Skip to content

Commit

Permalink
ws & rpc connections
Browse files Browse the repository at this point in the history
  • Loading branch information
lauti7 committed Apr 10, 2024
1 parent fdd6b6b commit 143c702
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 6 deletions.
94 changes: 94 additions & 0 deletions src/adapters/rpcServer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { createRpcServer } from '@dcl/rpc'
import { registerService } from '@dcl/rpc/dist/codegen'
import {
FriendshipsServiceDefinition,
UsersResponse,
SubscribeFriendshipEventsUpdatesResponse
} from '@dcl/protocol/out-ts/decentraland/social/friendships_ea/friendships_ea.gen'
import { AppComponents, RpcServerContext } from '../types'

export default function createRpcServerComponent(components: Pick<AppComponents, 'logs'>) {
const { logs } = components

const server = createRpcServer<RpcServerContext>({
logger: logs.getLogger('rpc-server')
})

const _logger = logs.getLogger('rpc-server-handler')
// Mocked server until we get the new service definition done
server.setHandler(async function handler(port) {
registerService(port, FriendshipsServiceDefinition, async () => ({
getFriends(_request, _context) {
const generator = async function* () {
const response: UsersResponse = {
response: {
$case: 'users',
users: { users: [] }
}
}
yield response
}

return generator()
},
getMutualFriends(_request, _context) {
const generator = async function* () {
const response: UsersResponse = {
response: {
$case: 'users',
users: { users: [] }
}
}
yield response
}

return generator()
},
async getRequestEvents(_request, _context) {
return {
response: {
$case: 'events',
events: {
outgoing: { total: 0, items: [] },
incoming: { total: 0, items: [] }
}
}
}
},
async updateFriendshipEvent(_request, _context) {
return {
response: {
$case: 'event',
event: {
body: {
$case: 'accept',
accept: {
user: {
address: '0xA'
}
}
}
}
}
}
},
subscribeFriendshipEventsUpdates(_request, _context) {
const generator = async function* () {
const response: SubscribeFriendshipEventsUpdatesResponse = {
response: {
$case: 'events',
events: {
responses: []
}
}
}
yield response
}

return generator()
}
}))
})

return server
}
25 changes: 25 additions & 0 deletions src/adapters/ws.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { WebSocketServer } from 'ws'
import { IWebSocketComponent } from '../types'

export async function createWsComponent(): Promise<IWebSocketComponent> {
let wss: WebSocketServer | undefined

async function start() {
if (wss) return

wss = new WebSocketServer({ noServer: true })
}

async function stop() {
wss?.close()
wss = undefined
}

await start()

return {
start,
stop,
ws: wss!
}
}
15 changes: 13 additions & 2 deletions src/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,22 @@ import { AppComponents, GlobalContext } from './types'
import { metricDeclarations } from './metrics'
import { createPgComponent } from '@well-known-components/pg-component'
import { createDBComponent } from './adapters/db'
import { createWsComponent } from './adapters/ws'
import createRpcServerComponent from './adapters/rpcServer'
import { createFetchComponent } from '@well-known-components/fetch-component'

