diff --git a/src/components.ts b/src/components.ts index 7d9123e..22244bb 100644 --- a/src/components.ts +++ b/src/components.ts @@ -1,21 +1,16 @@ import { resolve } from 'path' import { createDotEnvConfigComponent } from '@well-known-components/env-config-provider' -import { - createServerComponent, - createStatusCheckComponent, - instrumentHttpServerWithPromClientRegistry -} from '@well-known-components/http-server' import { createLogComponent } from '@well-known-components/logger' import { createMetricsComponent } from '@well-known-components/metrics' import { createFetchComponent } from '@well-known-components/fetch-component' import { createPgComponent } from '@well-known-components/pg-component' -import { AppComponents, GlobalContext } from './types' +import { AppComponents } from './types' import { metricDeclarations } from './metrics' import { createDBComponent } from './adapters/db' -import { createWsComponent } from './adapters/ws' import createRpcServerComponent from './adapters/rpcServer' import createRedisComponent from './adapters/redis' import createPubSubComponent from './adapters/pubsub' +import { createUWsComponent } from '@well-known-components/uws-http-server' // Initialize all the components of the app export async function initComponents(): Promise { @@ -23,9 +18,7 @@ export async function initComponents(): Promise { const metrics = await createMetricsComponent(metricDeclarations, { config }) const logs = await createLogComponent({ metrics }) - const ws = await createWsComponent() - const server = await createServerComponent({ config, logs, ws: ws.ws }, {}) - const statusChecks = await createStatusCheckComponent({ server, config }) + const server = await createUWsComponent({ config, logs }) const fetcher = createFetchComponent() @@ -58,17 +51,13 @@ export async function initComponents(): Promise { const pubsub = createPubSubComponent({ logs, redis }) const rpcServer = await createRpcServerComponent({ logs, db, pubsub }) - await instrumentHttpServerWithPromClientRegistry({ metrics, server, config, registry: metrics.registry! }) - return { config, logs, server, - statusChecks, metrics, pg, db, - ws, fetcher, redis, pubsub, diff --git a/src/controllers/handlers/status-handler.ts b/src/controllers/handlers/status-handler.ts new file mode 100644 index 0000000..16b1557 --- /dev/null +++ b/src/controllers/handlers/status-handler.ts @@ -0,0 +1,22 @@ +import { AppComponents } from '../../types' + +export async function createStatusHandler(components: Pick) { + const { config } = components + const [commitHash, version] = await Promise.all([ + config.getString('COMMIT_HASH'), + config.getString('CURRENT_VERSION') + ]) + + return { + path: '/status', + f: async () => { + return { + body: { + version: version ?? '', + currentTime: Date.now(), + commitHash: commitHash ?? '' + } + } + } + } +} diff --git a/src/controllers/handlers/ws-handler.ts b/src/controllers/handlers/ws-handler.ts index abb3dac..e8f552c 100644 --- a/src/controllers/handlers/ws-handler.ts +++ b/src/controllers/handlers/ws-handler.ts @@ -1,81 +1,96 @@ -import { IHttpServerComponent } from '@well-known-components/interfaces' -import { upgradeWebSocketResponse } from '@well-known-components/http-server/dist/ws' -import { WebSocket, MessageEvent } from 'ws' -import { WebSocketTransport } from '@dcl/rpc/dist/transports/WebSocket' -import future from 'fp-future' +import mitt from 'mitt' +import { onRequestEnd, onRequestStart } from '@well-known-components/uws-http-server' import { verify } from '@dcl/platform-crypto-middleware' -import { GlobalContext } from '../../types' +import { AppComponents, WsUserData } from '../../types' import { normalizeAddress } from '../../utils/address' +import { IUWebSocketEventMap, UWebSocketTransport } from '../../utils/UWebSocketTransport' -export async function wsHandler(context: IHttpServerComponent.DefaultContext) { - const { logs, rpcServer, fetcher } = context.components +export async function registerWsHandler( + components: Pick +) { + const { logs, server, metrics, fetcher, rpcServer } = components const logger = logs.getLogger('ws-handler') - return upgradeWebSocketResponse(async (socket) => { - let isAlive = true - const ws = socket as any as WebSocket - // it's needed bc of cloudflare - const pingInterval = setInterval(() => { - if (isAlive === false) { - logger.warn('terminating ws because of ping timeout') - return ws.terminate() - } - isAlive = false - ws.ping() - }, 30000) - - ws.on('close', () => { - logger.debug('closing websocket') - clearInterval(pingInterval) - }) - - ws.on('pong', () => { - logger.debug('PONG') - isAlive = true - }) + function changeStage(data: WsUserData, newData: WsUserData) { + Object.assign(data, newData) + } - const authChainPromise = future() - - function receiveAuthchainAsFirstMessage(event: MessageEvent) { - if (typeof event.data === 'string') { - authChainPromise.resolve(JSON.parse(event.data)) - } else { - authChainPromise.reject(new Error('INVALID_MESSAGE')) + server.app.ws('/', { + idleTimeout: 0, + upgrade: (res, req, context) => { + logger.debug('upgrade requested') + const { labels, end } = onRequestStart(metrics, req.getMethod(), '/ws') + res.upgrade( + { + isConnected: false, + auth: false + }, + req.getHeader('sec-websocket-key'), + req.getHeader('sec-websocket-protocol'), + req.getHeader('sec-websocket-extensions'), + context + ) + onRequestEnd(metrics, labels, 101, end) + }, + open: (ws) => { + logger.debug('ws open') + const data = ws.getUserData() + // just for type assertion + if (!data.auth) { + data.timeout = setTimeout(() => { + try { + logger.error('closing connection, no authchain received') + ws.end() + } catch (err) {} + }, 30000) } - } + data.isConnected = true + }, + message: async (ws, message) => { + const data = ws.getUserData() - ws.addEventListener('message', receiveAuthchainAsFirstMessage) + if (data.auth) { + data.eventEmitter.emit('message', message) + } else { + clearTimeout(data.timeout) + data.timeout = undefined - try { - const authChain = await Promise.race([sleep30Secs(), authChainPromise]) - ws.removeEventListener('message', receiveAuthchainAsFirstMessage) + try { + const authChainMessage = new TextDecoder().decode(message) - const authchainVerifyResult = await verify('get', '/', authChain, { - fetcher - }) + const veirfyResult = await verify('get', '/', JSON.parse(authChainMessage), { + fetcher + }) + const address = normalizeAddress(veirfyResult.auth) - const wsTransport = WebSocketTransport(socket) + logger.debug('addresss > ', { address }) - logger.debug('addresss > ', { address: authchainVerifyResult.auth }) + const emitter = mitt() + changeStage(data, { auth: true, address, eventEmitter: emitter, isConnected: true }) - const address = normalizeAddress(authchainVerifyResult.auth) + const transport = UWebSocketTransport(ws, emitter) - rpcServer.attachUser({ transport: wsTransport, address }) + rpcServer.attachUser({ transport, address }) - wsTransport.on('error', (err) => { - if (err && err.message) { - logger.error(err) + transport.on('error', (err) => { + if (err && err.message) { + logger.error(err) + } + }) + } catch (error) { + console.log(error) + logger.error(error as any) + ws.close() } - }) - } catch (error) { - // rejects if timeout, invalid first message or authchain verify error - logger.error(error as Error) - ws.close() + } + }, + close: (ws, code, _message) => { + logger.debug(`Websocket closed ${code}`) + const data = ws.getUserData() + if (data.auth) { + data.isConnected = false + data.eventEmitter.emit('close', code) + } } }) } - -const sleep30Secs = () => - new Promise((_resolve, reject) => { - setTimeout(() => reject(new Error('TIMEOUT_WAITING_FOR_AUTCHAIN')), 30000) - }) diff --git a/src/controllers/routes.ts b/src/controllers/routes.ts index 8f98351..388266d 100644 --- a/src/controllers/routes.ts +++ b/src/controllers/routes.ts @@ -1,12 +1,77 @@ -import { Router } from '@well-known-components/http-server' -import { wsHandler } from './handlers/ws-handler' -import { GlobalContext } from '../types' +import { + HttpRequest, + HttpResponse, + createMetricsHandler, + onRequestEnd, + onRequestStart +} from '@well-known-components/uws-http-server' +import { AppComponents, IHandler, TestComponents } from '../types' +import { createStatusHandler } from './handlers/status-handler' +import { registerWsHandler } from './handlers/ws-handler' -// We return the entire router because it will be easier to test than a whole server -export async function setupRouter(_globalContext: GlobalContext): Promise> { - const router = new Router() +export async function setupRoutes(components: AppComponents | TestComponents): Promise { + const { metrics, server } = components - router.get('/', wsHandler) + function wrap(h: IHandler) { + return async (res: HttpResponse, req: HttpRequest) => { + const { labels, end } = onRequestStart(metrics, req.getMethod(), h.path) + let status = 500 + try { + const result = await h.f(res, req) + status = result.status ?? 200 + res.writeStatus(`${status}`) - return router + const headers = new Headers(result.headers ?? {}) + + if (!headers.has('Access-Control-Allow-Origin')) { + headers.set('Access-Control-Allow-Origin', '*') + } + + headers.forEach((v, k) => res.writeHeader(k, v)) + + if (result.body === undefined) { + res.end() + } else if (typeof result.body === 'string') { + res.end(result.body) + } else { + res.writeHeader('content-type', 'application/json') + res.end(JSON.stringify(result.body)) + } + } catch (err) { + res.writeStatus(`${status}`) + res.end() + } finally { + onRequestEnd(metrics, labels, status, end) + } + } + } + + await registerWsHandler(components) + + { + const handler = await createStatusHandler(components) + server.app.get(handler.path, wrap(handler)) + } + + { + const { path, handler } = await createMetricsHandler(components, metrics.registry!) + server.app.get(path, handler) + } + + server.app.any('/health/live', (res, req) => { + const { end, labels } = onRequestStart(metrics, req.getMethod(), '/health/live') + res.writeStatus('200 OK') + res.writeHeader('Access-Control-Allow-Origin', '*') + res.end('alive') + onRequestEnd(metrics, labels, 404, end) + }) + + server.app.any('/*', (res, req) => { + const { end, labels } = onRequestStart(metrics, req.getMethod(), '') + res.writeStatus('404 Not Found') + res.writeHeader('Access-Control-Allow-Origin', '*') + res.writeHeader('content-type', 'application/json') + res.end(JSON.stringify({ error: 'Not Found' })) + onRequestEnd(metrics, labels, 404, end) + }) } diff --git a/src/service.ts b/src/service.ts index a30ba9f..c02ffd1 100644 --- a/src/service.ts +++ b/src/service.ts @@ -1,23 +1,12 @@ import { Lifecycle } from '@well-known-components/interfaces' -import { setupRouter } from './controllers/routes' -import { AppComponents, GlobalContext, TestComponents } from './types' +import { setupRoutes } from './controllers/routes' +import { AppComponents, TestComponents } from './types' // this function wires the business logic (adapters & controllers) with the components (ports) export async function main(program: Lifecycle.EntryPointParameters) { const { components, startComponents } = program - const globalContext: GlobalContext = { - components - } - // wire the HTTP router (make it automatic? TBD) - const router = await setupRouter(globalContext) - // register routes middleware - components.server.use(router.middleware()) - // register not implemented/method not allowed/cors responses middleware - components.server.use(router.allowedMethods()) - // set the context to be passed to the handlers - components.server.setContext(globalContext) + await setupRoutes(components) - // start ports: db, listeners, synchronizations, etc await startComponents() } diff --git a/src/types.ts b/src/types.ts index 7fb052f..795c491 100644 --- a/src/types.ts +++ b/src/types.ts @@ -14,6 +14,8 @@ import { IDatabaseComponent } from './adapters/db' import { IRedisComponent } from './adapters/redis' import { IRPCServerComponent } from './adapters/rpcServer' import { IPubSubComponent } from './adapters/pubsub' +import { HttpRequest, HttpResponse, IUWsComponent, WebSocket } from '@well-known-components/uws-http-server' +import { IUWebSocketEventMap } from './utils/UWebSocketTransport' export type GlobalContext = { components: BaseComponents @@ -23,11 +25,10 @@ export type GlobalContext = { export type BaseComponents = { config: IConfigComponent logs: ILoggerComponent - server: IHttpServerComponent + server: IUWsComponent metrics: IMetricsComponent pg: IPgComponent db: IDatabaseComponent - ws: IWebSocketComponent rpcServer: IRPCServerComponent fetcher: IFetchComponent redis: IRedisComponent @@ -35,9 +36,7 @@ export type BaseComponents = { } // components used in runtime -export type AppComponents = BaseComponents & { - statusChecks: IBaseComponent -} +export type AppComponents = BaseComponents // components used in tests export type TestComponents = BaseComponents & { @@ -45,6 +44,35 @@ export type TestComponents = BaseComponents & { localFetch: IFetchComponent } +export type JsonBody = Record +export type ResponseBody = JsonBody | string + +export type IHandlerResult = { + status?: number + headers?: Record + body?: ResponseBody +} + +export type IHandler = { + path: string + f: (res: HttpResponse, req: HttpRequest) => Promise +} + +export type WsUserData = + | { + isConnected: boolean + auth: false + timeout?: NodeJS.Timeout + } + | { + isConnected: boolean + eventEmitter: Emitter + auth: true + address: string + } + +export type InternalWebSocket = WebSocket + // this type simplifies the typings of http handlers export type HandlerContextWithPath< ComponentNames extends keyof AppComponents, diff --git a/src/utils/UWebSocketTransport.ts b/src/utils/UWebSocketTransport.ts new file mode 100644 index 0000000..04e9848 --- /dev/null +++ b/src/utils/UWebSocketTransport.ts @@ -0,0 +1,93 @@ +import { Transport, TransportEvents } from '@dcl/rpc' +import mitt, { Emitter } from 'mitt' + +export const defer = Promise.prototype.then.bind(Promise.resolve()) + +export type RecognizedString = + | string + | ArrayBuffer + | Uint8Array + | Int8Array + | Uint16Array + | Int16Array + | Uint32Array + | Int32Array + | Float32Array + | Float64Array + +export type IUWebSocketEventMap = { + close: any + message: RecognizedString +} + +export interface IUWebSocket { + end(code?: number, shortMessage?: RecognizedString): void + + send(message: RecognizedString, isBinary?: boolean, compress?: boolean): number + + close(): void + + getUserData(): T +} + +export function UWebSocketTransport( + socket: IUWebSocket, + uServerEmitter: Emitter +): Transport { + const queue: Uint8Array[] = [] + + function flush() { + for (const $ of queue) { + send($) + queue.length = 0 + } + } + + function send(msg: string | Uint8Array | ArrayBuffer | SharedArrayBuffer) { + if (msg instanceof Uint8Array || msg instanceof ArrayBuffer || msg instanceof SharedArrayBuffer) { + socket.send(msg, true) + } else { + throw new Error(`WebSocketTransport only accepts Uint8Array`) + } + } + + const events = mitt() + + uServerEmitter.on('close', () => { + events.emit('close', {}) + }) + + uServerEmitter.on('message', (message) => { + if (message instanceof ArrayBuffer) { + events.emit('message', new Uint8Array(message)) + } else { + throw new Error(`WebSocketTransport: Received unknown type of message, expecting Uint8Array`) + } + }) + + // socket already connected at this point + void defer(() => events.emit('connect', {})) + void defer(() => flush()) + + const api: Transport = { + ...events, + get isConnected() { + return socket.getUserData().isConnected + }, + sendMessage(message: any) { + if (message instanceof Uint8Array) { + if (true) { + send(message) + } else { + } + } else { + throw new Error(`WebSocketTransport: Received unknown type of message, expecting Uint8Array`) + } + }, + close() { + socket.close() + } + } + + return api +} diff --git a/yarn.lock b/yarn.lock index e3657ae..6096c95 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1171,7 +1171,7 @@ "@well-known-components/http-server@^2.1.0": version "2.1.0" - resolved "https://registry.npmjs.org/@well-known-components/http-server/-/http-server-2.1.0.tgz" + resolved "https://registry.yarnpkg.com/@well-known-components/http-server/-/http-server-2.1.0.tgz#23a18edc82904b3a575452c2d7e618c7da37a07f" integrity sha512-IHD7aLTA+9DYEchQubHDBwc4FmVEmQC+2TWbi8Tz+QlkiQdtndcuba8XHH+EwqlB5sna/EAJGZGXPxS7okcHKA== dependencies: "@types/http-errors" "^2.0.1" @@ -1227,10 +1227,10 @@ sinon "^17.0.0" ts-jest "^29.1.0" -"@well-known-components/uws-http-server@^0.0.1-20240314125425.commit-711dd8f": - version "0.0.1-20240318121702.commit-ac5a332" - resolved "https://registry.yarnpkg.com/@well-known-components/uws-http-server/-/uws-http-server-0.0.1-20240318121702.commit-ac5a332.tgz#d74b59ad6b9ce439ea9dc81eced047f3512069e5" - integrity sha512-eT6bPvpxB6k9bw9uJCejtq1/JNzzbD3i6aWmTUWI7ZTTR60kBRvN7sRv5qVo7tcrBYwfvUVe3TgZ7H3Q6GoxSw== +"@well-known-components/uws-http-server@^0.0.2": + version "0.0.2" + resolved "https://registry.yarnpkg.com/@well-known-components/uws-http-server/-/uws-http-server-0.0.2.tgz#5ba577fd895bd63b67936b15f7803b2b7e9c1149" + integrity sha512-zwq7kpXnr+ml3py1bcuU3ILOGVO+6tg5e8+pf01G2wudWQlLMV3Cp66/U8tlrAFgIo5Ob7ZQmKdAnCQpHm9gFA== dependencies: "@well-known-components/interfaces" "^1.4.3" uWebSockets.js "github:uNetworking/uWebSockets.js#v20.43.0"