Skip to content

Commit

Permalink
Treating header/body process error different from response errors
Browse files Browse the repository at this point in the history
why:
  Error handling becomes active not until some consecutive failures
  appear. As both types of errors may interleave (i.g. no response
  errors) the counter reset for one type might affect the other.

  By doing it wrong, a peer might send repeatedly a bogus block so
  locking in the syncer in an endless loop.
  • Loading branch information
mjfh committed Dec 16, 2024
1 parent 76b640b commit 4eed13d
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 28 deletions.
23 changes: 18 additions & 5 deletions nimbus/sync/beacon/worker/blocks_staged.nim
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ proc blocksStagedCollect*(
# so that `async` can capture that properly.
blk = (ref BlocksForImport)()

# Flag, not to reset error count
haveError = false

# nFetchBodiesRequest
while true:
# Extract bottom range interval and fetch/stage it
Expand All @@ -196,15 +199,20 @@ proc blocksStagedCollect*(
# Throw away first time block fetch data. Keep other data for a
# partially assembled list.
if nBlkBlocks == 0:
buddy.only.nBdyRespErrors.inc
buddy.only.nBdyProcErrors.inc
haveError = true

if (1 < buddy.only.nBdyRespErrors and buddy.ctrl.stopped) or
fetchBodiesReqErrThresholdCount < buddy.only.nBdyRespErrors:
if ((0 < buddy.only.nBdyRespErrors or
0 < buddy.only.nBdyProcErrors) and buddy.ctrl.stopped) or
fetchBodiesReqErrThresholdCount < buddy.only.nBdyRespErrors or
fetchBodiesProcessErrThresholdCount < buddy.only.nBdyProcErrors:
# Make sure that this peer does not immediately reconnect
buddy.ctrl.zombie = true

trace info & ": current block list discarded", peer, iv, ivReq,
nStaged=ctx.blk.staged.len, ctrl=buddy.ctrl.state,
nRespErrors=buddy.only.nBdyRespErrors
bdyErrors=buddy.bdyErrors

ctx.blocksUnprocCommit(iv.len, iv)
# At this stage allow a task switch so that some other peer might try
# to work on the currently returned interval.
Expand Down Expand Up @@ -240,8 +248,13 @@ proc blocksStagedCollect*(
raiseAssert info & ": duplicate key on staged queue iv=" & $iv
qItem.data = blk[]

# Reset block process errors (not too many consecutive failures this time)
if not haveError:
buddy.only.nBdyProcErrors = 0

trace info & ": staged blocks", peer, bottomBlock=iv.minPt.bnStr,
nBlocks=blk.blocks.len, nStaged=ctx.blk.staged.len, ctrl=buddy.ctrl.state
nBlocks=blk.blocks.len, nStaged=ctx.blk.staged.len, ctrl=buddy.ctrl.state,
bdyErrors=buddy.bdyErrors

# Update, so it can be followed nicely
ctx.updateMetrics()
Expand Down
14 changes: 7 additions & 7 deletions nimbus/sync/beacon/worker/blocks_staged/bodies.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import
# Public functions
# ------------------------------------------------------------------------------

func bdyErrors*(buddy: BeaconBuddyRef): string =
$buddy.only.nBdyRespErrors & "/" & $buddy.only.nBdyProcErrors

proc fetchRegisterError*(buddy: BeaconBuddyRef) =
buddy.only.nBdyRespErrors.inc
if fetchBodiesReqErrThresholdCount < buddy.only.nBdyRespErrors:
Expand All @@ -39,16 +42,15 @@ proc bodiesFetch*(
start = Moment.now()
nReq = blockHashes.len

trace trEthSendSendingGetBlockBodies, peer, nReq,
nRespErrors=buddy.only.nBdyRespErrors
trace trEthSendSendingGetBlockBodies, peer, nReq, bdyErrors=buddy.bdyErrors

var resp: Option[blockBodiesObj]
try:
resp = await peer.getBlockBodies(blockHashes)
except CatchableError as e:
buddy.fetchRegisterError()
`info` info & " error", peer, nReq, elapsed=(Moment.now() - start).toStr,
error=($e.name), msg=e.msg, nRespErrors=buddy.only.nBdyRespErrors
error=($e.name), msg=e.msg, bdyErrors=buddy.bdyErrors
return err()

let elapsed = Moment.now() - start
Expand All @@ -57,8 +59,7 @@ proc bodiesFetch*(
if resp.isNone or buddy.ctrl.stopped:
buddy.fetchRegisterError()
trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=0,
elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
nRespErrors=buddy.only.nBdyRespErrors
elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, bdyErrors=buddy.bdyErrors
return err()

let b: seq[BlockBody] = resp.get.blocks
Expand All @@ -78,8 +79,7 @@ proc bodiesFetch*(
buddy.only.nBdyRespErrors = 0 # reset error count

trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=b.len,
elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
nRespErrors=buddy.only.nBdyRespErrors
elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, bdyErrors=buddy.bdyErrors

return ok(b)

Expand Down
25 changes: 17 additions & 8 deletions nimbus/sync/beacon/worker/headers_staged.nim
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ proc headersStagedCollect*(
# so that `async` can capture that properly.
lhc = (ref LinkedHChain)(parentHash: topLink)

# Flag, not to reset error count
haveError = false

while true:
# Extract top range interval and fetch/stage it
let
Expand All @@ -151,15 +154,17 @@ proc headersStagedCollect*(
# Throw away opportunistic data (or first time header fetch.) Keep
# other data for a partially assembled list.
if isOpportunistic or nLhcHeaders == 0:
buddy.only.nHdrRespErrors.inc
buddy.only.nHdrProcErrors.inc
haveError = true

if (0 < buddy.only.nHdrRespErrors and buddy.ctrl.stopped) or
fetchHeadersReqErrThresholdCount < buddy.only.nHdrRespErrors:
if ((0 < buddy.only.nHdrRespErrors or
0 < buddy.only.nHdrProcErrors) and buddy.ctrl.stopped) or
fetchHeadersReqErrThresholdCount < buddy.only.nHdrRespErrors or
fetchHeadersProcessErrThresholdCount < buddy.only.nHdrProcErrors:
# Make sure that this peer does not immediately reconnect
buddy.ctrl.zombie = true
trace info & ": current header list discarded", peer, iv, ivReq,
isOpportunistic,
ctrl=buddy.ctrl.state, nRespErrors=buddy.only.nHdrRespErrors
isOpportunistic, ctrl=buddy.ctrl.state, hdrErrors=buddy.hdrErrors
ctx.headersUnprocCommit(iv.len, iv)
# At this stage allow a task switch so that some other peer might try
# to work on the currently returned interval.
Expand Down Expand Up @@ -195,9 +200,13 @@ proc headersStagedCollect*(
raiseAssert info & ": duplicate key on staged queue iv=" & $iv
qItem.data = lhc[]

trace info & ": staged a list of headers", peer,
topBlock=iv.maxPt.bnStr, nHeaders=lhc.revHdrs.len,
nStaged=ctx.hdr.staged.len, isOpportunistic, ctrl=buddy.ctrl.state
# Reset header process errors (not too many consecutive failures this time)
if not haveError:
buddy.only.nHdrProcErrors = 0

trace info & ": staged a list of headers", peer, topBlock=iv.maxPt.bnStr,
nHeaders=lhc.revHdrs.len, nStaged=ctx.hdr.staged.len, isOpportunistic,
ctrl=buddy.ctrl.state, hdrErrors=buddy.hdrErrors

return true

Expand Down
19 changes: 13 additions & 6 deletions nimbus/sync/beacon/worker/headers_staged/headers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ proc registerError(buddy: BeaconBuddyRef) =
if fetchHeadersReqErrThresholdCount < buddy.only.nHdrRespErrors:
buddy.ctrl.zombie = true # abandon slow peer

# ------------------------------------------------------------------------------
# Public debugging & logging helpers
# ------------------------------------------------------------------------------

func hdrErrors*(buddy: BeaconBuddyRef): string =
$buddy.only.nHdrRespErrors & "/" & $buddy.only.nHdrProcErrors


# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -63,7 +71,7 @@ proc headersFetchReversed*(
start = Moment.now()

trace trEthSendSendingGetBlockHeaders & " reverse", peer, ivReq,
nReq=req.maxResults, useHash, nRespErrors=buddy.only.nHdrRespErrors
nReq=req.maxResults, useHash, hdrErrors=buddy.hdrErrors

# Fetch headers from peer
var resp: Option[blockHeadersObj]
Expand All @@ -78,7 +86,7 @@ proc headersFetchReversed*(
buddy.registerError()
`info` info & " error", peer, ivReq, nReq=req.maxResults, useHash,
elapsed=(Moment.now() - start).toStr, error=($e.name), msg=e.msg,
nRespErrors=buddy.only.nHdrRespErrors
hdrErrors=buddy.hdrErrors
return err()

let elapsed = Moment.now() - start
Expand All @@ -88,15 +96,15 @@ proc headersFetchReversed*(
buddy.registerError()
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
nResp=0, elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
nRespErrors=buddy.only.nHdrRespErrors
hdrErrors=buddy.hdrErrors
return err()

let h: seq[Header] = resp.get.headers
if h.len == 0 or ivReq.len < h.len.uint64:
buddy.registerError()
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
nResp=h.len, elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
nRespErrors=buddy.only.nHdrRespErrors
hdrErrors=buddy.hdrErrors
return err()

# Ban an overly slow peer for a while when seen in a row. Also there is a
Expand All @@ -109,8 +117,7 @@ proc headersFetchReversed*(

trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len,
elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
nRespErrors=buddy.only.nHdrRespErrors
elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, hdrErrors=buddy.hdrErrors

return ok(h)

Expand Down
7 changes: 7 additions & 0 deletions nimbus/sync/beacon/worker_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ const
## exceeds this threshold for more than `fetchHeadersReqThresholdCount`
## times in a row, then this peer will be banned for a while.

fetchHeadersProcessErrThresholdCount* = 3
## Similar to `fetchHeadersReqErrThresholdCount` but for the later part
## when errors occur while block headers are queued and further processed.

fetchHeadersReqMinResponsePC* = 10
## Some peers only returned one header at a time. If these peers sit on a
## farm, they might collectively slow down the download process. So this
Expand Down Expand Up @@ -96,6 +100,9 @@ const
fetchBodiesReqErrThresholdCount* = 3
## Similar to `fetchHeadersReqThreshold*`

fetchBodiesProcessErrThresholdCount* = 3
## Similar to `fetchHeadersProcessErrThresholdCount`.

fetchBodiesReqMinResponsePC* = 10
## Similar to `fetchHeadersReqMinResponsePC`

Expand Down
6 changes: 4 additions & 2 deletions nimbus/sync/beacon/worker_desc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,10 @@ type

BeaconBuddyData* = object
## Local descriptor data extension
nHdrRespErrors*: int ## Number of errors/slow responses in a row
nBdyRespErrors*: int ## Ditto for bodies
nHdrRespErrors*: uint8 ## Number of errors/slow responses in a row
nHdrProcErrors*: uint8 ## Number of post processing errors
nBdyRespErrors*: uint8 ## Ditto for bodies
nBdyProcErrors*: uint8

# Debugging and logging.
nMultiLoop*: int ## Number of runs
Expand Down

0 comments on commit 4eed13d

Please sign in to comment.