From d2578553530f789147104cd9ac7c035615289b21 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 12 Apr 2024 17:47:32 +0200 Subject: [PATCH 1/2] fix: rest store: content_topic -> contentTopic in the response (#2584) --- waku/waku_api/rest/store/types.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/waku/waku_api/rest/store/types.nim b/waku/waku_api/rest/store/types.nim index 8d08e8693c..7519b4fc69 100644 --- a/waku/waku_api/rest/store/types.nim +++ b/waku/waku_api/rest/store/types.nim @@ -140,7 +140,7 @@ proc writeValue*( writer.beginRecord() writer.writeField("payload", $value.payload) if value.contentTopic.isSome(): - writer.writeField("content_topic", value.contentTopic.get()) + writer.writeField("contentTopic", value.contentTopic.get()) if value.version.isSome(): writer.writeField("version", value.version.get()) if value.timestamp.isSome(): @@ -176,7 +176,7 @@ proc readValue*( case fieldName of "payload": payload = some(reader.readValue(Base64String)) - of "content_topic": + of "contentTopic": contentTopic = some(reader.readValue(ContentTopic)) of "version": version = some(reader.readValue(uint32)) From 480a62facdecd899c014ecdeedcc3f0be0085fd5 Mon Sep 17 00:00:00 2001 From: Aaryamann Challani <43716372+rymnc@users.noreply.github.com> Date: Fri, 12 Apr 2024 19:02:48 +0300 Subject: [PATCH 2/2] fix(rln-relay): reduce sync time (#2577) * fix(rln-relay): reduce sync time * fix: add batch handling of futures to prevent over utilization of cpu * fix: need to handle the futures on last iteration when it isnt full --- .../group_manager/on_chain/group_manager.nim | 42 +++++++++++++++---- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index 6237b287bc..118e6c5abe 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -597,6 +597,19 @@ proc startListeningToEvents( let newBlockCallback = g.getNewBlockCallback() g.runInInterval(newBlockCallback, DefaultBlockPollRate) +proc batchAwaitBlockHandlingFuture( + g: OnchainGroupManager, futs: seq[Future[bool]] +): Future[void] {.async: (raises: [Exception]).} = + for fut in futs: + try: + var handleBlockRes: bool + g.retryWrapper(handleBlockRes, "Failed to handle block"): + await fut + except CatchableError: + raise newException( + CatchableError, "could not fetch events from block: " & getCurrentExceptionMsg() + ) + proc startOnchainSync( g: OnchainGroupManager ): Future[void] {.async: (raises: [Exception]).} = @@ -606,6 +619,10 @@ proc startOnchainSync( # static block chunk size let blockChunkSize = 2_000 + # delay between rpc calls to not overload the rate limit + let rpcDelay = 200.milliseconds + # max number of futures to run concurrently + let maxFutures = 10 var fromBlock = if g.latestProcessedBlock > g.rlnContractDeployedBlockNumber: @@ -616,25 +633,36 @@ proc startOnchainSync( blockNumber = g.rlnContractDeployedBlockNumber g.rlnContractDeployedBlockNumber + var futs = newSeq[Future[bool]]() + var currentLatestBlock: BlockNumber + g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"): + cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) + try: # we always want to sync from last processed block => latest # chunk events while true: - var currentLatestBlock: BlockNumber - g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"): - cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) + # if the fromBlock is less than 2k blocks behind the current block + # then fetch the new toBlock if fromBlock >= currentLatestBlock: break + + if fromBlock + blockChunkSize.uint > currentLatestBlock.uint: + g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"): + cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) + let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock) debug "fetching events", fromBlock = fromBlock, toBlock = toBlock - var handleBlockRes: bool - g.retryWrapper(handleBlockRes, "Failed to handle old blocks"): - await g.getAndHandleEvents(fromBlock, toBlock) + await sleepAsync(rpcDelay) + futs.add(g.getAndHandleEvents(fromBlock, toBlock)) + if futs.len >= maxFutures or toBlock == currentLatestBlock: + await g.batchAwaitBlockHandlingFuture(futs) + futs = newSeq[Future[bool]]() fromBlock = toBlock + 1 except CatchableError: raise newException( - ValueError, + CatchableError, "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg(), )