Skip to content

Commit

Permalink
feat: REST APIs discovery handlers (#2109)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS authored Oct 27, 2023
1 parent b8bcb1e commit 7ca516a
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 117 deletions.
30 changes: 25 additions & 5 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../../waku/waku_api/message_cache,
../../waku/waku_api/cache_handlers,
../../waku/waku_api/handlers,
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/debug/handlers as rest_debug_api,
../../waku/waku_api/rest/relay/handlers as rest_relay_api,
Expand Down Expand Up @@ -679,18 +679,38 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo
rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, legacyFilterCache)

let filterCache = rest_filter_api.MessageCache.init()
rest_filter_api.installFilterRestApiHandlers(server.router, app.node, filterCache)

let filterDiscoHandler =
if app.wakuDiscv5.isSome():
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Filter))
else: none(DiscoveryHandler)

rest_filter_api.installFilterRestApiHandlers(
server.router,
app.node,
filterCache,
filterDiscoHandler,
)
else:
notInstalledTab["filter"] = "/filter endpoints are not available. Please check your configuration: --filternode"


## Store REST API
installStoreApiHandlers(server.router, app.node)
let storeDiscoHandler =
if app.wakuDiscv5.isSome():
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Store))
else: none(DiscoveryHandler)

installStoreApiHandlers(server.router, app.node, storeDiscoHandler)

## Light push API
if conf.lightpushnode != "" and
app.node.wakuLightpushClient != nil:
rest_lightpush_api.installLightPushRequestHandler(server.router, app.node)
let lightDiscoHandler =
if app.wakuDiscv5.isSome():
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Lightpush))
else: none(DiscoveryHandler)

rest_lightpush_api.installLightPushRequestHandler(server.router, app.node, lightDiscoHandler)
else:
notInstalledTab["lightpush"] = "/lightpush endpoints are not available. Please check your configuration: --lightpushnode"

Expand Down
2 changes: 1 addition & 1 deletion tests/wakunode_rest/test_rest_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ procSuite "Waku v2 Rest API - Store":
response.status == 412
$response.contentType == $MIMETYPE_TEXT
response.data.messages.len == 0
response.data.error_message.get == "Missing known store-peer node"
response.data.error_message.get == NoPeerNoDiscError.errobj.message

