diff --git a/nimbus/sync/beacon/worker/blocks_staged.nim b/nimbus/sync/beacon/worker/blocks_staged.nim index e0abe4a707..510ca7ad84 100644 --- a/nimbus/sync/beacon/worker/blocks_staged.nim +++ b/nimbus/sync/beacon/worker/blocks_staged.nim @@ -173,6 +173,9 @@ proc blocksStagedCollect*( # so that `async` can capture that properly. blk = (ref BlocksForImport)() + # Flag, not to reset error count + haveError = false + # nFetchBodiesRequest while true: # Extract bottom range interval and fetch/stage it @@ -196,15 +199,20 @@ proc blocksStagedCollect*( # Throw away first time block fetch data. Keep other data for a # partially assembled list. if nBlkBlocks == 0: - buddy.only.nBdyRespErrors.inc + buddy.only.nBdyProcErrors.inc + haveError = true - if (1 < buddy.only.nBdyRespErrors and buddy.ctrl.stopped) or - fetchBodiesReqErrThresholdCount < buddy.only.nBdyRespErrors: + if ((0 < buddy.only.nBdyRespErrors or + 0 < buddy.only.nBdyProcErrors) and buddy.ctrl.stopped) or + fetchBodiesReqErrThresholdCount < buddy.only.nBdyRespErrors or + fetchBodiesProcessErrThresholdCount < buddy.only.nBdyProcErrors: # Make sure that this peer does not immediately reconnect buddy.ctrl.zombie = true + trace info & ": current block list discarded", peer, iv, ivReq, nStaged=ctx.blk.staged.len, ctrl=buddy.ctrl.state, - nRespErrors=buddy.only.nBdyRespErrors + bdyErrors=buddy.bdyErrors + ctx.blocksUnprocCommit(iv.len, iv) # At this stage allow a task switch so that some other peer might try # to work on the currently returned interval. @@ -240,8 +248,13 @@ proc blocksStagedCollect*( raiseAssert info & ": duplicate key on staged queue iv=" & $iv qItem.data = blk[] + # Reset block process errors (not too many consecutive failures this time) + if not haveError: + buddy.only.nBdyProcErrors = 0 + trace info & ": staged blocks", peer, bottomBlock=iv.minPt.bnStr, - nBlocks=blk.blocks.len, nStaged=ctx.blk.staged.len, ctrl=buddy.ctrl.state + nBlocks=blk.blocks.len, nStaged=ctx.blk.staged.len, ctrl=buddy.ctrl.state, + bdyErrors=buddy.bdyErrors # Update, so it can be followed nicely ctx.updateMetrics() diff --git a/nimbus/sync/beacon/worker/blocks_staged/bodies.nim b/nimbus/sync/beacon/worker/blocks_staged/bodies.nim index 68009e4668..d7aa187a76 100644 --- a/nimbus/sync/beacon/worker/blocks_staged/bodies.nim +++ b/nimbus/sync/beacon/worker/blocks_staged/bodies.nim @@ -22,6 +22,9 @@ import # Public functions # ------------------------------------------------------------------------------ +func bdyErrors*(buddy: BeaconBuddyRef): string = + $buddy.only.nBdyRespErrors & "/" & $buddy.only.nBdyProcErrors + proc fetchRegisterError*(buddy: BeaconBuddyRef) = buddy.only.nBdyRespErrors.inc if fetchBodiesReqErrThresholdCount < buddy.only.nBdyRespErrors: @@ -39,8 +42,7 @@ proc bodiesFetch*( start = Moment.now() nReq = blockHashes.len - trace trEthSendSendingGetBlockBodies, peer, nReq, - nRespErrors=buddy.only.nBdyRespErrors + trace trEthSendSendingGetBlockBodies, peer, nReq, bdyErrors=buddy.bdyErrors var resp: Option[blockBodiesObj] try: @@ -48,7 +50,7 @@ proc bodiesFetch*( except CatchableError as e: buddy.fetchRegisterError() `info` info & " error", peer, nReq, elapsed=(Moment.now() - start).toStr, - error=($e.name), msg=e.msg, nRespErrors=buddy.only.nBdyRespErrors + error=($e.name), msg=e.msg, bdyErrors=buddy.bdyErrors return err() let elapsed = Moment.now() - start @@ -57,8 +59,7 @@ proc bodiesFetch*( if resp.isNone or buddy.ctrl.stopped: buddy.fetchRegisterError() trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=0, - elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, - nRespErrors=buddy.only.nBdyRespErrors + elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, bdyErrors=buddy.bdyErrors return err() let b: seq[BlockBody] = resp.get.blocks @@ -78,8 +79,7 @@ proc bodiesFetch*( buddy.only.nBdyRespErrors = 0 # reset error count trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=b.len, - elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, - nRespErrors=buddy.only.nBdyRespErrors + elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, bdyErrors=buddy.bdyErrors return ok(b) diff --git a/nimbus/sync/beacon/worker/headers_staged.nim b/nimbus/sync/beacon/worker/headers_staged.nim index 80696688c3..7fb934cfb9 100644 --- a/nimbus/sync/beacon/worker/headers_staged.nim +++ b/nimbus/sync/beacon/worker/headers_staged.nim @@ -132,6 +132,9 @@ proc headersStagedCollect*( # so that `async` can capture that properly. lhc = (ref LinkedHChain)(parentHash: topLink) + # Flag, not to reset error count + haveError = false + while true: # Extract top range interval and fetch/stage it let @@ -151,15 +154,17 @@ proc headersStagedCollect*( # Throw away opportunistic data (or first time header fetch.) Keep # other data for a partially assembled list. if isOpportunistic or nLhcHeaders == 0: - buddy.only.nHdrRespErrors.inc + buddy.only.nHdrProcErrors.inc + haveError = true - if (0 < buddy.only.nHdrRespErrors and buddy.ctrl.stopped) or - fetchHeadersReqErrThresholdCount < buddy.only.nHdrRespErrors: + if ((0 < buddy.only.nHdrRespErrors or + 0 < buddy.only.nHdrProcErrors) and buddy.ctrl.stopped) or + fetchHeadersReqErrThresholdCount < buddy.only.nHdrRespErrors or + fetchHeadersProcessErrThresholdCount < buddy.only.nHdrProcErrors: # Make sure that this peer does not immediately reconnect buddy.ctrl.zombie = true trace info & ": current header list discarded", peer, iv, ivReq, - isOpportunistic, - ctrl=buddy.ctrl.state, nRespErrors=buddy.only.nHdrRespErrors + isOpportunistic, ctrl=buddy.ctrl.state, hdrErrors=buddy.hdrErrors ctx.headersUnprocCommit(iv.len, iv) # At this stage allow a task switch so that some other peer might try # to work on the currently returned interval. @@ -195,9 +200,13 @@ proc headersStagedCollect*( raiseAssert info & ": duplicate key on staged queue iv=" & $iv qItem.data = lhc[] - trace info & ": staged a list of headers", peer, - topBlock=iv.maxPt.bnStr, nHeaders=lhc.revHdrs.len, - nStaged=ctx.hdr.staged.len, isOpportunistic, ctrl=buddy.ctrl.state + # Reset header process errors (not too many consecutive failures this time) + if not haveError: + buddy.only.nHdrProcErrors = 0 + + trace info & ": staged a list of headers", peer, topBlock=iv.maxPt.bnStr, + nHeaders=lhc.revHdrs.len, nStaged=ctx.hdr.staged.len, isOpportunistic, + ctrl=buddy.ctrl.state, hdrErrors=buddy.hdrErrors return true diff --git a/nimbus/sync/beacon/worker/headers_staged/headers.nim b/nimbus/sync/beacon/worker/headers_staged/headers.nim index aabf335a50..82f8d9845b 100644 --- a/nimbus/sync/beacon/worker/headers_staged/headers.nim +++ b/nimbus/sync/beacon/worker/headers_staged/headers.nim @@ -28,6 +28,14 @@ proc registerError(buddy: BeaconBuddyRef) = if fetchHeadersReqErrThresholdCount < buddy.only.nHdrRespErrors: buddy.ctrl.zombie = true # abandon slow peer +# ------------------------------------------------------------------------------ +# Public debugging & logging helpers +# ------------------------------------------------------------------------------ + +func hdrErrors*(buddy: BeaconBuddyRef): string = + $buddy.only.nHdrRespErrors & "/" & $buddy.only.nHdrProcErrors + + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -63,7 +71,7 @@ proc headersFetchReversed*( start = Moment.now() trace trEthSendSendingGetBlockHeaders & " reverse", peer, ivReq, - nReq=req.maxResults, useHash, nRespErrors=buddy.only.nHdrRespErrors + nReq=req.maxResults, useHash, hdrErrors=buddy.hdrErrors # Fetch headers from peer var resp: Option[blockHeadersObj] @@ -78,7 +86,7 @@ proc headersFetchReversed*( buddy.registerError() `info` info & " error", peer, ivReq, nReq=req.maxResults, useHash, elapsed=(Moment.now() - start).toStr, error=($e.name), msg=e.msg, - nRespErrors=buddy.only.nHdrRespErrors + hdrErrors=buddy.hdrErrors return err() let elapsed = Moment.now() - start @@ -88,7 +96,7 @@ proc headersFetchReversed*( buddy.registerError() trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash, nResp=0, elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, - nRespErrors=buddy.only.nHdrRespErrors + hdrErrors=buddy.hdrErrors return err() let h: seq[Header] = resp.get.headers @@ -96,7 +104,7 @@ proc headersFetchReversed*( buddy.registerError() trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash, nResp=h.len, elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, - nRespErrors=buddy.only.nHdrRespErrors + hdrErrors=buddy.hdrErrors return err() # Ban an overly slow peer for a while when seen in a row. Also there is a @@ -109,8 +117,7 @@ proc headersFetchReversed*( trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash, ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len, - elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, - nRespErrors=buddy.only.nHdrRespErrors + elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, hdrErrors=buddy.hdrErrors return ok(h) diff --git a/nimbus/sync/beacon/worker_config.nim b/nimbus/sync/beacon/worker_config.nim index f774f1ed00..ba75723004 100644 --- a/nimbus/sync/beacon/worker_config.nim +++ b/nimbus/sync/beacon/worker_config.nim @@ -60,6 +60,10 @@ const ## exceeds this threshold for more than `fetchHeadersReqThresholdCount` ## times in a row, then this peer will be banned for a while. + fetchHeadersProcessErrThresholdCount* = 3 + ## Similar to `fetchHeadersReqErrThresholdCount` but for the later part + ## when errors occur while block headers are queued and further processed. + fetchHeadersReqMinResponsePC* = 10 ## Some peers only returned one header at a time. If these peers sit on a ## farm, they might collectively slow down the download process. So this @@ -96,6 +100,9 @@ const fetchBodiesReqErrThresholdCount* = 3 ## Similar to `fetchHeadersReqThreshold*` + fetchBodiesProcessErrThresholdCount* = 3 + ## Similar to `fetchHeadersProcessErrThresholdCount`. + fetchBodiesReqMinResponsePC* = 10 ## Similar to `fetchHeadersReqMinResponsePC` diff --git a/nimbus/sync/beacon/worker_desc.nim b/nimbus/sync/beacon/worker_desc.nim index 5f23ab8e7d..ff191ec014 100644 --- a/nimbus/sync/beacon/worker_desc.nim +++ b/nimbus/sync/beacon/worker_desc.nim @@ -126,8 +126,10 @@ type BeaconBuddyData* = object ## Local descriptor data extension - nHdrRespErrors*: int ## Number of errors/slow responses in a row - nBdyRespErrors*: int ## Ditto for bodies + nHdrRespErrors*: uint8 ## Number of errors/slow responses in a row + nHdrProcErrors*: uint8 ## Number of post processing errors + nBdyRespErrors*: uint8 ## Ditto for bodies + nBdyProcErrors*: uint8 # Debugging and logging. nMultiLoop*: int ## Number of runs