Skip to content

Commit

Permalink
metadata shard subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Oct 24, 2023
1 parent 944dfda commit b0c3c2c
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 41 deletions.
4 changes: 4 additions & 0 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,10 @@ proc start*(node: WakuNode) {.async.} =

node.started = true

node.wakuMetadata.start()

asyncSpawn node.wakuMetadata.subscriptionsListener(node.topicSubscriptionQueue)

info "Node started successfully"

proc stop*(node: WakuNode) {.async.} =
Expand Down
106 changes: 65 additions & 41 deletions waku/waku_metadata/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ else:
{.push raises: [].}

import
std/[options, sequtils, random],
std/[options, sequtils, random, sets],
stew/results,
chronicles,
chronos,
Expand All @@ -27,57 +27,51 @@ const RpcResponseMaxBytes* = 1024
type
WakuMetadata* = ref object of LPProtocol
clusterId*: uint32
shards*: seq[uint32]
shards*: HashSet[uint32]

proc respond(m: WakuMetadata, conn: Connection): Future[Result[void, string]] {.async, gcsafe.} =
try:
await conn.writeLP(WakuMetadataResponse(
clusterId: some(m.clusterId),
shards: m.shards
).encode().buffer)
except CatchableError as exc:
return err(exc.msg)
let response = WakuMetadataResponse(
clusterId: some(m.clusterId),
shards: toSeq(m.shards)
)

let res = catch: await conn.writeLP(response.encode().buffer)
if res.isErr():
return err(res.error.msg)

return ok()

proc request*(m: WakuMetadata, conn: Connection): Future[Result[WakuMetadataResponse, string]] {.async, gcsafe.} =
var buffer: seq[byte]
var error: string
try:
await conn.writeLP(WakuMetadataRequest(
clusterId: some(m.clusterId),
shards: m.shards,
).encode().buffer)
buffer = await conn.readLp(RpcResponseMaxBytes)
except CatchableError as exc:
error = $exc.msg
finally:
# close, no more data is expected
await conn.closeWithEof()
let request = WakuMetadataRequest(clusterId: some(m.clusterId), shards: toSeq(m.shards))

let writeRes = catch: await conn.writeLP(request.encode().buffer)
if writeRes.isErr():
return err("write failed: " & writeRes.error.msg)

if error.len > 0:
return err("write/read failed: " & error)
let readRes = catch: await conn.readLp(RpcResponseMaxBytes)
let buffer =
if readRes.isErr():
return err("read failed: " & readRes.error.msg)
else: readRes.get()

let decodedBuff = WakuMetadataResponse.decode(buffer)
if decodedBuff.isErr():
return err("decode failed: " & $decodedBuff.error)
let closeRes = catch: await conn.closeWithEof()
if closeRes.isErr():
return err("close failed: " & closeRes.error.msg)

echo decodedBuff.get().clusterId
return ok(decodedBuff.get())
let response = WakuMetadataResponse.decode(buffer).valueOr:
return err("decode failed: " & $error)

return ok(response)

proc initProtocolHandler*(m: WakuMetadata) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var buffer: seq[byte]
try:
buffer = await conn.readLp(RpcResponseMaxBytes)
except CatchableError as exc:
let res = catch: await conn.readLp(RpcResponseMaxBytes)
let buffer = res.valueOr:
return

let decBuf = WakuMetadataResponse.decode(buffer)
if decBuf.isErr():
let response = WakuMetadataResponse.decode(buffer).valueOr:
return

let response = decBuf.get()
debug "Received WakuMetadata request",
remoteClusterId=response.clusterId,
remoteShards=response.shards,
Expand All @@ -93,11 +87,41 @@ proc initProtocolHandler*(m: WakuMetadata) =
m.codec = WakuMetadataCodec

proc new*(T: type WakuMetadata, clusterId: uint32): T =
let m = WakuMetadata(
clusterId: clusterId,
# TODO: must be updated real time
shards: @[],
)
let m = WakuMetadata(clusterId: clusterId)
m.initProtocolHandler()
info "Created WakuMetadata protocol", clusterId=clusterId
return m

proc start*(wm: WakuMetadata) =
wm.started = true

proc stop*(wm: WakuMetadata) =
wm.started = false

proc subscriptionsListener*(
wm: WakuMetadata,
topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]
) {.async.} =
## Listen for pubsub topics subscriptions changes

let key = topicSubscriptionQueue.register()

while wm.started:
let events = await topicSubscriptionQueue.waitEvents(key)

for event in events:
let parsedTopic = NsPubsubTopic.parse(event.topic).valueOr:
continue

if parsedTopic.kind != NsPubsubTopicKind.StaticSharding:
continue

case event.kind:
of PubsubSub:
wm.shards.incl(parsedTopic.shardId)
of PubsubUnsub:
wm.shards.excl(parsedTopic.shardId)
else:
continue

topicSubscriptionQueue.unregister(key)

0 comments on commit b0c3c2c

Please sign in to comment.