Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

code review for the notifications aggregator service. #13

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/server/NotificationServiceHTTPServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ export class NotificationServiceHTTPServer {
/**
* Creates an instance of NotificationServiceHTTPServer.
* @param {number} port - The port where the HTTP server will listen.
* @param {string[]} pod_url - The location of the Solid Pod from which the notifications are retrieved.
* @param {*} logger - The logger object.
* @memberof NotificationServiceHTTPServer
*/
Expand All @@ -40,7 +39,7 @@ export class NotificationServiceHTTPServer {
this.websocket_handler = new WebSocketServerHandler(this.websocket_server);
this.setupServer(port);
this.connect_to_websocket_server('ws://localhost:8085/');
this.websocket_handler.handle_communication(this.cacheService);
this.websocket_handler.handle_communication();

}
/**
Expand Down Expand Up @@ -165,6 +164,11 @@ export class NotificationServiceHTTPServer {
response.end('OK');
}

/**
* Sends a message to the WebSocket server.
* @param {string} message - The message to send.
* @memberof NotificationServiceHTTPServer
*/
public send_to_websocket_server(message: string) {
if (this.connection.connected) {
this.connection.sendUTF(message);
Expand All @@ -175,7 +179,11 @@ export class NotificationServiceHTTPServer {
});
}
}

/**
* Connects to the WebSocket server.
* @param {string} wss_url - The URL of the WebSocket server.
* @memberof NotificationServiceHTTPServer
*/
public async connect_to_websocket_server(wss_url: string) {
this.client.connect(wss_url, 'solid-stream-notifications-aggregator');
this.client.on('connect', (connection: typeof websocket.connection) => {
Expand Down
37 changes: 25 additions & 12 deletions src/server/WebSocketServerHandler.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,32 @@
import * as WebSocket from 'websocket';
import { CacheService } from '../service/CacheService';
import { SubscribeNotification } from '../service/SubscribeNotification';

/**
* This class is used to handle the WebSocket server.
* @class WebSocketServerHandler
*/
export class WebSocketServerHandler {

public websocket_server: any;
public websocket_connections: Map<string, WebSocket>;
public subscribe_notification: SubscribeNotification;


/**
* Creates an instance of WebSocketServerHandler.
* @param {WebSocket.server} websocket_server - The WebSocket server.
* @memberof WebSocketServerHandler
*/
constructor(websocket_server: WebSocket.server) {
this.websocket_server = websocket_server;
this.websocket_connections = new Map<string, WebSocket>();
this.subscribe_notification = new SubscribeNotification();
}

public async handle_communication(cache_service: CacheService) {
/**
* Handles the communication for the WebSocket server, including subscribing to the notification server and sending messages to the client.
* @memberof WebSocketServerHandler
*/
public async handle_communication() {
console.log(`Handling the communication for the WebSocket server.`);
this.websocket_server.on('connect', (connection: any) => {
console.log(`Connection received from the client with address: ${connection.remoteAddress}`);
this.websocket_server.on('connect', () => {
console.log(`Connected to the WebSocket server.`);
});

this.websocket_server.on('request', (request: any) => {
Expand All @@ -29,15 +37,15 @@ export class WebSocketServerHandler {
const ws_message = JSON.parse(message_utf8);
if (Object.keys(ws_message).includes('subscribe')) {
console.log(`Received a subscribe message from the client.`);
let stream_to_subscribe = ws_message.subscribe;
for (let stream of stream_to_subscribe){
const stream_to_subscribe = ws_message.subscribe;
for (const stream of stream_to_subscribe) {
this.subscribe_notification.subscribe(stream);
console.log(`Subscribed to the stream: ${stream}`);
this.set_connections(stream, connection);
}
}
else if (Object.keys(ws_message).includes('event')) {
let connection = this.websocket_connections.get(ws_message.stream);
const connection = this.websocket_connections.get(ws_message.stream);
if (connection !== undefined) {
connection.send(JSON.stringify(ws_message));
}
Expand All @@ -46,7 +54,12 @@ export class WebSocketServerHandler {
});
});
}

/**
* Sets the connections for the WebSocket server's Map.
* @param {string} subscribed_stream - The stream to subscribe to.
* @param {WebSocket} connection - The WebSocket connection.
* @memberof WebSocketServerHandler
*/
public set_connections(subscribed_stream: string, connection: WebSocket): void {
this.websocket_connections.set(subscribed_stream, connection);
}
Expand Down
20 changes: 18 additions & 2 deletions src/service/CacheService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ export class CacheService {
}


/**
* Get the most recent key-value pair from the Redis cache.
* @param {string} ldes_stream - The LDES stream to get the most recent key-value pair for.
* @returns {Promise<{ key: string, value: string }>} - A promise that resolves to an object containing the most recent key-value pair in the cache.
* @memberof CacheService
*/
async get_recent_key_value(ldes_stream: string): Promise<{ key: string, value: string }> {
try {
// example of a key is, stream:http://localhost:3000/aggregation_pod/aggregation/:1710250027636
Expand All @@ -140,14 +146,24 @@ export class CacheService {
await this.client.quit();
}
}




/**
* Sets the time to live for a key in the Redis cache.
* @param {string} key - The key to set the time to live for.
* @param {number} time_to_live - The time to live for the key in seconds.
* @memberof CacheService
*/
async setTimeToLive(key: string, time_to_live: number) {
// setting the time to live for the key in seconds.
await this.client.expire(key, time_to_live);
}


/**
* Clears the Redis cache.
* @memberof CacheService
*/
async clearCache() {
await this.client.flushall();
}
Expand Down
2 changes: 1 addition & 1 deletion src/service/SubscribeNotification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import { extract_ldp_inbox, extract_subscription_server } from "../utils/Util";
export class SubscribeNotification {
/**
* Creates an instance of SubscribeNotification.
* @param {string[]} streams - An array of LDES streams to subscribe to, for real-time notifications.
* @memberof SubscribeNotification
*/
constructor() {
}

/**
* Subscribes to the notification server for each LDES stream in the constructor, using the inbox and subscription server.
* @param {string} ldes_stream - The LDES stream to subscribe to.
* @returns {(Promise<boolean | undefined>)} - Returns a promise with a boolean or undefined. If the subscription is successful, it returns true. If the subscription fails, it throws an error.
* @memberof SubscribeNotification
*/
Expand Down