diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 83207024e1..3d622ef164 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1108,6 +1108,10 @@ proc start*(node: WakuNode) {.async.} = node.started = true + node.wakuMetadata.start() + + asyncSpawn node.wakuMetadata.subscriptionsListener(node.topicSubscriptionQueue) + info "Node started successfully" proc stop*(node: WakuNode) {.async.} = diff --git a/waku/waku_metadata/protocol.nim b/waku/waku_metadata/protocol.nim index a8b5ae227b..111ae2d334 100644 --- a/waku/waku_metadata/protocol.nim +++ b/waku/waku_metadata/protocol.nim @@ -4,7 +4,7 @@ else: {.push raises: [].} import - std/[options, sequtils, random], + std/[options, sequtils, random, sets], stew/results, chronicles, chronos, @@ -27,57 +27,51 @@ const RpcResponseMaxBytes* = 1024 type WakuMetadata* = ref object of LPProtocol clusterId*: uint32 - shards*: seq[uint32] + shards*: HashSet[uint32] proc respond(m: WakuMetadata, conn: Connection): Future[Result[void, string]] {.async, gcsafe.} = - try: - await conn.writeLP(WakuMetadataResponse( - clusterId: some(m.clusterId), - shards: m.shards - ).encode().buffer) - except CatchableError as exc: - return err(exc.msg) + let response = WakuMetadataResponse( + clusterId: some(m.clusterId), + shards: toSeq(m.shards) + ) + + let res = catch: await conn.writeLP(response.encode().buffer) + if res.isErr(): + return err(res.error.msg) return ok() proc request*(m: WakuMetadata, conn: Connection): Future[Result[WakuMetadataResponse, string]] {.async, gcsafe.} = - var buffer: seq[byte] - var error: string - try: - await conn.writeLP(WakuMetadataRequest( - clusterId: some(m.clusterId), - shards: m.shards, - ).encode().buffer) - buffer = await conn.readLp(RpcResponseMaxBytes) - except CatchableError as exc: - error = $exc.msg - finally: - # close, no more data is expected - await conn.closeWithEof() + let request = WakuMetadataRequest(clusterId: some(m.clusterId), shards: toSeq(m.shards)) + + let writeRes = catch: await conn.writeLP(request.encode().buffer) + if writeRes.isErr(): + return err("write failed: " & writeRes.error.msg) - if error.len > 0: - return err("write/read failed: " & error) + let readRes = catch: await conn.readLp(RpcResponseMaxBytes) + let buffer = + if readRes.isErr(): + return err("read failed: " & readRes.error.msg) + else: readRes.get() - let decodedBuff = WakuMetadataResponse.decode(buffer) - if decodedBuff.isErr(): - return err("decode failed: " & $decodedBuff.error) + let closeRes = catch: await conn.closeWithEof() + if closeRes.isErr(): + return err("close failed: " & closeRes.error.msg) - echo decodedBuff.get().clusterId - return ok(decodedBuff.get()) + let response = WakuMetadataResponse.decode(buffer).valueOr: + return err("decode failed: " & $error) + + return ok(response) proc initProtocolHandler*(m: WakuMetadata) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = - var buffer: seq[byte] - try: - buffer = await conn.readLp(RpcResponseMaxBytes) - except CatchableError as exc: + let res = catch: await conn.readLp(RpcResponseMaxBytes) + let buffer = res.valueOr: return - let decBuf = WakuMetadataResponse.decode(buffer) - if decBuf.isErr(): + let response = WakuMetadataResponse.decode(buffer).valueOr: return - let response = decBuf.get() debug "Received WakuMetadata request", remoteClusterId=response.clusterId, remoteShards=response.shards, @@ -93,11 +87,41 @@ proc initProtocolHandler*(m: WakuMetadata) = m.codec = WakuMetadataCodec proc new*(T: type WakuMetadata, clusterId: uint32): T = - let m = WakuMetadata( - clusterId: clusterId, - # TODO: must be updated real time - shards: @[], - ) + let m = WakuMetadata(clusterId: clusterId) m.initProtocolHandler() info "Created WakuMetadata protocol", clusterId=clusterId return m + +proc start*(wm: WakuMetadata) = + wm.started = true + +proc stop*(wm: WakuMetadata) = + wm.started = false + +proc subscriptionsListener*( + wm: WakuMetadata, + topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent] + ) {.async.} = + ## Listen for pubsub topics subscriptions changes + + let key = topicSubscriptionQueue.register() + + while wm.started: + let events = await topicSubscriptionQueue.waitEvents(key) + + for event in events: + let parsedTopic = NsPubsubTopic.parse(event.topic).valueOr: + continue + + if parsedTopic.kind != NsPubsubTopicKind.StaticSharding: + continue + + case event.kind: + of PubsubSub: + wm.shards.incl(parsedTopic.shardId) + of PubsubUnsub: + wm.shards.excl(parsedTopic.shardId) + else: + continue + + topicSubscriptionQueue.unregister(key) \ No newline at end of file