Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fc module various base tree admin updates #2895

Merged
merged 7 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 131 additions & 88 deletions nimbus/core/chain/forked_chain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type
header: Header

CanonicalDesc = object
## Designate some `header` entry on a `CursorDesc` sub-chain named
## `cursorDesc` identified by `cursorHash == cursorDesc.hash`.
cursorHash: Hash32
header: Header

Expand All @@ -59,13 +61,31 @@ const
BaseDistance = 128

# ------------------------------------------------------------------------------
# Private
# Private helpers
# ------------------------------------------------------------------------------
template shouldNotKeyError(body: untyped) =

template shouldNotKeyError(info: string, body: untyped) =
try:
body
except KeyError as exc:
raiseAssert exc.msg
raiseAssert info & ": name=" & $exc.name & " msg=" & exc.msg

proc deleteLineage(c: ForkedChainRef; top: Hash32) =
## Starting at argument `top`, delete all entries from `c.blocks[]` along
## the ancestor chain.
##
var parent = top
while true:
c.blocks.withValue(parent, val):
let w = parent
parent = val.blk.header.parentHash
c.blocks.del(w)
continue
break

# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------

proc processBlock(c: ForkedChainRef,
parent: Header,
Expand Down Expand Up @@ -135,11 +155,16 @@ func updateCursor(c: ForkedChainRef,

c.cursorHeader = header
c.cursorHash = header.blockHash
c.blocks[c.cursorHash] = BlockDesc(
blk: blk,
receipts: move(receipts)
)
c.updateCursorHeads(c.cursorHash, header)

c.blocks.withValue(c.cursorHash, val):
# Block exists alrady, so update only
val.receipts = receipts
do:
# New block => update head
c.blocks[c.cursorHash] = BlockDesc(
blk: blk,
receipts: move(receipts))
c.updateCursorHeads(c.cursorHash, header)

proc validateBlock(c: ForkedChainRef,
parent: Header,
Expand Down Expand Up @@ -169,7 +194,7 @@ proc replaySegment*(c: ForkedChainRef, target: Hash32) =
prevHash = target
chain = newSeq[Block]()

shouldNotKeyError:
shouldNotKeyError "replaySegment(target)":
while prevHash != c.baseHash:
chain.add c.blocks[prevHash].blk
prevHash = chain[^1].header.parentHash
Expand All @@ -193,7 +218,7 @@ proc replaySegment(c: ForkedChainRef,
prevHash = target
chain = newSeq[Block]()

shouldNotKeyError:
shouldNotKeyError "replaySegment(target,parent)":
while prevHash != parentHash:
chain.add c.blocks[prevHash].blk
prevHash = chain[^1].header.parentHash
Expand All @@ -210,7 +235,7 @@ proc writeBaggage(c: ForkedChainRef, target: Hash32) =
template header(): Header =
blk.blk.header

shouldNotKeyError:
shouldNotKeyError "writeBaggage":
var prevHash = target
var count = 0'u64
while prevHash != c.baseHash:
Expand All @@ -237,67 +262,65 @@ func updateBase(c: ForkedChainRef,
newBaseHash: Hash32,
newBaseHeader: Header,
canonicalCursorHash: Hash32) =
var cursorHeadsLen = c.cursorHeads.len
# Remove obsolete chains, example:
# -- A1 - A2 - A3 -- D5 - D6
# / /
# base - B1 - B2 - [B3] - B4
# \
# --- C3 - C4
# If base move to B3, both A and C will be removed
# but not D

for i in 0..<cursorHeadsLen:
if c.cursorHeads[i].forkJunction <= newBaseHeader.number and
c.cursorHeads[i].hash != canonicalCursorHash:
var prevHash = c.cursorHeads[i].hash
while prevHash != c.baseHash:
c.blocks.withValue(prevHash, val) do:
let rmHash = prevHash
prevHash = val.blk.header.parentHash
c.blocks.del(rmHash)
do:
# Older chain segment have been deleted
# by previous head
break
c.cursorHeads.del(i)
# If we use `c.cursorHeads.len` in the for loop,
# the sequence length will not updated
dec cursorHeadsLen
## Remove obsolete chains, example:
##
## A1 - A2 - A3 D5 - D6
## / /
## base - B1 - B2 - [B3] - B4 - B5
## \ \
## C2 - C3 E4 - E5
##
## where `B5` is the `canonicalCursor` head. When the `base` is moved to
## position `[B3]`, both chains `A` and `C` will be removed but not so for
## `D` and `E`, and chain `B` will be curtailed below `B4`.
##
var newCursorHeads: seq[CursorDesc] # Will become new `c.cursorHeads`
for ch in c.cursorHeads:
if newBaseHeader.number < ch.forkJunction:
# On the example, this would be any of chain `D` or `E`.
newCursorHeads.add ch

elif ch.hash == canonicalCursorHash:
# On the example, this would be chain `B`.
newCursorHeads.add CursorDesc(
hash: ch.hash,
forkJunction: newBaseHeader.number + 1)

else:
# On the example, this would be either chain `A` or `B`.
c.deleteLineage ch.hash

# Cleanup in-memory blocks starting from newBase backward
# while blocks from newBase+1 to canonicalCursor not deleted
# e.g. B4 onward
var prevHash = newBaseHash
while prevHash != c.baseHash:
c.blocks.withValue(prevHash, val) do:
let rmHash = prevHash
prevHash = val.blk.header.parentHash
c.blocks.del(rmHash)
do:
# Older chain segment have been deleted
# by previous head
break
c.deleteLineage newBaseHash

# Implied deletion of chain heads (if any)
c.cursorHeads.swap newCursorHeads

c.baseHeader = newBaseHeader
c.baseHash = newBaseHash

func findCanonicalHead(c: ForkedChainRef,
hash: Hash32): Result[CanonicalDesc, string] =
## Find the `cursor` arc that contains the block relative to the
## argument `hash`.
if hash == c.baseHash:
# The cursorHash here should not be used for next step
# because it not point to any active chain
return ok(CanonicalDesc(cursorHash: c.baseHash, header: c.baseHeader))

shouldNotKeyError:
# Find hash belong to which chain
for cursor in c.cursorHeads:
var prevHash = cursor.hash
while prevHash != c.baseHash:
let header = c.blocks[prevHash].blk.header
if prevHash == hash:
return ok(CanonicalDesc(cursorHash: cursor.hash, header: header))
prevHash = header.parentHash
for ch in c.cursorHeads:
var top = ch.hash
while true:
c.blocks.withValue(top, val):
if ch.forkJunction <= val.blk.header.number:
if top == hash:
return ok CanonicalDesc(cursorHash: ch.hash, header: val.blk.header)
if ch.forkJunction < val.blk.header.number:
top = val.blk.header.parentHash
continue
break

err("Block hash is not part of any active chain")

Expand All @@ -307,7 +330,7 @@ func canonicalChain(c: ForkedChainRef,
if hash == c.baseHash:
return ok(c.baseHeader)

shouldNotKeyError:
shouldNotKeyError "canonicalChain":
var prevHash = headHash
while prevHash != c.baseHash:
var header = c.blocks[prevHash].blk.header
Expand All @@ -318,34 +341,63 @@ func canonicalChain(c: ForkedChainRef,
err("Block hash not in canonical chain")

func calculateNewBase(c: ForkedChainRef,
finalizedHeader: Header,
finalized: BlockNumber,
headHash: Hash32,
headHeader: Header): BaseDesc =
head: CanonicalDesc): BaseDesc =
## Search for `finalized` searching backwards starting at `head` to find an
## entry with this block number on the arc (or leg) terminating at `head`.
##
## It is required that `finalized` is within the `head` arc, i.e.
## `cursorHead.forkJunction <= finalized` for the relevant `cursorHead`.
##
## The `finalized` entry might be moved backwards so that a minimum
## distance applies.
##
## Discussion:
## If the distance `finalized..head` is too short, `finalized` will be
## adjusted backwards and may fall outside the `head` arc. In that case,
## base remains un-moved.
##
# It's important to have base at least `baseDistance` behind head
# so we can answer state queries about history that deep.

let targetNumber = min(finalizedHeader.number,
max(headHeader.number, c.baseDistance) - c.baseDistance)
let target = min(finalized,
max(head.header.number, c.baseDistance) - c.baseDistance)

# The distance is less than `baseDistance`, don't move the base
if targetNumber <= c.baseHeader.number + c.baseDistance:
if target <= c.baseHeader.number + c.baseDistance:
return BaseDesc(hash: c.baseHash, header: c.baseHeader)

shouldNotKeyError:
var prevHash = headHash
while prevHash != c.baseHash:
var header = c.blocks[prevHash].blk.header
if header.number == targetNumber:
return BaseDesc(hash: prevHash, header: move(header))
prevHash = header.parentHash
# Verify that `target` does not fall outside the `head` arc. It is
# assumed that `finalized` is within the `head` arc.
if target < finalized:
block verifyTarget:
# Find cursor realive to the `head` argument
for ch in c.cursorHeads:
if ch.hash == head.cursorHash:
if ch.forkJunction <= target:
break verifyTarget
break
# Noting to do here
return BaseDesc(hash: c.baseHash, header: c.baseHeader)

var prevHash = headHash
while true:
c.blocks.withValue(prevHash, val):
# Note that `cursorHead.forkJunction <= target`
if target == val.blk.header.number:
return BaseDesc(hash: prevHash, header: val.blk.header)
prevHash = val.blk.header.parentHash
continue
break

doAssert(false, "Unreachable code")
doAssert(false, "Unreachable code, finalized block outside cursor arc")

func trimCanonicalChain(c: ForkedChainRef,
head: CanonicalDesc,
headHash: Hash32) =
# Maybe the current active chain is longer than canonical chain
shouldNotKeyError:
shouldNotKeyError "trimCanonicalChain":
var prevHash = head.cursorHash
while prevHash != c.baseHash:
let header = c.blocks[prevHash].blk.header
Expand Down Expand Up @@ -517,8 +569,7 @@ proc forkChoice*(c: ForkedChainRef,
# Finalized block must be part of canonical chain
let finalizedHeader = ?c.canonicalChain(finalizedHash, headHash)

let newBase = c.calculateNewBase(
finalizedHeader, headHash, head.header)
let newBase = c.calculateNewBase(finalizedHeader.number, headHash, head)

if newBase.hash == c.baseHash:
# The base is not updated but the cursor maybe need update
Expand Down Expand Up @@ -645,15 +696,7 @@ func memoryTransaction*(c: ForkedChainRef, txHash: Hash32): Opt[Transaction] =
proc latestBlock*(c: ForkedChainRef): Block =
c.blocks.withValue(c.cursorHash, val) do:
return val.blk
do:
result = c.db.getEthBlock(c.cursorHash).expect("cursorBlock exists")
if c.cursorHash != c.baseHash:
# This can happen if the block pointed to by cursorHash is not loaded yet
c.blocks[c.cursorHash] = BlockDesc(
blk: result,
receipts: c.db.getReceipts(result.header.receiptsRoot).
expect("receipts exists"),
)
c.db.getEthBlock(c.cursorHash).expect("cursorBlock exists")

proc headerByNumber*(c: ForkedChainRef, number: BlockNumber): Result[Header, string] =
if number > c.cursorHeader.number:
Expand All @@ -668,7 +711,7 @@ proc headerByNumber*(c: ForkedChainRef, number: BlockNumber): Result[Header, str
if number < c.baseHeader.number:
return c.db.getBlockHeader(number)

shouldNotKeyError:
shouldNotKeyError "headerByNumber":
var prevHash = c.cursorHeader.parentHash
while prevHash != c.baseHash:
let header = c.blocks[prevHash].blk.header
Expand Down Expand Up @@ -702,7 +745,7 @@ proc blockByNumber*(c: ForkedChainRef, number: BlockNumber): Result[Block, strin
if number < c.baseHeader.number:
return c.db.getEthBlock(number)

shouldNotKeyError:
shouldNotKeyError "blockByNumber":
var prevHash = c.cursorHash
while prevHash != c.baseHash:
c.blocks.withValue(prevHash, item):
Expand All @@ -713,7 +756,7 @@ proc blockByNumber*(c: ForkedChainRef, number: BlockNumber): Result[Block, strin

func blockFromBaseTo*(c: ForkedChainRef, number: BlockNumber): seq[Block] =
# return block in reverse order
shouldNotKeyError:
shouldNotKeyError "blockFromBaseTo":
var prevHash = c.cursorHash
while prevHash != c.baseHash:
c.blocks.withValue(prevHash, item):
Expand All @@ -725,7 +768,7 @@ func isCanonical*(c: ForkedChainRef, blockHash: Hash32): bool =
if blockHash == c.baseHash:
return true

shouldNotKeyError:
shouldNotKeyError "isCanonical":
var prevHash = c.cursorHash
while prevHash != c.baseHash:
c.blocks.withValue(prevHash, item):
Expand All @@ -745,7 +788,7 @@ proc isCanonicalAncestor*(c: ForkedChainRef,
if c.baseHeader.number < c.cursorHeader.number:
# The current canonical chain in memory is headed by
# cursorHeader
shouldNotKeyError:
shouldNotKeyError "isCanonicalAncestor":
var prevHash = c.cursorHeader.parentHash
while prevHash != c.baseHash:
var header = c.blocks[prevHash].blk.header
Expand Down
14 changes: 1 addition & 13 deletions nimbus/sync/beacon/TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,7 @@

See issue [#2816](https://github.com/status-im/nimbus-eth1/issues/2816)

### 2. Some assert

Error: unhandled exception: key not found: 0x441a0f..027bc96a [AssertionDefect]

which happened on several `holesky` tests immediately after loging somehing like

NTC 2024-10-31 21:37:34.728 Finalized blocks persisted file=forked_chain.nim:231 numberOfBlocks=129 last=044d22843cbe baseNumber=2646764 baseHash=21ec11c1deac

or from another machine with literally the same exception text (but the stack-trace differs)

NTC 2024-10-31 21:58:07.616 Finalized blocks persisted file=forked_chain.nim:231 numberOfBlocks=129 last=9cbcc52953a8 baseNumber=2646857 baseHash=9db5c2ac537b

### 3. Mem overflow possible on small breasted systems
### 2. Mem overflow possible on small breasted systems

Running the exe client, a 1.5G response message was opbserved (on my 8G test system this kills the program as it has already 80% mem load. It happens while syncing holesky at around block #184160 and is reproducible on the 8G system but not yet on the an 80G system.)

Expand Down
2 changes: 1 addition & 1 deletion nimbus/sync/beacon/worker.nim
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ proc start*(buddy: BeaconBuddyRef; info: static[string]): bool =
proc stop*(buddy: BeaconBuddyRef; info: static[string]) =
## Clean up this peer
if not buddy.ctx.hibernate: debug info & ": release peer", peer=buddy.peer,
ctrl=buddy.ctrl.state, nInvocations=buddy.only.nMultiLoop,
ctrl=buddy.ctrl.state, nLaps=buddy.only.nMultiLoop,
lastIdleGap=buddy.only.multiRunIdle.toStr
buddy.stopBuddy()

Expand Down
Loading
Loading