Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: topic health tracking #3212

Merged
merged 14 commits into from
Dec 24, 2024
23 changes: 23 additions & 0 deletions library/events/json_topic_health_change_event.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import system, results, std/json
import stew/byteutils
import ../../waku/common/base64, ./json_base_event
import ../../waku/waku_relay

type JsonTopicHealthChangeEvent* = ref object of JsonEvent
pubsubTopic*: string
topicHealth*: TopicHealth

proc new*(
T: type JsonTopicHealthChangeEvent, pubsubTopic: string, topicHealth: TopicHealth
): T =
# Returns a TopicHealthChange event as indicated in
# https://rfc.vac.dev/spec/36/#jsonmessageevent-type

return JsonTopicHealthChangeEvent(
eventType: "relay_topic_health_change",
pubsubTopic: pubsubTopic,
topicHealth: topicHealth,
)

method `$`*(jsonTopicHealthChange: JsonTopicHealthChangeEvent): string =
$(%*jsonTopicHealthChange)
34 changes: 31 additions & 3 deletions library/libwaku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import
waku/node/waku_node,
waku/waku_core/topics/pubsub_topic,
waku/waku_core/subscription/push_handler,
waku/waku_relay/protocol,
./events/json_message_event,
waku/waku_relay,
./events/[json_message_event, json_topic_health_change_event],
./waku_thread/waku_thread,
./waku_thread/inter_thread_communication/requests/node_lifecycle_request,
./waku_thread/inter_thread_communication/requests/peer_manager_request,
Expand Down Expand Up @@ -84,6 +84,31 @@ proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler =
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)

proc onTopicHealthChange(ctx: ptr WakuContext): TopicHealthChangeHandler =
gabrielmer marked this conversation as resolved.
Show resolved Hide resolved
return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} =
# Callback that hadles the Waku Relay events. i.e. messages or errors.
if isNil(ctx[].eventCallback):
error "onTopicHealthChange - eventCallback is nil"
return

if isNil(ctx[].eventUserData):
error "onTopicHealthChange - eventUserData is nil"
return

foreignThreadGc:
try:
let event = $JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth)
cast[WakuCallBack](ctx[].eventCallback)(
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
)
except Exception, CatchableError:
let msg =
"Exception onTopicHealthChange when calling 'eventCallBack': " &
getCurrentExceptionMsg()
cast[WakuCallBack](ctx[].eventCallback)(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)

### End of not-exported components
################################################################################

