From 19b23c01357108e1e564c36076f8ac453ebe2a23 Mon Sep 17 00:00:00 2001 From: Kevin Szuchet <31735779+kevinszuchet@users.noreply.github.com> Date: Wed, 22 Jan 2025 16:57:34 -0300 Subject: [PATCH] feat: Stream connectivity status (#36) * chore: Remove double exlamation mark * feat: 1st approach of getting connected peers * fix: Include setup test in jest config * feat: Scheduler adapter to store connected peers in redis * feat: Memory cache or Redis * feat: Get Friends filter by connectivity status using Redis * refactor: Move component interfaces to root types file * chore: Remove unused imports * feat: GetFriends does not filter by connectivity status * test: Adapt tests * feat: Send id of the request to subscribe, get sent and get pending * feat: Peer Tracking + Subscribe to Friends Status Updates * test: Peer tracking tests * refactor: Rename of one of the base components * refactor: Reuse common queries * test: Archipelago Stats tests * test: Friend Updates Streamer tests * test: PubSub tests * style: Missing eof * chore: Add TODO * fix: Propagate error if the fetch to stats fails * refactor: Rename confusing db function to filterActiveFriendshipsFromAddresses * test: New friendship function test * chore: First README iteration * chore: Update DER * chore: Improve seq diagram * chore: More readme improvements * chore: Some fixes in the seq diag * chore: Add TODO comment * test: Tiny improvements on tests for more coverage --- .env.default | 1 + README.md | 229 ++++++++++++++++++ jest.config.js | 3 +- package.json | 4 +- src/adapters/archipelago-stats.ts | 34 +++ src/adapters/db.ts | 158 ++++++------ src/adapters/memory-cache.ts | 22 ++ src/adapters/peer-tracking.ts | 74 ++++++ src/adapters/peers-synchronizer.ts | 50 ++++ src/adapters/pubsub.ts | 34 +-- src/adapters/redis.ts | 60 ++++- src/adapters/rpc-server/rpc-server.ts | 88 ++++--- .../rpc-server/services/get-friends.ts | 17 +- .../services/get-friendship-status.ts | 2 +- .../rpc-server/services/get-mutual-friends.ts | 2 +- .../get-pending-friendship-requests.ts | 5 +- .../services/get-sent-friendship-requests.ts | 5 +- .../services/subscribe-to-friend-updates.ts | 49 ++++ .../subscribe-to-friendship-updates.ts | 8 +- .../rpc-server/services/upsert-friendship.ts | 12 +- src/components.ts | 20 +- src/controllers/handlers/ws-handler.ts | 4 +- src/logic/friendships.ts | 18 +- src/types.ts | 144 ++++++++--- src/utils/peers.ts | 1 + test/mocks/components/archipelago-stats.ts | 6 + test/mocks/components/db.ts | 4 +- test/mocks/components/fetcher.ts | 5 + test/mocks/components/index.ts | 4 + test/mocks/components/logs.ts | 2 - test/mocks/components/nats.ts | 15 ++ test/mocks/components/pg.ts | 1 - test/mocks/components/pubsub.ts | 6 +- test/mocks/components/redis.ts | 28 +++ test/mocks/friendship-request.ts | 14 +- test/unit/adapters/archipelago-stats.spec.ts | 49 ++++ test/unit/adapters/db.spec.ts | 110 ++++++++- test/unit/adapters/memory-cache.spec.ts | 11 + test/unit/adapters/peers-synchronizer.spec.ts | 78 ++++++ test/unit/adapters/pubsub.spec.ts | 104 ++++++++ test/unit/adapters/redis.spec.ts | 88 +++++++ test/unit/adapters/rpc-server.spec.ts | 24 +- .../rpc-server/services/get-friends.spec.ts | 21 +- .../services/get-friendship-status.spec.ts | 2 +- .../services/get-mutual-friends.spec.ts | 2 +- .../get-pending-friendship-requests.spec.ts | 14 +- .../get-sent-friendship-requests.spec.ts | 15 +- .../subscribe-to-friend-updates.spec.ts | 53 ++++ .../services/upsert-friendship.spec.ts | 7 +- test/unit/logic/friendships.spec.ts | 26 +- test/unit/peer-tracking.spec.ts | 73 ++++++ test/unit/utils/pagination.spec.ts | 2 +- yarn.lock | 56 ++++- 53 files changed, 1595 insertions(+), 269 deletions(-) create mode 100644 src/adapters/archipelago-stats.ts create mode 100644 src/adapters/memory-cache.ts create mode 100644 src/adapters/peer-tracking.ts create mode 100644 src/adapters/peers-synchronizer.ts create mode 100644 src/adapters/rpc-server/services/subscribe-to-friend-updates.ts create mode 100644 src/utils/peers.ts create mode 100644 test/mocks/components/archipelago-stats.ts create mode 100644 test/mocks/components/fetcher.ts create mode 100644 test/mocks/components/nats.ts create mode 100644 test/mocks/components/redis.ts create mode 100644 test/unit/adapters/archipelago-stats.spec.ts create mode 100644 test/unit/adapters/memory-cache.spec.ts create mode 100644 test/unit/adapters/peers-synchronizer.spec.ts create mode 100644 test/unit/adapters/pubsub.spec.ts create mode 100644 test/unit/adapters/redis.spec.ts create mode 100644 test/unit/adapters/rpc-server/services/subscribe-to-friend-updates.spec.ts create mode 100644 test/unit/peer-tracking.spec.ts diff --git a/.env.default b/.env.default index c107a35..19b70bf 100644 --- a/.env.default +++ b/.env.default @@ -13,3 +13,4 @@ HTTP_SERVER_HOST=0.0.0.0 # reset metrics at 00:00UTC WKC_METRICS_RESET_AT_NIGHT=false +NATS_URL=localhost:4222 diff --git a/README.md b/README.md index 09312b1..dfb2e03 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,232 @@ # EA Social Service [![Coverage Status](https://coveralls.io/repos/github/decentraland/social-service-ea/badge.svg)](https://coveralls.io/github/decentraland/social-service-ea) + +A microservice that handles social interactions (friendships) for Decentraland, built using the Well Known Components architecture pattern. + +## Table of Contents + +- [๐ŸŒŸ Features](#-features) +- [๐Ÿ— Architecture](#-architecture) + - [Component-Based Architecture](#component-based-architecture) + - [Database Design](#database-design) + - [Flow Diagrams](#flow-diagrams) +- [๐Ÿš€ Getting Started](#-getting-started) + - [Prerequisites](#prerequisites) + - [Local Development](#local-development) + - [Environment Variables](#environment-variables) +- [๐Ÿงช Testing](#-testing) + - [Test Coverage](#test-coverage) +- [๐Ÿ”„ CI/CD](#-ci/cd) + - [Deployment Environments](#deployment-environments) + +## ๐ŸŒŸ Features + +- Friendship management (requests, accepts, rejects, cancellations) +- Real-time friend status updates +- Mutual friends discovery +- Online status tracking +- Integration with Archipelago for peer synchronization + +## ๐Ÿ— Architecture + +### Component-Based Architecture + +This service follows the Well Known Components pattern, where each component is a self-contained unit with a clear interface. The main components are: + +- **Database (PostgreSQL)**: Stores friendship relationships and actions +- **Cache (Redis)**: Handles temporary data and real-time status +- **RPC Server**: Manages client-server RPC communication +- **PubSub**: Handles real-time updates +- **Archipelago Stats**: Integrates with Decentraland's peer discovery system +- **Peer Tracking**: Monitors online status of users through the NATS messaging system +- **Peers Synchronization**: Synchronizes peers with the Archipelago Stats service and store them in Redis + +### Database Design + +```plantuml +@startuml +!define table(x) class x << (T,#FFAAAA) >> +!define primary_key(x) x +!define foreign_key(x) #x# +hide methods +hide stereotypes + +table(friendships) { + primary_key(id): uuid + address_requester: varchar + address_requested: varchar + is_active: boolean + created_at: timestamp + updated_at: timestamp + -- + indexes + .. + hash(address_requester) + hash(address_requested) + btree(LOWER(address_requester)) + btree(LOWER(address_requested)) +} + +table(friendship_actions) { + primary_key(id): uuid + foreign_key(friendship_id): uuid + action: varchar + acting_user: varchar + metadata: jsonb + timestamp: timestamp + -- + indexes + .. + btree(friendship_id) +} + +friendships ||--|{ friendship_actions +@enduml +``` + +The database schema supports: + +- Bidirectional friendships +- Action history tracking +- Metadata for requests +- Optimized queries with proper indexes + +See migrations for details: [migrations](./src/migrations) + +### Flow Diagrams + +```mermaid +sequenceDiagram + participant Client + participant WebSocket + participant RPC Server + participant Redis + participant NATS + participant DB + + Note over Client,DB: Connection Setup + Client->>WebSocket: WS Handshake + activate WebSocket + WebSocket-->>Client: Connection Established + Client->>WebSocket: Auth Message + WebSocket->>RPC Server: Attach Transport + activate RPC Server + + Note over RPC Server,NATS: Subscriptions Setup + RPC Server->>Redis: Subscribe to updates channels + activate Redis + Note over Redis: friendship.updates + Note over Redis: friend.status.updates + RPC Server->>NATS: Subscribe to peer events + activate NATS + Note over NATS: peer.*.connected + Note over NATS: peer.*.disconnected + Note over NATS: peer.*.heartbeat + + Note over Client,DB: Friendship Request Flow + Client->>RPC Server: Friend Request + RPC Server->>DB: Create Friendship Record + DB-->>RPC Server: Friendship Created + RPC Server->>DB: Record Friendship Action + RPC Server->>Redis: Publish Friendship Update + RPC Server-->>Client: Request Confirmation + + Note over Client,DB: Friendship Updates Flow + Redis-->>RPC Server: Friendship Update Event + RPC Server-->>Client: Stream Friendship Updates + Note over RPC Server: (accept/cancel/reject/delete) + + Note over Client,DB: Friends Lifecycle + NATS-->>Redis: Peer Heartbeat + Redis-->>RPC Server: Friend Status Update + RPC Server->>Redis: Request Cached Peers + Redis-->>RPC Server: Cached Peers + RPC Server->>DB: Request Online Friends + DB-->>RPC Server: Online Friends + RPC Server->>DB: Query Online Friends + RPC Server-->>Client: Stream Friend Status Updates + Note over RPC Server: (online/offline) + + Note over Client,DB: Cleanup + Client->>WebSocket: Connection Close + WebSocket->>RPC Server: Detach Transport + RPC Server->>Redis: Unsubscribe + RPC Server->>NATS: Unsubscribe + deactivate WebSocket + deactivate RPC Server + deactivate Redis + deactivate NATS +``` + +## ๐Ÿš€ Getting Started + +### Prerequisites + +- Node.js v18.20.4 +- Docker and Docker Compose +- PostgreSQL +- Redis + +### Local Development + +1. Clone the repository +2. Install dependencies: + +```bash +yarn install +``` + +3. Start the development environment: + +```bash +docker-compose up -d +``` + +4. Run migrations: + +```bash +yarn migrate up +``` + +5. Run the service: + +```bash +yarn start +``` + +### Environment Variables + +Key environment variables needed: + +- `REDIS_HOST`: URL of the Redis instance +- `PG_COMPONENT_PSQL_CONNECTION_STRING`: URL of the PostgreSQL instance +- `ARCHIPELAGO_STATS_URL`: URL of the Archipelago Stats service + +See `.env.default` for all available options. + +## ๐Ÿงช Testing + +The project uses Jest for testing. Run tests with: + +```bash +yarn test +``` + +### Test Coverage + +Coverage reports are generated in the `/coverage` directory and uploaded to Coveralls. + +## ๐Ÿ”„ CI/CD + +The project uses GitHub Actions for: + +- Continuous Integration +- Docker image building +- Automated deployments to dev/prod environments +- Dependency management with Dependabot + +### Deployment Environments + +- **Development**: Automatic deployments on main branch +- **Production**: Manual deployments via GitHub releases diff --git a/jest.config.js b/jest.config.js index c50d4de..18b6a04 100644 --- a/jest.config.js +++ b/jest.config.js @@ -8,5 +8,6 @@ module.exports = { coverageDirectory: 'coverage', collectCoverageFrom: ['src/**/*.ts', 'src/**/*.js', '!src/migrations/**'], testMatch: ['**/*.spec.(ts)'], - testEnvironment: 'node' + testEnvironment: 'node', + setupFilesAfterEnv: ['/test/setupTests.ts'] } diff --git a/package.json b/package.json index a544e5c..84996c4 100644 --- a/package.json +++ b/package.json @@ -30,7 +30,7 @@ }, "dependencies": { "@dcl/platform-crypto-middleware": "^1.1.0", - "@dcl/protocol": "^1.0.0-12815643167.commit-c4162c4", + "@dcl/protocol": "https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12890706635.commit-a7e4210.tgz", "@dcl/rpc": "^1.1.2", "@well-known-components/env-config-provider": "^1.2.0", "@well-known-components/fetch-component": "^2.0.2", @@ -38,9 +38,11 @@ "@well-known-components/interfaces": "^1.4.3", "@well-known-components/logger": "^3.1.3", "@well-known-components/metrics": "^2.1.0", + "@well-known-components/nats-component": "^2.0.0", "@well-known-components/pg-component": "^0.2.2", "@well-known-components/uws-http-server": "^0.0.2", "fp-future": "^1.0.1", + "lru-cache": "^10.4.3", "mitt": "^3.0.1", "redis": "^4.6.13", "sql-template-strings": "^2.2.2", diff --git a/src/adapters/archipelago-stats.ts b/src/adapters/archipelago-stats.ts new file mode 100644 index 0000000..df41bf5 --- /dev/null +++ b/src/adapters/archipelago-stats.ts @@ -0,0 +1,34 @@ +import { AppComponents, IArchipelagoStatsComponent } from '../types' +import { PEERS_CACHE_KEY } from '../utils/peers' + +export async function createArchipelagoStatsComponent({ + logs, + config, + fetcher, + redis +}: Pick): Promise { + const logger = logs.getLogger('archipelago-stats-component') + const url = await config.getString('ARCHIPELAGO_STATS_URL') + + return { + async getPeers() { + try { + const response = await fetcher.fetch(`${url}/comms/peers`) + + if (!response.ok) { + throw new Error(`Error fetching peers: ${response.statusText}`) + } + + const { peers } = await response.json() + + return peers.map((peer: { id: string }) => peer.id) + } catch (error: any) { + logger.error(`Error fetching peers from archipelago stats: ${error.message}`) + throw error + } + }, + async getPeersFromCache() { + return (await redis.get(PEERS_CACHE_KEY)) || [] + } + } +} diff --git a/src/adapters/db.ts b/src/adapters/db.ts index 2695330..b4da8a5 100644 --- a/src/adapters/db.ts +++ b/src/adapters/db.ts @@ -1,59 +1,82 @@ import SQL, { SQLStatement } from 'sql-template-strings' import { randomUUID } from 'node:crypto' import { PoolClient } from 'pg' -import { Action, AppComponents, Friendship, FriendshipAction, FriendshipRequest, Mutual, Pagination } from '../types' +import { AppComponents, Friendship, FriendshipAction, FriendshipRequest, IDatabaseComponent, Friend } from '../types' import { FRIENDSHIPS_PER_PAGE } from './rpc-server/constants' -export interface IDatabaseComponent { - createFriendship( - users: [string, string], - isActive: boolean, - txClient?: PoolClient - ): Promise<{ - id: string - created_at: Date - }> - updateFriendshipStatus(friendshipId: string, isActive: boolean, txClient?: PoolClient): Promise - getFriends( - userAddress: string, - options?: { - pagination?: Pagination - onlyActive?: boolean - } - ): Promise - getFriendsCount( - userAddress: string, - options?: { - onlyActive?: boolean - } - ): Promise - getMutualFriends(userAddress1: string, userAddress2: string, pagination?: Pagination): Promise - getMutualFriendsCount(userAddress1: string, userAddress2: string): Promise - getFriendship(userAddresses: [string, string]): Promise - getLastFriendshipAction(friendshipId: string): Promise - getLastFriendshipActionByUsers(loggedUser: string, friendUser: string): Promise - recordFriendshipAction( - friendshipId: string, - actingUser: string, - action: Action, - metadata: Record | null, - txClient?: PoolClient - ): Promise - getReceivedFriendshipRequests(userAddress: string, pagination?: Pagination): Promise - getSentFriendshipRequests(userAddress: string, pagination?: Pagination): Promise - executeTx(cb: (client: PoolClient) => Promise): Promise -} - export function createDBComponent(components: Pick): IDatabaseComponent { const { pg, logs } = components const logger = logs.getLogger('db-component') + // TODO: abstract common statements in a util file + function getFriendsBaseQuery(userAddress: string) { + return SQL` + SELECT DISTINCT + CASE + WHEN address_requester = ${userAddress} THEN address_requested + ELSE address_requester + END as address + FROM friendships + WHERE (address_requester = ${userAddress} OR address_requested = ${userAddress})` + } + + function getFriendshipRequestBaseQuery(userAddress: string, type: 'sent' | 'received'): SQLStatement { + const columnMapping = { + sent: SQL` f.address_requested`, + received: SQL` f.address_requester` + } + const filterMapping = { + sent: SQL`f.address_requester`, + received: SQL`f.address_requested` + } + + const baseQuery = SQL`SELECT fa.id,` + baseQuery.append(columnMapping[type].append(', as address')) + baseQuery.append(SQL` + fa.timestamp, fa.metadata + FROM friendships f + INNER JOIN friendship_actions fa ON f.id = fa.friendship_id + WHERE + `) + + baseQuery.append(filterMapping[type].append(SQL` = ${userAddress}`)) + + baseQuery.append(SQL` + AND fa.action = 'request' + AND f.is_active IS FALSE + AND fa.timestamp = ( + SELECT MAX(fa2.timestamp) + FROM friendship_actions fa2 + WHERE fa2.friendship_id = fa.friendship_id + ) + ORDER BY fa.timestamp DESC + `) + + return baseQuery + } + + function filterActiveFriendshipsFromAddresses(userAddress: string, userAddresses: string[]) { + return SQL` + SELECT DISTINCT + CASE + WHEN address_requester = ${userAddress} THEN address_requested + ELSE address_requester + END as address + FROM friendships + WHERE ( + (address_requester = ${userAddress} AND address_requested = ANY(${userAddresses})) + OR + (address_requested = ${userAddress} AND address_requester = ANY(${userAddresses})) + ) + AND is_active = true` + } + return { async getFriends(userAddress, { onlyActive = true, pagination = { limit: FRIENDSHIPS_PER_PAGE, offset: 0 } } = {}) { const { limit, offset } = pagination - const query: SQLStatement = SQL`SELECT * FROM friendships WHERE (address_requester = ${userAddress} OR address_requested = ${userAddress})` + const query: SQLStatement = getFriendsBaseQuery(userAddress) if (onlyActive) { query.append(SQL` AND is_active = true`) @@ -61,7 +84,7 @@ export function createDBComponent(components: Pick query.append(SQL` ORDER BY created_at DESC OFFSET ${offset} LIMIT ${limit}`) - const result = await pg.query(query) + const result = await pg.query(query) return result.rows }, async getFriendsCount(userAddress, { onlyActive } = { onlyActive: true }) { @@ -76,7 +99,7 @@ export function createDBComponent(components: Pick }, async getMutualFriends(userAddress1, userAddress2, pagination = { limit: FRIENDSHIPS_PER_PAGE, offset: 0 }) { const { limit, offset } = pagination - const result = await pg.query( + const result = await pg.query( SQL`WITH friendsA as ( SELECT CASE @@ -244,32 +267,18 @@ export function createDBComponent(components: Pick await pg.query(query) } - return true + return uuid }, async getReceivedFriendshipRequests(userAddress, pagination) { const { limit, offset } = pagination || {} - const query = SQL` - SELECT f.address_requester as address, fa.timestamp, fa.metadata - FROM friendships f - INNER JOIN friendship_actions fa ON f.id = fa.friendship_id - WHERE - f.address_requested = ${userAddress} - AND fa.action = 'request' - AND f.is_active IS FALSE - AND fa.timestamp = ( - SELECT MAX(fa2.timestamp) - FROM friendship_actions fa2 - WHERE fa2.friendship_id = fa.friendship_id - ) - ORDER BY fa.timestamp DESC - ` + const query = getFriendshipRequestBaseQuery(userAddress, 'received') if (limit) { query.append(SQL` LIMIT ${limit}`) } - if (!!offset) { + if (offset) { query.append(SQL` OFFSET ${offset}`) } @@ -279,21 +288,7 @@ export function createDBComponent(components: Pick }, async getSentFriendshipRequests(userAddress, pagination) { const { limit, offset } = pagination || {} - const query = SQL` - SELECT f.address_requested as address, fa.timestamp, fa.metadata - FROM friendships f - INNER JOIN friendship_actions fa ON f.id = fa.friendship_id - WHERE - f.address_requester = ${userAddress} - AND fa.action = 'request' - AND f.is_active IS FALSE - AND fa.timestamp = ( - SELECT MAX(fa2.timestamp) - FROM friendship_actions fa2 - WHERE fa2.friendship_id = fa.friendship_id - ) - ORDER BY fa.timestamp DESC - ` + const query = getFriendshipRequestBaseQuery(userAddress, 'sent') if (limit) { query.append(SQL` LIMIT ${limit}`) @@ -307,6 +302,17 @@ export function createDBComponent(components: Pick return results.rows }, + streamOnlineFriends(userAddress: string, onlinePeers: string[]) { + const query: SQLStatement = filterActiveFriendshipsFromAddresses(userAddress, onlinePeers) + return pg.streamQuery(query) + }, + async getOnlineFriends(userAddress: string, potentialFriends: string[]) { + if (potentialFriends.length === 0) return [] + + const query: SQLStatement = filterActiveFriendshipsFromAddresses(userAddress, potentialFriends) + const results = await pg.query(query) + return results.rows + }, async executeTx(cb: (client: PoolClient) => Promise): Promise { const pool = pg.getPool() const client = await pool.connect() diff --git a/src/adapters/memory-cache.ts b/src/adapters/memory-cache.ts new file mode 100644 index 0000000..50c09ce --- /dev/null +++ b/src/adapters/memory-cache.ts @@ -0,0 +1,22 @@ +import { ICacheComponent } from '@well-known-components/interfaces' +import { LRUCache } from 'lru-cache' + +export function createInMemoryCacheComponent(): ICacheComponent { + const cache = new LRUCache({ + max: 1000, + ttl: 1000 * 60 * 60 * 2 // 2 hours + }) + + async function get(key: string): Promise { + return cache.get(key) + } + + async function put(key: string, value: any): Promise { + cache.set(key, value) + } + + return { + get, + put + } +} diff --git a/src/adapters/peer-tracking.ts b/src/adapters/peer-tracking.ts new file mode 100644 index 0000000..d0da90f --- /dev/null +++ b/src/adapters/peer-tracking.ts @@ -0,0 +1,74 @@ +import { Subscription } from '@well-known-components/nats-component' +import { IPeerTrackingComponent } from '../types' +import { AppComponents } from '../types' +import { ConnectivityStatus } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' +import { NatsMsg } from '@well-known-components/nats-component/dist/types' +import { FRIEND_STATUS_UPDATES_CHANNEL } from './pubsub' + +export type PeerStatusHandler = { + event: string + pattern: string + status: ConnectivityStatus +} + +export const PEER_STATUS_HANDLERS: PeerStatusHandler[] = [ + { event: 'connect', pattern: 'peer.*.connect', status: ConnectivityStatus.OFFLINE }, + { event: 'disconnect', pattern: 'peer.*.disconnect', status: ConnectivityStatus.OFFLINE }, + { event: 'heartbeat', pattern: 'peer.*.heartbeat', status: ConnectivityStatus.ONLINE } +] + +export function createPeerTrackingComponent({ + logs, + pubsub, + nats +}: Pick): IPeerTrackingComponent { + const logger = logs.getLogger('peer-tracking-component') + const subscriptions = new Map() + + async function notifyPeerStatusChange(peerId: string, status: ConnectivityStatus) { + try { + await pubsub.publishInChannel(FRIEND_STATUS_UPDATES_CHANNEL, { + address: peerId, + status + }) + } catch (error: any) { + logger.error('Error notifying peer status change:', { + error: error.message, + peerId, + status + }) + } + } + + function createMessageHandler(handler: PeerStatusHandler) { + return async (err: Error | null, message: NatsMsg) => { + if (err) { + logger.error(`Error processing peer ${handler.event} message:`, { + error: err.message, + pattern: handler.pattern + }) + return + } + + const peerId = message.subject.split('.')[1] + await notifyPeerStatusChange(peerId, handler.status) + } + } + + return { + async start() { + PEER_STATUS_HANDLERS.forEach((handler) => { + const subscription = nats.subscribe(handler.pattern, createMessageHandler(handler)) + subscriptions.set(handler.event, subscription) + }) + }, + async stop() { + subscriptions.forEach((subscription) => subscription.unsubscribe()) + subscriptions.clear() + }, + // Exposed for testing + getSubscriptions() { + return subscriptions + } + } +} diff --git a/src/adapters/peers-synchronizer.ts b/src/adapters/peers-synchronizer.ts new file mode 100644 index 0000000..8979504 --- /dev/null +++ b/src/adapters/peers-synchronizer.ts @@ -0,0 +1,50 @@ +import { AppComponents, IPeersSynchronizer } from '../types' +import { PEERS_CACHE_KEY } from '../utils/peers' + +export const FIVE_SECS_IN_MS = 5000 +export const TEN_SECS_IN_MS = 10000 + +export async function createPeersSynchronizerComponent({ + logs, + archipelagoStats, + redis, + config +}: Pick): Promise { + const logger = logs.getLogger('peers-synchronizer-component') + let intervalId: NodeJS.Timeout | null = null + const syncIntervalMs = (await config.getNumber('PEER_SYNC_INTERVAL_MS')) || FIVE_SECS_IN_MS + const cacheTTLInSeconds = Math.floor(((await config.getNumber('PEERS_SYNC_CACHE_TTL_MS')) || TEN_SECS_IN_MS) / 1000) + + async function syncPeers() { + try { + const currentPeers = await archipelagoStats.getPeers() + + await redis.put(PEERS_CACHE_KEY, JSON.stringify(currentPeers), { + EX: cacheTTLInSeconds + }) + + logger.debug('Synced peers to Redis', { + peersCount: Object.keys(currentPeers).length, + timestamp: Date.now() + }) + } catch (error: any) { + logger.error('Error syncing peers:', error) + } + } + + return { + async start() { + logger.info('Starting scheduler component', { syncIntervalMs }) + await syncPeers() + intervalId = setInterval(syncPeers, syncIntervalMs) + }, + + async stop() { + logger.info('Stopping scheduler component') + if (intervalId) { + clearInterval(intervalId) + intervalId = null + } + } + } +} diff --git a/src/adapters/pubsub.ts b/src/adapters/pubsub.ts index edcc862..05f7252 100644 --- a/src/adapters/pubsub.ts +++ b/src/adapters/pubsub.ts @@ -1,22 +1,15 @@ -import { IBaseComponent } from '@well-known-components/interfaces' -import { AppComponents, SubscriptionEventsEmitter } from '../types' +import { AppComponents, IPubSubComponent } from '../types' -const FRIENDSHIP_UPDATES_CHANNEL = 'FRIENDSHIP_UPDATES' +export const FRIENDSHIP_UPDATES_CHANNEL = 'friendship.updates' +export const FRIEND_STATUS_UPDATES_CHANNEL = 'friend.status.updates' -export type IPubSubComponent = IBaseComponent & { - subscribeToFriendshipUpdates(cb: (message: string) => void): Promise - publishFriendshipUpdate(update: SubscriptionEventsEmitter['update']): Promise -} - -export default function createPubSubComponent(components: Pick): IPubSubComponent { +export function createPubSubComponent(components: Pick): IPubSubComponent { const { logs, redis } = components const logger = logs.getLogger('pubsub-component') const subClient = redis.client.duplicate() const pubClient = redis.client.duplicate() - let friendshipUpdatesCb: (message: string) => void | undefined - return { async start() { if (!subClient.isReady) { @@ -36,21 +29,20 @@ export default function createPubSubComponent(components: Pick void) { try { - friendshipUpdatesCb = cb - await subClient.subscribe(FRIENDSHIP_UPDATES_CHANNEL, friendshipUpdatesCb) - } catch (error) { - logger.error(error as any) + await subClient.subscribe(channel, cb) + } catch (error: any) { + logger.error(`Error while subscribing to channel ${channel}: ${error.message}`) } }, - async publishFriendshipUpdate(update) { + async publishInChannel(channel: string, update: T) { try { const message = JSON.stringify(update) - logger.debug('publishing update to FRIENDSHIP_UPDATES > ', { update: message }) - await pubClient.publish(FRIENDSHIP_UPDATES_CHANNEL, message) - } catch (error) { - logger.error(error as any) + logger.debug(`Publishing update to channel ${channel}:`, { update: message }) + await pubClient.publish(channel, message) + } catch (error: any) { + logger.error(`Error while publishing update to channel ${channel}: ${error.message}`) } } } diff --git a/src/adapters/redis.ts b/src/adapters/redis.ts index d8eb424..0e97fcb 100644 --- a/src/adapters/redis.ts +++ b/src/adapters/redis.ts @@ -1,14 +1,11 @@ -import { createClient } from 'redis' -import { AppComponents } from '../types' -import { IBaseComponent } from '@well-known-components/interfaces' +import { createClient, SetOptions } from 'redis' +import { AppComponents, IRedisComponent, ICacheComponent } from '../types' -export interface IRedisComponent extends IBaseComponent { - client: ReturnType -} +const TWO_HOURS_IN_SECONDS = 60 * 60 * 2 -export default async function createRedisComponent( +export async function createRedisComponent( components: Pick -): Promise { +): Promise { const { logs, config } = components const logger = logs.getLogger('redis-component') const REDIS_HOST = (await config.getString('REDIS_HOST')) || '127.0.0.1' @@ -24,16 +21,57 @@ export default async function createRedisComponent( }) async function start() { - await client.connect() + try { + logger.debug('Connecting to Redis', { url }) + await client.connect() + logger.debug('Successfully connected to Redis') + } catch (err: any) { + logger.error('Error connecting to Redis', err) + throw err + } } async function stop() { - await client.disconnect() + try { + logger.debug('Disconnecting from Redis') + await client.disconnect() + logger.debug('Successfully disconnected from Redis') + } catch (err: any) { + logger.error('Error disconnecting from Redis', err) + } + } + + async function get(key: string): Promise { + try { + const serializedValue = await client.get(key) + if (serializedValue) { + return JSON.parse(serializedValue) as T + } + return null + } catch (err: any) { + logger.error(`Error getting key "${key}"`, err) + throw err + } + } + + async function put(key: string, value: T, options?: SetOptions): Promise { + try { + const serializedValue = JSON.stringify(value) + await client.set(key, serializedValue, { + EX: options?.EX || TWO_HOURS_IN_SECONDS + }) + logger.debug(`Successfully set key "${key}"`) + } catch (err: any) { + logger.error(`Error setting key "${key}"`, err) + throw err + } } return { client, start, - stop + stop, + get, + put } } diff --git a/src/adapters/rpc-server/rpc-server.ts b/src/adapters/rpc-server/rpc-server.ts index 5a057cf..5cd7bc3 100644 --- a/src/adapters/rpc-server/rpc-server.ts +++ b/src/adapters/rpc-server/rpc-server.ts @@ -1,34 +1,38 @@ -import { Transport, createRpcServer } from '@dcl/rpc' +import { createRpcServer } from '@dcl/rpc' import { registerService } from '@dcl/rpc/dist/codegen' -import { IBaseComponent } from '@well-known-components/interfaces' -import { AppComponents, RpcServerContext, SubscriptionEventsEmitter } from '../../types' +import { AppComponents, IRPCServerComponent, RpcServerContext, SubscriptionEventsEmitter } from '../../types' import { getFriendsService } from './services/get-friends' import { getMutualFriendsService } from './services/get-mutual-friends' import { getPendingFriendshipRequestsService } from './services/get-pending-friendship-requests' import { upsertFriendshipService } from './services/upsert-friendship' import { subscribeToFriendshipUpdatesService } from './services/subscribe-to-friendship-updates' -import { SocialServiceDefinition } from '@dcl/protocol/out-js/decentraland/social_service/v3/social_service_v3.gen' +import { SocialServiceDefinition } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' import { getSentFriendshipRequestsService } from './services/get-sent-friendship-requests' import { getFriendshipStatusService } from './services/get-friendship-status' +import { subscribeToFriendUpdatesService } from './services/subscribe-to-friend-updates' +import { FRIEND_STATUS_UPDATES_CHANNEL, FRIENDSHIP_UPDATES_CHANNEL } from '../pubsub' -export type IRPCServerComponent = IBaseComponent & { - attachUser(user: { transport: Transport; address: string }): void -} - -export async function createRpcServerComponent( - components: Pick -): Promise { - const { logs, db, pubsub, config, server } = components - +export async function createRpcServerComponent({ + logs, + db, + pubsub, + config, + server, + archipelagoStats +}: Pick< + AppComponents, + 'logs' | 'db' | 'pubsub' | 'config' | 'server' | 'nats' | 'archipelagoStats' | 'redis' +>): Promise { + // TODO: this should be a redis if we want to have more than one instance of the server const SHARED_CONTEXT: Pick = { subscribers: {} } const rpcServer = createRpcServer({ - logger: logs.getLogger('rpcServer') + logger: logs.getLogger('rpc-server') }) - const logger = logs.getLogger('rpcServer-handler') + const logger = logs.getLogger('rpc-server-handler') const rpcServerPort = (await config.getNumber('RPC_SERVER_PORT')) || 8085 @@ -37,8 +41,44 @@ export async function createRpcServerComponent( const getPendingFriendshipRequests = getPendingFriendshipRequestsService({ components: { logs, db } }) const getSentFriendshipRequests = getSentFriendshipRequestsService({ components: { logs, db } }) const upsertFriendship = upsertFriendshipService({ components: { logs, db, pubsub } }) - const subscribeToFriendshipUpdates = subscribeToFriendshipUpdatesService({ components: { logs } }) const getFriendshipStatus = getFriendshipStatusService({ components: { logs, db } }) + const subscribeToFriendshipUpdates = subscribeToFriendshipUpdatesService({ components: { logs } }) + const subscribeToFriendUpdates = subscribeToFriendUpdatesService({ + components: { logs, db, archipelagoStats } + }) + + function handleFriendshipUpdate(message: string) { + try { + const update = JSON.parse(message) as SubscriptionEventsEmitter['friendshipUpdate'] + const updateEmitter = SHARED_CONTEXT.subscribers[update.to] + if (updateEmitter) { + updateEmitter.emit('friendshipUpdate', update) + } + } catch (error: any) { + logger.error(`Error handling friendship update: ${error.message}`, { + message + }) + } + } + + async function handleFriendStatusUpdate(message: string) { + try { + // TODO: this may be a problem if the user has a lot of friends or there are a lot of users online + const update = JSON.parse(message) as SubscriptionEventsEmitter['friendStatusUpdate'] + const friends = await db.getOnlineFriends(update.address, Object.keys(SHARED_CONTEXT.subscribers)) + + friends.forEach(({ address: friendAddress }) => { + const emitter = SHARED_CONTEXT.subscribers[friendAddress] + if (emitter) { + emitter.emit('friendStatusUpdate', update) + } + }) + } catch (error: any) { + logger.error(`Error handling friend status update: ${error.message}`, { + message + }) + } + } rpcServer.setHandler(async function handler(port) { registerService(port, SocialServiceDefinition, async () => ({ @@ -48,7 +88,8 @@ export async function createRpcServerComponent( getSentFriendshipRequests, getFriendshipStatus, upsertFriendship, - subscribeToFriendshipUpdates + subscribeToFriendshipUpdates, + subscribeToFriendUpdates })) }) @@ -58,17 +99,8 @@ export async function createRpcServerComponent( logger.info(`[RPC] RPC Server listening on port ${rpcServerPort}`) }) - await pubsub.subscribeToFriendshipUpdates((message) => { - try { - const update = JSON.parse(message) as SubscriptionEventsEmitter['update'] - const updateEmitter = SHARED_CONTEXT.subscribers[update.to] - if (updateEmitter) { - updateEmitter.emit('update', update) - } - } catch (error) { - logger.error(error as any) - } - }) + await pubsub.subscribeToChannel(FRIENDSHIP_UPDATES_CHANNEL, handleFriendshipUpdate) + await pubsub.subscribeToChannel(FRIEND_STATUS_UPDATES_CHANNEL, handleFriendStatusUpdate) }, attachUser({ transport, address }) { transport.on('close', () => { diff --git a/src/adapters/rpc-server/services/get-friends.ts b/src/adapters/rpc-server/services/get-friends.ts index 577b246..f61654a 100644 --- a/src/adapters/rpc-server/services/get-friends.ts +++ b/src/adapters/rpc-server/services/get-friends.ts @@ -1,30 +1,27 @@ -import { Friendship, RpcServerContext, RPCServiceContext } from '../../../types' +import { RpcServerContext, RPCServiceContext } from '../../../types' import { getPage } from '../../../utils/pagination' import { FRIENDSHIPS_PER_PAGE, INTERNAL_SERVER_ERROR } from '../constants' import { GetFriendsPayload, PaginatedUsersResponse -} from '@dcl/protocol/out-js/decentraland/social_service/v3/social_service_v3.gen' +} from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' export function getFriendsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) { const logger = logs.getLogger('get-friends-service') return async function (request: GetFriendsPayload, context: RpcServerContext): Promise { - const { pagination, status: _status } = request + const { pagination } = request const { address: loggedUserAddress } = context + try { + // TODO: can use the getPeersFromCache to get the online friends and sort online friends first const [friends, total] = await Promise.all([ db.getFriends(loggedUserAddress, { pagination }), db.getFriendsCount(loggedUserAddress) ]) - // TODO: retrieve peers and filter by connectivity status - // connecting to NATS and maintaining the same logic as stats/peers - return { - users: friends.map((friend) => ({ - address: friend.address_requested === loggedUserAddress ? friend.address_requester : friend.address_requested - })), + users: friends, paginationData: { total, page: getPage(pagination?.limit || FRIENDSHIPS_PER_PAGE, pagination?.offset) @@ -32,8 +29,6 @@ export function getFriendsService({ components: { logs, db } }: RPCServiceContex } } catch (error) { logger.error(error as any) - // throw an error bc there is no sense to create a generator to send an error - // as it's done in the previous Social Service throw new Error(INTERNAL_SERVER_ERROR) } } diff --git a/src/adapters/rpc-server/services/get-friendship-status.ts b/src/adapters/rpc-server/services/get-friendship-status.ts index 4fd74b8..8d6c7e4 100644 --- a/src/adapters/rpc-server/services/get-friendship-status.ts +++ b/src/adapters/rpc-server/services/get-friendship-status.ts @@ -3,7 +3,7 @@ import { RpcServerContext, RPCServiceContext } from '../../../types' import { GetFriendshipStatusPayload, GetFriendshipStatusResponse -} from '@dcl/protocol/out-js/decentraland/social_service/v3/social_service_v3.gen' +} from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' export function getFriendshipStatusService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) { const logger = logs.getLogger('get-sent-friendship-requests-service') diff --git a/src/adapters/rpc-server/services/get-mutual-friends.ts b/src/adapters/rpc-server/services/get-mutual-friends.ts index 6c5a672..6da62d5 100644 --- a/src/adapters/rpc-server/services/get-mutual-friends.ts +++ b/src/adapters/rpc-server/services/get-mutual-friends.ts @@ -3,7 +3,7 @@ import { INTERNAL_SERVER_ERROR, FRIENDSHIPS_PER_PAGE } from '../constants' import { GetMutualFriendsPayload, PaginatedUsersResponse -} from '@dcl/protocol/out-js/decentraland/social_service/v3/social_service_v3.gen' +} from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' import { normalizeAddress } from '../../../utils/address' import { getPage } from '../../../utils/pagination' diff --git a/src/adapters/rpc-server/services/get-pending-friendship-requests.ts b/src/adapters/rpc-server/services/get-pending-friendship-requests.ts index 02f1dd8..7b241e3 100644 --- a/src/adapters/rpc-server/services/get-pending-friendship-requests.ts +++ b/src/adapters/rpc-server/services/get-pending-friendship-requests.ts @@ -2,7 +2,7 @@ import { RpcServerContext, RPCServiceContext } from '../../../types' import { PaginatedFriendshipRequestsResponse, GetFriendshipRequestsPayload -} from '@dcl/protocol/out-js/decentraland/social_service/v3/social_service_v3.gen' +} from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' export function getPendingFriendshipRequestsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) { const logger = logs.getLogger('get-pending-friendship-requests-service') @@ -13,7 +13,8 @@ export function getPendingFriendshipRequestsService({ components: { logs, db } } ): Promise { try { const pendingRequests = await db.getReceivedFriendshipRequests(context.address, request.pagination) - const mappedRequests = pendingRequests.map(({ address, timestamp, metadata }) => ({ + const mappedRequests = pendingRequests.map(({ id, address, timestamp, metadata }) => ({ + id, user: { address }, createdAt: new Date(timestamp).getTime(), message: metadata?.message || '' diff --git a/src/adapters/rpc-server/services/get-sent-friendship-requests.ts b/src/adapters/rpc-server/services/get-sent-friendship-requests.ts index db63503..5b984bd 100644 --- a/src/adapters/rpc-server/services/get-sent-friendship-requests.ts +++ b/src/adapters/rpc-server/services/get-sent-friendship-requests.ts @@ -2,7 +2,7 @@ import { RpcServerContext, RPCServiceContext } from '../../../types' import { PaginatedFriendshipRequestsResponse, GetFriendshipRequestsPayload -} from '@dcl/protocol/out-js/decentraland/social_service/v3/social_service_v3.gen' +} from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' export function getSentFriendshipRequestsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) { const logger = logs.getLogger('get-sent-friendship-requests-service') @@ -13,7 +13,8 @@ export function getSentFriendshipRequestsService({ components: { logs, db } }: R ): Promise { try { const pendingRequests = await db.getSentFriendshipRequests(context.address, request.pagination) - const mappedRequests = pendingRequests.map(({ address, timestamp, metadata }) => ({ + const mappedRequests = pendingRequests.map(({ id, address, timestamp, metadata }) => ({ + id, user: { address }, createdAt: new Date(timestamp).getTime(), message: metadata?.message || '' diff --git a/src/adapters/rpc-server/services/subscribe-to-friend-updates.ts b/src/adapters/rpc-server/services/subscribe-to-friend-updates.ts new file mode 100644 index 0000000..625ea2f --- /dev/null +++ b/src/adapters/rpc-server/services/subscribe-to-friend-updates.ts @@ -0,0 +1,49 @@ +import { Empty } from '@dcl/protocol/out-js/google/protobuf/empty.gen' +import { SubscriptionEventsEmitter, RpcServerContext, RPCServiceContext } from '../../../types' +import { + FriendUpdate, + ConnectivityStatus +} from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' +import mitt from 'mitt' +import emitterToAsyncGenerator from '../../../utils/emitterToGenerator' +import { parseEmittedUpdateToFriendStatusUpdate } from '../../../logic/friendships' + +export function subscribeToFriendUpdatesService({ + components: { logs, db, archipelagoStats } +}: RPCServiceContext<'logs' | 'db' | 'archipelagoStats'>) { + const logger = logs.getLogger('subscribe-to-friend-updates-service') + + return async function* (_request: Empty, context: RpcServerContext): AsyncGenerator { + try { + const eventEmitter = context.subscribers[context.address] || mitt() + + if (!context.subscribers[context.address]) { + context.subscribers[context.address] = eventEmitter + } + + const onlinePeers = await archipelagoStats.getPeersFromCache() + const onlineFriends = db.streamOnlineFriends(context.address, onlinePeers) + + for await (const friend of onlineFriends) { + yield { + user: { address: friend.address }, + status: ConnectivityStatus.ONLINE + } + } + + const updatesGenerator = emitterToAsyncGenerator(eventEmitter, 'friendStatusUpdate') + for await (const update of updatesGenerator) { + logger.debug('Friend status update received:', { update: JSON.stringify(update) }) + const updateToResponse = parseEmittedUpdateToFriendStatusUpdate(update) + if (updateToResponse) { + yield updateToResponse + } else { + logger.error('Unable to parse friend status update: ', { update: JSON.stringify(update) }) + } + } + } catch (error: any) { + logger.error('Error in friend updates subscription:', error) + throw error + } + } +} diff --git a/src/adapters/rpc-server/services/subscribe-to-friendship-updates.ts b/src/adapters/rpc-server/services/subscribe-to-friendship-updates.ts index b9f3d2a..3a5ab42 100644 --- a/src/adapters/rpc-server/services/subscribe-to-friendship-updates.ts +++ b/src/adapters/rpc-server/services/subscribe-to-friendship-updates.ts @@ -1,6 +1,6 @@ import { Empty } from '@dcl/protocol/out-js/google/protobuf/empty.gen' import { RpcServerContext, RPCServiceContext, SubscriptionEventsEmitter } from '../../../types' -import { FriendshipUpdate } from '@dcl/protocol/out-js/decentraland/social_service/v3/social_service_v3.gen' +import { FriendshipUpdate } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' import mitt from 'mitt' import { parseEmittedUpdateToFriendshipUpdate } from '../../../logic/friendships' import emitterToAsyncGenerator from '../../../utils/emitterToGenerator' @@ -15,15 +15,15 @@ export function subscribeToFriendshipUpdatesService({ components: { logs } }: RP context.subscribers[context.address] = eventEmitter } - const updatesGenerator = emitterToAsyncGenerator(eventEmitter, 'update') + const updatesGenerator = emitterToAsyncGenerator(eventEmitter, 'friendshipUpdate') for await (const update of updatesGenerator) { - logger.debug('> friendship update received, sending: ', { update: update as any }) + logger.debug('Friendship update received:', { update: JSON.stringify(update) }) const updateToResponse = parseEmittedUpdateToFriendshipUpdate(update) if (updateToResponse) { yield updateToResponse } else { - logger.error('> unable to parse update to FriendshipUpdate > ', { update: update as any }) + logger.error('Unable to parse friendship update: ', { update: JSON.stringify(update) }) } } } diff --git a/src/adapters/rpc-server/services/upsert-friendship.ts b/src/adapters/rpc-server/services/upsert-friendship.ts index 658184e..b392017 100644 --- a/src/adapters/rpc-server/services/upsert-friendship.ts +++ b/src/adapters/rpc-server/services/upsert-friendship.ts @@ -2,12 +2,13 @@ import { Action, FriendshipStatus, RpcServerContext, RPCServiceContext } from '. import { UpsertFriendshipPayload, UpsertFriendshipResponse -} from '@dcl/protocol/out-js/decentraland/social_service/v3/social_service_v3.gen' +} from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' import { parseUpsertFriendshipRequest, validateNewFriendshipAction, getNewFriendshipStatus } from '../../../logic/friendships' +import { FRIENDSHIP_UPDATES_CHANNEL } from '../../pubsub' export function upsertFriendshipService({ components: { logs, db, pubsub } @@ -60,7 +61,7 @@ export function upsertFriendshipService({ logger.debug('friendship status > ', { isActive: JSON.stringify(isActive), friendshipStatus }) - const { id, createdAt } = await db.executeTx(async (tx) => { + const { id, actionId, createdAt } = await db.executeTx(async (tx) => { let id: string, createdAt: number if (friendship) { @@ -77,7 +78,7 @@ export function upsertFriendshipService({ createdAt = new Date(created_at).getTime() } - await db.recordFriendshipAction( + const actionId = await db.recordFriendshipAction( id, context.address, parsedRequest.action, @@ -85,12 +86,13 @@ export function upsertFriendshipService({ tx ) - return { id, createdAt } + return { id, actionId, createdAt } }) logger.debug(`${id} friendship was upsert successfully`) - await pubsub.publishFriendshipUpdate({ + await pubsub.publishInChannel(FRIENDSHIP_UPDATES_CHANNEL, { + id: actionId, from: context.address, to: parsedRequest.user, action: parsedRequest.action, diff --git a/src/components.ts b/src/components.ts index d7d44b2..feb4713 100644 --- a/src/components.ts +++ b/src/components.ts @@ -8,9 +8,13 @@ import { AppComponents } from './types' import { metricDeclarations } from './metrics' import { createDBComponent } from './adapters/db' import { createRpcServerComponent } from './adapters/rpc-server' -import createRedisComponent from './adapters/redis' -import createPubSubComponent from './adapters/pubsub' +import { createRedisComponent } from './adapters/redis' +import { createPubSubComponent } from './adapters/pubsub' import { createUWsComponent } from '@well-known-components/uws-http-server' +import { createArchipelagoStatsComponent } from './adapters/archipelago-stats' +import { createPeersSynchronizerComponent } from './adapters/peers-synchronizer' +import { createNatsComponent } from '@well-known-components/nats-component' +import { createPeerTrackingComponent } from './adapters/peer-tracking' // Initialize all the components of the app export async function initComponents(): Promise { @@ -49,7 +53,11 @@ export async function initComponents(): Promise { const redis = await createRedisComponent({ logs, config }) const pubsub = createPubSubComponent({ logs, redis }) - const rpcServer = await createRpcServerComponent({ logs, db, pubsub, server, config }) + const archipelagoStats = await createArchipelagoStatsComponent({ logs, config, fetcher, redis }) + const nats = await createNatsComponent({ logs, config }) + const rpcServer = await createRpcServerComponent({ logs, db, pubsub, server, config, nats, archipelagoStats, redis }) + const peersSynchronizer = await createPeersSynchronizerComponent({ logs, archipelagoStats, redis, config }) + const peerTracking = createPeerTrackingComponent({ logs, pubsub, nats }) return { config, @@ -61,6 +69,10 @@ export async function initComponents(): Promise { fetcher, redis, pubsub, - rpcServer + rpcServer, + archipelagoStats, + peersSynchronizer, + nats, + peerTracking } } diff --git a/src/controllers/handlers/ws-handler.ts b/src/controllers/handlers/ws-handler.ts index b000c85..c3c049d 100644 --- a/src/controllers/handlers/ws-handler.ts +++ b/src/controllers/handlers/ws-handler.ts @@ -42,7 +42,7 @@ export async function registerWsHandler( if (isNotAuthenticated(data)) { data.timeout = setTimeout(() => { try { - logger.error('closing connection, no authchain received') + logger.error('closing connection, no auth chain received') ws.end() } catch (err) {} }, 30000) @@ -66,7 +66,7 @@ export async function registerWsHandler( }) const address = normalizeAddress(verifyResult.auth) - logger.debug('addresss > ', { address }) + logger.debug('address > ', { address }) const eventEmitter = mitt() changeStage(data, { auth: true, address, eventEmitter, isConnected: true }) diff --git a/src/logic/friendships.ts b/src/logic/friendships.ts index 9027ccc..9c4ec74 100644 --- a/src/logic/friendships.ts +++ b/src/logic/friendships.ts @@ -1,8 +1,9 @@ import { FriendshipUpdate, UpsertFriendshipPayload, - FriendshipStatus as FriendshipRequestStatus -} from '@dcl/protocol/out-js/decentraland/social_service/v3/social_service_v3.gen' + FriendshipStatus as FriendshipRequestStatus, + FriendUpdate +} from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' import { Action, FRIENDSHIP_ACTION_TRANSITIONS, @@ -123,7 +124,7 @@ export function parseUpsertFriendshipRequest(request: UpsertFriendshipPayload): } export function parseEmittedUpdateToFriendshipUpdate( - update: SubscriptionEventsEmitter['update'] + update: SubscriptionEventsEmitter['friendshipUpdate'] ): FriendshipUpdate | null { switch (update.action) { case Action.REQUEST: @@ -131,6 +132,7 @@ export function parseEmittedUpdateToFriendshipUpdate( update: { $case: 'request', request: { + id: update.id, createdAt: update.timestamp, user: { address: update.from @@ -188,6 +190,16 @@ export function parseEmittedUpdateToFriendshipUpdate( } } +export function parseEmittedUpdateToFriendStatusUpdate({ + address, + status +}: SubscriptionEventsEmitter['friendStatusUpdate']): FriendUpdate | null { + return { + user: { address }, + status: status + } +} + export function getFriendshipRequestStatus( { action, acting_user }: FriendshipAction, loggedUserAddress: string diff --git a/src/types.ts b/src/types.ts index c526cfe..8151559 100644 --- a/src/types.ts +++ b/src/types.ts @@ -4,18 +4,20 @@ import type { IHttpServerComponent, IBaseComponent, IMetricsComponent, - IFetchComponent + IFetchComponent, + ICacheComponent as IBaseCacheComponent } from '@well-known-components/interfaces' import { IPgComponent } from '@well-known-components/pg-component' import { WebSocketServer } from 'ws' import { Emitter } from 'mitt' import { metricDeclarations } from './metrics' -import { IDatabaseComponent } from './adapters/db' -import { IRedisComponent } from './adapters/redis' -import { IRPCServerComponent } from './adapters/rpc-server/rpc-server' -import { IPubSubComponent } from './adapters/pubsub' import { HttpRequest, HttpResponse, IUWsComponent, WebSocket } from '@well-known-components/uws-http-server' import { IUWebSocketEventMap } from './utils/UWebSocketTransport' +import { Transport } from '@dcl/rpc' +import { PoolClient } from 'pg' +import { createClient, SetOptions } from 'redis' +import { INatsComponent, Subscription } from '@well-known-components/nats-component/dist/types' +import { ConnectivityStatus } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' export type GlobalContext = { components: BaseComponents @@ -31,8 +33,12 @@ export type BaseComponents = { db: IDatabaseComponent rpcServer: IRPCServerComponent fetcher: IFetchComponent - redis: IRedisComponent + redis: IRedisComponent & ICacheComponent pubsub: IPubSubComponent + archipelagoStats: IArchipelagoStatsComponent + peersSynchronizer: IPeersSynchronizer + nats: INatsComponent + peerTracking: IPeerTrackingComponent } // components used in runtime @@ -44,6 +50,89 @@ export type TestComponents = BaseComponents & { localFetch: IFetchComponent } +export type IRPCServerComponent = IBaseComponent & { + attachUser(user: { transport: Transport; address: string }): void +} +export interface IDatabaseComponent { + createFriendship( + users: [string, string], + isActive: boolean, + txClient?: PoolClient + ): Promise<{ + id: string + created_at: Date + }> + updateFriendshipStatus(friendshipId: string, isActive: boolean, txClient?: PoolClient): Promise + getFriends( + userAddress: string, + options?: { + pagination?: Pagination + onlyActive?: boolean + } + ): Promise + getFriendsCount( + userAddress: string, + options?: { + onlyActive?: boolean + } + ): Promise + getMutualFriends(userAddress1: string, userAddress2: string, pagination?: Pagination): Promise + getMutualFriendsCount(userAddress1: string, userAddress2: string): Promise + getFriendship(userAddresses: [string, string]): Promise + getLastFriendshipAction(friendshipId: string): Promise + getLastFriendshipActionByUsers(loggedUser: string, friendUser: string): Promise + recordFriendshipAction( + friendshipId: string, + actingUser: string, + action: Action, + metadata: Record | null, + txClient?: PoolClient + ): Promise + getReceivedFriendshipRequests(userAddress: string, pagination?: Pagination): Promise + getSentFriendshipRequests(userAddress: string, pagination?: Pagination): Promise + streamOnlineFriends(userAddress: string, onlinePeers: string[]): AsyncGenerator + getOnlineFriends(userAddress: string, potentialFriends: string[]): Promise + executeTx(cb: (client: PoolClient) => Promise): Promise +} +export interface IRedisComponent extends IBaseComponent { + client: ReturnType +} + +export interface ICacheComponent extends IBaseCacheComponent { + get: (key: string) => Promise + put: (key: string, value: T, options?: SetOptions) => Promise +} + +export type IPubSubComponent = IBaseComponent & { + subscribeToChannel(channel: string, cb: (message: string) => void): Promise + publishInChannel(channel: string, update: T): Promise +} + +export type IArchipelagoStatsComponent = IBaseComponent & { + getPeers(): Promise + getPeersFromCache(): Promise +} + +export type IPeersSynchronizer = IBaseComponent +export type IPeerTrackingComponent = IBaseComponent & { + getSubscriptions(): Map +} + +// this type simplifies the typings of http handlers +export type HandlerContextWithPath< + ComponentNames extends keyof AppComponents, + Path extends string = any +> = IHttpServerComponent.PathAwareContext< + IHttpServerComponent.DefaultContext<{ + components: Pick + }>, + Path +> + +export type IWebSocketComponent = IBaseComponent & { + ws: WebSocketServer +} + export type JsonBody = Record export type ResponseBody = JsonBody | string @@ -79,26 +168,28 @@ export type RPCServiceContext = { components: Pick } -// this type simplifies the typings of http handlers -export type HandlerContextWithPath< - ComponentNames extends keyof AppComponents, - Path extends string = any -> = IHttpServerComponent.PathAwareContext< - IHttpServerComponent.DefaultContext<{ - components: Pick - }>, - Path -> - export type Context = IHttpServerComponent.PathAwareContext -export type IWebSocketComponent = IBaseComponent & { - ws: WebSocketServer +export type SubscriptionEventsEmitter = { + friendshipUpdate: { + id: string + to: string + from: string + action: Action + timestamp: number + metadata?: { message: string } + } + friendStatusUpdate: { + address: string + status: ConnectivityStatus + } } +export type Subscribers = Record> + export type RpcServerContext = { address: string - subscribers: Record> + subscribers: Subscribers } export type Friendship = { @@ -110,7 +201,7 @@ export type Friendship = { updated_at: string } -export type Mutual = { +export type Friend = { address: string } @@ -147,21 +238,12 @@ export enum FriendshipStatus { } export type FriendshipRequest = { + id: string address: string timestamp: string metadata: Record | null } -export type SubscriptionEventsEmitter = { - update: { - to: string - from: string - action: Action - timestamp: number - metadata?: { message: string } - } -} - export type Pagination = { limit: number offset: number diff --git a/src/utils/peers.ts b/src/utils/peers.ts new file mode 100644 index 0000000..2c605ff --- /dev/null +++ b/src/utils/peers.ts @@ -0,0 +1 @@ +export const PEERS_CACHE_KEY = 'connected-peers' diff --git a/test/mocks/components/archipelago-stats.ts b/test/mocks/components/archipelago-stats.ts new file mode 100644 index 0000000..4a6c2c1 --- /dev/null +++ b/test/mocks/components/archipelago-stats.ts @@ -0,0 +1,6 @@ +import { IArchipelagoStatsComponent } from '../../../src/types' + +export const mockArchipelagoStats: jest.Mocked = { + getPeers: jest.fn(), + getPeersFromCache: jest.fn() +} diff --git a/test/mocks/components/db.ts b/test/mocks/components/db.ts index 7c9a105..3008188 100644 --- a/test/mocks/components/db.ts +++ b/test/mocks/components/db.ts @@ -1,10 +1,12 @@ -import { IDatabaseComponent } from '../../../src/adapters/db' +import { IDatabaseComponent } from '../../../src/types' export const mockDb: jest.Mocked = { createFriendship: jest.fn(), updateFriendshipStatus: jest.fn(), getFriends: jest.fn(), getFriendsCount: jest.fn(), + streamOnlineFriends: jest.fn(), + getOnlineFriends: jest.fn(), getMutualFriends: jest.fn(), getMutualFriendsCount: jest.fn(), getFriendship: jest.fn(), diff --git a/test/mocks/components/fetcher.ts b/test/mocks/components/fetcher.ts new file mode 100644 index 0000000..0d9e5de --- /dev/null +++ b/test/mocks/components/fetcher.ts @@ -0,0 +1,5 @@ +import { IFetchComponent } from '@well-known-components/interfaces' + +export const mockFetcher: jest.Mocked = { + fetch: jest.fn() +} diff --git a/test/mocks/components/index.ts b/test/mocks/components/index.ts index f8808c1..e5dfeda 100644 --- a/test/mocks/components/index.ts +++ b/test/mocks/components/index.ts @@ -4,3 +4,7 @@ export * from './pubsub' export * from './pg' export * from './config' export * from './uws' +export * from './redis' +export * from './archipelago-stats' +export * from './nats' +export * from './fetcher' diff --git a/test/mocks/components/logs.ts b/test/mocks/components/logs.ts index 0235740..ad53e69 100644 --- a/test/mocks/components/logs.ts +++ b/test/mocks/components/logs.ts @@ -1,6 +1,4 @@ -import { IMetricsComponent } from '@well-known-components/interfaces' import { ILoggerComponent } from '@well-known-components/interfaces/dist/components/logger' -import { createLogComponent } from '@well-known-components/logger' export const mockLogs: jest.Mocked = { getLogger: jest.fn().mockReturnValue({ diff --git a/test/mocks/components/nats.ts b/test/mocks/components/nats.ts new file mode 100644 index 0000000..071d847 --- /dev/null +++ b/test/mocks/components/nats.ts @@ -0,0 +1,15 @@ +import { INatsComponent, NatsEvents } from '@well-known-components/nats-component/dist/types' +import { Emitter } from 'mitt' + +export const mockNats: jest.Mocked = { + subscribe: jest.fn().mockReturnValue({ unsubscribe: jest.fn() }), + publish: jest.fn(), + start: jest.fn(), + stop: jest.fn(), + events: jest.fn().mockReturnValue({ + all: jest.fn(), + on: jest.fn(), + off: jest.fn(), + emit: jest.fn() + }) as unknown as jest.Mocked> +} diff --git a/test/mocks/components/pg.ts b/test/mocks/components/pg.ts index d341699..33d234f 100644 --- a/test/mocks/components/pg.ts +++ b/test/mocks/components/pg.ts @@ -1,5 +1,4 @@ import { IPgComponent } from '@well-known-components/pg-component' -import { release } from 'os' export const mockPg: jest.Mocked = { streamQuery: jest.fn(), diff --git a/test/mocks/components/pubsub.ts b/test/mocks/components/pubsub.ts index 95f6485..d20ae80 100644 --- a/test/mocks/components/pubsub.ts +++ b/test/mocks/components/pubsub.ts @@ -1,8 +1,8 @@ -import { IPubSubComponent } from '../../../src/adapters/pubsub' +import { IPubSubComponent } from '../../../src/types' export const mockPubSub: jest.Mocked = { start: jest.fn(), stop: jest.fn(), - subscribeToFriendshipUpdates: jest.fn(), - publishFriendshipUpdate: jest.fn() + subscribeToChannel: jest.fn(), + publishInChannel: jest.fn() } diff --git a/test/mocks/components/redis.ts b/test/mocks/components/redis.ts new file mode 100644 index 0000000..a2d1a80 --- /dev/null +++ b/test/mocks/components/redis.ts @@ -0,0 +1,28 @@ +import { ICacheComponent, IRedisComponent } from '../../../src/types' +import { createClient } from 'redis' + +jest.mock('redis', () => { + const mockClient = { + on: jest.fn(), + connect: jest.fn(), + disconnect: jest.fn(), + get: jest.fn(), + set: jest.fn(), + del: jest.fn(), + quit: jest.fn(), + subscribe: jest.fn(), + publish: jest.fn() + } + return { + createClient: jest.fn().mockReturnValue({ + ...mockClient, + duplicate: jest.fn().mockReturnValue(mockClient) + }) + } +}) + +export const mockRedis: jest.Mocked = { + client: createClient(), + get: jest.fn(), + put: jest.fn() +} diff --git a/test/mocks/friendship-request.ts b/test/mocks/friendship-request.ts index 8b5cc6a..095cb95 100644 --- a/test/mocks/friendship-request.ts +++ b/test/mocks/friendship-request.ts @@ -4,10 +4,12 @@ import { FriendshipRequest } from '../../src/types' * Creates a mock friendship request from given parameters. */ export const createMockFriendshipRequest = ( + id: string, address: string, timestamp: string, message?: string ): FriendshipRequest => ({ + id, address, timestamp, metadata: message ? { message } : undefined @@ -16,8 +18,14 @@ export const createMockFriendshipRequest = ( /** * Creates the expected mapped response for a friendship request. */ -export const createMockExpectedFriendshipRequest = (address: string, createdAt: string, message: string) => ({ +export const createMockExpectedFriendshipRequest = ( + id: string, + address: string, + createdAt?: string, + message?: string +) => ({ + id, user: { address }, - createdAt: new Date(createdAt).getTime(), - message + createdAt: createdAt ? new Date(createdAt).getTime() : new Date(createdAt).getTime(), + message: message || '' }) diff --git a/test/unit/adapters/archipelago-stats.spec.ts b/test/unit/adapters/archipelago-stats.spec.ts new file mode 100644 index 0000000..347c6f6 --- /dev/null +++ b/test/unit/adapters/archipelago-stats.spec.ts @@ -0,0 +1,49 @@ +import { json } from 'stream/consumers' +import { createArchipelagoStatsComponent } from '../../../src/adapters/archipelago-stats' +import { IArchipelagoStatsComponent } from '../../../src/types' +import { mockConfig, mockFetcher, mockLogs, mockRedis } from '../../mocks/components' + +describe('ArchipelagoStatsComponent', () => { + let archipelagoStats: IArchipelagoStatsComponent + + beforeEach(async () => { + archipelagoStats = await createArchipelagoStatsComponent({ + logs: mockLogs, + redis: mockRedis, + config: mockConfig, + fetcher: mockFetcher + }) + }) + + describe('getPeers', () => { + it('should return online peers when the fetch is successful', async () => { + mockFetcher.fetch.mockResolvedValue({ + ok: true, + json: jest.fn().mockResolvedValue({ + peers: [{ id: '0x123' }, { id: '0x456' }] + }) + } as any) + const result = await archipelagoStats.getPeers() + expect(result).toEqual(['0x123', '0x456']) + }) + + it('should throw an error when the fetch fails', async () => { + mockFetcher.fetch.mockRejectedValue(new Error('Fetch failed')) + await expect(archipelagoStats.getPeers()).rejects.toThrow('Fetch failed') + }) + }) + + describe('getPeersFromCache', () => { + it('should return cached peers', async () => { + mockRedis.get.mockResolvedValue(['0x123', '0x456']) + const result = await archipelagoStats.getPeersFromCache() + expect(result).toEqual(['0x123', '0x456']) + }) + + it('should return an empty array when no peers are cached', async () => { + mockRedis.get.mockResolvedValue(null) + const result = await archipelagoStats.getPeersFromCache() + expect(result).toEqual([]) + }) + }) +}) diff --git a/test/unit/adapters/db.spec.ts b/test/unit/adapters/db.spec.ts index 98f291c..da72bed 100644 --- a/test/unit/adapters/db.spec.ts +++ b/test/unit/adapters/db.spec.ts @@ -1,7 +1,11 @@ import { createDBComponent } from '../../../src/adapters/db' import { Action } from '../../../src/types' -import SQL from 'sql-template-strings' -import { mockDb, mockLogs, mockPg } from '../../mocks/components' +import SQL, { SQLStatement } from 'sql-template-strings' +import { mockLogs, mockPg } from '../../mocks/components' + +jest.mock('node:crypto', () => ({ + randomUUID: jest.fn().mockReturnValue('mock-uuid') +})) describe('db', () => { let dbComponent: ReturnType @@ -17,11 +21,50 @@ describe('db', () => { ] mockPg.query.mockResolvedValueOnce({ rows: mockFriends, rowCount: mockFriends.length }) - const result = await dbComponent.getFriends('0x123', { onlyActive: true }) + const userAddress = '0x123' + + const result = await dbComponent.getFriends(userAddress, { onlyActive: true }) + + const expectedFragmentsOfTheQuery = [ + { + text: 'WHEN address_requester =', + values: ['0x123'] + }, + { + text: 'WHERE (address_requester =', + values: ['0x123'] + }, + { + text: 'OR address_requested =', + values: ['0x123'] + }, + { + text: 'AND is_active = true', + values: [] + }, + { + text: 'ORDER BY created_at DESC', + values: [] + }, + { + text: 'OFFSET', + values: [expect.any(Number)] + }, + { + text: 'LIMIT', + values: [expect.any(Number)] + } + ] + + expectedFragmentsOfTheQuery.forEach(({ text, values }) => { + expect(mockPg.query).toHaveBeenCalledWith( + expect.objectContaining({ + text: expect.stringContaining(text), + values: expect.arrayContaining(values) + }) + ) + }) - expect(mockPg.query).toHaveBeenCalledWith( - SQL`SELECT * FROM friendships WHERE (address_requester = ${'0x123'} OR address_requested = ${'0x123'}) AND is_active = true ORDER BY created_at DESC OFFSET ${expect.any(Number)} LIMIT ${expect.any(Number)}` - ) expect(result).toEqual(mockFriends) }) @@ -34,7 +77,9 @@ describe('db', () => { const result = await dbComponent.getFriends('0x123', { onlyActive: false }) expect(mockPg.query).toHaveBeenCalledWith( - SQL`SELECT * FROM friendships WHERE (address_requester = ${'0x123'} OR address_requested = ${'0x123'}) ORDER BY created_at DESC OFFSET ${expect.any(Number)} LIMIT ${expect.any(Number)}` + expect.not.objectContaining({ + text: expect.stringContaining('AND is_active = true') + }) ) expect(result).toEqual(mockFriends) }) @@ -201,6 +246,7 @@ describe('db', () => { it('should retrieve received friendship requests', async () => { const mockRequests = [ { + id: expect.any(String), address: '0x123', timestamp: '2025-01-01T00:00:00.000Z', metadata: { message: 'Hello' } @@ -226,6 +272,7 @@ describe('db', () => { it('should retrieve sent friendship requests', async () => { const mockRequests = [ { + id: expect.any(String), address: '0x456', timestamp: '2025-01-01T00:00:00.000Z', metadata: { message: 'Hi there' } @@ -236,6 +283,12 @@ describe('db', () => { const result = await dbComponent.getSentFriendshipRequests('0x123', { limit: 10, offset: 5 }) expect(result).toEqual(mockRequests) + expect(mockPg.query).toHaveBeenCalledWith( + expect.objectContaining({ + text: expect.stringContaining('f.address_requester ='), + values: expect.arrayContaining(['0x123']) + }) + ) expectPaginatedQueryToHaveBeenCalledWithProperLimitAndOffset(10, 5) }) }) @@ -253,7 +306,7 @@ describe('db', () => { mockClient ) - expect(result).toBe(true) + expect(result).toBe('mock-uuid') expect(withTxClient ? mockClient.query : mockPg.query).toHaveBeenCalledWith( expect.objectContaining({ text: expect.stringContaining( @@ -271,6 +324,47 @@ describe('db', () => { }) }) + describe('areFriendsOf', () => { + it('should return empty array for empty potential friends', async () => { + const result = await dbComponent.getOnlineFriends('0x123', []) + expect(result).toEqual([]) + expect(mockPg.query).not.toHaveBeenCalled() + }) + + it('should query friendships for potential friends', async () => { + const mockResult = { + rows: [{ address: '0x456' }, { address: '0x789' }], + rowCount: 2 + } + mockPg.query.mockResolvedValueOnce(mockResult) + + const potentialFriends = ['0x456', '0x789', '0x999'] + const result = await dbComponent.getOnlineFriends('0x123', potentialFriends) + + const queryExpectations = [ + { text: 'address_requester =', values: ['0x123'] }, + { text: 'AND address_requested = ANY(' }, + { text: 'address_requested =', values: ['0x123'] }, + { text: 'address_requester = ANY(' } + ] + + queryExpectations.forEach(({ text, values }) => { + expect(mockPg.query).toHaveBeenCalledWith( + expect.objectContaining({ + text: expect.stringContaining(text) + }) + ) + if (values) { + expect(mockPg.query).toHaveBeenCalledWith( + expect.objectContaining({ + values: expect.arrayContaining(values) + }) + ) + } + }) + }) + }) + describe('executeTx', () => { it('should execute a transaction successfully', async () => { const result = await dbComponent.executeTx(async (client) => { diff --git a/test/unit/adapters/memory-cache.spec.ts b/test/unit/adapters/memory-cache.spec.ts new file mode 100644 index 0000000..43b41f9 --- /dev/null +++ b/test/unit/adapters/memory-cache.spec.ts @@ -0,0 +1,11 @@ +import { createInMemoryCacheComponent } from '../../../src/adapters/memory-cache' + +describe('Memory Cache', () => { + const cache = createInMemoryCacheComponent() + + it('should get and put values in the cache', async () => { + await cache.put('key', 'value') + const value = await cache.get('key') + expect(value).toBe('value') + }) +}) diff --git a/test/unit/adapters/peers-synchronizer.spec.ts b/test/unit/adapters/peers-synchronizer.spec.ts new file mode 100644 index 0000000..2f63b78 --- /dev/null +++ b/test/unit/adapters/peers-synchronizer.spec.ts @@ -0,0 +1,78 @@ +import { + createPeersSynchronizerComponent, + FIVE_SECS_IN_MS, + TEN_SECS_IN_MS +} from '../../../src/adapters/peers-synchronizer' +import { mockLogs, mockRedis, mockArchipelagoStats, mockConfig } from '../../mocks/components' +import { IPeersSynchronizer } from '../../../src/types' +import { PEERS_CACHE_KEY } from '../../../src/utils/peers' + +describe('peers-synchronizer', () => { + let scheduler: IPeersSynchronizer + + beforeEach(async () => { + scheduler = await createPeersSynchronizerComponent({ + logs: mockLogs, + archipelagoStats: mockArchipelagoStats, + redis: mockRedis, + config: mockConfig + }) + + jest.useFakeTimers() + }) + + afterEach(() => { + jest.useRealTimers() + }) + + it('should sync peers on start', async () => { + const mockPeers = ['0x123', '0x456'] + mockArchipelagoStats.getPeers.mockResolvedValueOnce(mockPeers) + + await scheduler.start({} as any) + await scheduler.stop() + + expect(mockArchipelagoStats.getPeers).toHaveBeenCalled() + expect(mockRedis.put).toHaveBeenCalledWith( + PEERS_CACHE_KEY, + JSON.stringify(mockPeers), + expect.objectContaining({ EX: TEN_SECS_IN_MS / 1000 }) + ) + }) + + it('should sync peers periodically', async () => { + const mockPeers = ['0x123'] + mockArchipelagoStats.getPeers.mockResolvedValue(mockPeers) + + await scheduler.start({} as any) + + // Advance timer to trigger next sync + jest.advanceTimersByTime(FIVE_SECS_IN_MS) + + await scheduler.stop() + + expect(mockArchipelagoStats.getPeers).toHaveBeenCalledTimes(2) + expect(mockRedis.put).toHaveBeenCalledTimes(2) + }) + + it('should stop syncing when stopped', async () => { + mockArchipelagoStats.getPeers.mockResolvedValue([]) + + await scheduler.start({} as any) + await scheduler.stop() + + jest.advanceTimersByTime(FIVE_SECS_IN_MS) + + // Should only have the initial sync + expect(mockArchipelagoStats.getPeers).toHaveBeenCalledTimes(1) + }) + + it('should handle errors gracefully', async () => { + mockArchipelagoStats.getPeers.mockRejectedValue(new Error('Network error')) + + await scheduler.start({} as any) + await scheduler.stop() + + expect(mockLogs.getLogger('scheduler-component').error).toHaveBeenCalled() + }) +}) diff --git a/test/unit/adapters/pubsub.spec.ts b/test/unit/adapters/pubsub.spec.ts new file mode 100644 index 0000000..d08e9da --- /dev/null +++ b/test/unit/adapters/pubsub.spec.ts @@ -0,0 +1,104 @@ +import { createPubSubComponent } from '../../../src/adapters/pubsub' +import { mockLogs, mockRedis } from '../../mocks/components' +import { FRIEND_STATUS_UPDATES_CHANNEL, FRIENDSHIP_UPDATES_CHANNEL } from '../../../src/adapters/pubsub' +import { ConnectivityStatus } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' +import { IPubSubComponent } from '../../../src/types' +import { RedisClientType } from 'redis' + +describe('PubSubComponent', () => { + let pubsub: IPubSubComponent + + let mockSubClient: jest.Mocked + let mockPubClient: jest.Mocked + + beforeEach(() => { + pubsub = createPubSubComponent({ + logs: mockLogs, + redis: mockRedis + }) + + mockSubClient = mockRedis.client.duplicate() as jest.Mocked + mockPubClient = mockRedis.client.duplicate() as jest.Mocked + }) + + describe('publishInChannel', () => { + it('should publish friend status updates', async () => { + const update = { + address: '0x123', + status: ConnectivityStatus.ONLINE + } + + await pubsub.publishInChannel(FRIEND_STATUS_UPDATES_CHANNEL, update) + + expect(mockPubClient.publish).toHaveBeenCalledWith(FRIEND_STATUS_UPDATES_CHANNEL, JSON.stringify(update)) + }) + + it('should publish friendship updates', async () => { + const update = { + userAddress: '0x123', + friendAddress: '0x456', + action: 'REQUEST' + } + + await pubsub.publishInChannel(FRIENDSHIP_UPDATES_CHANNEL, update) + + expect(mockPubClient.publish).toHaveBeenCalledWith(FRIENDSHIP_UPDATES_CHANNEL, JSON.stringify(update)) + }) + }) + + describe('subscribeToChannel', () => { + it('should subscribe to friend status updates', async () => { + const handler = jest.fn() + await pubsub.subscribeToChannel(FRIEND_STATUS_UPDATES_CHANNEL, handler) + + expect(mockSubClient.subscribe).toHaveBeenCalledWith(FRIEND_STATUS_UPDATES_CHANNEL, handler) + }) + + it('should subscribe to friendship updates', async () => { + const handler = jest.fn() + await pubsub.subscribeToChannel(FRIENDSHIP_UPDATES_CHANNEL, handler) + + expect(mockSubClient.subscribe).toHaveBeenCalledWith(FRIENDSHIP_UPDATES_CHANNEL, handler) + }) + }) + + describe('start/stop', () => { + it('should connect on start if redis is not ready', async () => { + Object.defineProperty(mockSubClient, 'isReady', { value: false, writable: true }) + Object.defineProperty(mockPubClient, 'isReady', { value: false, writable: true }) + + await pubsub.start({} as any) + + expect(mockSubClient.connect).toHaveBeenCalled() + expect(mockPubClient.connect).toHaveBeenCalled() + }) + + it('should do nothing on start if redis is already connected', async () => { + Object.defineProperty(mockSubClient, 'isReady', { value: true, writable: true }) + Object.defineProperty(mockPubClient, 'isReady', { value: true, writable: true }) + + await pubsub.start({} as any) + + expect(mockSubClient.connect).not.toHaveBeenCalled() + expect(mockPubClient.connect).not.toHaveBeenCalled() + }) + + it('should disconnect on stop if redis is ready', async () => { + Object.defineProperty(mockSubClient, 'isReady', { value: true, writable: true }) + Object.defineProperty(mockPubClient, 'isReady', { value: true, writable: true }) + + await pubsub.stop() + expect(mockSubClient.disconnect).toHaveBeenCalled() + expect(mockPubClient.disconnect).toHaveBeenCalled() + }) + + it('should disconnect on stop if redis is ready', async () => { + Object.defineProperty(mockSubClient, 'isReady', { value: false, writable: true }) + Object.defineProperty(mockPubClient, 'isReady', { value: false, writable: true }) + + await pubsub.stop() + expect(mockSubClient.disconnect).not.toHaveBeenCalled() + expect(mockPubClient.disconnect).not.toHaveBeenCalled() + }) + }) +}) diff --git a/test/unit/adapters/redis.spec.ts b/test/unit/adapters/redis.spec.ts new file mode 100644 index 0000000..ce13be2 --- /dev/null +++ b/test/unit/adapters/redis.spec.ts @@ -0,0 +1,88 @@ +import { createClient } from 'redis' +import { createRedisComponent } from '../../../src/adapters/redis' +import { mockConfig, mockLogs } from '../../mocks/components' +import { IRedisComponent } from '../../../src/types' +import { ICacheComponent } from '@well-known-components/interfaces' + +jest.mock('redis', () => ({ + createClient: jest.fn().mockReturnValue({ + on: jest.fn(), + connect: jest.fn(), + disconnect: jest.fn(), + get: jest.fn(), + set: jest.fn(), + del: jest.fn(), + quit: jest.fn() + }) +})) + +describe('redis', () => { + let redis: IRedisComponent & ICacheComponent + let mockClient: ReturnType + + beforeEach(async () => { + redis = await createRedisComponent({ + logs: mockLogs, + config: mockConfig + }) + + mockClient = createClient({ url: 'redis://localhost:6379' }) + }) + + describe('start()', () => { + it('should start the redis client', async () => { + await redis.start({} as any) + expect(mockClient.connect).toHaveBeenCalled() + }) + + it('when connect fails, should throw an error', async () => { + mockClient.connect = jest.fn().mockRejectedValueOnce(new Error('Connection failed')) + await expect(redis.start({} as any)).rejects.toThrow('Connection failed') + }) + }) + + describe('get()', () => { + it('should get a value from the redis client and return null if it was not found', async () => { + const value = await redis.get('key') + expect(mockClient.get).toHaveBeenCalledWith('key') + expect(value).toBe(null) + }) + + it('should get a value from the redis client and parse it correctly', async () => { + mockClient.get = jest.fn().mockResolvedValueOnce(JSON.stringify('value')) + const value = await redis.get('key') + expect(mockClient.get).toHaveBeenCalledWith('key') + expect(value).toBe('value') + }) + + it('when get fails, should throw an error', async () => { + mockClient.get = jest.fn().mockRejectedValueOnce(new Error('Get failed')) + await expect(redis.get('key')).rejects.toThrow('Get failed') + }) + }) + + describe('put()', () => { + it('should set a value in the redis client', async () => { + await redis.put('key', 'value') + expect(mockClient.set).toHaveBeenCalledWith('key', JSON.stringify('value'), { EX: 7200 }) + }) + + it('when put fails, should throw an error', async () => { + mockClient.set = jest.fn().mockRejectedValueOnce(new Error('Set failed')) + await expect(redis.put('key', 'value')).rejects.toThrow('Set failed') + }) + }) + + describe('stop()', () => { + it('should stop the redis client', async () => { + await redis.stop() + expect(mockClient.disconnect).toHaveBeenCalled() + }) + + it('when disconnect fails, should throw an error', async () => { + mockClient.disconnect = jest.fn().mockRejectedValueOnce(new Error('Disconnection failed')) + await redis.stop() + expect(mockClient.disconnect).toHaveBeenCalled() + }) + }) +}) diff --git a/test/unit/adapters/rpc-server.spec.ts b/test/unit/adapters/rpc-server.spec.ts index b6f90ca..028fa6b 100644 --- a/test/unit/adapters/rpc-server.spec.ts +++ b/test/unit/adapters/rpc-server.spec.ts @@ -1,7 +1,17 @@ -import { createRpcServerComponent, IRPCServerComponent } from ' ../../../src/adapters/rpc-server/rpc-server' -import { RpcServerContext } from '../../../src/types' +import { createRpcServerComponent } from ' ../../../src/adapters/rpc-server' +import { IRPCServerComponent, RpcServerContext } from '../../../src/types' import { RpcServer, Transport, createRpcServer } from '@dcl/rpc' -import { mockConfig, mockDb, mockLogs, mockPubSub, mockUWs } from '../../mocks/components' +import { + mockArchipelagoStats, + mockConfig, + mockDb, + mockLogs, + mockNats, + mockPubSub, + mockRedis, + mockUWs +} from '../../mocks/components' +import { FRIEND_STATUS_UPDATES_CHANNEL, FRIENDSHIP_UPDATES_CHANNEL } from '../../../src/adapters/pubsub' jest.mock('@dcl/rpc', () => ({ createRpcServer: jest.fn().mockReturnValue({ @@ -29,7 +39,10 @@ describe('createRpcServerComponent', () => { db: mockDb, pubsub: mockPubSub, config: mockConfig, - server: mockUWs + server: mockUWs, + nats: mockNats, + archipelagoStats: mockArchipelagoStats, + redis: mockRedis }) }) @@ -44,7 +57,8 @@ describe('createRpcServerComponent', () => { await rpcServer.start({} as any) expect(mockUWs.app.listen).toHaveBeenCalledWith(8085, expect.any(Function)) - expect(mockPubSub.subscribeToFriendshipUpdates).toHaveBeenCalledWith(expect.any(Function)) + expect(mockPubSub.subscribeToChannel).toHaveBeenCalledWith(FRIENDSHIP_UPDATES_CHANNEL, expect.any(Function)) + expect(mockPubSub.subscribeToChannel).toHaveBeenCalledWith(FRIEND_STATUS_UPDATES_CHANNEL, expect.any(Function)) }) }) diff --git a/test/unit/adapters/rpc-server/services/get-friends.spec.ts b/test/unit/adapters/rpc-server/services/get-friends.spec.ts index cfc1bf7..f296fa3 100644 --- a/test/unit/adapters/rpc-server/services/get-friends.spec.ts +++ b/test/unit/adapters/rpc-server/services/get-friends.spec.ts @@ -1,7 +1,7 @@ import { mockDb, mockLogs } from '../../../../mocks/components' import { getFriendsService } from '../../../../../src/adapters/rpc-server/services/get-friends' import { FRIENDSHIPS_PER_PAGE, INTERNAL_SERVER_ERROR } from '../../../../../src/adapters/rpc-server/constants' -import { RpcServerContext, Friendship, AppComponents } from '../../../../../src/types' +import { RpcServerContext, AppComponents, Friend } from '../../../../../src/types' describe('getFriendsService', () => { let components: jest.Mocked> @@ -18,11 +18,7 @@ describe('getFriendsService', () => { }) it('should return the correct list of friends with pagination data', async () => { - const mockFriends = [ - createMockFriendship('0x123', '0x456'), - createMockFriendship('0x123', '0x789'), - createMockFriendship('0x987', '0x123') - ] + const mockFriends = [createMockFriend('0x456'), createMockFriend('0x789'), createMockFriend('0x987')] const totalFriends = 2 mockDb.getFriends.mockResolvedValueOnce(mockFriends) @@ -40,9 +36,7 @@ describe('getFriendsService', () => { }) it('should respect the pagination limit', async () => { - const mockFriends = Array.from({ length: FRIENDSHIPS_PER_PAGE }, (_, i) => - createMockFriendship(`0x${i + 1}`, '0x123') - ) + const mockFriends = Array.from({ length: FRIENDSHIPS_PER_PAGE }, (_, i) => createMockFriend(`0x${i + 1}`)) const totalFriends = FRIENDSHIPS_PER_PAGE + 5 mockDb.getFriends.mockResolvedValueOnce(mockFriends) @@ -83,12 +77,7 @@ describe('getFriendsService', () => { }) // Helper to create a mock friendship object - const createMockFriendship = (requester: string, requested: string): Friendship => ({ - address_requester: requester, - address_requested: requested, - id: 'mock-friendship-id', - is_active: false, - created_at: new Date().toISOString(), - updated_at: new Date().toISOString() + const createMockFriend = (address): Friend => ({ + address }) }) diff --git a/test/unit/adapters/rpc-server/services/get-friendship-status.spec.ts b/test/unit/adapters/rpc-server/services/get-friendship-status.spec.ts index ffe92af..133e433 100644 --- a/test/unit/adapters/rpc-server/services/get-friendship-status.spec.ts +++ b/test/unit/adapters/rpc-server/services/get-friendship-status.spec.ts @@ -4,7 +4,7 @@ import { FriendshipStatus, GetFriendshipStatusPayload, GetFriendshipStatusResponse -} from '@dcl/protocol/out-js/decentraland/social_service/v3/social_service_v3.gen' +} from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' import { mockDb, mockLogs } from '../../../../mocks/components' describe('getFriendshipStatusService', () => { diff --git a/test/unit/adapters/rpc-server/services/get-mutual-friends.spec.ts b/test/unit/adapters/rpc-server/services/get-mutual-friends.spec.ts index 029ab3d..9dcf4cc 100644 --- a/test/unit/adapters/rpc-server/services/get-mutual-friends.spec.ts +++ b/test/unit/adapters/rpc-server/services/get-mutual-friends.spec.ts @@ -1,7 +1,7 @@ import { mockDb, mockLogs } from '../../../../mocks/components' import { getMutualFriendsService } from '../../../../../src/adapters/rpc-server/services/get-mutual-friends' import { INTERNAL_SERVER_ERROR, FRIENDSHIPS_PER_PAGE } from '../../../../../src/adapters/rpc-server/constants' -import { GetMutualFriendsPayload } from '@dcl/protocol/out-js/decentraland/social_service/v3/social_service_v3.gen' +import { GetMutualFriendsPayload } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' import { RpcServerContext, AppComponents } from '../../../../../src/types' describe('getMutualFriendsService', () => { diff --git a/test/unit/adapters/rpc-server/services/get-pending-friendship-requests.spec.ts b/test/unit/adapters/rpc-server/services/get-pending-friendship-requests.spec.ts index 2c52c1c..243f1d3 100644 --- a/test/unit/adapters/rpc-server/services/get-pending-friendship-requests.spec.ts +++ b/test/unit/adapters/rpc-server/services/get-pending-friendship-requests.spec.ts @@ -1,7 +1,7 @@ import { mockDb, mockLogs } from '../../../../mocks/components' import { getPendingFriendshipRequestsService } from '../../../../../src/adapters/rpc-server/services/get-pending-friendship-requests' import { RpcServerContext, AppComponents } from '../../../../../src/types' -import { PaginatedFriendshipRequestsResponse } from '@dcl/protocol/out-js/decentraland/social_service/v3/social_service_v3.gen' +import { PaginatedFriendshipRequestsResponse } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' import { emptyRequest } from '../../../../mocks/empty-request' import { createMockFriendshipRequest, createMockExpectedFriendshipRequest } from '../../../../mocks/friendship-request' @@ -21,8 +21,8 @@ describe('getPendingFriendshipRequestsService', () => { it('should return the correct list of pending friendship requests', async () => { const mockPendingRequests = [ - createMockFriendshipRequest('0x456', '2025-01-01T00:00:00Z', 'Hi!'), - createMockFriendshipRequest('0x789', '2025-01-02T00:00:00Z') + createMockFriendshipRequest('id1', '0x456', '2025-01-01T00:00:00Z', 'Hi!'), + createMockFriendshipRequest('id2', '0x789', '2025-01-02T00:00:00Z') ] mockDb.getReceivedFriendshipRequests.mockResolvedValueOnce(mockPendingRequests) @@ -34,8 +34,8 @@ describe('getPendingFriendshipRequestsService', () => { $case: 'requests', requests: { requests: [ - createMockExpectedFriendshipRequest('0x456', '2025-01-01T00:00:00Z', 'Hi!'), - createMockExpectedFriendshipRequest('0x789', '2025-01-02T00:00:00Z', '') + createMockExpectedFriendshipRequest('id1', '0x456', '2025-01-01T00:00:00Z', 'Hi!'), + createMockExpectedFriendshipRequest('id2', '0x789', '2025-01-02T00:00:00Z') ] } } @@ -58,7 +58,7 @@ describe('getPendingFriendshipRequestsService', () => { }) it('should map metadata.message to an empty string if undefined', async () => { - const mockPendingRequests = [createMockFriendshipRequest('0x456', '2025-01-01T00:00:00Z')] + const mockPendingRequests = [createMockFriendshipRequest('id1', '0x456', '2025-01-01T00:00:00Z', 'Hi!')] mockDb.getReceivedFriendshipRequests.mockResolvedValueOnce(mockPendingRequests) @@ -68,7 +68,7 @@ describe('getPendingFriendshipRequestsService', () => { response: { $case: 'requests', requests: { - requests: [createMockExpectedFriendshipRequest('0x456', '2025-01-01T00:00:00Z', '')] + requests: [createMockExpectedFriendshipRequest('id1', '0x456', '2025-01-01T00:00:00Z', 'Hi!')] } } }) diff --git a/test/unit/adapters/rpc-server/services/get-sent-friendship-requests.spec.ts b/test/unit/adapters/rpc-server/services/get-sent-friendship-requests.spec.ts index a595e0f..a5d2d3e 100644 --- a/test/unit/adapters/rpc-server/services/get-sent-friendship-requests.spec.ts +++ b/test/unit/adapters/rpc-server/services/get-sent-friendship-requests.spec.ts @@ -3,7 +3,7 @@ import { getSentFriendshipRequestsService } from '../../../../../src/adapters/rp import { RpcServerContext, AppComponents } from '../../../../../src/types' import { emptyRequest } from '../../../../mocks/empty-request' import { createMockFriendshipRequest, createMockExpectedFriendshipRequest } from '../../../../mocks/friendship-request' -import { PaginatedFriendshipRequestsResponse } from '@dcl/protocol/out-js/decentraland/social_service/v3/social_service_v3.gen' +import { PaginatedFriendshipRequestsResponse } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' describe('getSentFriendshipRequestsService', () => { let components: jest.Mocked> @@ -21,8 +21,8 @@ describe('getSentFriendshipRequestsService', () => { it('should return the correct list of sent friendship requests', async () => { const mockSentRequests = [ - createMockFriendshipRequest('0x456', '2025-01-01T00:00:00Z', 'Hello!'), - createMockFriendshipRequest('0x789', '2025-01-02T00:00:00Z') + createMockFriendshipRequest('id1', '0x456', '2025-01-01T00:00:00Z', 'Hello!'), + createMockFriendshipRequest('id2', '0x789', '2025-01-02T00:00:00Z') ] mockDb.getSentFriendshipRequests.mockResolvedValueOnce(mockSentRequests) @@ -34,8 +34,8 @@ describe('getSentFriendshipRequestsService', () => { $case: 'requests', requests: { requests: [ - createMockExpectedFriendshipRequest('0x456', '2025-01-01T00:00:00Z', 'Hello!'), - createMockExpectedFriendshipRequest('0x789', '2025-01-02T00:00:00Z', '') + createMockExpectedFriendshipRequest('id1', '0x456', '2025-01-01T00:00:00Z', 'Hello!'), + createMockExpectedFriendshipRequest('id2', '0x789', '2025-01-02T00:00:00Z') ] } } @@ -56,9 +56,8 @@ describe('getSentFriendshipRequestsService', () => { } }) }) - it('should map metadata.message to an empty string if undefined', async () => { - const mockSentRequests = [createMockFriendshipRequest('0x456', '2025-01-01T00:00:00Z')] + const mockSentRequests = [createMockFriendshipRequest('id1', '0x456', '2025-01-01T00:00:00Z')] mockDb.getSentFriendshipRequests.mockResolvedValueOnce(mockSentRequests) @@ -68,7 +67,7 @@ describe('getSentFriendshipRequestsService', () => { response: { $case: 'requests', requests: { - requests: [createMockExpectedFriendshipRequest('0x456', '2025-01-01T00:00:00Z', '')] + requests: [createMockExpectedFriendshipRequest('id1', '0x456', '2025-01-01T00:00:00Z')] } } }) diff --git a/test/unit/adapters/rpc-server/services/subscribe-to-friend-updates.spec.ts b/test/unit/adapters/rpc-server/services/subscribe-to-friend-updates.spec.ts new file mode 100644 index 0000000..e85c304 --- /dev/null +++ b/test/unit/adapters/rpc-server/services/subscribe-to-friend-updates.spec.ts @@ -0,0 +1,53 @@ +import { Empty } from '@dcl/protocol/out-js/google/protobuf/empty.gen' +import { Friend, RpcServerContext } from '../../../../../src/types' +import { mockLogs, mockArchipelagoStats, mockDb } from '../../../../mocks/components' +import { subscribeToFriendUpdatesService } from '../../../../../src/adapters/rpc-server/services/subscribe-to-friend-updates' +import { ConnectivityStatus } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' + +describe('subscribeToFriendStatusService', () => { + let subscribeToFriendUpdates: ReturnType + let rpcContext: RpcServerContext + + beforeEach(() => { + subscribeToFriendUpdates = subscribeToFriendUpdatesService({ + components: { + logs: mockLogs, + db: mockDb, + archipelagoStats: mockArchipelagoStats + } + }) + + rpcContext = { + address: '0x123', + subscribers: {} + } + + mockDb.streamOnlineFriends.mockImplementationOnce(async function* () { + yield { address: '0x456' } + }) + }) + + it('should get initial online friends from archipelago stats', async () => { + mockArchipelagoStats.getPeers.mockResolvedValue(['0x456', '0x789']) + + const generator = subscribeToFriendUpdates({} as Empty, rpcContext) + const result = await generator.next() + + expect(mockArchipelagoStats.getPeersFromCache).toHaveBeenCalled() + expect(result.value).toEqual({ + user: { address: '0x456' }, + status: ConnectivityStatus.ONLINE + }) + }) + + it('should add the status subscriber to context', async () => { + const generator = subscribeToFriendUpdates({} as Empty, rpcContext) + generator.next() + + expect(rpcContext.subscribers['0x123']).toBeDefined() + generator.return(undefined) + }) + + it.todo('should yield parsed updates when an update is emitted') + it.todo('should skip unparsable updates') +}) diff --git a/test/unit/adapters/rpc-server/services/upsert-friendship.spec.ts b/test/unit/adapters/rpc-server/services/upsert-friendship.spec.ts index 5810ae9..d3ac367 100644 --- a/test/unit/adapters/rpc-server/services/upsert-friendship.spec.ts +++ b/test/unit/adapters/rpc-server/services/upsert-friendship.spec.ts @@ -5,8 +5,9 @@ import * as FriendshipsLogic from '../../../../../src/logic/friendships' import { UpsertFriendshipPayload, UpsertFriendshipResponse -} from '@dcl/protocol/out-js/decentraland/social_service/v3/social_service_v3.gen' +} from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' import { ParsedUpsertFriendshipRequest } from '../../../../../src/logic/friendships' +import { FRIENDSHIP_UPDATES_CHANNEL } from '../../../../../src/adapters/pubsub' jest.mock('../../../../../src/logic/friendships') @@ -149,10 +150,12 @@ describe('upsertFriendshipService', () => { mockDb.getFriendship.mockResolvedValueOnce(existingFriendship) mockDb.getLastFriendshipAction.mockResolvedValueOnce(lastFriendshipAction) + mockDb.recordFriendshipAction.mockResolvedValueOnce(lastFriendshipAction.id) const result: UpsertFriendshipResponse = await upsertFriendship(mockRequest, rpcContext) - expect(components.pubsub.publishFriendshipUpdate).toHaveBeenCalledWith({ + expect(components.pubsub.publishInChannel).toHaveBeenCalledWith(FRIENDSHIP_UPDATES_CHANNEL, { + id: lastFriendshipAction.id, from: rpcContext.address, to: userAddress, action: mockParsedRequest.action, diff --git a/test/unit/logic/friendships.spec.ts b/test/unit/logic/friendships.spec.ts index b66d446..c2cc519 100644 --- a/test/unit/logic/friendships.spec.ts +++ b/test/unit/logic/friendships.spec.ts @@ -4,11 +4,15 @@ import { isFriendshipActionValid, isUserActionValid, parseEmittedUpdateToFriendshipUpdate, + parseEmittedUpdateToFriendStatusUpdate, parseUpsertFriendshipRequest, validateNewFriendshipAction } from '../../../src/logic/friendships' import { Action, FriendshipStatus } from '../../../src/types' -import { FriendshipStatus as FriendshipRequestStatus } from '@dcl/protocol/out-js/decentraland/social_service/v3/social_service_v3.gen' +import { + ConnectivityStatus, + FriendshipStatus as FriendshipRequestStatus +} from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' describe('isFriendshipActionValid()', () => { test('it should be valid if from is null and to is REQUEST ', () => { @@ -369,10 +373,13 @@ describe('parseUpsertFriendshipRequest()', () => { }) describe('parseEmittedUpdateToFriendshipUpdate()', () => { + const id = 'id' + test('it should parse REQUEST update properly', () => { const now = Date.now() expect( parseEmittedUpdateToFriendshipUpdate({ + id, action: Action.REQUEST, timestamp: now, from: '0xA', @@ -382,6 +389,7 @@ describe('parseEmittedUpdateToFriendshipUpdate()', () => { update: { $case: 'request', request: { + id, createdAt: now, user: { address: '0xA' @@ -393,6 +401,7 @@ describe('parseEmittedUpdateToFriendshipUpdate()', () => { expect( parseEmittedUpdateToFriendshipUpdate({ + id, action: Action.REQUEST, timestamp: now, from: '0xA', @@ -405,6 +414,7 @@ describe('parseEmittedUpdateToFriendshipUpdate()', () => { update: { $case: 'request', request: { + id, createdAt: now, user: { address: '0xA' @@ -419,6 +429,7 @@ describe('parseEmittedUpdateToFriendshipUpdate()', () => { const now = Date.now() expect( parseEmittedUpdateToFriendshipUpdate({ + id, action: Action.CANCEL, timestamp: now, from: '0xA', @@ -440,6 +451,7 @@ describe('parseEmittedUpdateToFriendshipUpdate()', () => { const now = Date.now() expect( parseEmittedUpdateToFriendshipUpdate({ + id, action: Action.DELETE, timestamp: now, from: '0xA', @@ -461,6 +473,7 @@ describe('parseEmittedUpdateToFriendshipUpdate()', () => { const now = Date.now() expect( parseEmittedUpdateToFriendshipUpdate({ + id, action: Action.REJECT, timestamp: now, from: '0xA', @@ -482,6 +495,7 @@ describe('parseEmittedUpdateToFriendshipUpdate()', () => { const now = Date.now() expect( parseEmittedUpdateToFriendshipUpdate({ + id, action: Action.ACCEPT, timestamp: now, from: '0xA', @@ -503,6 +517,7 @@ describe('parseEmittedUpdateToFriendshipUpdate()', () => { const now = Date.now() expect( parseEmittedUpdateToFriendshipUpdate({ + id, action: 'whaterver' as Action, timestamp: now, from: '0xA', @@ -540,3 +555,12 @@ describe('getFriendshipRequestStatus()', () => { expect(getFriendshipRequestStatus(requestMadeByAnotherUser, '0x123')).toBe(FriendshipRequestStatus.REQUEST_RECEIVED) }) }) + +describe('parseEmittedUpdateToFriendStatusUpdate()', () => { + test('it should parse ONLINE update properly', () => { + expect(parseEmittedUpdateToFriendStatusUpdate({ address: '0x123', status: ConnectivityStatus.ONLINE })).toEqual({ + user: { address: '0x123' }, + status: ConnectivityStatus.ONLINE + }) + }) +}) diff --git a/test/unit/peer-tracking.spec.ts b/test/unit/peer-tracking.spec.ts new file mode 100644 index 0000000..5e36530 --- /dev/null +++ b/test/unit/peer-tracking.spec.ts @@ -0,0 +1,73 @@ +import { createPeerTrackingComponent, PEER_STATUS_HANDLERS } from '../../src/adapters/peer-tracking' +import { FRIEND_STATUS_UPDATES_CHANNEL } from '../../src/adapters/pubsub' +import { mockLogs, mockNats, mockPubSub } from '../mocks/components' +import { IPeerTrackingComponent } from '../../src/types' + +describe('PeerTrackingComponent', () => { + let peerTracking: IPeerTrackingComponent + + beforeEach(() => { + jest.clearAllMocks() + peerTracking = createPeerTrackingComponent({ logs: mockLogs, nats: mockNats, pubsub: mockPubSub }) + }) + + describe('start', () => { + it('should subscribe to all peer status patterns', async () => { + await peerTracking.start({} as any) + + const subscriptions = peerTracking.getSubscriptions() + expect(subscriptions.size).toBe(PEER_STATUS_HANDLERS.length) + + PEER_STATUS_HANDLERS.forEach((handler) => { + expect(mockNats.subscribe).toHaveBeenCalledWith(handler.pattern, expect.any(Function)) + expect(subscriptions.has(handler.event)).toBe(true) + }) + }) + }) + + describe('stop', () => { + it('should unsubscribe and clear all subscriptions', async () => { + await peerTracking.start({} as any) + await peerTracking.stop() + + const subscriptions = peerTracking.getSubscriptions() + expect(subscriptions.size).toBe(0) + }) + }) + + describe('message handling', () => { + PEER_STATUS_HANDLERS.forEach((handler) => { + it(`should handle ${handler.event} messages correctly`, async () => { + await peerTracking.start({} as any) + + const messageHandler = mockNats.subscribe.mock.calls.find((call) => call[0] === handler.pattern)?.[1] + + expect(messageHandler).toBeDefined() + + await messageHandler(null, { + subject: `peer.0x123.${handler.event}`, + data: undefined + }) + + expect(mockPubSub.publishInChannel).toHaveBeenCalledWith(FRIEND_STATUS_UPDATES_CHANNEL, { + address: '0x123', + status: handler.status + }) + }) + + it(`should handle ${handler.event} message errors`, async () => { + await peerTracking.start({} as any) + + const messageHandler = mockNats.subscribe.mock.calls.find((call) => call[0] === handler.pattern)?.[1] + + const error = new Error('Test error') + await messageHandler(error, { + subject: `peer.0x123.${handler.event}`, + data: undefined + }) + + expect(mockPubSub.publishInChannel).not.toHaveBeenCalled() + }) + }) + }) +}) diff --git a/test/unit/utils/pagination.spec.ts b/test/unit/utils/pagination.spec.ts index ffb2728..2caa640 100644 --- a/test/unit/utils/pagination.spec.ts +++ b/test/unit/utils/pagination.spec.ts @@ -3,7 +3,7 @@ import { getPage } from '../../../src/utils/pagination' describe('pagination', () => { describe('getPage', () => { it('should return the correct page number', () => { - expect(getPage(10, 0)).toBe(1) + expect(getPage(10)).toBe(1) expect(getPage(10, 10)).toBe(2) expect(getPage(10, 20)).toBe(3) }) diff --git a/yarn.lock b/yarn.lock index f304985..df38fb6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -344,10 +344,15 @@ "@well-known-components/fetch-component" "^2.0.2" "@well-known-components/interfaces" "^1.4.2" -"@dcl/protocol@^1.0.0-12815643167.commit-c4162c4": - version "1.0.0-12815778152.commit-3614ead" - resolved "https://registry.yarnpkg.com/@dcl/protocol/-/protocol-1.0.0-12815778152.commit-3614ead.tgz#5f61db13d426e61ecda6c6a237d148fcdfcb6f8e" - integrity sha512-JrtwOtKICHVdwFRC9c4XQbmsw8QpGkFwAiuHp7MBuBbfITIx8ZlWELBlmQH6sq18jzTl83Uz7IohTeAmz5U/wg== +"@dcl/protocol@https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12875508193.commit-808a662.tgz": + version "1.0.0-12875508193.commit-808a662" + resolved "https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12875508193.commit-808a662.tgz#53592864b3d9e7ea382363ce225ce20e6792321a" + dependencies: + "@dcl/ts-proto" "1.154.0" + +"@dcl/protocol@https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12890706635.commit-a7e4210.tgz": + version "1.0.0-12890706635.commit-a7e4210" + resolved "https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12890706635.commit-a7e4210.tgz#26bac792c70a98cd33346847e4298540eacd1b27" dependencies: "@dcl/ts-proto" "1.154.0" @@ -1176,7 +1181,7 @@ on-finished "^2.4.1" path-to-regexp "^6.2.1" -"@well-known-components/interfaces@^1.1.0", "@well-known-components/interfaces@^1.4.1", "@well-known-components/interfaces@^1.4.2", "@well-known-components/interfaces@^1.4.3": +"@well-known-components/interfaces@^1.1.0", "@well-known-components/interfaces@^1.1.1", "@well-known-components/interfaces@^1.4.1", "@well-known-components/interfaces@^1.4.2", "@well-known-components/interfaces@^1.4.3": version "1.4.3" resolved "https://registry.npmjs.org/@well-known-components/interfaces/-/interfaces-1.4.3.tgz" integrity sha512-roVtoOHG6uaH+nL4C0ISnAwkkopc2FLsS7fqX+roI22EdX9PAknPoImhPU8/3u6jgRAVpglX5Zj4nWZkSaXPkQ== @@ -1197,6 +1202,16 @@ dependencies: prom-client "^15.1.0" +"@well-known-components/nats-component@^2.0.0": + version "2.0.0" + resolved "https://registry.yarnpkg.com/@well-known-components/nats-component/-/nats-component-2.0.0.tgz#abe0a1258d7e9b6fa36d0c56b9bd2d7c125ef8af" + integrity sha512-czUnm7Hv2BzwsgAtzFg75BYf6sKj/R4AQqHyvMk4QHIUmjUys7FL0eJKbrM3RLLLGCwugCqVBj4kfYOWdpTiHg== + dependencies: + "@well-known-components/interfaces" "^1.1.1" + "@well-known-components/pushable-channel" "^1.0.0" + mitt "^3.0.0" + nats "^2.7.1" + "@well-known-components/pg-component@^0.2.2": version "0.2.2" resolved "https://registry.npmjs.org/@well-known-components/pg-component/-/pg-component-0.2.2.tgz" @@ -1209,6 +1224,13 @@ pg-query-stream "^4.2.3" sql-template-strings "^2.2.2" +"@well-known-components/pushable-channel@^1.0.0": + version "1.0.3" + resolved "https://registry.yarnpkg.com/@well-known-components/pushable-channel/-/pushable-channel-1.0.3.tgz#b8a803c483bb52c03339a98813dd370857129059" + integrity sha512-8ibswJXQx7YfmUgzXp02xsIBTw6zrVXgNybV8asvEr1vE/0m/xmZi41+NwTcZmLCNRzsE/i+aRUzNO+oyq/g2g== + dependencies: + mitt "^3.0.0" + "@well-known-components/test-helpers@^1.5.6": version "1.5.6" resolved "https://registry.npmjs.org/@well-known-components/test-helpers/-/test-helpers-1.5.6.tgz" @@ -2946,6 +2968,11 @@ long@^5.0.0, long@^5.2.3: resolved "https://registry.yarnpkg.com/long/-/long-5.2.3.tgz#a3ba97f3877cf1d778eccbcb048525ebb77499e1" integrity sha512-lcHwpNoggQTObv5apGNCTdJrO69eHOZMi4BNC+rTLER8iHAqGrUVeLh/irVIM7zTw2bOXA8T6uNPeujwOLg/2Q== +lru-cache@^10.4.3: + version "10.4.3" + resolved "https://registry.yarnpkg.com/lru-cache/-/lru-cache-10.4.3.tgz#410fc8a17b70e598013df257c2446b7f3383f119" + integrity sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ== + lru-cache@^5.1.1: version "5.1.1" resolved "https://registry.npmjs.org/lru-cache/-/lru-cache-5.1.1.tgz" @@ -3048,6 +3075,13 @@ ms@^2.1.1: resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2" integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== +nats@^2.7.1: + version "2.29.1" + resolved "https://registry.yarnpkg.com/nats/-/nats-2.29.1.tgz#0612053e030dfb31107987d6083d17e50fb295c7" + integrity sha512-OHVsxrQCITTdMKG3So0jhtnBd5jS2u1xpS91UCws7VklsaCbctwg5vT/8lYpVldPW0x3aHGF8uuAoMfCoJy7Sg== + dependencies: + nkeys.js "1.1.0" + natural-compare@^1.4.0: version "1.4.0" resolved "https://registry.npmjs.org/natural-compare/-/natural-compare-1.4.0.tgz" @@ -3064,6 +3098,13 @@ nise@^5.1.5: just-extend "^6.2.0" path-to-regexp "^6.2.1" +nkeys.js@1.1.0: + version "1.1.0" + resolved "https://registry.yarnpkg.com/nkeys.js/-/nkeys.js-1.1.0.tgz#de83a9a13f396c5b6d7c412788f4b9f7f35d4c18" + integrity sha512-tB/a0shZL5UZWSwsoeyqfTszONTt4k2YS0tuQioMOD180+MbombYVgzDUYHlx+gejYK6rgf08n/2Df99WY0Sxg== + dependencies: + tweetnacl "1.0.3" + node-fetch@^2.6.12, node-fetch@^2.6.9: version "2.7.0" resolved "https://registry.npmjs.org/node-fetch/-/node-fetch-2.7.0.tgz" @@ -3918,6 +3959,11 @@ tslib@^2.6.2: resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.6.2.tgz#703ac29425e7b37cd6fd456e92404d46d1f3e4ae" integrity sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q== +tweetnacl@1.0.3: + version "1.0.3" + resolved "https://registry.yarnpkg.com/tweetnacl/-/tweetnacl-1.0.3.tgz#ac0af71680458d8a6378d0d0d050ab1407d35596" + integrity sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw== + type-check@^0.4.0, type-check@~0.4.0: version "0.4.0" resolved "https://registry.yarnpkg.com/type-check/-/type-check-0.4.0.tgz#07b8203bfa7056c0657050e3ccd2c37730bab8f1"