# Now add the storenode from "config"
node.peerManager.addServicePeer(remotePeerInfo,
Expand Down
23 changes: 0 additions & 23 deletions waku/waku_api/cache_handlers.nim

This file was deleted.

50 changes: 50 additions & 0 deletions waku/waku_api/handlers.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
chronos,
chronicles,
std/[options, sequtils],
stew/results
import
../waku_discv5,
../waku_relay,
../waku_core,
./message_cache

### Discovery

type DiscoveryHandler* = proc(): Future[Result[Option[RemotePeerInfo], string]] {.async, closure.}

proc defaultDiscoveryHandler*(discv5: WakuDiscoveryV5, cap: Capabilities): DiscoveryHandler =
proc(): Future[Result[Option[RemotePeerInfo], string]] {.async, closure.} =
#Discv5 is already filtering peers by shards no need to pass a predicate.
let findPeers = discv5.findRandomPeers()

if not await findPeers.withTimeout(60.seconds):
return err("discovery process timed out!")

var peers = findPeers.read()

peers.keepItIf(it.supportsCapability(cap))

if peers.len == 0:
return ok(none(RemotePeerInfo))

let remotePeerInfo = peers[0].toRemotePeerInfo().valueOr:
return err($error)

return ok(some(remotePeerInfo))

### Message Cache

proc messageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler =
return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
cache.addMessage(PubSubTopic(pubsubTopic), msg)

proc autoMessageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler =
return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
if cache.isSubscribed(msg.contentTopic):
cache.addMessage(msg.contentTopic, msg)
2 changes: 1 addition & 1 deletion waku/waku_api/jsonrpc/relay/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import
../../../waku_rln_relay/rln/wrappers,
../../../waku_node,
../../message_cache,
../../cache_handlers,
../../handlers,
../message

from std/times import getTime
Expand Down
146 changes: 94 additions & 52 deletions waku/waku_api/rest/filter/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import
../../../waku_filter_v2/client as filter_protocol_client,
../../../waku_filter_v2/common as filter_protocol_type,
../../message_cache,
../../handlers,
../serdes,
../responses,
./types
Expand Down Expand Up @@ -145,11 +146,18 @@ proc makeRestResponse(requestId: string, protocolClientRes: filter_protocol_type

return resp.get()

proc filterPostPutSubscriptionRequestHandler(node: WakuNode,
contentBody: Option[ContentBody],
cache: MessageCache):
Future[RestApiResponse]
{.async.} =
const NoPeerNoDiscoError = FilterSubscribeError.serviceUnavailable(
"No suitable service peer & no discovery method")

const NoPeerNoneFoundError = FilterSubscribeError.serviceUnavailable(
"No suitable service peer & none discovered")

proc filterPostPutSubscriptionRequestHandler(
node: WakuNode,
contentBody: Option[ContentBody],
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
): Future[RestApiResponse] {.async.} =
## handles any filter subscription requests, adds or modifies.

let decodedBody = decodeRequestBody[FilterSubscribeRequest](contentBody)
Expand All @@ -159,14 +167,17 @@ proc filterPostPutSubscriptionRequestHandler(node: WakuNode,

let req: FilterSubscribeRequest = decodedBody.value()

let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let handler = discHandler.valueOr:
return makeRestResponse(req.requestId, NoPeerNoDiscoError)

let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError($error)

if peerOpt.isNone():
return makeRestResponse(req.requestId, FilterSubscribeError.serviceUnavailable("No suitable peers"))
peerOp.valueOr:
return makeRestResponse(req.requestId, NoPeerNoneFoundError)

let subFut = node.filterSubscribe(req.pubsubTopic,
req.contentFilters,
peerOpt.get())
let subFut = node.filterSubscribe(req.pubsubTopic, req.contentFilters, peer)

if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to subscribe to contentFilters do to timeout!"
Expand All @@ -178,29 +189,36 @@ proc filterPostPutSubscriptionRequestHandler(node: WakuNode,

return makeRestResponse(req.requestId, subFut.read())

proc installFilterPostSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
proc installFilterPostSubscriptionsHandler(
router: var RestRouter,
node: WakuNode,
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a pubsubTopic
debug "post", ROUTE_FILTER_SUBSCRIPTIONS, contentBody

let response = await filterPostPutSubscriptionRequestHandler(node, contentBody, cache)
return response
return await filterPostPutSubscriptionRequestHandler(node, contentBody, cache, discHandler)

proc installFilterPutSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
proc installFilterPutSubscriptionsHandler(
router: var RestRouter,
node: WakuNode,
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodPut, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Modifies a subscribtion of a node to a list of contentTopics of a pubsubTopic
debug "put", ROUTE_FILTER_SUBSCRIPTIONS, contentBody

let response = await filterPostPutSubscriptionRequestHandler(node, contentBody, cache)
return response
return await filterPostPutSubscriptionRequestHandler(node, contentBody, cache, discHandler)

proc installFilterDeleteSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
proc installFilterDeleteSubscriptionsHandler(
router: var RestRouter,
node: WakuNode,
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a PubSub topic
debug "delete", ROUTE_FILTER_SUBSCRIPTIONS, contentBody
Expand All @@ -213,13 +231,18 @@ proc installFilterDeleteSubscriptionsHandler(router: var RestRouter,

let req: FilterUnsubscribeRequest = decodedBody.value()

let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let handler = discHandler.valueOr:
return makeRestResponse(req.requestId, NoPeerNoDiscoError)

if peerOpt.isNone():
return makeRestResponse(req.requestId,
FilterSubscribeError.serviceUnavailable("No suitable peers"))
let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError($error)

peerOp.valueOr:
return makeRestResponse(req.requestId, NoPeerNoneFoundError)

let unsubFut = node.filterUnsubscribe(req.pubsubTopic, req.contentFilters, peer)

let unsubFut = node.filterUnsubscribe(req.pubsubTopic, req.contentFilters, peerOpt.get())
if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to unsubscribe from contentFilters due to timeout!"
return makeRestResponse(req.requestId,
Expand All @@ -233,9 +256,12 @@ proc installFilterDeleteSubscriptionsHandler(router: var RestRouter,
# Successfully unsubscribed from all requested contentTopics
return makeRestResponse(req.requestId, unsubFut.read())

proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
proc installFilterDeleteAllSubscriptionsHandler(
router: var RestRouter,
node: WakuNode,
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodDelete, ROUTE_FILTER_ALL_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a PubSub topic
debug "delete", ROUTE_FILTER_ALL_SUBSCRIPTIONS, contentBody
Expand All @@ -248,13 +274,18 @@ proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter,

let req: FilterUnsubscribeAllRequest = decodedBody.value()

let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let handler = discHandler.valueOr:
return makeRestResponse(req.requestId, NoPeerNoDiscoError)

if peerOpt.isNone():
return makeRestResponse(req.requestId,
FilterSubscribeError.serviceUnavailable("No suitable peers"))
let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError($error)

peerOp.valueOr:
return makeRestResponse(req.requestId, NoPeerNoneFoundError)

let unsubFut = node.filterUnsubscribeAll(peerOpt.get())
let unsubFut = node.filterUnsubscribeAll(peer)

if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to unsubscribe from contentFilters due to timeout!"
return makeRestResponse(req.requestId,
Expand All @@ -268,18 +299,26 @@ proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter,

const ROUTE_FILTER_SUBSCRIBER_PING* = "/filter/v2/subscriptions/{requestId}"

proc installFilterPingSubscriberHandler(router: var RestRouter,
node: WakuNode) =
proc installFilterPingSubscriberHandler(
router: var RestRouter,
node: WakuNode,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodGet, ROUTE_FILTER_SUBSCRIBER_PING) do (requestId: string) -> RestApiResponse:
## Checks if a node has valid subscription or not.
debug "get", ROUTE_FILTER_SUBSCRIBER_PING, requestId

let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
if peerOpt.isNone():
return makeRestResponse(requestId.get(),
FilterSubscribeError.serviceUnavailable("No suitable remote filter peers"))
let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let handler = discHandler.valueOr:
return makeRestResponse(requestId.get(), NoPeerNoDiscoError)

let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError($error)

peerOp.valueOr:
return makeRestResponse(requestId.get(), NoPeerNoneFoundError)

let pingFutRes = node.wakuFilterClient.ping(peerOpt.get())
let pingFutRes = node.wakuFilterClient.ping(peer)

if not await pingFutRes.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to ping filter service peer due to timeout!"
Expand Down Expand Up @@ -325,12 +364,15 @@ proc installFilterGetMessagesHandler(router: var RestRouter,

return resp.get()

proc installFilterRestApiHandlers*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
installFilterPingSubscriberHandler(router, node)
installFilterPostSubscriptionsHandler(router, node, cache)
installFilterPutSubscriptionsHandler(router, node, cache)
installFilterDeleteSubscriptionsHandler(router, node, cache)
installFilterDeleteAllSubscriptionsHandler(router, node, cache)
proc installFilterRestApiHandlers*(
router: var RestRouter,
node: WakuNode,
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
installFilterPingSubscriberHandler(router, node, discHandler)
installFilterPostSubscriptionsHandler(router, node, cache, discHandler)
installFilterPutSubscriptionsHandler(router, node, cache, discHandler)
installFilterDeleteSubscriptionsHandler(router, node, cache, discHandler)
installFilterDeleteAllSubscriptionsHandler(router, node, cache, discHandler)
installFilterGetMessagesHandler(router, node, cache)
Loading

0 comments on commit 7ca516a

Please sign in to comment.