From 4eed13d213aa50d52f67304459b73599bf0063c8 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Mon, 2 Dec 2024 17:49:53 +0000 Subject: [PATCH] Treating header/body process error different from response errors why: Error handling becomes active not until some consecutive failures appear. As both types of errors may interleave (i.g. no response errors) the counter reset for one type might affect the other. By doing it wrong, a peer might send repeatedly a bogus block so locking in the syncer in an endless loop. --- nimbus/sync/beacon/worker/blocks_staged.nim | 23 +++++++++++++---- .../beacon/worker/blocks_staged/bodies.nim | 14 +++++------ nimbus/sync/beacon/worker/headers_staged.nim | 25 +++++++++++++------ .../beacon/worker/headers_staged/headers.nim | 19 +++++++++----- nimbus/sync/beacon/worker_config.nim | 7 ++++++ nimbus/sync/beacon/worker_desc.nim | 6 +++-- 6 files changed, 66 insertions(+), 28 deletions(-) diff --git a/nimbus/sync/beacon/worker/blocks_staged.nim b/nimbus/sync/beacon/worker/blocks_staged.nim index e0abe4a707..510ca7ad84 100644 --- a/nimbus/sync/beacon/worker/blocks_staged.nim +++ b/nimbus/sync/beacon/worker/blocks_staged.nim @@ -173,6 +173,9 @@ proc blocksStagedCollect*( # so that `async` can capture that properly. blk = (ref BlocksForImport)() + # Flag, not to reset error count + haveError = false + # nFetchBodiesRequest while true: # Extract bottom range interval and fetch/stage it @@ -196,15 +199,20 @@ proc blocksStagedCollect*( # Throw away first time block fetch data. Keep other data for a # partially assembled list. if nBlkBlocks == 0: - buddy.only.nBdyRespErrors.inc + buddy.only.nBdyProcErrors.inc + haveError = true - if (1 < buddy.only.nBdyRespErrors and buddy.ctrl.stopped) or - fetchBodiesReqErrThresholdCount < buddy.only.nBdyRespErrors: + if ((0 < buddy.only.nBdyRespErrors or + 0 < buddy.only.nBdyProcErrors) and buddy.ctrl.stopped) or + fetchBodiesReqErrThresholdCount < buddy.only.nBdyRespErrors or + fetchBodiesProcessErrThresholdCount < buddy.only.nBdyProcErrors: # Make sure that this peer does not immediately reconnect buddy.ctrl.zombie = true + trace info & ": current block list discarded", peer, iv, ivReq, nStaged=ctx.blk.staged.len, ctrl=buddy.ctrl.state, - nRespErrors=buddy.only.nBdyRespErrors + bdyErrors=buddy.bdyErrors + ctx.blocksUnprocCommit(iv.len, iv) # At this stage allow a task switch so that some other peer might try # to work on the currently returned interval. @@ -240,8 +248,13 @@ proc blocksStagedCollect*( raiseAssert info & ": duplicate key on staged queue iv=" & $iv qItem.data = blk[] + # Reset block process errors (not too many consecutive failures this time) + if not haveError: + buddy.only.nBdyProcErrors = 0 + trace info & ": staged blocks", peer, bottomBlock=iv.minPt.bnStr, - nBlocks=blk.blocks.len, nStaged=ctx.blk.staged.len, ctrl=buddy.ctrl.state + nBlocks=blk.blocks.len, nStaged=ctx.blk.staged.len, ctrl=buddy.ctrl.state, + bdyErrors=buddy.bdyErrors # Update, so it can be followed nicely ctx.updateMetrics() diff --git a/nimbus/sync/beacon/worker/blocks_staged/bodies.nim b/nimbus/sync/beacon/worker/blocks_staged/bodies.nim index 68009e4668..d7aa187a76 100644 --- a/nimbus/sync/beacon/worker/blocks_staged/bodies.nim +++ b/nimbus/sync/beacon/worker/blocks_staged/bodies.nim @@ -22,6 +22,9 @@ import # Public functions # ------------------------------------------------------------------------------ +func bdyErrors*(buddy: BeaconBuddyRef): string = + $buddy.only.nBdyRespErrors & "/" & $buddy.only.nBdyProcErrors + proc fetchRegisterError*(buddy: BeaconBuddyRef) = buddy.only.nBdyRespErrors.inc if fetchBodiesReqErrThresholdCount < buddy.only.nBdyRespErrors: @@ -39,8 +42,7 @@ proc bodiesFetch*( start = Moment.now() nReq = blockHashes.len - trace trEthSendSendingGetBlockBodies, peer, nReq, - nRespErrors=buddy.only.nBdyRespErrors + trace trEthSendSendingGetBlockBodies, peer, nReq, bdyErrors=buddy.bdyErrors var resp: Option[blockBodiesObj] try: @@ -48,7 +50,7 @@ proc bodiesFetch*( except CatchableError as e: buddy.fetchRegisterError() `info` info & " error", peer, nReq, elapsed=(Moment.now() - start).toStr, - error=($e.name), msg=e.msg, nRespErrors=buddy.only.nBdyRespErrors + error=($e.name), msg=e.msg, bdyErrors=buddy.bdyErrors return err() let elapsed = Moment.now() - start @@ -57,8 +59,7 @@ proc bodiesFetch*( if resp.isNone or buddy.ctrl.stopped: buddy.fetchRegisterError() trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=0, - elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, - nRespErrors=buddy.only.nBdyRespErrors + elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, bdyErrors=buddy.bdyErrors return err() let b: seq[BlockBody] = resp.get.blocks @@ -78,8 +79,7 @@ proc bodiesFetch*( buddy.only.nBdyRespErrors = 0 # reset error count trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=b.len, - elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, - nRespErrors=buddy.only.nBdyRespErrors + elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, bdyErrors=buddy.bdyErrors return ok(b) diff --git a/nimbus/sync/beacon/worker/headers_staged.nim b/nimbus/sync/beacon/worker/headers_staged.nim index 80696688c3..7fb934cfb9 100644 --- a/nimbus/sync/beacon/worker/headers_staged.nim +++ b/nimbus/sync/beacon/worker/headers_staged.nim @@ -132,6 +132,9 @@ proc headersStagedCollect*( # so that `async` can capture that properly. lhc = (ref LinkedHChain)(parentHash: topLink) + # Flag, not to reset error count + haveError = false + while true: # Extract top range interval and fetch/stage it let @@ -151,15 +154,17 @@ proc headersStagedCollect*( # Throw away opportunistic data (or first time header fetch.) Keep # other data for a partially assembled list. if isOpportunistic or nLhcHeaders == 0: - buddy.only.nHdrRespErrors.inc + buddy.only.nHdrProcErrors.inc + haveError = true - if (0 < buddy.only.nHdrRespErrors and buddy.ctrl.stopped) or - fetchHeadersReqErrThresholdCount < buddy.only.nHdrRespErrors: + if ((0 < buddy.only.nHdrRespErrors or + 0 < buddy.only.nHdrProcErrors) and buddy.ctrl.stopped) or + fetchHeadersReqErrThresholdCount < buddy.only.nHdrRespErrors or + fetchHeadersProcessErrThresholdCount < buddy.only.nHdrProcErrors: # Make sure that this peer does not immediately reconnect buddy.ctrl.zombie = true trace info & ": current header list discarded", peer, iv, ivReq, - isOpportunistic, - ctrl=buddy.ctrl.state, nRespErrors=buddy.only.nHdrRespErrors + isOpportunistic, ctrl=buddy.ctrl.state, hdrErrors=buddy.hdrErrors ctx.headersUnprocCommit(iv.len, iv) # At this stage allow a task switch so that some other peer might try # to work on the currently returned interval. @@ -195,9 +200,13 @@ proc headersStagedCollect*( raiseAssert info & ": duplicate key on staged queue iv=" & $iv qItem.data = lhc[] - trace info & ": staged a list of headers", peer, - topBlock=iv.maxPt.bnStr, nHeaders=lhc.revHdrs.len, - nStaged=ctx.hdr.staged.len, isOpportunistic, ctrl=buddy.ctrl.state + # Reset header process errors (not too many consecutive failures this time) + if not haveError: + buddy.only.nHdrProcErrors = 0 + + trace info & ": staged a list of headers", peer, topBlock=iv.maxPt.bnStr, + nHeaders=lhc.revHdrs.len, nStaged=ctx.hdr.staged.len, isOpportunistic, + ctrl=buddy.ctrl.state, hdrErrors=buddy.hdrErrors return true diff --git a/nimbus/sync/beacon/worker/headers_staged/headers.nim b/nimbus/sync/beacon/worker/headers_staged/headers.nim index aabf335a50..82f8d9845b 100644 --- a/nimbus/sync/beacon/worker/headers_staged/headers.nim +++ b/nimbus/sync/beacon/worker/headers_staged/headers.nim @@ -28,6 +28,14 @@ proc registerError(buddy: BeaconBuddyRef) = if fetchHeadersReqErrThresholdCount < buddy.only.nHdrRespErrors: buddy.ctrl.zombie = true # abandon slow peer +# ------------------------------------------------------------------------------ +# Public debugging & logging helpers +# ------------------------------------------------------------------------------ + +func hdrErrors*(buddy: BeaconBuddyRef): string = + $buddy.only.nHdrRespErrors & "/" & $buddy.only.nHdrProcErrors + + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -63,7 +71,7 @@ proc headersFetchReversed*( start = Moment.now() trace trEthSendSendingGetBlockHeaders & " reverse", peer, ivReq, - nReq=req.maxResults, useHash, nRespErrors=buddy.only.nHdrRespErrors + nReq=req.maxResults, useHash, hdrErrors=buddy.hdrErrors # Fetch headers from peer var resp: Option[blockHeadersObj] @@ -78,7 +86,7 @@ proc headersFetchReversed*( buddy.registerError() `info` info & " error", peer, ivReq, nReq=req.maxResults, useHash, elapsed=(Moment.now() - start).toStr, error=($e.name), msg=e.msg, - nRespErrors=buddy.only.nHdrRespErrors + hdrErrors=buddy.hdrErrors return err() let elapsed = Moment.now() - start @@ -88,7 +96,7 @@ proc headersFetchReversed*( buddy.registerError() trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash, nResp=0, elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, - nRespErrors=buddy.only.nHdrRespErrors + hdrErrors=buddy.hdrErrors return err() let h: seq[Header] = resp.get.headers @@ -96,7 +104,7 @@ proc headersFetchReversed*( buddy.registerError() trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash, nResp=h.len, elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, - nRespErrors=buddy.only.nHdrRespErrors + hdrErrors=buddy.hdrErrors return err() # Ban an overly slow peer for a while when seen in a row. Also there is a @@ -109,8 +117,7 @@ proc headersFetchReversed*( trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash, ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len, - elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, - nRespErrors=buddy.only.nHdrRespErrors + elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, hdrErrors=buddy.hdrErrors return ok(h) diff --git a/nimbus/sync/beacon/worker_config.nim b/nimbus/sync/beacon/worker_config.nim index f774f1ed00..ba75723004 100644 --- a/nimbus/sync/beacon/worker_config.nim +++ b/nimbus/sync/beacon/worker_config.nim @@ -60,6 +60,10 @@ const ## exceeds this threshold for more than `fetchHeadersReqThresholdCount` ## times in a row, then this peer will be banned for a while. + fetchHeadersProcessErrThresholdCount* = 3 + ## Similar to `fetchHeadersReqErrThresholdCount` but for the later part + ## when errors occur while block headers are queued and further processed. + fetchHeadersReqMinResponsePC* = 10 ## Some peers only returned one header at a time. If these peers sit on a ## farm, they might collectively slow down the download process. So this @@ -96,6 +100,9 @@ const fetchBodiesReqErrThresholdCount* = 3 ## Similar to `fetchHeadersReqThreshold*` + fetchBodiesProcessErrThresholdCount* = 3 + ## Similar to `fetchHeadersProcessErrThresholdCount`. + fetchBodiesReqMinResponsePC* = 10 ## Similar to `fetchHeadersReqMinResponsePC` diff --git a/nimbus/sync/beacon/worker_desc.nim b/nimbus/sync/beacon/worker_desc.nim index 5f23ab8e7d..ff191ec014 100644 --- a/nimbus/sync/beacon/worker_desc.nim +++ b/nimbus/sync/beacon/worker_desc.nim @@ -126,8 +126,10 @@ type BeaconBuddyData* = object ## Local descriptor data extension - nHdrRespErrors*: int ## Number of errors/slow responses in a row - nBdyRespErrors*: int ## Ditto for bodies + nHdrRespErrors*: uint8 ## Number of errors/slow responses in a row + nHdrProcErrors*: uint8 ## Number of post processing errors + nBdyRespErrors*: uint8 ## Ditto for bodies + nBdyProcErrors*: uint8 # Debugging and logging. nMultiLoop*: int ## Number of runs