Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BN, add timeout for sync workers which are waiting in queue. #5831

Open
wants to merge 11 commits into
base: unstable
Choose a base branch
from
6 changes: 4 additions & 2 deletions AllTests-mainnet.md
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,7 @@ OK: 7/7 Fail: 0/7 Skip: 0/7
+ [SyncQueue#Backward] Pass through established limits test OK
+ [SyncQueue#Backward] Smoke test OK
+ [SyncQueue#Backward] Start and finish slots equal OK
+ [SyncQueue#Backward] SyncQueue worker timeout test OK
+ [SyncQueue#Backward] Two full requests success/fail OK
+ [SyncQueue#Backward] getRewindPoint() test OK
+ [SyncQueue#Forward] Async unordered push test OK
Expand All @@ -883,14 +884,15 @@ OK: 7/7 Fail: 0/7 Skip: 0/7
+ [SyncQueue#Forward] Pass through established limits test OK
+ [SyncQueue#Forward] Smoke test OK
+ [SyncQueue#Forward] Start and finish slots equal OK
+ [SyncQueue#Forward] SyncQueue worker timeout test OK
+ [SyncQueue#Forward] Two full requests success/fail OK
+ [SyncQueue#Forward] getRewindPoint() test OK
+ [SyncQueue] checkResponse() test OK
+ [SyncQueue] contains() test OK
+ [SyncQueue] getLastNonEmptySlot() test OK
+ [SyncQueue] hasEndGap() test OK
```
OK: 24/24 Fail: 0/24 Skip: 0/24
OK: 26/26 Fail: 0/26 Skip: 0/26
## Type helpers
```diff
+ BeaconBlock OK
Expand Down Expand Up @@ -1033,4 +1035,4 @@ OK: 2/2 Fail: 0/2 Skip: 0/2
OK: 9/9 Fail: 0/9 Skip: 0/9

---TOTAL---
OK: 690/695 Fail: 0/695 Skip: 5/695
OK: 692/697 Fail: 0/697 Skip: 5/697
30 changes: 30 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,33 @@
2024-07-29 v24.7.0
==================

Nimbus `v24.7.0` is a `low-urgency` release with beacon API improvements and fixes.

### Improvements

* Add support for publishBlindedBlockV2 beacon API endpoint:
https://github.com/status-im/nimbus-eth2/pull/6413

* Improve block proposal rewards in the absence of pre-aggregated sync contributions:
https://github.com/status-im/nimbus-eth2/pull/6384

### Fixes

* Fix SSZ decoding for beacon API publishBlock and publishBlockV2 endpoints:
https://github.com/status-im/nimbus-eth2/pull/6408

* Fix `statuses` parameter handling in postStateValidators beacon API endpoint:
https://github.com/status-im/nimbus-eth2/pull/6391

* Restore functioning Sepolia bootnodes, as previous bootnodes had gradually vanished:
https://github.com/status-im/nimbus-eth2/pull/6421

* Fix IP addresses returned by getNetworkIdentity beacon API endpoint:
https://github.com/status-im/nimbus-eth2/pull/6422

* Ensure Keymanager API fee recipient changes propagate to builder API relays:
https://github.com/status-im/nimbus-eth2/pull/6412

2024-06-24 v24.6.0
==================

Expand Down
2 changes: 2 additions & 0 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ proc initFullNode(
getFrontfillSlot, isWithinWeakSubjectivityPeriod,
dag.tail.slot, blockVerifier,
shutdownEvent = node.shutdownEvent,
workerBlockWaitTimeout = chronos.seconds(1),
flags = syncManagerFlags)
backfiller = newSyncManager[Peer, PeerId](
node.network.peerPool,
Expand All @@ -478,6 +479,7 @@ proc initFullNode(
getFrontfillSlot, isWithinWeakSubjectivityPeriod,
dag.backfill.slot, blockVerifier, maxHeadAge = 0,
shutdownEvent = node.shutdownEvent,
workerBlockWaitTimeout = chronos.seconds(1),
flags = syncManagerFlags)
router = (ref MessageRouter)(
processor: processor,
Expand Down
8 changes: 6 additions & 2 deletions beacon_chain/sync/sync_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type
syncStatus*: string
direction: SyncQueueKind
ident*: string
pendingWorkerBlockWaitTime*: Duration
flags: set[SyncManagerFlag]

SyncMoment* = object
Expand Down Expand Up @@ -109,7 +110,7 @@ proc initQueue[A, B](man: SyncManager[A, B]) =
man.queue = SyncQueue.init(A, man.direction, man.getFirstSlot(),
man.getLastSlot(), man.chunkSize,
man.getSafeSlot, man.blockVerifier,
1, man.ident)
1, man.ident, man.pendingWorkerBlockWaitTime)
of SyncQueueKind.Backward:
let
firstSlot = man.getFirstSlot()
Expand All @@ -122,7 +123,8 @@ proc initQueue[A, B](man: SyncManager[A, B]) =
firstSlot - 1'u64
man.queue = SyncQueue.init(A, man.direction, startSlot, lastSlot,
man.chunkSize, man.getSafeSlot,
man.blockVerifier, 1, man.ident)
man.blockVerifier, 1, man.ident,
man.pendingWorkerBlockWaitTime)

proc newSyncManager*[A, B](pool: PeerPool[A, B],
denebEpoch: Epoch,
Expand All @@ -139,6 +141,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
shutdownEvent: AsyncEvent,
maxHeadAge = uint64(SLOTS_PER_EPOCH * 1),
chunkSize = uint64(SLOTS_PER_EPOCH),
workerBlockWaitTimeout = InfiniteDuration,
flags: set[SyncManagerFlag] = {},
ident = "main"
): SyncManager[A, B] =
Expand Down Expand Up @@ -166,6 +169,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
direction: direction,
shutdownEvent: shutdownEvent,
ident: ident,
pendingWorkerBlockWaitTime: workerBlockWaitTimeout,
flags: flags
)
res.initQueue()
Expand Down
77 changes: 58 additions & 19 deletions beacon_chain/sync/sync_queue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type
readyQueue: HeapQueue[SyncResult[T]]
rewind: Option[RewindPoint]
blockVerifier: BlockVerifier
pendingWorkerBlockWaitTime: Duration
ident*: string

chronicles.formatIt SyncQueueKind: toLowerAscii($it)
Expand Down Expand Up @@ -121,8 +122,9 @@ proc getShortMap*[T](req: SyncRequest[T],
res.add('|')
continue
if slot == data[cur].signed_block_header.message.slot:
for k in cur..<cur+MAX_BLOBS_PER_BLOCK:
if k >= lenu64(data) or slot != data[k].signed_block_header.message.slot:
for k in cur ..< cur + MAX_BLOBS_PER_BLOCK:
if k >= lenu64(data) or
slot != data[k].signed_block_header.message.slot:
res.add('|')
break
else:
Expand Down Expand Up @@ -198,7 +200,8 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
getSafeSlotCb: GetSlotCallback,
blockVerifier: BlockVerifier,
syncQueueSize: int = -1,
ident: string = "main"): SyncQueue[T] =
ident: string = "main",
pendingWorkerWaitTime = InfiniteDuration): SyncQueue[T] =
## Create new synchronization queue with parameters
##
## ``start`` and ``final`` are starting and final Slots.
Expand Down Expand Up @@ -268,6 +271,7 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
inpSlot: start,
outSlot: start,
blockVerifier: blockVerifier,
pendingWorkerBlockWaitTime: pendingWorkerWaitTime,
ident: ident
)

Expand Down Expand Up @@ -312,18 +316,22 @@ proc wakeupWaiters[T](sq: SyncQueue[T], reset = false) =
if not(item.future.finished()):
item.future.complete()

proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async: (raises: [CancelledError]).} =
proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.
async: (raises: [CancelledError]).} =
## Create new waiter and wait for completion from `wakeupWaiters()`.
let waitfut = Future[void].Raising([CancelledError]).init("SyncQueue.waitForChanges")
let waititem = SyncWaiter(future: waitfut)
let
waitfut =
Future[void].Raising([CancelledError]).init("SyncQueue.waitForChanges")
waititem = SyncWaiter(future: waitfut)
sq.waiters.add(waititem)
try:
await waitfut
return waititem.reset
finally:
sq.waiters.delete(sq.waiters.find(waititem))

proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async: (raises: [CancelledError]).} =
proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.
async: (raises: [CancelledError]).} =
## This procedure will perform wakeupWaiters(true) and blocks until last
## waiter will be awakened.
var waitChanges = sq.waitForChanges()
Expand All @@ -334,7 +342,8 @@ proc clearAndWakeup*[T](sq: SyncQueue[T]) =
sq.pending.clear()
sq.wakeupWaiters(true)

proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async: (raises: [CancelledError]).} =
proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.
async: (raises: [CancelledError]).} =
## Perform reset of all the blocked waiters in SyncQueue.
##
## We adding one more waiter to the waiters sequence and
Expand Down Expand Up @@ -547,8 +556,9 @@ func getOpt(blobs: Opt[seq[BlobSidecars]], i: int): Opt[BlobSidecars] =
else:
Opt.none(BlobSidecars)

iterator blocks[T](sq: SyncQueue[T],
sr: SyncResult[T]): (ref ForkedSignedBeaconBlock, Opt[BlobSidecars]) =
iterator blocks[T](
sq: SyncQueue[T],
sr: SyncResult[T]): (ref ForkedSignedBeaconBlock, Opt[BlobSidecars]) =
case sq.kind
of SyncQueueKind.Forward:
for i in countup(0, len(sr.data) - 1):
Expand Down Expand Up @@ -578,6 +588,24 @@ proc notInRange[T](sq: SyncQueue[T], sr: SyncRequest[T]): bool =
of SyncQueueKind.Backward:
(sq.queueSize > 0) and (sr.lastSlot < sq.outSlot)

func chunksCount*[T](sq: SyncQueue[T], sr: SyncRequest[T]): uint64 =
## Returns number of chunks between current queue position and request's
## position.
case sq.kind
of SyncQueueKind.Forward:
(sr.slot - sq.outSlot) div sq.chunkSize
of SyncQueueKind.Backward:
(sq.outSlot - sr.lastSlot) div sq.chunkSize

func expectedTime[T](sq: SyncQueue[T], sr: SyncRequest[T]): Duration =
## Returns expected wait time of specific pending request.
## SR3 SR2 SR1 SR0 <CURRENT QUEUE POSITION>
## SR3 expected pending time would be
## <CHUNK SIZE> * <SR2 .. SR0 CHUNKS COUNT> * <BLOCK WAIT TIME>
nanoseconds(
int64(sq.chunkSize * sq.chunksCount(sr)) *
sq.pendingWorkerBlockWaitTime.nanoseconds)
Comment on lines +605 to +607
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the value if every single request takes the maximum time to complete a single time.

  • reality could be longer, e.g., when there are retries of earlier sync requests
  • reality could be faster, e.g., if the time is computed while there are many prior requests, but then those prior requests complete quickly

I wonder if a simpler mechanism with a static, e.g., 30sec timeout, could also mitigate the risk of getting stuck. It would take a bit longer to unstuck than the current solution, but is simpler to reason about.

Alternatively, to get it fully correct may involve having to re-schedule the timeouts whenever a prior request completes.


func numAlreadyKnownSlots[T](sq: SyncQueue[T], sr: SyncRequest[T]): uint64 =
## Compute the number of slots covered by a given `SyncRequest` that are
## already known and, hence, no longer relevant for sync progression.
Expand Down Expand Up @@ -607,11 +635,14 @@ func numAlreadyKnownSlots[T](sq: SyncQueue[T], sr: SyncRequest[T]): uint64 =
# Entire request is still relevant.
0

proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
data: seq[ref ForkedSignedBeaconBlock],
blobs: Opt[seq[BlobSidecars]],
maybeFinalized: bool = false,
processingCb: ProcessingCallback = nil) {.async: (raises: [CancelledError]).} =
proc push*[T](
sq: SyncQueue[T],
sr: SyncRequest[T],
data: seq[ref ForkedSignedBeaconBlock],
blobs: Opt[seq[BlobSidecars]],
maybeFinalized: bool = false,
processingCb: ProcessingCallback = nil
) {.async: (raises: [CancelledError]).} =
logScope:
sync_ident = sq.ident
topics = "syncman"
Expand All @@ -632,8 +663,18 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
# all pending `push` requests if `request.slot` not in range.
while true:
if sq.notInRange(sr):
let reset = await sq.waitForChanges()
if reset:
let
resetWorker =
if sq.pendingWorkerBlockWaitTime == InfiniteDuration:
await sq.waitForChanges()
else:
try:
await sq.waitForChanges().wait(sq.expectedTime(sr))
except AsyncTimeoutError:
# Moving request to debts queue without checking pending list.
sq.debtsQueue.push(sr)
return
if resetWorker:
# SyncQueue reset happens. We are exiting to wake up sync-worker.
return
else:
Expand Down Expand Up @@ -686,10 +727,8 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
# Nim versions, remove workaround and move `res` into for loop
res: Result[void, VerifierError]

var i=0
for blk, blb in sq.blocks(item):
res = await sq.blockVerifier(blk[], blb, maybeFinalized)
inc(i)

if res.isOk():
goodBlock = some(blk[].slot)
Expand Down
2 changes: 1 addition & 1 deletion beacon_chain/version.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const
"Copyright (c) 2019-" & compileYear & " Status Research & Development GmbH"

versionMajor* = 24
versionMinor* = 6
versionMinor* = 7
versionBuild* = 0

versionBlob* = "stateofus" # Single word - ends up in the default graffiti
Expand Down
70 changes: 67 additions & 3 deletions tests/test_sync_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import std/[strutils, sequtils]
import unittest2
import chronos
import chronos, chronos/unittest2/asynctests
import ../beacon_chain/gossip_processing/block_processor,
../beacon_chain/sync/sync_manager,
../beacon_chain/spec/forks
Expand Down Expand Up @@ -1135,8 +1135,6 @@ suite "SyncManager test suite":
check:
groupedRes3.isErr()



test "[SyncQueue#Forward] getRewindPoint() test":
let aq = newAsyncQueue[BlockEntry]()
block:
Expand Down Expand Up @@ -1208,3 +1206,69 @@ suite "SyncManager test suite":
let safeSlot = getSafeSlot()
for i in countdown(1023, 0):
check queue.getRewindPoint(Slot(i), safeSlot) == safeSlot

template workerTimeoutTest(startSlot, finishSlot: Slot,
kind: SyncQueueKind) =
let aq = newAsyncQueue[BlockEntry]()
var
chain = createChain(startSlot, finishSlot)
queue =
case kind
of SyncQueueKind.Forward:
SyncQueue.init(SomeTPeer, kind, startSlot, finishSlot,
1'u64, getStaticSlotCb(startSlot),
collector(aq), 1, "worker-timeout-test",
100.milliseconds)
of SyncQueueKind.Backward:
SyncQueue.init(SomeTPeer, kind, finishSlot, startSlot,
1'u64, getStaticSlotCb(finishSlot),
collector(aq), 1, "worker-timeout-test",
100.milliseconds)

let
p1 = SomeTPeer()
p2 = SomeTPeer()
p3 = SomeTPeer()
p4 = SomeTPeer()
r11 {.used.} = queue.pop(finishSlot, p1)
r12 = queue.pop(finishSlot, p2)
r13 = queue.pop(finishSlot, p3)
r14 = queue.pop(finishSlot, p4)
f12 = queue.push(
r12, chain.getSlice(Slot(0), r12), Opt.none(seq[BlobSidecars]))
f13 = queue.push(
r13, chain.getSlice(Slot(0), r13), Opt.none(seq[BlobSidecars]))
f14 = queue.push(
r14, chain.getSlice(Slot(0), r14), Opt.none(seq[BlobSidecars]))

check:
f12.finished() == false
f13.finished() == false
f14.finished() == false

# Current timeout value is 100.milliseconds per block.
await sleepAsync(400.milliseconds)

check:
f12.finished() == true
f13.finished() == true
f14.finished() == true

let
r22 = queue.pop(finishSlot, p2)
r23 = queue.pop(finishSlot, p3)
r24 = queue.pop(finishSlot, p4)

check:
r22.slot == r12.slot
r22.count == r12.count
r23.slot == r13.slot
r23.count == r13.count
r24.slot == r14.slot
r24.count == r14.count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and, also, if r11 fails (incomplete download, failed validation and so on), and p1 goes away due to the corresponding descore for failing to provide correct data, a different peer will eventually pick up r11.

otherwise, r12/r13/r14 would just get stuck again and again (as before).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep in old version r12, r13 and r14 will stuck waiting for peer to appear which should provide r11 again.


asyncTest "[SyncQueue#Forward] SyncQueue worker timeout test":
workerTimeoutTest(Slot(0), Slot(200), SyncQueueKind.Forward)

asyncTest "[SyncQueue#Backward] SyncQueue worker timeout test":
workerTimeoutTest(Slot(0), Slot(200), SyncQueueKind.Backward)