Skip to content

Commit

Permalink
Stream blocks during import
Browse files Browse the repository at this point in the history
When running the import, currently blocks are loaded in batches into a
`seq` then passed to the importer as such.

In reality, blocks are still processed one by one, so the batching does
not offer any performance advantage. It does however require that the
client wastes memory, up to several GB, on the block sequence while
they're waiting to be processed.

This PR introduces a persister that accepts these potentially large
blocks one by one and at the same time removes a number of redundant /
unnecessary copies, assignments and resets that were slowing down the
import process in general.
  • Loading branch information
arnetheduck committed Dec 13, 2024
1 parent 650fec5 commit 0b2eb5c
Show file tree
Hide file tree
Showing 14 changed files with 509 additions and 476 deletions.
12 changes: 8 additions & 4 deletions fluffy/database/era1_db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,19 @@ proc new*(
): Era1DB =
Era1DB(path: path, network: network, accumulator: accumulator)

proc getEthBlock*(db: Era1DB, blockNumber: uint64): Result[Block, string] =
proc getEthBlock*(
db: Era1DB, blockNumber: uint64, res: var Block
): Result[void, string] =
let f = ?db.getEra1File(blockNumber.era)

f.getEthBlock(blockNumber)
f.getEthBlock(blockNumber, res)

proc getBlockTuple*(db: Era1DB, blockNumber: uint64): Result[BlockTuple, string] =
proc getBlockTuple*(
db: Era1DB, blockNumber: uint64, res: var BlockTuple
): Result[void, string] =
let f = ?db.getEra1File(blockNumber.era)

f.getBlockTuple(blockNumber)
f.getBlockTuple(blockNumber, res)

