Skip to content

Commit

Permalink
Merge branch 'master' into chore-updating-chronicles
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmer authored Apr 15, 2024
2 parents 20e98fc + 480a62f commit c1a8b1e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 9 deletions.
4 changes: 2 additions & 2 deletions waku/waku_api/rest/store/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ proc writeValue*(
writer.beginRecord()
writer.writeField("payload", $value.payload)
if value.contentTopic.isSome():
writer.writeField("content_topic", value.contentTopic.get())
writer.writeField("contentTopic", value.contentTopic.get())
if value.version.isSome():
writer.writeField("version", value.version.get())
if value.timestamp.isSome():
Expand Down Expand Up @@ -176,7 +176,7 @@ proc readValue*(
case fieldName
of "payload":
payload = some(reader.readValue(Base64String))
of "content_topic":
of "contentTopic":
contentTopic = some(reader.readValue(ContentTopic))
of "version":
version = some(reader.readValue(uint32))
Expand Down
42 changes: 35 additions & 7 deletions waku/waku_rln_relay/group_manager/on_chain/group_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,19 @@ proc startListeningToEvents(
let newBlockCallback = g.getNewBlockCallback()
g.runInInterval(newBlockCallback, DefaultBlockPollRate)

proc batchAwaitBlockHandlingFuture(
g: OnchainGroupManager, futs: seq[Future[bool]]
): Future[void] {.async: (raises: [Exception]).} =
for fut in futs:
try:
var handleBlockRes: bool
g.retryWrapper(handleBlockRes, "Failed to handle block"):
await fut
except CatchableError:
raise newException(
CatchableError, "could not fetch events from block: " & getCurrentExceptionMsg()
)

proc startOnchainSync(
g: OnchainGroupManager
): Future[void] {.async: (raises: [Exception]).} =
Expand All @@ -606,6 +619,10 @@ proc startOnchainSync(

# static block chunk size
let blockChunkSize = 2_000
# delay between rpc calls to not overload the rate limit
let rpcDelay = 200.milliseconds
# max number of futures to run concurrently
let maxFutures = 10

var fromBlock =
if g.latestProcessedBlock > g.rlnContractDeployedBlockNumber:
Expand All @@ -616,25 +633,36 @@ proc startOnchainSync(
blockNumber = g.rlnContractDeployedBlockNumber
g.rlnContractDeployedBlockNumber

var futs = newSeq[Future[bool]]()
var currentLatestBlock: BlockNumber
g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())

try:
# we always want to sync from last processed block => latest
# chunk events
while true:
var currentLatestBlock: BlockNumber
g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
# if the fromBlock is less than 2k blocks behind the current block
# then fetch the new toBlock
if fromBlock >= currentLatestBlock:
break

if fromBlock + blockChunkSize.uint > currentLatestBlock.uint:
g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())


let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
debug "fetching events", fromBlock = fromBlock, toBlock = toBlock
var handleBlockRes: bool
g.retryWrapper(handleBlockRes, "Failed to handle old blocks"):
await g.getAndHandleEvents(fromBlock, toBlock)
await sleepAsync(rpcDelay)
futs.add(g.getAndHandleEvents(fromBlock, toBlock))
if futs.len >= maxFutures or toBlock == currentLatestBlock:
await g.batchAwaitBlockHandlingFuture(futs)
futs = newSeq[Future[bool]]()
fromBlock = toBlock + 1
except CatchableError:
raise newException(
ValueError,
CatchableError,
"failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg(),
)

Expand Down

0 comments on commit c1a8b1e

Please sign in to comment.