Skip to content

Commit

Permalink
chore: libwaku tweaks (#3233)
Browse files Browse the repository at this point in the history
* make lightpush return msg hash after successful publish
* libwaku avoid the use of string
* library alloc.nim allocate memory when nil cstring is passed
* libwaku store_request remove extra destroyShared(self)
  • Loading branch information
Ivansete-status authored Jan 8, 2025
1 parent e81a551 commit 625c8ee
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 59 deletions.
5 changes: 5 additions & 0 deletions library/alloc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int]
proc alloc*(str: cstring): cstring =
# Byte allocation from the given address.
# There should be the corresponding manual deallocation with deallocShared !
if str.isNil():
var ret = cast[cstring](allocShared(1)) # Allocate memory for the null terminator
ret[0] = '\0' # Set the null terminator
return ret

let ret = cast[cstring](allocShared(len(str) + 1))
copyMem(ret, str, len(str) + 1)
return ret
Expand Down
32 changes: 7 additions & 25 deletions library/libwaku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -313,16 +313,10 @@ proc waku_relay_publish(
defer:
deallocShared(pst)

let targetPubSubTopic =
if len(pst) == 0:
DefaultPubsubTopic
else:
$pst

handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.PUBLISH, PubsubTopic($pst), nil, wakuMessage),
RelayRequest.createShared(RelayMsgType.PUBLISH, pst, nil, wakuMessage),
callback,
userData,
)
Expand Down Expand Up @@ -370,9 +364,7 @@ proc waku_relay_subscribe(
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(
RelayMsgType.SUBSCRIBE, PubsubTopic($pst), WakuRelayHandler(cb)
),
RelayRequest.createShared(RelayMsgType.SUBSCRIBE, pst, WakuRelayHandler(cb)),
callback,
userData,
)
Expand All @@ -398,7 +390,7 @@ proc waku_relay_add_protected_shard(
RelayMsgType.ADD_PROTECTED_SHARD,
clusterId = clusterId,
shardId = shardId,
publicKey = $pubk,
publicKey = pubk,
),
callback,
userData,
Expand All @@ -421,9 +413,7 @@ proc waku_relay_unsubscribe(
ctx,
RequestType.RELAY,
RelayRequest.createShared(
RelayMsgType.UNSUBSCRIBE,
PubsubTopic($pst),
WakuRelayHandler(onReceivedMessage(ctx)),
RelayMsgType.UNSUBSCRIBE, pst, WakuRelayHandler(onReceivedMessage(ctx))
),
callback,
userData,
Expand All @@ -445,7 +435,7 @@ proc waku_relay_get_num_connected_peers(
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.LIST_CONNECTED_PEERS, PubsubTopic($pst)),
RelayRequest.createShared(RelayMsgType.LIST_CONNECTED_PEERS, pst),
callback,
userData,
)
Expand All @@ -466,7 +456,7 @@ proc waku_relay_get_num_peers_in_mesh(
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.LIST_MESH_PEERS, PubsubTopic($pst)),
RelayRequest.createShared(RelayMsgType.LIST_MESH_PEERS, pst),
callback,
userData,
)
Expand Down Expand Up @@ -557,18 +547,10 @@ proc waku_lightpush_publish(
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

let targetPubSubTopic =
if len(pst) == 0:
DefaultPubsubTopic
else:
$pst

handleRequest(
ctx,
RequestType.LIGHTPUSH,
LightpushRequest.createShared(
LightpushMsgType.PUBLISH, PubsubTopic($pst), wakuMessage
),
LightpushRequest.createShared(LightpushMsgType.PUBLISH, pst, wakuMessage),
callback,
userData,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type LightpushRequest* = object
proc createShared*(
T: type LightpushRequest,
op: LightpushMsgType,
pubsubTopic: PubsubTopic,
pubsubTopic: cstring,
m = WakuMessage(),
): ptr type T =
var ret = createShared(T)
Expand Down Expand Up @@ -97,12 +97,12 @@ proc process*(
error "PUBLISH failed", error = errorMsg
return err(errorMsg)

(
let msgHashHex = (
await waku.node.wakuLightpushClient.publish(
pubsubTopic, msg, peer = peerOpt.get()
)
).isOkOr:
).valueOr:
error "PUBLISH failed", error = error
return err("LightpushRequest error publishing: " & $error)

return ok("")
return ok(msgHashHex)
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ type RelayRequest* = object
proc createShared*(
T: type RelayRequest,
op: RelayMsgType,
pubsubTopic: PubsubTopic = "",
pubsubTopic: cstring = nil,
relayEventCallback: WakuRelayHandler = nil,
m = WakuMessage(),
clusterId: cint = 0,
shardId: cint = 0,
publicKey: string = "",
publicKey: cstring = nil,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,6 @@ proc destroyShared(self: ptr StoreRequest) =
proc process_remote_query(
self: ptr StoreRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)

let jsonContentRes = catch:
parseJson($self[].jsonQuery)

Expand Down
14 changes: 4 additions & 10 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ proc lightpushPublish*(
pubsubTopic: Option[PubsubTopic],
message: WakuMessage,
peer: RemotePeerInfo,
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
): Future[WakuLightPushResult[string]] {.async, gcsafe.} =
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
## Returns whether relaying was successful or not.
## `WakuMessage` should contain a `contentTopic` field for light node
Expand All @@ -986,7 +986,7 @@ proc lightpushPublish*(
pubsubTopic: PubsubTopic,
message: WakuMessage,
peer: RemotePeerInfo,
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
): Future[WakuLightPushResult[string]] {.async, gcsafe.} =
let msgHash = pubsubTopic.computeMessageHash(message).to0xHex()
if not node.wakuLightpushClient.isNil():
notice "publishing message with lightpush",
Expand Down Expand Up @@ -1023,7 +1023,7 @@ proc lightpushPublish*(
# TODO: Move to application module (e.g., wakunode2.nim)
proc lightpushPublish*(
node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage
): Future[WakuLightPushResult[void]] {.
): Future[WakuLightPushResult[string]] {.
async, gcsafe, deprecated: "Use 'node.lightpushPublish()' instead"
.} =
if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil():
Expand All @@ -1040,13 +1040,7 @@ proc lightpushPublish*(
elif not node.wakuLightPush.isNil():
peerOpt = some(RemotePeerInfo.init($node.switch.peerInfo.peerId))

let publishRes =
await node.lightpushPublish(pubsubTopic, message, peer = peerOpt.get())

if publishRes.isErr():
error "failed to publish message", error = publishRes.error

return publishRes
return await node.lightpushPublish(pubsubTopic, message, peer = peerOpt.get())

## Waku RLN Relay
proc mountRlnRelay*(
Expand Down
23 changes: 11 additions & 12 deletions waku/waku_lightpush/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,23 @@ proc publish*(
wl: WakuLightPushClient,
pubSubTopic: PubsubTopic,
message: WakuMessage,
peer: PeerId | RemotePeerInfo,
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
when peer is PeerId:
info "publish",
peerId = shortLog(peer),
msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
else:
info "publish",
peerId = shortLog(peer.peerId),
msg_hash = computeMessageHash(pubsubTopic, message).to0xHex

peer: RemotePeerInfo,
): Future[WakuLightPushResult[string]] {.async, gcsafe.} =
## On success, returns the msg_hash of the published message
let msg_hash_hex_str = computeMessageHash(pubsubTopic, message).to0xHex()
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
?await wl.sendPushRequest(pushRequest, peer)

for obs in wl.publishObservers:
obs.onMessagePublished(pubSubTopic, message)

return ok()
notice "publishing message with lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
msg_hash = msg_hash_hex_str

return ok(msg_hash_hex_str)

proc publishToAny*(
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage
Expand Down
15 changes: 12 additions & 3 deletions waku/waku_lightpush/self_req_handler.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
## which spawn a full service Waku node
## that could be used also as a lightpush client, helping testing and development.

import results, chronos, std/options, metrics
import results, chronos, chronicles, std/options, metrics, stew/byteutils
import
../waku_core,
./protocol,
Expand All @@ -21,9 +21,10 @@ import

proc handleSelfLightPushRequest*(
self: WakuLightPush, pubSubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
): Future[WakuLightPushResult[string]] {.async.} =
## Handles the lightpush requests made by the node to itself.
## Normally used in REST-lightpush requests
## On success, returns the msg_hash of the published message.

try:
# provide self peerId as now this node is used directly, thus there is no light client sender peer.
Expand All @@ -45,6 +46,14 @@ proc handleSelfLightPushRequest*(
else:
return err("unknown failure")

return ok()
let msg_hash_hex_str = computeMessageHash(pubSubTopic, message).to0xHex()

notice "publishing message with self hosted lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
self_peer_id = selfPeerId,
msg_hash = msg_hash_hex_str

return ok(msg_hash_hex_str)
except Exception:
return err("exception in handleSelfLightPushRequest: " & getCurrentExceptionMsg())

0 comments on commit 625c8ee

Please sign in to comment.