From 8c1db6e6af7ece5c8578ebfa350f0594cde5e454 Mon Sep 17 00:00:00 2001 From: lauti7 Date: Tue, 23 Apr 2024 15:39:49 -0300 Subject: [PATCH] components and business logic --- src/adapters/pubsub.ts | 57 +++++ src/adapters/rpcServer.ts | 318 +++++++++++++++++++++---- src/components.ts | 19 +- src/controllers/handlers/ws-handler.ts | 5 +- src/logic/friendships.ts | 174 ++++++++++++++ src/ports/redis.ts | 37 +++ src/types.ts | 73 +++++- src/utils/emitterToGenerator.ts | 58 +++++ 8 files changed, 688 insertions(+), 53 deletions(-) create mode 100644 src/adapters/pubsub.ts create mode 100644 src/logic/friendships.ts create mode 100644 src/ports/redis.ts create mode 100644 src/utils/emitterToGenerator.ts diff --git a/src/adapters/pubsub.ts b/src/adapters/pubsub.ts new file mode 100644 index 0000000..edcc862 --- /dev/null +++ b/src/adapters/pubsub.ts @@ -0,0 +1,57 @@ +import { IBaseComponent } from '@well-known-components/interfaces' +import { AppComponents, SubscriptionEventsEmitter } from '../types' + +const FRIENDSHIP_UPDATES_CHANNEL = 'FRIENDSHIP_UPDATES' + +export type IPubSubComponent = IBaseComponent & { + subscribeToFriendshipUpdates(cb: (message: string) => void): Promise + publishFriendshipUpdate(update: SubscriptionEventsEmitter['update']): Promise +} + +export default function createPubSubComponent(components: Pick): IPubSubComponent { + const { logs, redis } = components + const logger = logs.getLogger('pubsub-component') + + const subClient = redis.client.duplicate() + const pubClient = redis.client.duplicate() + + let friendshipUpdatesCb: (message: string) => void | undefined + + return { + async start() { + if (!subClient.isReady) { + await subClient.connect() + } + + if (!pubClient.isReady) { + await pubClient.connect() + } + }, + async stop() { + if (subClient.isReady) { + await subClient.disconnect() + } + + if (pubClient.isReady) { + await pubClient.disconnect() + } + }, + async subscribeToFriendshipUpdates(cb) { + try { + friendshipUpdatesCb = cb + await subClient.subscribe(FRIENDSHIP_UPDATES_CHANNEL, friendshipUpdatesCb) + } catch (error) { + logger.error(error as any) + } + }, + async publishFriendshipUpdate(update) { + try { + const message = JSON.stringify(update) + logger.debug('publishing update to FRIENDSHIP_UPDATES > ', { update: message }) + await pubClient.publish(FRIENDSHIP_UPDATES_CHANNEL, message) + } catch (error) { + logger.error(error as any) + } + } + } +} diff --git a/src/adapters/rpcServer.ts b/src/adapters/rpcServer.ts index 5c441f1..a05bb2f 100644 --- a/src/adapters/rpcServer.ts +++ b/src/adapters/rpcServer.ts @@ -1,74 +1,288 @@ -import { createRpcServer } from '@dcl/rpc' +import { Transport, createRpcServer } from '@dcl/rpc' +import { SocialServiceDefinition } from '@dcl/protocol/out-js/decentraland/social_service_v2/social_service.gen' import { registerService } from '@dcl/rpc/dist/codegen' +import mitt from 'mitt' +import { IBaseComponent } from '@well-known-components/interfaces' import { - FriendshipsServiceDefinition, - UsersResponse, - SubscribeFriendshipEventsUpdatesResponse, - RequestEventsResponse, - UpdateFriendshipResponse -} from '../friendships_ea' -import { AppComponents, RpcServerContext } from '../types' + Action, + AppComponents, + Friendship, + FriendshipStatus, + RpcServerContext, + SubscriptionEventsEmitter +} from '../types' +import { + getNewFriendshipStatus, + parseEmittedUpdateToFriendshipUpdate, + parseUpsertFriendshipRequest, + validateNewFriendshipAction +} from '../logic/friendships' +import emitterToAsyncGenerator from '../utils/emitterToGenerator' + +export type IRPCServerComponent = IBaseComponent & { + attachUser(user: { transport: Transport; address: string }): void +} + +const FRIENDSHIPS_COUNT_PAGE_STREAM = 20 -export default function createRpcServerComponent(components: Pick) { - const { logs } = components +const INTERNAL_SERVER_ERROR = 'SERVER ERROR' + +export default async function createRpcServerComponent( + components: Pick +): Promise { + const { logs, db, pubsub } = components + + const SHARED_CONTEXT: Pick = { + subscribers: {} + } const server = createRpcServer({ - logger: logs.getLogger('rpc-server') + logger: logs.getLogger('rpcserver') }) - const _logger = logs.getLogger('rpc-server-handler') - // Mocked server until we get the new service definition & db queries done + const logger = logs.getLogger('rpcserver-handler') + server.setHandler(async function handler(port) { - registerService(port, FriendshipsServiceDefinition, async () => ({ - getFriends(_request, _context) { + registerService(port, SocialServiceDefinition, async () => ({ + getFriends(_request, context) { + logger.debug('getting friends for ', { address: context.address }) + let friendsGenerator: AsyncGenerator | undefined + try { + friendsGenerator = db.getFriends(context.address) + } catch (error) { + logger.error(error as any) + // throw an error bc there is no sense to create a generator to send an error + // as it's done in the previous Social Service + throw new Error(INTERNAL_SERVER_ERROR) + } + const generator = async function* () { - const response: UsersResponse = { - users: { users: [] } + let users = [] + for await (const friendship of friendsGenerator) { + const { address_requested, address_requester } = friendship + if (context.address === address_requested) { + users.push({ address: address_requester }) + } else { + users.push({ address: address_requested }) + } + + if (users.length === FRIENDSHIPS_COUNT_PAGE_STREAM) { + const response = { + users: [...users] + } + users = [] + yield response + } + } + + if (users.length) { + const response = { + users + } + yield response } - yield response } return generator() }, - getMutualFriends(_request, _context) { + getMutualFriends(request, context) { + logger.debug(`getting mutual friends ${context.address}<>${request.user!.address}`) + let mutualFriends: AsyncGenerator<{ address: string }> | undefined + try { + mutualFriends = db.getMutualFriends(context.address, request.user!.address) + } catch (error) { + logger.error(error as any) + // throw an error bc there is no sense to create a generator to send an error + // as it's done in the previous Social Service + throw new Error(INTERNAL_SERVER_ERROR) + } + const generator = async function* () { - const response: UsersResponse = { - users: { users: [] } + const users = [] + for await (const friendship of mutualFriends) { + const { address } = friendship + users.push({ address }) + if (users.length === FRIENDSHIPS_COUNT_PAGE_STREAM) { + const response = { + users + } + yield response + } + } + + if (users.length) { + const response = { + users + } + yield response } - yield response } return generator() }, - async getRequestEvents(_request, _context) { - const res: RequestEventsResponse = { - events: { - outgoing: { items: [], total: 0 }, - incoming: { items: [], total: 0 } + async getPendingFriendshipRequests(_request, context) { + try { + const pendingRequests = await db.getReceivedFriendshipRequests(context.address) + const mappedRequestss = pendingRequests.map(({ address, timestamp, metadata }) => ({ + user: { address }, + createdAt: new Date(timestamp).getTime(), + message: metadata?.message || '' + })) + + return { + response: { + $case: 'requests', + requests: { + requests: mappedRequestss + } + } + } + } catch (error) { + logger.error(error as any) + return { + response: { + $case: 'internalServerError', + internalServerError: {} + } } } - return res }, - async updateFriendshipEvent(_request, _context) { - const res: UpdateFriendshipResponse = { - event: { - accept: { - user: { - address: '0xa' + async getSentFriendshipRequests(_request, context) { + try { + const pendingRequests = await db.getSentFriendshipRequests(context.address) + const mappedRequestss = pendingRequests.map(({ address, timestamp, metadata }) => ({ + user: { address }, + createdAt: new Date(timestamp).getTime(), + message: metadata?.message || '' + })) + + return { + response: { + $case: 'requests', + requests: { + requests: mappedRequestss } } } + } catch (error) { + logger.error(error as any) + return { + response: { + $case: 'internalServerError', + internalServerError: {} + } + } } - return res }, - subscribeFriendshipEventsUpdates(_request, _context) { + async upsertFriendship(request, context) { + const parsedRequest = parseUpsertFriendshipRequest(request) + if (!parsedRequest) { + logger.error('upsert friendship received unkwown message: ', request as any) + return { + response: { + $case: 'internalServerError', + internalServerError: {} + } + } + } + + logger.debug(`upsert friendship > `, parsedRequest as Record) + + try { + const friendship = await db.getFriendship([context.address, parsedRequest.user!]) + let lastAction = undefined + if (friendship) { + const lastRecordedAction = await db.getLastFriendshipAction(friendship.id) + lastAction = lastRecordedAction + } + + if ( + !validateNewFriendshipAction( + context.address, + { action: parsedRequest.action, user: parsedRequest.user }, + lastAction + ) + ) { + logger.error('invalid action for a friendship') + return { + response: { + $case: 'invalidFriendshipAction', + invalidFriendshipAction: {} + } + } + } + + const friendshipStatus = getNewFriendshipStatus(parsedRequest.action) + const isActive = friendshipStatus === FriendshipStatus.Friends + + logger.debug('friendshipstatus > ', { isActive: JSON.stringify(isActive), friendshipStatus }) + + const id = await db.executeTx(async (tx) => { + let id + if (friendship) { + await db.updateFriendshipStatus(friendship.id, isActive, tx) + id = friendship.id + } else { + const newFriendshipId = await db.createFriendship([context.address, parsedRequest.user!], isActive, tx) + id = newFriendshipId + } + + await db.recordFriendshipAction( + id, + context.address, + parsedRequest.action, + parsedRequest.action === Action.REQUEST ? parsedRequest.metadata : null, + tx + ) + return id + }) + + logger.debug(`${id} friendship was upserted successfully`) + + await pubsub.publishFriendshipUpdate({ + from: context.address, + to: parsedRequest.user, + action: parsedRequest.action, + timestamp: Date.now(), + metadata: + parsedRequest.action === Action.REQUEST + ? parsedRequest.metadata + ? parsedRequest.metadata + : undefined + : undefined + }) + + return { + response: { + $case: 'accepted', + accepted: {} + } + } + } catch (error) { + logger.error(error as any) + return { + response: { + $case: 'internalServerError', + internalServerError: {} + } + } + } + }, + subscribeToFriendshipUpdates(_request, context) { + const eventEmitter = mitt() + context.subscribers[context.address] = eventEmitter + const updatesGenerator = emitterToAsyncGenerator(eventEmitter, 'update') + const generator = async function* () { - const response: SubscribeFriendshipEventsUpdatesResponse = { - events: { - responses: [] + for await (const update of updatesGenerator) { + logger.debug('> friendship update received, sending: ', { update: update as any }) + const updateToResponse = parseEmittedUpdateToFriendshipUpdate(update) + if (updateToResponse) { + yield updateToResponse + } else { + logger.error('> unable to parse update to FriendshipUpdate > ', { update: update as any }) } } - yield response } return generator() @@ -76,5 +290,27 @@ export default function createRpcServerComponent(components: Pick { + try { + const update = JSON.parse(message) as SubscriptionEventsEmitter['update'] + const updateEmitter = SHARED_CONTEXT.subscribers[update.to] + if (updateEmitter) { + updateEmitter.emit('update', update) + } + } catch (error) { + logger.error(error as any) + } + }) + }, + attachUser({ transport, address }) { + transport.on('close', () => { + if (SHARED_CONTEXT.subscribers[address]) { + delete SHARED_CONTEXT.subscribers[address] + } + }) + server.attachTransport(transport, { subscribers: SHARED_CONTEXT.subscribers, address }) + } + } } diff --git a/src/components.ts b/src/components.ts index e2ed54e..600477d 100644 --- a/src/components.ts +++ b/src/components.ts @@ -7,13 +7,15 @@ import { } from '@well-known-components/http-server' import { createLogComponent } from '@well-known-components/logger' import { createMetricsComponent } from '@well-known-components/metrics' +import { createFetchComponent } from '@well-known-components/fetch-component' +import { createPgComponent } from '@well-known-components/pg-component' import { AppComponents, GlobalContext } from './types' import { metricDeclarations } from './metrics' -import { createPgComponent } from '@well-known-components/pg-component' import { createDBComponent } from './adapters/db' -import { createWsComponent } from './adapters/ws' +import { createWsComponent } from './ports/ws' import createRpcServerComponent from './adapters/rpcServer' -import { createFetchComponent } from '@well-known-components/fetch-component' +import createRedisComponent from './ports/redis' +import createPubSubComponent from './adapters/pubsub' // Initialize all the components of the app export async function initComponents(): Promise { @@ -24,7 +26,6 @@ export async function initComponents(): Promise { const ws = await createWsComponent() const server = await createServerComponent({ config, logs, ws: ws.ws }, {}) const statusChecks = await createStatusCheckComponent({ server, config }) - const rpcServer = createRpcServerComponent({ logs }) const fetcher = createFetchComponent() @@ -53,6 +54,10 @@ export async function initComponents(): Promise { const db = createDBComponent({ pg, logs }) + const redis = await createRedisComponent({ logs, config }) + const pubsub = createPubSubComponent({ logs, redis }) + const rpcServer = await createRpcServerComponent({ logs, db, pubsub }) + await instrumentHttpServerWithPromClientRegistry({ metrics, server, config, registry: metrics.registry! }) return { @@ -64,7 +69,9 @@ export async function initComponents(): Promise { pg, db, ws, - rpcServer, - fetcher + fetcher, + redis, + pubsub, + rpcServer } } diff --git a/src/controllers/handlers/ws-handler.ts b/src/controllers/handlers/ws-handler.ts index 2ac23d9..2dd140c 100644 --- a/src/controllers/handlers/ws-handler.ts +++ b/src/controllers/handlers/ws-handler.ts @@ -19,7 +19,6 @@ export async function wsHandler(context: IHttpServerComponent.DefaultContext ', { address: authchainVerifyResult.auth }) - rpcServer.attachTransport(wsTransport, { components: context.components, address: authchainVerifyResult.auth }) + const address = authchainVerifyResult.auth.toLowerCase() + + rpcServer.attachUser({ transport: wsTransport, address }) wsTransport.on('error', (err) => { if (err && err.message) { diff --git a/src/logic/friendships.ts b/src/logic/friendships.ts new file mode 100644 index 0000000..828d432 --- /dev/null +++ b/src/logic/friendships.ts @@ -0,0 +1,174 @@ +import { + FriendshipUpdate, + UpsertFriendshipPayload +} from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen' +import { + Action, + FRIENDSHIP_ACTION_TRANSITIONS, + FriendshipAction, + FriendshipStatus, + SubscriptionEventsEmitter +} from '../types' + +export function isFriendshipActionValid(from: Action | null, to: Action) { + return FRIENDSHIP_ACTION_TRANSITIONS[to].includes(from) +} + +export function isUserActionValid( + actingUser: string, + newAction: { action: Action; user: string }, + lastAction?: FriendshipAction +) { + if (!lastAction) { + if (newAction.action === Action.REQUEST && actingUser === newAction.user) return false + + return true + } + + if (lastAction.acting_user === actingUser) { + switch (newAction.action) { + case Action.ACCEPT: + case Action.REJECT: + return false + default: + return true + } + } else { + if (newAction.action === Action.CANCEL) return false + return true + } +} + +export function getNewFriendshipStatus(action: Action) { + switch (action) { + case Action.REQUEST: + return FriendshipStatus.Requested + case Action.ACCEPT: + return FriendshipStatus.Friends + case Action.CANCEL: + case Action.REJECT: + case Action.DELETE: + default: + return FriendshipStatus.NotFriends + } +} + +export function validateNewFriendshipAction( + actingUser: string, + newAction: { action: Action; user: string }, + lastAction?: FriendshipAction +): boolean { + if (!isFriendshipActionValid(lastAction?.action || null, newAction.action)) return false + return isUserActionValid(actingUser, newAction, lastAction) +} + +type CommonParsedRequest = { + action: A + user: string +} + +type ParsedUpsertFriendshipRequest = + | (CommonParsedRequest & { metadata: { message: string } | null }) + | CommonParsedRequest + | CommonParsedRequest + | CommonParsedRequest + | CommonParsedRequest + +export function parseUpsertFriendshipRequest(request: UpsertFriendshipPayload): ParsedUpsertFriendshipRequest | null { + switch (request.action?.$case) { + case 'accept': + return { + action: Action.ACCEPT, + user: request.action.accept.user!.address + } + case 'cancel': + return { + action: Action.CANCEL, + user: request.action.cancel.user!.address + } + case 'delete': + return { + action: Action.DELETE, + user: request.action.delete.user!.address + } + case 'reject': + return { + action: Action.REJECT, + user: request.action.reject.user!.address + } + case 'request': + return { + action: Action.REQUEST, + user: request.action.request.user!.address, + metadata: request.action.request.message ? { message: request.action.request.message } : null + } + default: + return null + } +} + +export function parseEmittedUpdateToFriendshipUpdate( + update: SubscriptionEventsEmitter['update'] +): FriendshipUpdate | null { + switch (update.action) { + case Action.REQUEST: + return { + update: { + $case: 'request', + request: { + createdAt: update.timestamp, + user: { + address: update.from + }, + message: update.metadata?.message + } + } + } + case Action.CANCEL: + return { + update: { + $case: 'cancel', + cancel: { + user: { + address: update.from + } + } + } + } + case Action.DELETE: + return { + update: { + $case: 'delete', + delete: { + user: { + address: update.from + } + } + } + } + case Action.REJECT: + return { + update: { + $case: 'reject', + reject: { + user: { + address: update.from + } + } + } + } + case Action.ACCEPT: + return { + update: { + $case: 'accept', + accept: { + user: { + address: update.from + } + } + } + } + default: + return null + } +} diff --git a/src/ports/redis.ts b/src/ports/redis.ts new file mode 100644 index 0000000..9828729 --- /dev/null +++ b/src/ports/redis.ts @@ -0,0 +1,37 @@ +import { createClient } from 'redis' +import { AppComponents } from '../types' +import { IBaseComponent } from '@well-known-components/interfaces' + +export interface IRedisComponent extends IBaseComponent { + client: ReturnType +} + +export default async function createRedisComponent( + components: Pick +): Promise { + const { logs, config } = components + const logger = logs.getLogger('redis-component') + const REDIS_URL = (await config.getString('REDIS_CONNECTION_STRING')) || `redis://127.0.0.1:6379` + + const client = createClient({ + url: REDIS_URL + }) + + client.on('error', (err) => { + logger.error(err) + }) + + async function start() { + await client.connect() + } + + async function stop() { + await client.disconnect() + } + + return { + client, + start, + stop + } +} diff --git a/src/types.ts b/src/types.ts index de35bc1..d9e5ed0 100644 --- a/src/types.ts +++ b/src/types.ts @@ -7,10 +7,13 @@ import type { IFetchComponent } from '@well-known-components/interfaces' import { IPgComponent } from '@well-known-components/pg-component' +import { WebSocketServer } from 'ws' +import { Emitter } from 'mitt' import { metricDeclarations } from './metrics' import { IDatabaseComponent } from './adapters/db' -import { WebSocketServer } from 'ws' -import { RpcServer } from '@dcl/rpc' +import { IRedisComponent } from './ports/redis' +import { IRPCServerComponent } from './adapters/rpcServer' +import { IPubSubComponent } from './adapters/pubsub' export type GlobalContext = { components: BaseComponents @@ -25,8 +28,10 @@ export type BaseComponents = { pg: IPgComponent db: IDatabaseComponent ws: IWebSocketComponent - rpcServer: RpcServer + rpcServer: IRPCServerComponent fetcher: IFetchComponent + redis: IRedisComponent + pubsub: IPubSubComponent } // components used in runtime @@ -57,4 +62,64 @@ export type IWebSocketComponent = IBaseComponent & { ws: WebSocketServer } -export type RpcServerContext = GlobalContext & { address: string } +export type RpcServerContext = { + address: string + subscribers: Record> +} + +export type Friendship = { + id: string + address_requester: string + address_requested: string + is_active: boolean + created_at: string + updated_at: string +} + +export enum Action { + REQUEST = 'request', // request a friendship + CANCEL = 'cancel', // cancel a friendship request + ACCEPT = 'accept', // accept a friendship request + REJECT = 'reject', // reject a friendship request + DELETE = 'delete' // delete a friendship +} + +// [to]: [from] +export const FRIENDSHIP_ACTION_TRANSITIONS: Record = { + [Action.REQUEST]: [Action.CANCEL, Action.REJECT, Action.DELETE, null], + [Action.ACCEPT]: [Action.REQUEST], + [Action.CANCEL]: [Action.REQUEST], + [Action.REJECT]: [Action.REQUEST], + [Action.DELETE]: [Action.ACCEPT] +} + +export type FriendshipAction = { + id: string + friendship_id: string + action: Action + acting_user: string + metadata?: Record + timestamp: string +} + +export enum FriendshipStatus { + Requested, + Friends, + NotFriends +} + +export type FriendshipRequest = { + address: string + timestamp: string + metadata: Record | null +} + +export type SubscriptionEventsEmitter = { + update: { + to: string + from: string + action: Action + timestamp: number + metadata?: { message: string } + } +} diff --git a/src/utils/emitterToGenerator.ts b/src/utils/emitterToGenerator.ts new file mode 100644 index 0000000..f94fd23 --- /dev/null +++ b/src/utils/emitterToGenerator.ts @@ -0,0 +1,58 @@ +import { EventType, Emitter } from 'mitt' + +/** + * Turns an `EventEmitter` into an `AsyncGenerator` + * @param emitter `Emitter` from `mitt` package + * @param event type of event to listen to + * @returns `AsyncGenerator` + */ +export default function emitterToAsyncGenerator, T extends keyof Events>( + emitter: Emitter, + event: T +): AsyncGenerator { + const isDone = false + const nextQueue: { + resolve: (value: IteratorResult | PromiseLike>) => void + reject: (reason?: any) => void + }[] = [] + const valueQueue: Events[T][] = [] + + function eventHandler(value: Events[T]) { + if (nextQueue.length > 0) { + const { resolve } = nextQueue.shift()! + resolve({ done: false, value }) + return + } + valueQueue.push(value) + } + + emitter.on(event, eventHandler) + + return { + [Symbol.asyncIterator]() { + return this + }, + async next() { + if (valueQueue.length) { + const value = valueQueue.shift()! + return { + done: isDone && valueQueue.length === 0, + value + } + } + + return new Promise((resolve, reject) => { + nextQueue.push({ resolve, reject }) + }) + }, + async return(value) { + return { + done: true, + value + } + }, + async throw(e) { + throw e + } + } +}