Expand Down Expand Up @@ -139,7 +164,10 @@ proc waku_new(

ctx.userData = userData

let appCallbacks = AppCallbacks(relayHandler: onReceivedMessage(ctx))
let appCallbacks = AppCallbacks(
relayHandler: onReceivedMessage(ctx),
topicHealthChangeHandler: onTopicHealthChange(ctx),
)

let retCode = handleRequest(
ctx,
Expand Down
3 changes: 2 additions & 1 deletion waku/factory/app_callbacks.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ../waku_relay/protocol
import ../waku_relay

type AppCallbacks* = ref object
relayHandler*: WakuRelayHandler
topicHealthChangeHandler*: TopicHealthChangeHandler
8 changes: 7 additions & 1 deletion waku/factory/waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,13 @@ proc setupAppCallbacks(
for shard in shards:
discard node.wakuRelay.subscribe($shard, appCallbacks.relayHandler)

return ok()
if not appCallbacks.topicHealthChangeHandler.isNil():
if node.wakuRelay.isNil():
return
err("Cannot configure topicHealthChangeHandler callback without Relay mounted")
node.wakuRelay.onTopicHealthChange = appCallbacks.topicHealthChangeHandler

return ok()

proc new*(
T: type Waku, confCopy: var WakuNodeConf, appCallbacks: AppCallbacks = nil
Expand Down
4 changes: 2 additions & 2 deletions waku/waku_relay.nim
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import ./waku_relay/protocol
import ./waku_relay/[protocol, topic_health]

export protocol
export protocol, topic_health
91 changes: 71 additions & 20 deletions waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import
libp2p/protocols/pubsub/rpc/messages,
libp2p/stream/connection,
libp2p/switch
import ../waku_core, ./message_id, ../node/delivery_monitor/publish_observer
import
../waku_core, ./message_id, ./topic_health, ../node/delivery_monitor/publish_observer

from ../waku_core/codecs import WakuRelayCodec
export WakuRelayCodec
Expand Down Expand Up @@ -131,6 +132,9 @@ type
# a map of validators to error messages to return when validation fails
validatorInserted: Table[PubsubTopic, bool]
publishObservers: seq[PublishObserver]
topicsHealth*: Table[string, TopicHealth]
onTopicHealthChange*: TopicHealthChangeHandler
topicHealthLoopHandle*: Future[void]

proc initProtocolHandler(w: WakuRelay) =
proc handler(conn: Connection, proto: string) {.async.} =
Expand Down Expand Up @@ -289,6 +293,7 @@ proc new*(
procCall GossipSub(w).initPubSub()
w.initProtocolHandler()
w.initRelayObservers()
w.topicsHealth = initTable[string, TopicHealth]()
gabrielmer marked this conversation as resolved.
Show resolved Hide resolved
except InitializationError:
return err("initialization error: " & getCurrentExceptionMsg())

Expand All @@ -309,13 +314,78 @@ proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
## Observes when a message is sent/received from the GossipSub PoV
procCall GossipSub(w).addObserver(observer)

proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] =
## Returns the number of peers in a mesh defined by the passed pubsub topic.
## The 'mesh' atribute is defined in the GossipSub ref object.

if not w.mesh.hasKey(pubsubTopic):
debug "getNumPeersInMesh - there is no mesh peer for the given pubsub topic",
pubsubTopic = pubsubTopic
return ok(0)

let peersRes = catch:
w.mesh[pubsubTopic]

let peers: HashSet[PubSubPeer] = peersRes.valueOr:
return
err("getNumPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg)

return ok(peers.len)

proc calculateTopicHealth(wakuRelay: WakuRelay, topic: string): TopicHealth =
let numPeersInMesh = wakuRelay.getNumPeersInMesh(topic).valueOr:
error "Could not calculate topic health", topic = topic, error = error
return TopicHealth.UNHEALTHY

if numPeersInMesh < 1:
return TopicHealth.UNHEALTHY
elif numPeersInMesh < wakuRelay.parameters.dLow:
return TopicHealth.MINIMALLY_HEALTHY
return TopicHealth.SUFFICIENTLY_HEALTHY

proc updateTopicsHealth(wakuRelay: WakuRelay) {.async.} =
var futs = newSeq[Future[void]]()
for topic in toSeq(wakuRelay.topics.keys):
## loop over all the topics I'm subscribed to
let
oldHealth = wakuRelay.topicsHealth.getOrDefault(topic)
currentHealth = wakuRelay.calculateTopicHealth(topic)

if oldHealth == currentHealth:
continue

wakuRelay.topicsHealth[topic] = currentHealth
if not wakuRelay.onTopicHealthChange.isNil():
let fut = wakuRelay.onTopicHealthChange(topic, currentHealth)
if not fut.completed(): # Fast path for successful sync handlers
futs.add(fut)

if futs.len() > 0:
# slow path - we have to wait for the handlers to complete
try:
futs = await allFinished(futs)
except CancelledError:
# check for errors in futures
for fut in futs:
if fut.failed:
let err = fut.readError()
warn "Error in health change handler", description = err.msg

proc topicsHealthLoop(wakuRelay: WakuRelay) {.async.} =
while true:
await wakuRelay.updateTopicsHealth()
await sleepAsync(10.seconds)

method start*(w: WakuRelay) {.async, base.} =
debug "start"
await procCall GossipSub(w).start()
w.topicHealthLoopHandle = w.topicsHealthLoop()

method stop*(w: WakuRelay) {.async, base.} =
debug "stop"
await procCall GossipSub(w).stop()
if not w.topicHealthLoopHandle.isNil():
await w.topicHealthLoopHandle.cancelAndWait()

proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
GossipSub(w).topics.hasKey(topic)
Expand Down Expand Up @@ -455,25 +525,6 @@ proc publish*(

return relayedPeerCount

proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] =
## Returns the number of peers in a mesh defined by the passed pubsub topic.
## The 'mesh' atribute is defined in the GossipSub ref object.

if not w.mesh.hasKey(pubsubTopic):
return err(
"getNumPeersInMesh - there is no mesh peer for the given pubsub topic: " &
pubsubTopic
)

let peersRes = catch:
w.mesh[pubsubTopic]

let peers: HashSet[PubSubPeer] = peersRes.valueOr:
return
err("getNumPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg)

return ok(peers.len)

proc getNumConnectedPeers*(
w: WakuRelay, pubsubTopic: PubsubTopic
): Result[int, string] =
Expand Down
19 changes: 19 additions & 0 deletions waku/waku_relay/topic_health.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import chronos

import ../waku_core

type TopicHealth* = enum
UNHEALTHY
MINIMALLY_HEALTHY
SUFFICIENTLY_HEALTHY

proc `$`*(t: TopicHealth): string =
result =
case t
of UNHEALTHY: "UnHealthy"
of MINIMALLY_HEALTHY: "MinimallyHealthy"
of SUFFICIENTLY_HEALTHY: "SufficientlyHealthy"

type TopicHealthChangeHandler* = proc(
pubsubTopic: PubsubTopic, topicHealth: TopicHealth
): Future[void] {.gcsafe, raises: [Defect].}
Loading