// Initialize all the components of the app
export async function initComponents(): Promise<AppComponents> {
const config = await createDotEnvConfigComponent({ path: ['.env.default', '.env'] })
const metrics = await createMetricsComponent(metricDeclarations, { config })
const logs = await createLogComponent({ metrics })
const server = await createServerComponent<GlobalContext>({ config, logs }, {})

const ws = await createWsComponent()
const server = await createServerComponent<GlobalContext>({ config, logs, ws: ws.ws }, {})
const statusChecks = await createStatusCheckComponent({ server, config })
const rpcServer = createRpcServerComponent({ logs })

const fetcher = createFetchComponent()

let databaseUrl: string | undefined = await config.getString('PG_COMPONENT_PSQL_CONNECTION_STRING')
if (!databaseUrl) {
Expand Down Expand Up @@ -54,6 +62,9 @@ export async function initComponents(): Promise<AppComponents> {
statusChecks,
metrics,
pg,
db
db,
ws,
rpcServer,
fetcher
}
}
80 changes: 80 additions & 0 deletions src/controllers/handlers/ws-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { IHttpServerComponent } from '@well-known-components/interfaces'
import { upgradeWebSocketResponse } from '@well-known-components/http-server/dist/ws'
import { WebSocket, MessageEvent } from 'ws'
import { WebSocketTransport } from '@dcl/rpc/dist/transports/WebSocket'
import future from 'fp-future'
import { verify } from '@dcl/platform-crypto-middleware'
import { GlobalContext } from '../../types'

export async function wsHandler(context: IHttpServerComponent.DefaultContext<GlobalContext>) {
const { logs, rpcServer, fetcher } = context.components
const logger = logs.getLogger('ws-handler')

return upgradeWebSocketResponse(async (socket) => {
let isAlive = true
const ws = socket as any as WebSocket
// it's needed bc of cloudflare
const pingInterval = setInterval(() => {
if (isAlive === false) {
logger.warn('terminating ws because of ping timeout')
return ws.terminate()
}
logger.debug('pinging websocket bc of cloudflare')
isAlive = false
ws.ping()
}, 30000)

ws.on('close', () => {
logger.debug('closing websocket')
clearInterval(pingInterval)
})

ws.on('pong', () => {
logger.debug('PONG')
isAlive = true
})

const authChainPromise = future()

function receiveAuthchainAsFirstMessage(event: MessageEvent) {
if (typeof event.data === 'string') {
authChainPromise.resolve(JSON.parse(event.data))
} else {
authChainPromise.reject(new Error('INVALID_MESSAGE'))
}
}

ws.addEventListener('message', receiveAuthchainAsFirstMessage)

try {
const authChain = await Promise.race([sleep30Secs(), authChainPromise])
ws.removeEventListener('message', receiveAuthchainAsFirstMessage)

const authchainVerifyResult = await verify('get', '/', authChain, {
fetcher,
expiration: 1000 * 240
})

const wsTransport = WebSocketTransport(socket)

logger.debug('addresss > ', { address: authchainVerifyResult.auth })

rpcServer.attachTransport(wsTransport, { components: context.components, address: authchainVerifyResult.auth })

wsTransport.on('error', (err) => {
if (err && err.message) {
logger.error(err)
}
})
} catch (error) {
// rejects if timeout, invalid first message or authchain verify error
logger.error(error as Error)
ws.close()
}
})
}

const sleep30Secs = () =>
new Promise((_resolve, reject) => {
setTimeout(() => reject(new Error('TIMEOUT_WAITING_FOR_AUTCHAIN')), 30000)
})
5 changes: 4 additions & 1 deletion src/controllers/routes.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { Router } from '@well-known-components/http-server'
import { wsHandler } from './handlers/ws-handler'
import { GlobalContext } from '../types'

// We return the entire router because it will be easier to test than a whole server
export async function setupRouter(globalContext: GlobalContext): Promise<Router<GlobalContext>> {
export async function setupRouter(_globalContext: GlobalContext): Promise<Router<GlobalContext>> {
const router = new Router<GlobalContext>()

router.get('/', wsHandler)

return router
}
6 changes: 3 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Lifecycle } from "@well-known-components/interfaces"
import { initComponents } from "./components"
import { main } from "./service"
import { Lifecycle } from '@well-known-components/interfaces'
import { initComponents } from './components'
import { main } from './service'

// This file is the program entry point, it only calls the Lifecycle function
Lifecycle.run({ main, initComponents })
11 changes: 11 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import type {
import { IPgComponent } from '@well-known-components/pg-component'
import { metricDeclarations } from './metrics'
import { IDatabaseComponent } from './adapters/db'
import { WebSocketServer } from 'ws'
import { RpcServer } from '@dcl/rpc'

export type GlobalContext = {
components: BaseComponents
Expand All @@ -22,6 +24,9 @@ export type BaseComponents = {
metrics: IMetricsComponent<keyof typeof metricDeclarations>
pg: IPgComponent
db: IDatabaseComponent
ws: IWebSocketComponent
rpcServer: RpcServer<RpcServerContext>
fetcher: IFetchComponent
}

// components used in runtime
Expand All @@ -47,3 +52,9 @@ export type HandlerContextWithPath<
>

export type Context<Path extends string = any> = IHttpServerComponent.PathAwareContext<GlobalContext, Path>

export type IWebSocketComponent = IBaseComponent & {
ws: WebSocketServer
}

export type RpcServerContext = GlobalContext & { address: string }

0 comments on commit 143c702

Please sign in to comment.