Skip to content

Commit

Permalink
feat: topic health tracking (#3212)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmer authored Dec 24, 2024
1 parent 6627e34 commit 6020a67
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 27 deletions.
23 changes: 23 additions & 0 deletions library/events/json_topic_health_change_event.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import system, results, std/json
import stew/byteutils
import ../../waku/common/base64, ./json_base_event
import ../../waku/waku_relay

type JsonTopicHealthChangeEvent* = ref object of JsonEvent
pubsubTopic*: string
topicHealth*: TopicHealth

proc new*(
T: type JsonTopicHealthChangeEvent, pubsubTopic: string, topicHealth: TopicHealth
): T =
# Returns a TopicHealthChange event as indicated in
# https://rfc.vac.dev/spec/36/#jsonmessageevent-type

return JsonTopicHealthChangeEvent(
eventType: "relay_topic_health_change",
pubsubTopic: pubsubTopic,
topicHealth: topicHealth,
)

method `$`*(jsonTopicHealthChange: JsonTopicHealthChangeEvent): string =
$(%*jsonTopicHealthChange)
34 changes: 31 additions & 3 deletions library/libwaku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import
waku/node/waku_node,
waku/waku_core/topics/pubsub_topic,
waku/waku_core/subscription/push_handler,
waku/waku_relay/protocol,
./events/json_message_event,
waku/waku_relay,
./events/[json_message_event, json_topic_health_change_event],
./waku_thread/waku_thread,
./waku_thread/inter_thread_communication/requests/node_lifecycle_request,
./waku_thread/inter_thread_communication/requests/peer_manager_request,
Expand Down Expand Up @@ -84,6 +84,31 @@ proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler =
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)

proc onTopicHealthChange(ctx: ptr WakuContext): TopicHealthChangeHandler =
return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} =
# Callback that hadles the Waku Relay events. i.e. messages or errors.
if isNil(ctx[].eventCallback):
error "onTopicHealthChange - eventCallback is nil"
return

if isNil(ctx[].eventUserData):
error "onTopicHealthChange - eventUserData is nil"
return

foreignThreadGc:
try:
let event = $JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth)
cast[WakuCallBack](ctx[].eventCallback)(
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
)
except Exception, CatchableError:
let msg =
"Exception onTopicHealthChange when calling 'eventCallBack': " &
getCurrentExceptionMsg()
cast[WakuCallBack](ctx[].eventCallback)(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)

### End of not-exported components
################################################################################

