Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use uws #12

Merged
merged 6 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions build-local-proto.sh

This file was deleted.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
"@dcl/eslint-config": "^2.0.0",
"@protobuf-ts/protoc": "^2.9.4",
"@types/node": "^20.11.28",
"@types/redis": "^4.0.11",
"@types/ws": "^8.5.10",
"@well-known-components/test-helpers": "^1.5.6",
"nodemon": "^3.1.0",
"ts-node": "^10.9.2",
"typescript": "^5.4.2",
"@types/redis": "^4.0.11"
"typescript": "^5.4.2"
},
"prettier": {
"printWidth": 120,
Expand All @@ -39,7 +39,7 @@
"@well-known-components/logger": "^3.1.3",
"@well-known-components/metrics": "^2.1.0",
"@well-known-components/pg-component": "^0.2.2",
"@well-known-components/uws-http-server": "^0.0.1-20240314125425.commit-711dd8f",
"@well-known-components/uws-http-server": "^0.0.2",
"fp-future": "^1.0.1",
"mitt": "^3.0.1",
"redis": "^4.6.13",
Expand Down
17 changes: 3 additions & 14 deletions src/components.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,24 @@
import { resolve } from 'path'
import { createDotEnvConfigComponent } from '@well-known-components/env-config-provider'
import {
createServerComponent,
createStatusCheckComponent,
instrumentHttpServerWithPromClientRegistry
} from '@well-known-components/http-server'
import { createLogComponent } from '@well-known-components/logger'
import { createMetricsComponent } from '@well-known-components/metrics'
import { createFetchComponent } from '@well-known-components/fetch-component'
import { createPgComponent } from '@well-known-components/pg-component'
import { AppComponents, GlobalContext } from './types'
import { AppComponents } from './types'
import { metricDeclarations } from './metrics'
import { createDBComponent } from './adapters/db'
import { createWsComponent } from './adapters/ws'
import createRpcServerComponent from './adapters/rpcServer'
import createRedisComponent from './adapters/redis'
import createPubSubComponent from './adapters/pubsub'
import { createUWsComponent } from '@well-known-components/uws-http-server'

// Initialize all the components of the app
export async function initComponents(): Promise<AppComponents> {
const config = await createDotEnvConfigComponent({ path: ['.env.default', '.env'] })
const metrics = await createMetricsComponent(metricDeclarations, { config })
const logs = await createLogComponent({ metrics })

const ws = await createWsComponent()
const server = await createServerComponent<GlobalContext>({ config, logs, ws: ws.ws }, {})
const statusChecks = await createStatusCheckComponent({ server, config })
const server = await createUWsComponent({ config, logs })

const fetcher = createFetchComponent()

Expand Down Expand Up @@ -58,17 +51,13 @@ export async function initComponents(): Promise<AppComponents> {
const pubsub = createPubSubComponent({ logs, redis })
const rpcServer = await createRpcServerComponent({ logs, db, pubsub })

await instrumentHttpServerWithPromClientRegistry({ metrics, server, config, registry: metrics.registry! })

return {
config,
logs,
server,
statusChecks,
metrics,
pg,
db,
ws,
fetcher,
redis,
pubsub,
Expand Down
22 changes: 22 additions & 0 deletions src/controllers/handlers/status-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { AppComponents } from '../../types'

export async function createStatusHandler(components: Pick<AppComponents, 'config'>) {
const { config } = components
const [commitHash, version] = await Promise.all([
config.getString('COMMIT_HASH'),
config.getString('CURRENT_VERSION')
])

return {
path: '/status',
f: async () => {
return {
body: {
version: version ?? '',
currentTime: Date.now(),
commitHash: commitHash ?? ''
}
}
}
}
}
141 changes: 79 additions & 62 deletions src/controllers/handlers/ws-handler.ts
Original file line number Diff line number Diff line change
@@ -1,81 +1,98 @@
import { IHttpServerComponent } from '@well-known-components/interfaces'
import { upgradeWebSocketResponse } from '@well-known-components/http-server/dist/ws'
import { WebSocket, MessageEvent } from 'ws'
import { WebSocketTransport } from '@dcl/rpc/dist/transports/WebSocket'
import future from 'fp-future'
import mitt from 'mitt'
import { onRequestEnd, onRequestStart } from '@well-known-components/uws-http-server'
import { verify } from '@dcl/platform-crypto-middleware'
import { GlobalContext } from '../../types'
import { AppComponents, WsUserData } from '../../types'
import { normalizeAddress } from '../../utils/address'
import { IUWebSocketEventMap, UWebSocketTransport } from '../../utils/UWebSocketTransport'

export async function wsHandler(context: IHttpServerComponent.DefaultContext<GlobalContext>) {
const { logs, rpcServer, fetcher } = context.components
const logger = logs.getLogger('ws-handler')

return upgradeWebSocketResponse(async (socket) => {
let isAlive = true
const ws = socket as any as WebSocket
// it's needed bc of cloudflare
const pingInterval = setInterval(() => {
if (isAlive === false) {
logger.warn('terminating ws because of ping timeout')
return ws.terminate()
}
isAlive = false
ws.ping()
}, 30000)
const textDecoder = new TextDecoder()

ws.on('close', () => {
logger.debug('closing websocket')
clearInterval(pingInterval)
})

ws.on('pong', () => {
logger.debug('PONG')
isAlive = true
})
export async function registerWsHandler(
components: Pick<AppComponents, 'logs' | 'server' | 'metrics' | 'fetcher' | 'rpcServer'>
) {
const { logs, server, metrics, fetcher, rpcServer } = components
const logger = logs.getLogger('ws-handler')

const authChainPromise = future()
function changeStage(data: WsUserData, newData: WsUserData) {
Object.assign(data, newData)
}

function receiveAuthchainAsFirstMessage(event: MessageEvent) {
if (typeof event.data === 'string') {
authChainPromise.resolve(JSON.parse(event.data))
} else {
authChainPromise.reject(new Error('INVALID_MESSAGE'))
server.app.ws<WsUserData>('/', {
idleTimeout: 0,
upgrade: (res, req, context) => {
logger.debug('upgrade requested')
const { labels, end } = onRequestStart(metrics, req.getMethod(), '/ws')
res.upgrade(
{
isConnected: false,
auth: false
},
req.getHeader('sec-websocket-key'),
req.getHeader('sec-websocket-protocol'),
req.getHeader('sec-websocket-extensions'),
context
)
onRequestEnd(metrics, labels, 101, end)
},
open: (ws) => {
logger.debug('ws open')
const data = ws.getUserData()
// just for type assertion
if (!data.auth) {
data.timeout = setTimeout(() => {
try {
logger.error('closing connection, no authchain received')
ws.end()
} catch (err) {}
}, 30000)
}
}
data.isConnected = true
},
message: async (ws, message) => {
const data = ws.getUserData()

ws.addEventListener('message', receiveAuthchainAsFirstMessage)
if (data.auth) {
data.eventEmitter.emit('message', message)
} else {
clearTimeout(data.timeout)
data.timeout = undefined

try {
const authChain = await Promise.race([sleep30Secs(), authChainPromise])
ws.removeEventListener('message', receiveAuthchainAsFirstMessage)
try {
const authChainMessage = textDecoder.decode(message)

const authchainVerifyResult = await verify('get', '/', authChain, {
fetcher
})
const verifyResult = await verify('get', '/', JSON.parse(authChainMessage), {
fetcher
})
const address = normalizeAddress(verifyResult.auth)

const wsTransport = WebSocketTransport(socket)
logger.debug('addresss > ', { address })

logger.debug('addresss > ', { address: authchainVerifyResult.auth })
const eventEmitter = mitt<IUWebSocketEventMap>()
changeStage(data, { auth: true, address, eventEmitter, isConnected: true })

const address = normalizeAddress(authchainVerifyResult.auth)
const transport = UWebSocketTransport(ws, eventEmitter)

rpcServer.attachUser({ transport: wsTransport, address })
rpcServer.attachUser({ transport, address })

wsTransport.on('error', (err) => {
if (err && err.message) {
logger.error(err)
transport.on('error', (err) => {
if (err && err.message) {
logger.error(err)
}
})
} catch (error) {
console.log(error)
logger.error(error as any)
ws.close()
}
})
} catch (error) {
// rejects if timeout, invalid first message or authchain verify error
logger.error(error as Error)
ws.close()
}
},
close: (ws, code, _message) => {
logger.debug(`Websocket closed ${code}`)
const data = ws.getUserData()
if (data.auth) {
data.isConnected = false
data.eventEmitter.emit('close', code)
}
}
})
}

const sleep30Secs = () =>
new Promise((_resolve, reject) => {
setTimeout(() => reject(new Error('TIMEOUT_WAITING_FOR_AUTCHAIN')), 30000)
})
81 changes: 73 additions & 8 deletions src/controllers/routes.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,77 @@
import { Router } from '@well-known-components/http-server'
import { wsHandler } from './handlers/ws-handler'
import { GlobalContext } from '../types'
import {
HttpRequest,
HttpResponse,
createMetricsHandler,
onRequestEnd,
onRequestStart
} from '@well-known-components/uws-http-server'
import { AppComponents, IHandler, TestComponents } from '../types'
import { createStatusHandler } from './handlers/status-handler'
import { registerWsHandler } from './handlers/ws-handler'

// We return the entire router because it will be easier to test than a whole server
export async function setupRouter(_globalContext: GlobalContext): Promise<Router<GlobalContext>> {
const router = new Router<GlobalContext>()
export async function setupRoutes(components: AppComponents | TestComponents): Promise<void> {
const { metrics, server } = components

router.get('/', wsHandler)
function wrap(h: IHandler) {
return async (res: HttpResponse, req: HttpRequest) => {
const { labels, end } = onRequestStart(metrics, req.getMethod(), h.path)
let status = 500
try {
const result = await h.f(res, req)
status = result.status ?? 200
res.writeStatus(`${status}`)

return router
const headers = new Headers(result.headers ?? {})

if (!headers.has('Access-Control-Allow-Origin')) {
headers.set('Access-Control-Allow-Origin', '*')
}

headers.forEach((v, k) => res.writeHeader(k, v))

if (result.body === undefined) {
res.end()
} else if (typeof result.body === 'string') {
res.end(result.body)
} else {
res.writeHeader('content-type', 'application/json')
res.end(JSON.stringify(result.body))
}
} catch (err) {
res.writeStatus(`${status}`)
res.end()
} finally {
onRequestEnd(metrics, labels, status, end)
}
}
}

await registerWsHandler(components)

{
const handler = await createStatusHandler(components)
server.app.get(handler.path, wrap(handler))
}

{
const { path, handler } = await createMetricsHandler(components, metrics.registry!)
server.app.get(path, handler)
}

server.app.any('/health/live', (res, req) => {
const { end, labels } = onRequestStart(metrics, req.getMethod(), '/health/live')
res.writeStatus('200 OK')
res.writeHeader('Access-Control-Allow-Origin', '*')
res.end('alive')
onRequestEnd(metrics, labels, 404, end)
})

server.app.any('/*', (res, req) => {
const { end, labels } = onRequestStart(metrics, req.getMethod(), '')
res.writeStatus('404 Not Found')
res.writeHeader('Access-Control-Allow-Origin', '*')
res.writeHeader('content-type', 'application/json')
res.end(JSON.stringify({ error: 'Not Found' }))
onRequestEnd(metrics, labels, 404, end)
})
}
17 changes: 3 additions & 14 deletions src/service.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,12 @@
import { Lifecycle } from '@well-known-components/interfaces'
import { setupRouter } from './controllers/routes'
import { AppComponents, GlobalContext, TestComponents } from './types'
import { setupRoutes } from './controllers/routes'
import { AppComponents, TestComponents } from './types'

// this function wires the business logic (adapters & controllers) with the components (ports)
export async function main(program: Lifecycle.EntryPointParameters<AppComponents | TestComponents>) {
const { components, startComponents } = program
const globalContext: GlobalContext = {
components
}

// wire the HTTP router (make it automatic? TBD)
const router = await setupRouter(globalContext)
// register routes middleware
components.server.use(router.middleware())
// register not implemented/method not allowed/cors responses middleware
components.server.use(router.allowedMethods())
// set the context to be passed to the handlers
components.server.setContext(globalContext)
await setupRoutes(components)

// start ports: db, listeners, synchronizations, etc
await startComponents()
}
Loading
Loading