proc getAccumulator*(
db: Era1DB, blockNumber: uint64
Expand Down
98 changes: 55 additions & 43 deletions fluffy/eth_data/era1.nim
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,10 @@ func offsetsLen(startNumber: uint64): int =
proc toCompressedRlpBytes(item: auto): seq[byte] =
snappy.encodeFramed(rlp.encode(item))

proc fromCompressedRlpBytes(bytes: openArray[byte], T: type): Result[T, string] =
proc fromCompressedRlpBytes[T](bytes: openArray[byte], v: var T): Result[void, string] =
try:
ok(rlp.decode(decodeFramed(bytes, checkIntegrity = false), T))
v = rlp.decode(decodeFramed(bytes, checkIntegrity = false), T)
ok()
except RlpError as e:
err("Invalid compressed RLP data for " & $T & ": " & e.msg)

Expand Down Expand Up @@ -300,32 +301,32 @@ proc skipRecord*(f: Era1File): Result[void, string] =

f[].handle.get().skipRecord()

proc getBlockHeader(f: Era1File): Result[headers.Header, string] =
proc getBlockHeader(f: Era1File, res: var headers.Header): Result[void, string] =
var bytes: seq[byte]

let header = ?f[].handle.get().readRecord(bytes)
if header.typ != CompressedHeader:
return err("Invalid era file: didn't find block header at index position")

fromCompressedRlpBytes(bytes, headers.Header)
fromCompressedRlpBytes(bytes, res)

proc getBlockBody(f: Era1File): Result[BlockBody, string] =
proc getBlockBody(f: Era1File, res: var BlockBody): Result[void, string] =
var bytes: seq[byte]

let header = ?f[].handle.get().readRecord(bytes)
if header.typ != CompressedBody:
return err("Invalid era file: didn't find block body at index position")

fromCompressedRlpBytes(bytes, BlockBody)
fromCompressedRlpBytes(bytes, res)

proc getReceipts(f: Era1File): Result[seq[Receipt], string] =
proc getReceipts(f: Era1File, res: var seq[Receipt]): Result[void, string] =
var bytes: seq[byte]

let header = ?f[].handle.get().readRecord(bytes)
if header.typ != CompressedReceipts:
return err("Invalid era file: didn't find receipts at index position")

fromCompressedRlpBytes(bytes, seq[Receipt])
fromCompressedRlpBytes(bytes, res)

proc getTotalDifficulty(f: Era1File): Result[UInt256, string] =
var bytes: seq[byte]
Expand All @@ -339,18 +340,25 @@ proc getTotalDifficulty(f: Era1File): Result[UInt256, string] =

ok(UInt256.fromBytesLE(bytes))

proc getNextEthBlock*(f: Era1File): Result[Block, string] =
proc getNextEthBlock*(f: Era1File, res: var Block): Result[void, string] =
doAssert not isNil(f) and f[].handle.isSome

var
header = ?getBlockHeader(f)
body = ?getBlockBody(f)
var body: BlockBody
?getBlockHeader(f, res.header)
?getBlockBody(f, body)

?skipRecord(f) # receipts
?skipRecord(f) # totalDifficulty

ok(Block.init(move(header), move(body)))
res.transactions = move(body.transactions)
res.uncles = move(body.uncles)
res.withdrawals = move(body.withdrawals)

ok()

proc getEthBlock*(f: Era1File, blockNumber: uint64): Result[Block, string] =
proc getEthBlock*(
f: Era1File, blockNumber: uint64, res: var Block
): Result[void, string] =
doAssert not isNil(f) and f[].handle.isSome
doAssert(
blockNumber >= f[].blockIdx.startNumber and blockNumber <= f[].blockIdx.endNumber,
Expand All @@ -361,20 +369,21 @@ proc getEthBlock*(f: Era1File, blockNumber: uint64): Result[Block, string] =

?f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg)

getNextEthBlock(f)
getNextEthBlock(f, res)

proc getNextBlockTuple*(f: Era1File): Result[BlockTuple, string] =
proc getNextBlockTuple*(f: Era1File, res: var BlockTuple): Result[void, string] =
doAssert not isNil(f) and f[].handle.isSome

let
blockHeader = ?getBlockHeader(f)
blockBody = ?getBlockBody(f)
receipts = ?getReceipts(f)
totalDifficulty = ?getTotalDifficulty(f)
?getBlockHeader(f, res.header)
?getBlockBody(f, res.body)
?getReceipts(f, res.receipts)
res.td = ?getTotalDifficulty(f)

ok((blockHeader, blockBody, receipts, totalDifficulty))
ok()

proc getBlockTuple*(f: Era1File, blockNumber: uint64): Result[BlockTuple, string] =
proc getBlockTuple*(
f: Era1File, blockNumber: uint64, res: var BlockTuple
): Result[void, string] =
doAssert not isNil(f) and f[].handle.isSome
doAssert(
blockNumber >= f[].blockIdx.startNumber and blockNumber <= f[].blockIdx.endNumber,
Expand All @@ -385,9 +394,11 @@ proc getBlockTuple*(f: Era1File, blockNumber: uint64): Result[BlockTuple, string

?f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg)

getNextBlockTuple(f)
getNextBlockTuple(f, res)

proc getBlockHeader*(f: Era1File, blockNumber: uint64): Result[headers.Header, string] =
proc getBlockHeader*(
f: Era1File, blockNumber: uint64, res: var headers.Header
): Result[void, string] =
doAssert not isNil(f) and f[].handle.isSome
doAssert(
blockNumber >= f[].blockIdx.startNumber and blockNumber <= f[].blockIdx.endNumber,
Expand All @@ -398,7 +409,7 @@ proc getBlockHeader*(f: Era1File, blockNumber: uint64): Result[headers.Header, s

?f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg)

getBlockHeader(f)
getBlockHeader(f, res)

proc getTotalDifficulty*(f: Era1File, blockNumber: uint64): Result[UInt256, string] =
doAssert not isNil(f) and f[].handle.isSome
Expand Down Expand Up @@ -445,13 +456,13 @@ proc buildAccumulator*(f: Era1File): Result[EpochRecordCached, string] =
endNumber = f.blockIdx.endNumber()

var headerRecords: seq[HeaderRecord]
var header: headers.Header
for blockNumber in startNumber .. endNumber:
let
blockHeader = ?f.getBlockHeader(blockNumber)
totalDifficulty = ?f.getTotalDifficulty(blockNumber)
?f.getBlockHeader(blockNumber, header)
let totalDifficulty = ?f.getTotalDifficulty(blockNumber)

headerRecords.add(
HeaderRecord(blockHash: blockHeader.rlpHash(), totalDifficulty: totalDifficulty)
HeaderRecord(blockHash: header.rlpHash(), totalDifficulty: totalDifficulty)
)

ok(EpochRecordCached.init(headerRecords))
Expand All @@ -462,25 +473,26 @@ proc verify*(f: Era1File): Result[Digest, string] =
endNumber = f.blockIdx.endNumber()

var headerRecords: seq[HeaderRecord]
var blockTuple: BlockTuple
for blockNumber in startNumber .. endNumber:
?f.getBlockTuple(blockNumber, blockTuple)
let
(blockHeader, blockBody, receipts, totalDifficulty) =
?f.getBlockTuple(blockNumber)

txRoot = calcTxRoot(blockBody.transactions)
ommershHash = rlpHash(blockBody.uncles)
txRoot = calcTxRoot(blockTuple.body.transactions)
ommershHash = rlpHash(blockTuple.body.uncles)

if blockHeader.txRoot != txRoot:
if blockTuple.header.txRoot != txRoot:
return err("Invalid transactions root")

if blockHeader.ommersHash != ommershHash:
if blockTuple.header.ommersHash != ommershHash:
return err("Invalid ommers hash")

if blockHeader.receiptsRoot != calcReceiptsRoot(receipts):
if blockTuple.header.receiptsRoot != calcReceiptsRoot(blockTuple.receipts):
return err("Invalid receipts root")

headerRecords.add(
HeaderRecord(blockHash: blockHeader.rlpHash(), totalDifficulty: totalDifficulty)
HeaderRecord(
blockHash: blockTuple.header.rlpHash(), totalDifficulty: blockTuple.td
)
)

let expectedRoot = ?f.getAccumulatorRoot()
Expand All @@ -496,17 +508,17 @@ iterator era1BlockHeaders*(f: Era1File): headers.Header =
startNumber = f.blockIdx.startNumber
endNumber = f.blockIdx.endNumber()

var header: headers.Header
for blockNumber in startNumber .. endNumber:
let header = f.getBlockHeader(blockNumber).valueOr:
raiseAssert("Failed to read block header: " & error)
f.getBlockHeader(blockNumber, header).expect("Header can be read")
yield header

iterator era1BlockTuples*(f: Era1File): BlockTuple =
let
startNumber = f.blockIdx.startNumber
endNumber = f.blockIdx.endNumber()

var blockTuple: BlockTuple
for blockNumber in startNumber .. endNumber:
let blockTuple = f.getBlockTuple(blockNumber).valueOr:
raiseAssert("Failed to read block tuple: " & error)
f.getBlockTuple(blockNumber, blockTuple).expect("Block tuple can be read")
yield blockTuple
23 changes: 14 additions & 9 deletions fluffy/tools/portal_bridge/portal_bridge_history.nim
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,15 @@ proc runBackfillLoopAuditMode(
rng = newRng()
db = Era1DB.new(era1Dir, "mainnet", loadAccumulator())

var blockTuple: BlockTuple
while true:
let
# Grab a random blockNumber to audit and potentially gossip
blockNumber = rng[].rand(network_metadata.mergeBlockNumber - 1).uint64
(header, body, receipts, _) = db.getBlockTuple(blockNumber).valueOr:
error "Failed to get block tuple", error, blockNumber
continue
blockHash = header.rlpHash()
db.getBlockTuple(blockNumber, blockTuple).isOkOr:
error "Failed to get block tuple", error, blockNumber
continue
let blockHash = blockTuple.header.rlpHash()

var headerSuccess, bodySuccess, receiptsSuccess = false

Expand Down Expand Up @@ -441,7 +442,7 @@ proc runBackfillLoopAuditMode(
error "Invalid hex for block body content", error = e.msg
break bodyBlock

validateBlockBodyBytes(content, header).isOkOr:
validateBlockBodyBytes(content, blockTuple.header).isOkOr:
error "Block body is invalid", error
break bodyBlock

Expand Down Expand Up @@ -469,7 +470,7 @@ proc runBackfillLoopAuditMode(
error "Invalid hex for block receipts content", error = e.msg
break receiptsBlock

validateReceiptsBytes(content, header.receiptsRoot).isOkOr:
validateReceiptsBytes(content, blockTuple.header.receiptsRoot).isOkOr:
error "Block receipts are invalid", error
break receiptsBlock

Expand All @@ -481,17 +482,21 @@ proc runBackfillLoopAuditMode(
let
epochRecord = db.getAccumulator(blockNumber).valueOr:
raiseAssert "Failed to get accumulator from EraDB: " & error
headerWithProof = buildHeaderWithProof(header, epochRecord).valueOr:
headerWithProof = buildHeaderWithProof(blockTuple.header, epochRecord).valueOr:
raiseAssert "Failed to build header with proof: " & error

# gossip block header by hash
await bridge.gossipBlockHeader(blockHash, headerWithProof)
# gossip block header by number
await bridge.gossipBlockHeader(blockNumber, headerWithProof)
if not bodySuccess:
await bridge.gossipBlockBody(blockHash, PortalBlockBodyLegacy.fromBlockBody(body))
await bridge.gossipBlockBody(
blockHash, PortalBlockBodyLegacy.fromBlockBody(blockTuple.body)
)
if not receiptsSuccess:
await bridge.gossipReceipts(blockHash, PortalReceipts.fromReceipts(receipts))
await bridge.gossipReceipts(
blockHash, PortalReceipts.fromReceipts(blockTuple.receipts)
)

await sleepAsync(2.seconds)

Expand Down
1 change: 0 additions & 1 deletion hive_integration/nodocker/engine/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ proc setBlock*(c: ChainRef; blk: Block): Result[void, string] =
let
vmState = c.getVmState(header).valueOr:
return err("no vmstate")
_ = vmState.parent.stateRoot # Check point
? vmState.processBlock(blk)

? c.db.persistHeader(
Expand Down
Loading

0 comments on commit 0b2eb5c

Please sign in to comment.