Expand Down Expand Up @@ -139,7 +164,10 @@ proc waku_new(

ctx.userData = userData

let appCallbacks = AppCallbacks(relayHandler: onReceivedMessage(ctx))
let appCallbacks = AppCallbacks(
relayHandler: onReceivedMessage(ctx),
topicHealthChangeHandler: onTopicHealthChange(ctx),
)

let retCode = handleRequest(
ctx,
Expand Down
3 changes: 2 additions & 1 deletion waku/factory/app_callbacks.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ../waku_relay/protocol
import ../waku_relay

type AppCallbacks* = ref object
relayHandler*: WakuRelayHandler
topicHealthChangeHandler*: TopicHealthChangeHandler
8 changes: 7 additions & 1 deletion waku/factory/waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,13 @@ proc setupAppCallbacks(
for shard in shards:
discard node.wakuRelay.subscribe($shard, appCallbacks.relayHandler)

return ok()
if not appCallbacks.topicHealthChangeHandler.isNil():
if node.wakuRelay.isNil():
return
err("Cannot configure topicHealthChangeHandler callback without Relay mounted")
node.wakuRelay.onTopicHealthChange = appCallbacks.topicHealthChangeHandler

return ok()

proc new*(
T: type Waku, confCopy: var WakuNodeConf, appCallbacks: AppCallbacks = nil
Expand Down
4 changes: 2 additions & 2 deletions waku/waku_relay.nim
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import ./waku_relay/protocol
import ./waku_relay/[protocol, topic_health]

export protocol
export protocol, topic_health
91 changes: 71 additions & 20 deletions waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import
libp2p/protocols/pubsub/rpc/messages,
libp2p/stream/connection,
libp2p/switch
import ../waku_core, ./message_id, ../node/delivery_monitor/publish_observer
import
../waku_core, ./message_id, ./topic_health, ../node/delivery_monitor/publish_observer

from ../waku_core/codecs import WakuRelayCodec
export WakuRelayCodec
Expand Down Expand Up @@ -131,6 +132,9 @@ type
# a map of validators to error messages to return when validation fails
validatorInserted: Table[PubsubTopic, bool]
publishObservers: seq[PublishObserver]
topicsHealth*: Table[string, TopicHealth]
onTopicHealthChange*: TopicHealthChangeHandler
topicHealthLoopHandle*: Future[void]

proc initProtocolHandler(w: WakuRelay) =
proc handler(conn: Connection, proto: string) {.async.} =
Expand Down Expand Up @@ -289,6 +293,7 @@ proc new*(
procCall GossipSub(w).initPubSub()
w.initProtocolHandler()
w.initRelayObservers()
w.topicsHealth = initTable[string, TopicHealth]()
except InitializationError:
return err("initialization error: " & getCurrentExceptionMsg())

Expand All @@ -309,13 +314,78 @@ proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
## Observes when a message is sent/received from the GossipSub PoV
procCall GossipSub(w).addObserver(observer)

proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] =
## Returns the number of peers in a mesh defined by the passed pubsub topic.
## The 'mesh' atribute is defined in the GossipSub ref object.

if not w.mesh.hasKey(pubsubTopic):
debug "getNumPeersInMesh - there is no mesh peer for the given pubsub topic",
pubsubTopic = pubsubTopic
return ok(0)

let peersRes = catch:
w.mesh[pubsubTopic]

let peers: HashSet[PubSubPeer] = peersRes.valueOr:
return
err("getNumPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg)

return ok(peers.len)

proc calculateTopicHealth(wakuRelay: WakuRelay, topic: string): TopicHealth =
let numPeersInMesh = wakuRelay.getNumPeersInMesh(topic).valueOr:
error "Could not calculate topic health", topic = topic, error = error
return TopicHealth.UNHEALTHY

if numPeersInMesh < 1:
return TopicHealth.UNHEALTHY
elif numPeersInMesh < wakuRelay.parameters.dLow:
return TopicHealth.MINIMALLY_HEALTHY
return TopicHealth.SUFFICIENTLY_HEALTHY

proc updateTopicsHealth(wakuRelay: WakuRelay) {.async.} =
var futs = newSeq[Future[void]]()
for topic in toSeq(wakuRelay.topics.keys):
## loop over all the topics I'm subscribed to
let
oldHealth = wakuRelay.topicsHealth.getOrDefault(topic)
currentHealth = wakuRelay.calculateTopicHealth(topic)

if oldHealth == currentHealth:
continue

wakuRelay.topicsHealth[topic] = currentHealth
if not wakuRelay.onTopicHealthChange.isNil():
let fut = wakuRelay.onTopicHealthChange(topic, currentHealth)
if not fut.completed(): # Fast path for successful sync handlers
futs.add(fut)

if futs.len() > 0:
# slow path - we have to wait for the handlers to complete
try:
futs = await allFinished(futs)
except CancelledError:
# check for errors in futures
for fut in futs:
if fut.failed:
let err = fut.readError()
warn "Error in health change handler", description = err.msg

proc topicsHealthLoop(wakuRelay: WakuRelay) {.async.} =
while true:
await wakuRelay.updateTopicsHealth()
await sleepAsync(10.seconds)

method start*(w: WakuRelay) {.async, base.} =
debug "start"
await procCall GossipSub(w).start()
w.topicHealthLoopHandle = w.topicsHealthLoop()

method stop*(w: WakuRelay) {.async, base.} =
debug "stop"
await procCall GossipSub(w).stop()
if not w.topicHealthLoopHandle.isNil():
await w.topicHealthLoopHandle.cancelAndWait()

proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
GossipSub(w).topics.hasKey(topic)
Expand Down Expand Up @@ -455,25 +525,6 @@ proc publish*(

return relayedPeerCount

proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] =
## Returns the number of peers in a mesh defined by the passed pubsub topic.
## The 'mesh' atribute is defined in the GossipSub ref object.

if not w.mesh.hasKey(pubsubTopic):
return err(
"getNumPeersInMesh - there is no mesh peer for the given pubsub topic: " &
pubsubTopic
)

let peersRes = catch:
w.mesh[pubsubTopic]

let peers: HashSet[PubSubPeer] = peersRes.valueOr:
return
err("getNumPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg)

return ok(peers.len)

proc getNumConnectedPeers*(
w: WakuRelay, pubsubTopic: PubsubTopic
): Result[int, string] =
Expand Down
19 changes: 19 additions & 0 deletions waku/waku_relay/topic_health.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import chronos

import ../waku_core

type TopicHealth* = enum
UNHEALTHY
MINIMALLY_HEALTHY
SUFFICIENTLY_HEALTHY

proc `$`*(t: TopicHealth): string =
result =
case t
of UNHEALTHY: "UnHealthy"
of MINIMALLY_HEALTHY: "MinimallyHealthy"
of SUFFICIENTLY_HEALTHY: "SufficientlyHealthy"

type TopicHealthChangeHandler* = proc(
pubsubTopic: PubsubTopic, topicHealth: TopicHealth
): Future[void] {.gcsafe, raises: [Defect].}

0 comments on commit 6020a67

Please sign in to comment.