From 143c702a5aa4837e67206211398df2ed3a453515 Mon Sep 17 00:00:00 2001 From: lauti7 Date: Wed, 10 Apr 2024 15:52:03 -0300 Subject: [PATCH] ws & rpc connections --- src/adapters/rpcServer.ts | 94 ++++++++++++++++++++++++++ src/adapters/ws.ts | 25 +++++++ src/components.ts | 15 +++- src/controllers/handlers/ws-handler.ts | 80 ++++++++++++++++++++++ src/controllers/routes.ts | 5 +- src/index.ts | 6 +- src/types.ts | 11 +++ 7 files changed, 230 insertions(+), 6 deletions(-) create mode 100644 src/adapters/rpcServer.ts create mode 100644 src/adapters/ws.ts create mode 100644 src/controllers/handlers/ws-handler.ts diff --git a/src/adapters/rpcServer.ts b/src/adapters/rpcServer.ts new file mode 100644 index 0000000..ecd547f --- /dev/null +++ b/src/adapters/rpcServer.ts @@ -0,0 +1,94 @@ +import { createRpcServer } from '@dcl/rpc' +import { registerService } from '@dcl/rpc/dist/codegen' +import { + FriendshipsServiceDefinition, + UsersResponse, + SubscribeFriendshipEventsUpdatesResponse +} from '@dcl/protocol/out-ts/decentraland/social/friendships_ea/friendships_ea.gen' +import { AppComponents, RpcServerContext } from '../types' + +export default function createRpcServerComponent(components: Pick) { + const { logs } = components + + const server = createRpcServer({ + logger: logs.getLogger('rpc-server') + }) + + const _logger = logs.getLogger('rpc-server-handler') + // Mocked server until we get the new service definition done + server.setHandler(async function handler(port) { + registerService(port, FriendshipsServiceDefinition, async () => ({ + getFriends(_request, _context) { + const generator = async function* () { + const response: UsersResponse = { + response: { + $case: 'users', + users: { users: [] } + } + } + yield response + } + + return generator() + }, + getMutualFriends(_request, _context) { + const generator = async function* () { + const response: UsersResponse = { + response: { + $case: 'users', + users: { users: [] } + } + } + yield response + } + + return generator() + }, + async getRequestEvents(_request, _context) { + return { + response: { + $case: 'events', + events: { + outgoing: { total: 0, items: [] }, + incoming: { total: 0, items: [] } + } + } + } + }, + async updateFriendshipEvent(_request, _context) { + return { + response: { + $case: 'event', + event: { + body: { + $case: 'accept', + accept: { + user: { + address: '0xA' + } + } + } + } + } + } + }, + subscribeFriendshipEventsUpdates(_request, _context) { + const generator = async function* () { + const response: SubscribeFriendshipEventsUpdatesResponse = { + response: { + $case: 'events', + events: { + responses: [] + } + } + } + yield response + } + + return generator() + } + })) + }) + + return server +} diff --git a/src/adapters/ws.ts b/src/adapters/ws.ts new file mode 100644 index 0000000..da31fe0 --- /dev/null +++ b/src/adapters/ws.ts @@ -0,0 +1,25 @@ +import { WebSocketServer } from 'ws' +import { IWebSocketComponent } from '../types' + +export async function createWsComponent(): Promise { + let wss: WebSocketServer | undefined + + async function start() { + if (wss) return + + wss = new WebSocketServer({ noServer: true }) + } + + async function stop() { + wss?.close() + wss = undefined + } + + await start() + + return { + start, + stop, + ws: wss! + } +} diff --git a/src/components.ts b/src/components.ts index 94ee677..e2ed54e 100644 --- a/src/components.ts +++ b/src/components.ts @@ -11,14 +11,22 @@ import { AppComponents, GlobalContext } from './types' import { metricDeclarations } from './metrics' import { createPgComponent } from '@well-known-components/pg-component' import { createDBComponent } from './adapters/db' +import { createWsComponent } from './adapters/ws' +import createRpcServerComponent from './adapters/rpcServer' +import { createFetchComponent } from '@well-known-components/fetch-component' // Initialize all the components of the app export async function initComponents(): Promise { const config = await createDotEnvConfigComponent({ path: ['.env.default', '.env'] }) const metrics = await createMetricsComponent(metricDeclarations, { config }) const logs = await createLogComponent({ metrics }) - const server = await createServerComponent({ config, logs }, {}) + + const ws = await createWsComponent() + const server = await createServerComponent({ config, logs, ws: ws.ws }, {}) const statusChecks = await createStatusCheckComponent({ server, config }) + const rpcServer = createRpcServerComponent({ logs }) + + const fetcher = createFetchComponent() let databaseUrl: string | undefined = await config.getString('PG_COMPONENT_PSQL_CONNECTION_STRING') if (!databaseUrl) { @@ -54,6 +62,9 @@ export async function initComponents(): Promise { statusChecks, metrics, pg, - db + db, + ws, + rpcServer, + fetcher } } diff --git a/src/controllers/handlers/ws-handler.ts b/src/controllers/handlers/ws-handler.ts new file mode 100644 index 0000000..1c48044 --- /dev/null +++ b/src/controllers/handlers/ws-handler.ts @@ -0,0 +1,80 @@ +import { IHttpServerComponent } from '@well-known-components/interfaces' +import { upgradeWebSocketResponse } from '@well-known-components/http-server/dist/ws' +import { WebSocket, MessageEvent } from 'ws' +import { WebSocketTransport } from '@dcl/rpc/dist/transports/WebSocket' +import future from 'fp-future' +import { verify } from '@dcl/platform-crypto-middleware' +import { GlobalContext } from '../../types' + +export async function wsHandler(context: IHttpServerComponent.DefaultContext) { + const { logs, rpcServer, fetcher } = context.components + const logger = logs.getLogger('ws-handler') + + return upgradeWebSocketResponse(async (socket) => { + let isAlive = true + const ws = socket as any as WebSocket + // it's needed bc of cloudflare + const pingInterval = setInterval(() => { + if (isAlive === false) { + logger.warn('terminating ws because of ping timeout') + return ws.terminate() + } + logger.debug('pinging websocket bc of cloudflare') + isAlive = false + ws.ping() + }, 30000) + + ws.on('close', () => { + logger.debug('closing websocket') + clearInterval(pingInterval) + }) + + ws.on('pong', () => { + logger.debug('PONG') + isAlive = true + }) + + const authChainPromise = future() + + function receiveAuthchainAsFirstMessage(event: MessageEvent) { + if (typeof event.data === 'string') { + authChainPromise.resolve(JSON.parse(event.data)) + } else { + authChainPromise.reject(new Error('INVALID_MESSAGE')) + } + } + + ws.addEventListener('message', receiveAuthchainAsFirstMessage) + + try { + const authChain = await Promise.race([sleep30Secs(), authChainPromise]) + ws.removeEventListener('message', receiveAuthchainAsFirstMessage) + + const authchainVerifyResult = await verify('get', '/', authChain, { + fetcher, + expiration: 1000 * 240 + }) + + const wsTransport = WebSocketTransport(socket) + + logger.debug('addresss > ', { address: authchainVerifyResult.auth }) + + rpcServer.attachTransport(wsTransport, { components: context.components, address: authchainVerifyResult.auth }) + + wsTransport.on('error', (err) => { + if (err && err.message) { + logger.error(err) + } + }) + } catch (error) { + // rejects if timeout, invalid first message or authchain verify error + logger.error(error as Error) + ws.close() + } + }) +} + +const sleep30Secs = () => + new Promise((_resolve, reject) => { + setTimeout(() => reject(new Error('TIMEOUT_WAITING_FOR_AUTCHAIN')), 30000) + }) diff --git a/src/controllers/routes.ts b/src/controllers/routes.ts index e36be14..8f98351 100644 --- a/src/controllers/routes.ts +++ b/src/controllers/routes.ts @@ -1,9 +1,12 @@ import { Router } from '@well-known-components/http-server' +import { wsHandler } from './handlers/ws-handler' import { GlobalContext } from '../types' // We return the entire router because it will be easier to test than a whole server -export async function setupRouter(globalContext: GlobalContext): Promise> { +export async function setupRouter(_globalContext: GlobalContext): Promise> { const router = new Router() + router.get('/', wsHandler) + return router } diff --git a/src/index.ts b/src/index.ts index 9b2cddb..51be60f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,6 @@ -import { Lifecycle } from "@well-known-components/interfaces" -import { initComponents } from "./components" -import { main } from "./service" +import { Lifecycle } from '@well-known-components/interfaces' +import { initComponents } from './components' +import { main } from './service' // This file is the program entry point, it only calls the Lifecycle function Lifecycle.run({ main, initComponents }) diff --git a/src/types.ts b/src/types.ts index aa85610..de35bc1 100644 --- a/src/types.ts +++ b/src/types.ts @@ -9,6 +9,8 @@ import type { import { IPgComponent } from '@well-known-components/pg-component' import { metricDeclarations } from './metrics' import { IDatabaseComponent } from './adapters/db' +import { WebSocketServer } from 'ws' +import { RpcServer } from '@dcl/rpc' export type GlobalContext = { components: BaseComponents @@ -22,6 +24,9 @@ export type BaseComponents = { metrics: IMetricsComponent pg: IPgComponent db: IDatabaseComponent + ws: IWebSocketComponent + rpcServer: RpcServer + fetcher: IFetchComponent } // components used in runtime @@ -47,3 +52,9 @@ export type HandlerContextWithPath< > export type Context = IHttpServerComponent.PathAwareContext + +export type IWebSocketComponent = IBaseComponent & { + ws: WebSocketServer +} + +export type RpcServerContext = GlobalContext & { address: string }