Skip to content

Commit

Permalink
feat: use uws instead of ws
Browse files Browse the repository at this point in the history
  • Loading branch information
lauti7 committed Apr 24, 2024
1 parent b525744 commit d681890
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 108 deletions.
17 changes: 3 additions & 14 deletions src/components.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,24 @@
import { resolve } from 'path'
import { createDotEnvConfigComponent } from '@well-known-components/env-config-provider'
import {
createServerComponent,
createStatusCheckComponent,
instrumentHttpServerWithPromClientRegistry
} from '@well-known-components/http-server'
import { createLogComponent } from '@well-known-components/logger'
import { createMetricsComponent } from '@well-known-components/metrics'
import { createFetchComponent } from '@well-known-components/fetch-component'
import { createPgComponent } from '@well-known-components/pg-component'
import { AppComponents, GlobalContext } from './types'
import { AppComponents } from './types'
import { metricDeclarations } from './metrics'
import { createDBComponent } from './adapters/db'
import { createWsComponent } from './adapters/ws'
import createRpcServerComponent from './adapters/rpcServer'
import createRedisComponent from './adapters/redis'
import createPubSubComponent from './adapters/pubsub'
import { createUWsComponent } from '@well-known-components/uws-http-server'

// Initialize all the components of the app
export async function initComponents(): Promise<AppComponents> {
const config = await createDotEnvConfigComponent({ path: ['.env.default', '.env'] })
const metrics = await createMetricsComponent(metricDeclarations, { config })
const logs = await createLogComponent({ metrics })

const ws = await createWsComponent()
const server = await createServerComponent<GlobalContext>({ config, logs, ws: ws.ws }, {})
const statusChecks = await createStatusCheckComponent({ server, config })
const server = await createUWsComponent({ config, logs })

const fetcher = createFetchComponent()

Expand Down Expand Up @@ -58,17 +51,13 @@ export async function initComponents(): Promise<AppComponents> {
const pubsub = createPubSubComponent({ logs, redis })
const rpcServer = await createRpcServerComponent({ logs, db, pubsub })

await instrumentHttpServerWithPromClientRegistry({ metrics, server, config, registry: metrics.registry! })

return {
config,
logs,
server,
statusChecks,
metrics,
pg,
db,
ws,
fetcher,
redis,
pubsub,
Expand Down
22 changes: 22 additions & 0 deletions src/controllers/handlers/status-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { AppComponents } from '../../types'

export async function createStatusHandler(components: Pick<AppComponents, 'config'>) {
const { config } = components
const [commitHash, version] = await Promise.all([
config.getString('COMMIT_HASH'),
config.getString('CURRENT_VERSION')
])

return {
path: '/status',
f: async () => {
return {
body: {
version: version ?? '',
currentTime: Date.now(),
commitHash: commitHash ?? ''
}
}
}
}
}
139 changes: 77 additions & 62 deletions src/controllers/handlers/ws-handler.ts
Original file line number Diff line number Diff line change
@@ -1,81 +1,96 @@
import { IHttpServerComponent } from '@well-known-components/interfaces'
import { upgradeWebSocketResponse } from '@well-known-components/http-server/dist/ws'
import { WebSocket, MessageEvent } from 'ws'
import { WebSocketTransport } from '@dcl/rpc/dist/transports/WebSocket'
import future from 'fp-future'
import mitt from 'mitt'
import { onRequestEnd, onRequestStart } from '@well-known-components/uws-http-server'
import { verify } from '@dcl/platform-crypto-middleware'
import { GlobalContext } from '../../types'
import { AppComponents, WsUserData } from '../../types'
import { normalizeAddress } from '../../utils/address'
import { IUWebSocketEventMap, UWebSocketTransport } from '../../utils/UWebSocketTransport'

export async function wsHandler(context: IHttpServerComponent.DefaultContext<GlobalContext>) {
const { logs, rpcServer, fetcher } = context.components
export async function registerWsHandler(
components: Pick<AppComponents, 'logs' | 'server' | 'metrics' | 'fetcher' | 'rpcServer'>
) {
const { logs, server, metrics, fetcher, rpcServer } = components
const logger = logs.getLogger('ws-handler')

return upgradeWebSocketResponse(async (socket) => {
let isAlive = true
const ws = socket as any as WebSocket
// it's needed bc of cloudflare
const pingInterval = setInterval(() => {
if (isAlive === false) {
logger.warn('terminating ws because of ping timeout')
return ws.terminate()
}
isAlive = false
ws.ping()
}, 30000)

ws.on('close', () => {
logger.debug('closing websocket')
clearInterval(pingInterval)
})

ws.on('pong', () => {
logger.debug('PONG')
isAlive = true
})
function changeStage(data: WsUserData, newData: WsUserData) {
Object.assign(data, newData)
}

const authChainPromise = future()

function receiveAuthchainAsFirstMessage(event: MessageEvent) {
if (typeof event.data === 'string') {
authChainPromise.resolve(JSON.parse(event.data))
} else {
authChainPromise.reject(new Error('INVALID_MESSAGE'))
server.app.ws<WsUserData>('/', {
idleTimeout: 0,
upgrade: (res, req, context) => {
logger.debug('upgrade requested')
const { labels, end } = onRequestStart(metrics, req.getMethod(), '/ws')
res.upgrade(
{
isConnected: false,
auth: false
},
req.getHeader('sec-websocket-key'),
req.getHeader('sec-websocket-protocol'),
req.getHeader('sec-websocket-extensions'),
context
)
onRequestEnd(metrics, labels, 101, end)
},
open: (ws) => {
logger.debug('ws open')
const data = ws.getUserData()
// just for type assertion
if (!data.auth) {
data.timeout = setTimeout(() => {
try {
logger.error('closing connection, no authchain received')
ws.end()
} catch (err) {}
}, 30000)
}
}
data.isConnected = true
},
message: async (ws, message) => {
const data = ws.getUserData()

ws.addEventListener('message', receiveAuthchainAsFirstMessage)
if (data.auth) {
data.eventEmitter.emit('message', message)
} else {
clearTimeout(data.timeout)
data.timeout = undefined

try {
const authChain = await Promise.race([sleep30Secs(), authChainPromise])
ws.removeEventListener('message', receiveAuthchainAsFirstMessage)
try {
const authChainMessage = new TextDecoder().decode(message)

const authchainVerifyResult = await verify('get', '/', authChain, {
fetcher
})
const veirfyResult = await verify('get', '/', JSON.parse(authChainMessage), {
fetcher
})
const address = normalizeAddress(veirfyResult.auth)

const wsTransport = WebSocketTransport(socket)
logger.debug('addresss > ', { address })

logger.debug('addresss > ', { address: authchainVerifyResult.auth })
const emitter = mitt<IUWebSocketEventMap>()
changeStage(data, { auth: true, address, eventEmitter: emitter, isConnected: true })

const address = normalizeAddress(authchainVerifyResult.auth)
const transport = UWebSocketTransport(ws, emitter)

rpcServer.attachUser({ transport: wsTransport, address })
rpcServer.attachUser({ transport, address })

wsTransport.on('error', (err) => {
if (err && err.message) {
logger.error(err)
transport.on('error', (err) => {
if (err && err.message) {
logger.error(err)
}
})
} catch (error) {
console.log(error)
logger.error(error as any)
ws.close()
}
})
} catch (error) {
// rejects if timeout, invalid first message or authchain verify error
logger.error(error as Error)
ws.close()
}
},
close: (ws, code, _message) => {
logger.debug(`Websocket closed ${code}`)
const data = ws.getUserData()
if (data.auth) {
data.isConnected = false
data.eventEmitter.emit('close', code)
}
}
})
}

const sleep30Secs = () =>
new Promise((_resolve, reject) => {
setTimeout(() => reject(new Error('TIMEOUT_WAITING_FOR_AUTCHAIN')), 30000)
})
81 changes: 73 additions & 8 deletions src/controllers/routes.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,77 @@
import { Router } from '@well-known-components/http-server'
import { wsHandler } from './handlers/ws-handler'
import { GlobalContext } from '../types'
import {
HttpRequest,
HttpResponse,
createMetricsHandler,
onRequestEnd,
onRequestStart
} from '@well-known-components/uws-http-server'
import { AppComponents, IHandler, TestComponents } from '../types'
import { createStatusHandler } from './handlers/status-handler'
import { registerWsHandler } from './handlers/ws-handler'

// We return the entire router because it will be easier to test than a whole server
export async function setupRouter(_globalContext: GlobalContext): Promise<Router<GlobalContext>> {
const router = new Router<GlobalContext>()
export async function setupRoutes(components: AppComponents | TestComponents): Promise<void> {
const { metrics, server } = components

router.get('/', wsHandler)
function wrap(h: IHandler) {
return async (res: HttpResponse, req: HttpRequest) => {
const { labels, end } = onRequestStart(metrics, req.getMethod(), h.path)
let status = 500
try {
const result = await h.f(res, req)
status = result.status ?? 200
res.writeStatus(`${status}`)

return router
const headers = new Headers(result.headers ?? {})

if (!headers.has('Access-Control-Allow-Origin')) {
headers.set('Access-Control-Allow-Origin', '*')
}

headers.forEach((v, k) => res.writeHeader(k, v))

if (result.body === undefined) {
res.end()
} else if (typeof result.body === 'string') {
res.end(result.body)
} else {
res.writeHeader('content-type', 'application/json')
res.end(JSON.stringify(result.body))
}
} catch (err) {
res.writeStatus(`${status}`)
res.end()
} finally {
onRequestEnd(metrics, labels, status, end)
}
}
}

await registerWsHandler(components)

{
const handler = await createStatusHandler(components)
server.app.get(handler.path, wrap(handler))
}

{
const { path, handler } = await createMetricsHandler(components, metrics.registry!)
server.app.get(path, handler)
}

server.app.any('/health/live', (res, req) => {
const { end, labels } = onRequestStart(metrics, req.getMethod(), '/health/live')
res.writeStatus('200 OK')
res.writeHeader('Access-Control-Allow-Origin', '*')
res.end('alive')
onRequestEnd(metrics, labels, 404, end)
})

server.app.any('/*', (res, req) => {
const { end, labels } = onRequestStart(metrics, req.getMethod(), '')
res.writeStatus('404 Not Found')
res.writeHeader('Access-Control-Allow-Origin', '*')
res.writeHeader('content-type', 'application/json')
res.end(JSON.stringify({ error: 'Not Found' }))
onRequestEnd(metrics, labels, 404, end)
})
}
17 changes: 3 additions & 14 deletions src/service.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,12 @@
import { Lifecycle } from '@well-known-components/interfaces'
import { setupRouter } from './controllers/routes'
import { AppComponents, GlobalContext, TestComponents } from './types'
import { setupRoutes } from './controllers/routes'
import { AppComponents, TestComponents } from './types'

// this function wires the business logic (adapters & controllers) with the components (ports)
export async function main(program: Lifecycle.EntryPointParameters<AppComponents | TestComponents>) {
const { components, startComponents } = program
const globalContext: GlobalContext = {
components
}

// wire the HTTP router (make it automatic? TBD)
const router = await setupRouter(globalContext)
// register routes middleware
components.server.use(router.middleware())
// register not implemented/method not allowed/cors responses middleware
components.server.use(router.allowedMethods())
// set the context to be passed to the handlers
components.server.setContext(globalContext)
await setupRoutes(components)

// start ports: db, listeners, synchronizations, etc
await startComponents()
}
Loading

0 comments on commit d681890

Please sign in to comment.