Skip to content

Commit

Permalink
components and business logic
Browse files Browse the repository at this point in the history
  • Loading branch information
lauti7 committed Apr 23, 2024
1 parent 20f6c04 commit 8c1db6e
Show file tree
Hide file tree
Showing 8 changed files with 688 additions and 53 deletions.
57 changes: 57 additions & 0 deletions src/adapters/pubsub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { IBaseComponent } from '@well-known-components/interfaces'
import { AppComponents, SubscriptionEventsEmitter } from '../types'

const FRIENDSHIP_UPDATES_CHANNEL = 'FRIENDSHIP_UPDATES'

export type IPubSubComponent = IBaseComponent & {
subscribeToFriendshipUpdates(cb: (message: string) => void): Promise<void>
publishFriendshipUpdate(update: SubscriptionEventsEmitter['update']): Promise<void>
}

export default function createPubSubComponent(components: Pick<AppComponents, 'logs' | 'redis'>): IPubSubComponent {
const { logs, redis } = components
const logger = logs.getLogger('pubsub-component')

const subClient = redis.client.duplicate()
const pubClient = redis.client.duplicate()

let friendshipUpdatesCb: (message: string) => void | undefined

return {
async start() {
if (!subClient.isReady) {
await subClient.connect()
}

if (!pubClient.isReady) {
await pubClient.connect()
}
},
async stop() {
if (subClient.isReady) {
await subClient.disconnect()
}

if (pubClient.isReady) {
await pubClient.disconnect()
}
},
async subscribeToFriendshipUpdates(cb) {
try {
friendshipUpdatesCb = cb
await subClient.subscribe(FRIENDSHIP_UPDATES_CHANNEL, friendshipUpdatesCb)
} catch (error) {
logger.error(error as any)
}
},
async publishFriendshipUpdate(update) {
try {
const message = JSON.stringify(update)
logger.debug('publishing update to FRIENDSHIP_UPDATES > ', { update: message })
await pubClient.publish(FRIENDSHIP_UPDATES_CHANNEL, message)
} catch (error) {
logger.error(error as any)
}
}
}
}
Loading

0 comments on commit 8c1db6e

Please sign in to comment.