Skip to content

Commit

Permalink
Beacon sync mainentenance updates (#2958)
Browse files Browse the repository at this point in the history
* Fix name after API change

why:
  Slipped through (debugging mode)

* Fine tuning error counters

why:
  Previous operating mode was quite blunt and considered some unnecessary
  condition. Error handling was invoked and the peer zombified where one
  could have continued working with that peer.

* Provide `kvt` table API bypassing `FC`

details:
  Not a full bypass yet

why:
  As discussed on Discord:
    Ideally, those would pass through fc as well, as thin wrappers around
    the db calls, for now - later, we probably see some policy involved
    here and at that point, fc will be responsible for arbitrage between
    sources (ie if a rpc source sends the block the syncer is syncing
    while the syncer is working, fc is there to referee


* Apply `kvt` API from `FC` to beacon sync

* No need to use extra table for persistent header cache state record

why:
  Slot zero can do. This allows deleting that table wholesale when needed
  once thatfeature is available.

* Logger updates

details:
  + Lifting main header/block op logs from `trace` to `debug`
  + Set metrics update before nano-sleep (for task switch)
  • Loading branch information
mjfh authored Dec 19, 2024
1 parent 55fd257 commit c801a11
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 61 deletions.
3 changes: 2 additions & 1 deletion nimbus/core/chain/forked_chain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ import
../../evm/state,
../validate,
../executor/process_block,
./forked_chain/chain_desc
./forked_chain/[chain_desc, chain_kvt]

export
BlockDesc,
ForkedChainRef,
chain_kvt,
common,
core_db

Expand Down
72 changes: 72 additions & 0 deletions nimbus/core/chain/forked_chain/chain_kvt.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Nimbus
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed except
# according to those terms.

## Persistent kvt that ideally bypasses the `FC` logic. This can be used as a
## cache where in-memory storade would be much of a burden (e.g. all `mainnet`
## headers.)
##
## Currently, it is not always possible to store. But fortunately (for the
## syncer application) it works for some time at the beginning when the `FC`
## module is initialised and no block operation has been performed, yet.

{.push raises: [].}

import
pkg/results,
../../../common,
../../../db/core_db,
./chain_desc

proc fcKvtAvailable*(c: ForkedChainRef): bool =
## Returns `true` if `kvt` data can be saved persistently.
c.db.txFrameLevel() == 0

proc fcKvtPersistent*(c: ForkedChainRef): bool =
## Save cached `kvt` data if possible. This function has the side effect
## that it saves all cached db data including `Aristo` data (although there
## should not be any.)
##
if c.fcKvtAvailable():
c.db.persistent(c.db.getSavedStateBlockNumber()).isOkOr:
raiseAssert "fcKvtPersistent: persistent() failed: " & $$error
return true

proc fcKvtHasKey*(c: ForkedChainRef, key: openArray[byte]): bool =
## Check whether the argument `key` exists on the `kvt` table (i.e. `get()`
## would succeed.)
##
c.db.ctx.getKvt().hasKey(key)

proc fcKvtGet*(c: ForkedChainRef, key: openArray[byte]): Opt[seq[byte]] =
## Fetch data entry from `kvt` table.
##
var w = c.db.ctx.getKvt().get(key).valueOr:
return err()
ok(move w)

proc fcKvtPut*(c: ForkedChainRef, key, data: openArray[byte]): bool =
## Cache data on the `kvt` table marked for saving persistently. If the `kvt`
## table is unavailable, this function does nothing and returns `false`.
##
if c.fcKvtAvailable():
c.db.ctx.getKvt().put(key, data).isOkOr:
raiseAssert "fcKvtPut: put() failed: " & $$error
return true

proc fcKvtDel*(c: ForkedChainRef, key: openArray[byte]): bool =
## Cache key for deletion on the `kvt` table. If the `kvt` table is
## unavailable, this function does nothing and returns `false`.
##
if c.fcKvtAvailable():
c.db.ctx.getKvt().del(key).isOkOr:
raiseAssert "fcKvtDel: del() failed: " & $$error
return true

# End
2 changes: 1 addition & 1 deletion nimbus/db/aristo/aristo_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ when AutoValidateApiHooks:
doAssert not api.hasStorageData.isNil

doAssert not api.isTop.isNil
doAssert not api.level.isNil
doAssert not api.txFrameLevel.isNil

doAssert not api.mergeAccountRecord.isNil
doAssert not api.mergeStorageData.isNil
Expand Down
8 changes: 1 addition & 7 deletions nimbus/db/storage_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ type
dataDirId = 7
safeHash = 8
finalizedHash = 9
beaconState = 10
beaconHeader = 11
beaconHeader = 10

DbKey* = object
# The first byte stores the key type. The rest are key-specific values
Expand Down Expand Up @@ -87,11 +86,6 @@ func hashIndexKey*(hash: Hash32, index: uint16): HashIndexKey =
result[32] = byte(index and 0xFF)
result[33] = byte((index shl 8) and 0xFF)

func beaconStateKey*(u: uint8): DbKey =
result.data[0] = byte ord(beaconState)
result.data[1] = u
result.dataEndPos = 1

func beaconHeaderKey*(u: BlockNumber): DbKey =
result.data[0] = byte ord(beaconHeader)
doAssert sizeof(u) <= 32
Expand Down
20 changes: 11 additions & 9 deletions nimbus/sync/beacon/worker/blocks_staged.nim
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ proc fetchAndCheck(
blk.blocks[offset + n].uncles = bodies[n].uncles
blk.blocks[offset + n].withdrawals = bodies[n].withdrawals

return offset < blk.blocks.len.uint64
if offset < blk.blocks.len.uint64:
return true

buddy.only.nBdyProcErrors.inc
return false

# ------------------------------------------------------------------------------
# Public functions
Expand Down Expand Up @@ -195,13 +199,11 @@ proc blocksStagedCollect*(

# Fetch and extend staging record
if not await buddy.fetchAndCheck(ivReq, blk, info):
haveError = true

# Throw away first time block fetch data. Keep other data for a
# partially assembled list.
if nBlkBlocks == 0:
buddy.only.nBdyProcErrors.inc
haveError = true

if ((0 < buddy.only.nBdyRespErrors or
0 < buddy.only.nBdyProcErrors) and buddy.ctrl.stopped) or
fetchBodiesReqErrThresholdCount < buddy.only.nBdyRespErrors or
Expand Down Expand Up @@ -288,7 +290,7 @@ proc blocksStagedImport*(
nBlocks = qItem.data.blocks.len
iv = BnRange.new(qItem.key, qItem.key + nBlocks.uint64 - 1)

trace info & ": import blocks ..", iv, nBlocks,
debug info & ": import blocks ..", iv, nBlocks,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr

var maxImport = iv.maxPt
Expand Down Expand Up @@ -317,6 +319,9 @@ proc blocksStagedImport*(
maxImport = ctx.chain.latestNumber()
break importLoop

# Update, so it can be followed nicely
ctx.updateMetrics()

# Allow pseudo/async thread switch.
try: await sleepAsync asyncThreadSwitchTimeSlot
except CancelledError: discard
Expand All @@ -325,9 +330,6 @@ proc blocksStagedImport*(
maxImport = ctx.chain.latestNumber()
break importLoop

# Update, so it can be followed nicely
ctx.updateMetrics()

# Occasionally mark the chain finalized
if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks:
let
Expand Down Expand Up @@ -363,7 +365,7 @@ proc blocksStagedImport*(
# Update, so it can be followed nicely
ctx.updateMetrics()

trace info & ": import done", iv, nBlocks, B=ctx.chain.baseNumber.bnStr,
debug info & ": import done", iv, nBlocks, B=ctx.chain.baseNumber.bnStr,
L=ctx.chain.latestNumber.bnStr, F=ctx.layout.final.bnStr
return true

Expand Down
71 changes: 32 additions & 39 deletions nimbus/sync/beacon/worker/db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ import
../worker_desc,
./headers_unproc

const
LhcStateKey = 1.beaconStateKey
let
LhcStateKey = 0.beaconHeaderKey

# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------

template kvtNotAvailable(info: static[string]): string =
info & ": kvt table not available (locked by FC module)"


proc fetchSyncStateLayout(ctx: BeaconCtxRef): Opt[SyncStateLayout] =
let data = ctx.db.ctx.getKvt().get(LhcStateKey.toOpenArray).valueOr:
let data = ctx.pool.chain.fcKvtGet(LhcStateKey.toOpenArray).valueOr:
return err()
try:
return ok(rlp.decode(data, SyncStateLayout))
Expand All @@ -42,29 +46,25 @@ proc deleteStaleHeadersAndState(
info: static[string];
) =
## Delete stale headers and state
let
kvt = ctx.db.ctx.getKvt()
stateNum = ctx.db.getSavedStateBlockNumber() # for persisting
let c = ctx.pool.chain
if not c.fcKvtAvailable():
trace kvtNotAvailable(info)
return

var bn = upTo
while 0 < bn and kvt.hasKey(beaconHeaderKey(bn).toOpenArray):
discard kvt.del(beaconHeaderKey(bn).toOpenArray)
while 0 < bn and c.fcKvtHasKey(beaconHeaderKey(bn).toOpenArray):
discard c.fcKvtDel(beaconHeaderKey(bn).toOpenArray)
bn.dec

# Occasionallly persist the deleted headers. This will succeed if
# this function is called early enough after restart when there is
# no database transaction pending.
# Occasionallly persist the deleted headers (so that the internal DB cache
# does not grow extra large.) This will succeed if this function is called
# early enough after restart when there is no database transaction pending.
if (upTo - bn) mod 8192 == 0:
ctx.db.persistent(stateNum).isOkOr:
debug info & ": cannot persist deleted sync headers", error=($$error)
# So be it, stop here.
return

# Delete persistent state, there will be no use of it anymore
discard kvt.del(LhcStateKey.toOpenArray)
ctx.db.persistent(stateNum).isOkOr:
debug info & ": cannot persist deleted sync headers", error=($$error)
return
discard c.fcKvtPersistent()

# Delete persistent state record, there will be no use of it anymore
discard c.fcKvtDel(LhcStateKey.toOpenArray)
discard c.fcKvtPersistent()

if bn < upTo:
debug info & ": deleted stale sync headers", iv=BnRange.new(bn+1,upTo)
Expand All @@ -75,17 +75,12 @@ proc deleteStaleHeadersAndState(

proc dbStoreSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) =
## Save chain layout to persistent db
let data = rlp.encode(ctx.layout)
ctx.db.ctx.getKvt().put(LhcStateKey.toOpenArray, data).isOkOr:
raiseAssert info & " put() failed: " & $$error

# While executing blocks there are frequent save cycles. Otherwise, an
# extra save request might help to pick up an interrupted sync session.
if ctx.db.txFrameLevel() == 0 and ctx.stash.len == 0:
let number = ctx.db.getSavedStateBlockNumber()
ctx.db.persistent(number).isOkOr:
raiseAssert info & " persistent() failed: " & $$error

let c = ctx.pool.chain
if c.fcKvtAvailable():
discard c.fcKvtPut(LhcStateKey.toOpenArray, rlp.encode(ctx.layout))
discard c.fcKvtPersistent()
else:
trace kvtNotAvailable(info)

proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]): bool =
## Restore chain layout from persistent db. It returns `true` if a previous
Expand Down Expand Up @@ -165,18 +160,16 @@ proc dbHeadersStash*(
## ..
##
let
txFrameLevel = ctx.db.txFrameLevel()
c = ctx.pool.chain
last = first + revBlobs.len.uint64 - 1
if 0 < txFrameLevel:
if not c.fcKvtAvailable():
# Need to cache it because FCU has blocked writing through to disk.
for n,data in revBlobs:
ctx.stash[last - n.uint64] = data
else:
let kvt = ctx.db.ctx.getKvt()
for n,data in revBlobs:
let key = beaconHeaderKey(last - n.uint64)
kvt.put(key.toOpenArray, data).isOkOr:
raiseAssert info & ": put() failed: " & $$error
discard c.fcKvtPut(key.toOpenArray, data)

proc dbHeaderPeek*(ctx: BeaconCtxRef; num: BlockNumber): Opt[Header] =
## Retrieve some stashed header.
Expand All @@ -189,7 +182,7 @@ proc dbHeaderPeek*(ctx: BeaconCtxRef; num: BlockNumber): Opt[Header] =
# Use persistent storage next
let
key = beaconHeaderKey(num)
rc = ctx.db.ctx.getKvt().get(key.toOpenArray)
rc = ctx.pool.chain.fcKvtGet(key.toOpenArray)
if rc.isOk:
try:
return ok(rlp.decode(rc.value, Header))
Expand All @@ -206,7 +199,7 @@ proc dbHeaderUnstash*(ctx: BeaconCtxRef; bn: BlockNumber) =
ctx.stash.withValue(bn, _):
ctx.stash.del bn
return
discard ctx.db.ctx.getKvt().del(beaconHeaderKey(bn).toOpenArray)
discard ctx.pool.chain.fcKvtDel(beaconHeaderKey(bn).toOpenArray)

# ------------------------------------------------------------------------------
# End
Expand Down
7 changes: 3 additions & 4 deletions nimbus/sync/beacon/worker/headers_staged.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ proc fetchAndCheck(
# While assembling a `LinkedHChainRef`, verify that the `revHeaders` list
# was sound, i.e. contiguous, linked, etc.
if not revHeaders.extendLinkedHChain(buddy, ivReq.maxPt, lhc):
buddy.only.nHdrProcErrors.inc
return false

return true
Expand Down Expand Up @@ -150,13 +151,11 @@ proc headersStagedCollect*(

# Fetch and extend chain record
if not await buddy.fetchAndCheck(ivReq, lhc, info):
haveError = true

# Throw away opportunistic data (or first time header fetch.) Keep
# other data for a partially assembled list.
if isOpportunistic or nLhcHeaders == 0:
buddy.only.nHdrProcErrors.inc
haveError = true

if ((0 < buddy.only.nHdrRespErrors or
0 < buddy.only.nHdrProcErrors) and buddy.ctrl.stopped) or
fetchHeadersReqErrThresholdCount < buddy.only.nHdrRespErrors or
Expand Down Expand Up @@ -252,7 +251,7 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int =

result += qItem.data.revHdrs.len # count headers

trace info & ": stashed consecutive headers",
debug info & ": stashed consecutive headers",
nListsLeft=ctx.hdr.staged.len, nStashed=result

if headersStagedQueueLengthLwm < ctx.hdr.staged.len:
Expand Down

0 comments on commit c801a11

Please sign in to comment.