From 370b0dc8bd40bb1647f8f239bfe20dd36ab663bd Mon Sep 17 00:00:00 2001 From: Agnish Ghosh Date: Wed, 7 Aug 2024 16:58:56 +0530 Subject: [PATCH 1/3] save commit, decouples reconstruction and broadcasting --- .../gossip_processing/eth2_processor.nim | 99 +++++++++-------- beacon_chain/nimbus_beacon_node.nim | 105 +++++++++++++++++- beacon_chain/spec/eip7594_helpers.nim | 97 ++++++++-------- beacon_chain/spec/network.nim | 5 - beacon_chain/sync/sync_protocol.nim | 2 +- 5 files changed, 200 insertions(+), 108 deletions(-) diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 86eeea94fb..05ad16951c 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -446,60 +446,61 @@ proc checkForPotentialDoppelganger( quitDoppelganger() #TODO: need to revamp `recover_blobs` and rewrite this -# proc processDataColumnReconstruction*( -# self: ref Eth2Processor, -# node: Eth2Node, -# signed_block: deneb.SignedBeaconBlock | -# electra.SignedBeaconBlock): -# Future[ValidationRes] {.async: (raises: [CancelledError]).} = +proc processDataColumnReconstruction*( + self: ref Eth2Processor, + node: Eth2Node, + signed_block: deneb.SignedBeaconBlock | + electra.SignedBeaconBlock): + Future[ValidationRes] {.async: (raises: [CancelledError]).} = -# let -# dag = self.dag -# root = signed_block.root -# custodiedColumnIndices = get_custody_columns( -# node.nodeId, -# CUSTODY_REQUIREMENT) + let + dag = self.dag + root = signed_block.root + custodiedColumnIndices = get_custody_columns( + node.nodeId, + CUSTODY_REQUIREMENT) -# var -# data_column_sidecars: seq[DataColumnSidecar] -# columnsOk = true -# storedColumns: seq[ColumnIndex] + var + data_column_sidecars: seq[DataColumnSidecar] + columnsOk = true + storedColumns: seq[ColumnIndex] -# # Loading the data columns from the database -# for custody_column in custodiedColumnIndices.get: -# let data_column = DataColumnSidecar.new() -# if not dag.db.getDataColumnSidecar(root, custody_column, data_column[]): -# columnsOk = false -# break -# data_column_sidecars.add data_column[] -# storedColumns.add data_column.index - -# if columnsOk: -# debug "Loaded data column for reconstruction" - -# # storedColumn number is less than the NUMBER_OF_COLUMNS -# # then reconstruction is not possible, and if all the data columns -# # are already stored then we do not need to reconstruct at all -# if storedColumns.len < NUMBER_OF_COLUMNS or storedColumns.len == NUMBER_OF_COLUMNS: -# return ok() -# else: -# return errIgnore ("DataColumnSidecar: Reconstruction error!") - -# # Recover blobs from saved data column sidecars -# let recovered_blobs = recover_blobs(data_column_sidecars, storedColumns.len, signed_block) -# if not recovered_blobs.isOk: -# return errIgnore ("Error recovering blobs from data columns") - -# # Reconstruct data column sidecars from recovered blobs -# let reconstructedDataColumns = get_data_column_sidecars(signed_block, recovered_blobs.get) - -# for data_column in data_column_sidecars: -# if data_column.index notin custodiedColumnIndices.get: -# continue + # Loading the data columns from the database + for custody_column in custodiedColumnIndices.get: + let data_column = DataColumnSidecar.new() + if not dag.db.getDataColumnSidecar(root, custody_column, data_column[]): + columnsOk = false + break + data_column_sidecars.add data_column[] + storedColumns.add data_column.index + + if columnsOk: + debug "Loaded data column for reconstruction" + + # storedColumn number is less than the NUMBER_OF_COLUMNS + # then reconstruction is not possible, and if all the data columns + # are already stored then we do not need to reconstruct at all + if storedColumns.len < NUMBER_OF_COLUMNS or storedColumns.len == NUMBER_OF_COLUMNS: + return ok() + else: + return errIgnore ("DataColumnSidecar: Reconstruction error!") + + # Recover blobs from saved data column sidecars + let recovered_cps = recover_cells_and_proofs(data_column_sidecars, storedColumns.len, signed_block) + if not recovered_cps.isOk: + return errIgnore ("Error recovering cells and proofs from data columns") + + # Reconstruct data column sidecars from recovered blobs + let reconstructedDataColumns = get_data_column_sidecars(signed_block, recovered_cps.get) + + for data_column in data_column_sidecars: + if data_column.index notin custodiedColumnIndices.get: + continue -# dag.db.putDataColumnSidecar(data_column) + dag.db.putDataColumnSidecar(data_column) + notice "Data Column Reconstructed and Saved Successfully" -# ok() + ok() proc processAttestation*( self: ref Eth2Processor, src: MsgSource, diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index daf536ab47..6d847cc311 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -19,7 +19,7 @@ import ./networking/[topic_params, network_metadata_downloads, eth2_network], ./rpc/[rest_api, state_ttl_cache], ./spec/datatypes/[altair, bellatrix, phase0], - ./spec/[deposit_snapshots, engine_authentication, weak_subjectivity], + ./spec/[deposit_snapshots, eip7594_helpers, engine_authentication, weak_subjectivity], ./sync/[sync_protocol, light_client_protocol], ./validators/[keystore_management, beacon_validators], "."/[ @@ -30,6 +30,7 @@ when defined(posix): import system/ansi_c from ./spec/datatypes/deneb import SignedBeaconBlock +from ./spec/datatypes/electra import SignedBeaconBlock from libp2p/protocols/pubsub/gossipsub @@ -1471,10 +1472,112 @@ proc pruneDataColumns(node: BeaconNode, slot: Slot) = count = count + 1 debug "pruned data columns", count, dataColumnPruneEpoch +proc tryReconstructingDataColumns* (self: BeaconNode, + signed_block: deneb.SignedBeaconBlock | + electra.SignedBeaconBlock): Result[void, string] = + # Checks whether the data columns can be reconstructed + # or not from the recovery matrix + + let localCustodySubnetCount = + if self.config.subscribeAllSubnets: + DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64 + else: + CUSTODY_REQUIREMENT + + let + db = self.db + root = signed_block.root + custodiedColumnIndices = get_custody_columns( + self.network.nodeId, + localCustodySubnetCount) + + var + data_column_sidecars: seq[DataColumnSidecar] + columnsOk = true + storedColumns: seq[ColumnIndex] + + # Loading the data columns from the database + for custody_column in custodiedColumnIndices.get: + let data_column = DataColumnSidecar.new() + if not db.getDataColumnSidecar(root, custody_column, data_column[]): + columnsOk = false + break + data_column_sidecars.add data_column[] + storedColumns.add data_column.index + + if columnsOk: + debug "Loaded data column for reconstruction" + + # storedColumn number is less than the NUMBER_OF_COLUMNS + # then reconstruction is not possible, and if all the data columns + # are already stored then we do not need to reconstruct at all + if storedColumns.len < NUMBER_OF_COLUMNS or storedColumns.len == NUMBER_OF_COLUMNS: + return ok() + else: + return errIgnore ("DataColumnSidecar: Reconstruction error!") + + # Recover blobs from saved data column sidecars + let recovered_cps = recover_cells_and_proofs(data_column_sidecars, storedColumns.len, signed_block) + if not recovered_cps.isOk: + return errIgnore ("Error recovering cells and proofs from data columns") + + # Reconstruct data column sidecars from recovered blobs + let reconstructedDataColumns = get_data_column_sidecars(signed_block, recovered_cps.get) + + for data_column in data_column_sidecars: + if data_column.index notin custodiedColumnIndices.get: + continue + + db.putDataColumnSidecar(data_column) + notice "Data Column Reconstructed and Saved Successfully" + + ok() + +proc reconstructAndSendDataColumns*(node: BeaconNode, slot: Slot) {.async.} = + let db = node.db + var blckid: BlockId + let blck = node.dag.getForkedBlock(blckid).valueOr: return + when typeof(blck).kind < ConsensusFork.Deneb: return + else: + let res = node.tryReconstructingDataColumns(blck) + let custody_columns = get_custody_columns( + router.network.nodeId, + CUSTODY_REQUIREMENT) + var + data_column_sidecars: DataColumnSidecars + columnsOk = true + + for custody_column in custody_columns.get: + let data_column = DataColumnSidecar.new() + if not db.getDataColumnSidecar( + blck.root, custody_column, data_column[]): + columnsOk = false + debug "Issue with loading reconstructed data columns" + break + data_column_sidecars.add data_column + + var das_workers = newSeq[Future[SendResult]](len(data_column_sidecars)) + for i in 0.. Date: Wed, 7 Aug 2024 21:12:09 +0530 Subject: [PATCH 2/3] save progress --- beacon_chain/nimbus_beacon_node.nim | 62 ++++++++++++++++----------- beacon_chain/spec/eip7594_helpers.nim | 6 +-- 2 files changed, 39 insertions(+), 29 deletions(-) diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 6d847cc311..fc87042bc1 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -1473,8 +1473,8 @@ proc pruneDataColumns(node: BeaconNode, slot: Slot) = debug "pruned data columns", count, dataColumnPruneEpoch proc tryReconstructingDataColumns* (self: BeaconNode, - signed_block: deneb.SignedBeaconBlock | - electra.SignedBeaconBlock): Result[void, string] = + signed_block: ForkedTrustedSignedBeaconBlock): + Result[void, string] = # Checks whether the data columns can be reconstructed # or not from the recovery matrix @@ -1491,6 +1491,10 @@ proc tryReconstructingDataColumns* (self: BeaconNode, self.network.nodeId, localCustodySubnetCount) + if not selfReconstructDataColumns(custodiedColumnIndices.lenu64): + # No need to reconstruct and broadcast + ok() + var data_column_sidecars: seq[DataColumnSidecar] columnsOk = true @@ -1533,34 +1537,40 @@ proc tryReconstructingDataColumns* (self: BeaconNode, ok() -proc reconstructAndSendDataColumns*(node: BeaconNode, slot: Slot) {.async.} = - let db = node.db - var blckid: BlockId - let blck = node.dag.getForkedBlock(blckid).valueOr: return - when typeof(blck).kind < ConsensusFork.Deneb: return - else: - let res = node.tryReconstructingDataColumns(blck) - let custody_columns = get_custody_columns( - router.network.nodeId, - CUSTODY_REQUIREMENT) - var - data_column_sidecars: DataColumnSidecars - columnsOk = true - - for custody_column in custody_columns.get: - let data_column = DataColumnSidecar.new() - if not db.getDataColumnSidecar( - blck.root, custody_column, data_column[]): - columnsOk = false - debug "Issue with loading reconstructed data columns" - break - data_column_sidecars.add data_column +proc reconstructAndSendDataColumns*(node: BeaconNode) {.async.} = + let + db = node.db + root = node.dag.head.root + + let blck = getForkedBlock(db, root).valueOr: return + withBlck(blck): + when typeof(forkyBlck).kind < ConsensusFork.Deneb: return + else: + let res = node.tryReconstructingDataColumns(blck) + if not res.isOk(): + return + + let custody_columns = get_custody_columns( + node.network.nodeId, + CUSTODY_REQUIREMENT) + var + data_column_sidecars: DataColumnSidecars + columnsOk = true + + for custody_column in custody_columns.get: + let data_column = DataColumnSidecar.new() + if not db.getDataColumnSidecar( + root, custody_column, data_column[]): + columnsOk = false + debug "Issue with loading reconstructed data columns" + break + data_column_sidecars.add data_column var das_workers = newSeq[Future[SendResult]](len(data_column_sidecars)) for i in 0.. Date: Thu, 8 Aug 2024 12:47:38 +0530 Subject: [PATCH 3/3] add: reconstruction event loop, previous reconstruction related cleanups --- .../gossip_processing/eth2_processor.nim | 57 ------------------- beacon_chain/nimbus_beacon_node.nim | 15 ++--- beacon_chain/spec/crypto.nim | 5 +- beacon_chain/spec/eip7594_helpers.nim | 29 ++++++++-- beacon_chain/validators/message_router.nim | 50 ---------------- 5 files changed, 33 insertions(+), 123 deletions(-) diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 05ad16951c..61348760f0 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -445,63 +445,6 @@ proc checkForPotentialDoppelganger( attestation = shortLog(attestation) quitDoppelganger() -#TODO: need to revamp `recover_blobs` and rewrite this -proc processDataColumnReconstruction*( - self: ref Eth2Processor, - node: Eth2Node, - signed_block: deneb.SignedBeaconBlock | - electra.SignedBeaconBlock): - Future[ValidationRes] {.async: (raises: [CancelledError]).} = - - let - dag = self.dag - root = signed_block.root - custodiedColumnIndices = get_custody_columns( - node.nodeId, - CUSTODY_REQUIREMENT) - - var - data_column_sidecars: seq[DataColumnSidecar] - columnsOk = true - storedColumns: seq[ColumnIndex] - - # Loading the data columns from the database - for custody_column in custodiedColumnIndices.get: - let data_column = DataColumnSidecar.new() - if not dag.db.getDataColumnSidecar(root, custody_column, data_column[]): - columnsOk = false - break - data_column_sidecars.add data_column[] - storedColumns.add data_column.index - - if columnsOk: - debug "Loaded data column for reconstruction" - - # storedColumn number is less than the NUMBER_OF_COLUMNS - # then reconstruction is not possible, and if all the data columns - # are already stored then we do not need to reconstruct at all - if storedColumns.len < NUMBER_OF_COLUMNS or storedColumns.len == NUMBER_OF_COLUMNS: - return ok() - else: - return errIgnore ("DataColumnSidecar: Reconstruction error!") - - # Recover blobs from saved data column sidecars - let recovered_cps = recover_cells_and_proofs(data_column_sidecars, storedColumns.len, signed_block) - if not recovered_cps.isOk: - return errIgnore ("Error recovering cells and proofs from data columns") - - # Reconstruct data column sidecars from recovered blobs - let reconstructedDataColumns = get_data_column_sidecars(signed_block, recovered_cps.get) - - for data_column in data_column_sidecars: - if data_column.index notin custodiedColumnIndices.get: - continue - - dag.db.putDataColumnSidecar(data_column) - notice "Data Column Reconstructed and Saved Successfully" - - ok() - proc processAttestation*( self: ref Eth2Processor, src: MsgSource, attestation: phase0.Attestation | electra.Attestation, subnet_id: SubnetId, diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index fc87042bc1..43ba6caf32 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -1473,7 +1473,8 @@ proc pruneDataColumns(node: BeaconNode, slot: Slot) = debug "pruned data columns", count, dataColumnPruneEpoch proc tryReconstructingDataColumns* (self: BeaconNode, - signed_block: ForkedTrustedSignedBeaconBlock): + signed_block: deneb.TrustedSignedBeaconBlock | + electra.TrustedSignedBeaconBlock): Result[void, string] = # Checks whether the data columns can be reconstructed # or not from the recovery matrix @@ -1491,10 +1492,6 @@ proc tryReconstructingDataColumns* (self: BeaconNode, self.network.nodeId, localCustodySubnetCount) - if not selfReconstructDataColumns(custodiedColumnIndices.lenu64): - # No need to reconstruct and broadcast - ok() - var data_column_sidecars: seq[DataColumnSidecar] columnsOk = true @@ -1518,17 +1515,17 @@ proc tryReconstructingDataColumns* (self: BeaconNode, if storedColumns.len < NUMBER_OF_COLUMNS or storedColumns.len == NUMBER_OF_COLUMNS: return ok() else: - return errIgnore ("DataColumnSidecar: Reconstruction error!") + return err("DataColumnSidecar: Reconstruction error!") # Recover blobs from saved data column sidecars let recovered_cps = recover_cells_and_proofs(data_column_sidecars, storedColumns.len, signed_block) if not recovered_cps.isOk: - return errIgnore ("Error recovering cells and proofs from data columns") + return err("Error recovering cells and proofs from data columns") # Reconstruct data column sidecars from recovered blobs let reconstructedDataColumns = get_data_column_sidecars(signed_block, recovered_cps.get) - for data_column in data_column_sidecars: + for data_column in reconstructedDataColumns.get: if data_column.index notin custodiedColumnIndices.get: continue @@ -1546,7 +1543,7 @@ proc reconstructAndSendDataColumns*(node: BeaconNode) {.async.} = withBlck(blck): when typeof(forkyBlck).kind < ConsensusFork.Deneb: return else: - let res = node.tryReconstructingDataColumns(blck) + let res = node.tryReconstructingDataColumns(forkyblck) if not res.isOk(): return diff --git a/beacon_chain/spec/crypto.nim b/beacon_chain/spec/crypto.nim index 09798978e1..f608a92151 100644 --- a/beacon_chain/spec/crypto.nim +++ b/beacon_chain/spec/crypto.nim @@ -75,7 +75,7 @@ type BlsCurveType* = ValidatorPrivKey | ValidatorPubKey | ValidatorSig - BlsResult*[T] = Result[T, cstring] + BlsResult*[T] = Result[T, cstring] TrustedSig* = object data* {.align: 16.}: array[RawSigSize, byte] @@ -414,6 +414,9 @@ func toHex*(x: CookedPubKey): string = func `$`*(x: CookedPubKey): string = $(x.toPubKey()) +func toValidatorSig*(x: TrustedSig): ValidatorSig = + ValidatorSig(blob: x.data) + func toValidatorSig*(x: CookedSig): ValidatorSig = ValidatorSig(blob: blscurve.Signature(x).exportRaw()) diff --git a/beacon_chain/spec/eip7594_helpers.nim b/beacon_chain/spec/eip7594_helpers.nim index 110192ab4b..7613e7f3c4 100644 --- a/beacon_chain/spec/eip7594_helpers.nim +++ b/beacon_chain/spec/eip7594_helpers.nim @@ -169,13 +169,14 @@ proc recover_cells_and_proofs*( cell = column[blobIdx] # Transform the cell as a ckzg cell - var ckzgCell: Cell + var ckzgCell: array[BYTES_PER_CELL, byte] for i in 0 ..< int(FIELD_ELEMENTS_PER_CELL): var start = 32 * i for j in 0 ..< 32: - ckzgCell[start + j] = cell[start+j] + var inter = cell.bytes + ckzgCell[start + j] = inter[start+j].byte - ckzgCells.add(ckzgCell) + ckzgCells.add(KzgCell(bytes: ckzgCell)) # Recovering the cells and proofs let recovered_cells_and_proofs = recoverCellsAndKzgProofs(cell_ids, ckzgCells) @@ -184,7 +185,23 @@ proc recover_cells_and_proofs*( ok(recovered_cps) -proc compute_signed_block_header(signed_block: deneb.SignedBeaconBlock | +proc compute_signed_block_header(signed_block: deneb.TrustedSignedBeaconBlock | + electra.TrustedSignedBeaconBlock): + SignedBeaconBlockHeader = + let blck = signed_block.message + let block_header = BeaconBlockHeader( + slot: blck.slot, + proposer_index: blck.proposer_index, + parent_root: blck.parent_root, + state_root: blck.state_root, + body_root: hash_tree_root(blck.body) + ) + result = SignedBeaconBlockHeader( + message: block_header, + signature: signed_block.signature.toValidatorSig + ) + +proc compute_signed_block_header(signed_block: deneb.SignedBeaconBlock | electra.SignedBeaconBlock): SignedBeaconBlockHeader = let blck = signed_block.message @@ -201,8 +218,8 @@ proc compute_signed_block_header(signed_block: deneb.SignedBeaconBlock | ) # https://github.com/ethereum/consensus-specs/blob/bb8f3caafc92590cdcf2d14974adb602db9b5ca3/specs/_features/eip7594/das-core.md#get_data_column_sidecars -proc get_data_column_sidecars*(signed_block: deneb.SignedBeaconBlock | - electra.SignedBeaconBlock, +proc get_data_column_sidecars*(signed_block: deneb.TrustedSignedBeaconBlock | + electra.TrustedSignedBeaconBlock, cellsAndProofs: seq[CellsAndProofs]): Result[seq[DataColumnSidecar], string] = # Given a signed block and the cells/proofs associated with each blob diff --git a/beacon_chain/validators/message_router.nim b/beacon_chain/validators/message_router.nim index 3c83fc8761..44ce411e48 100644 --- a/beacon_chain/validators/message_router.nim +++ b/beacon_chain/validators/message_router.nim @@ -220,56 +220,6 @@ proc routeSignedBeaconBlock*( signature = shortLog(blck.signature) ok(blockRef) -proc routeReconstructedDataColumns*( - router: ref MessageRouter, - blck: ForkySignedBeaconBlock): - Future[SendResult] {.async: (raises: [CancelledError]).} = - - ## Process reconstructing the data columns and broadcast once done - block: - when typeof(blck).kind >= ConsensusFork.Deneb: - let res = await router[].processor.processDataColumnReconstruction( - router[].network, blck) - - if not res.isGoodForSending: - warn "Issue sending reconstructed data columns" - return err(res.error()[1]) - - let custody_columns = get_custody_columns( - router.network.nodeId, - CUSTODY_REQUIREMENT) - - var - data_column_sidecars: DataColumnSidecars - columnsOk = true - - for custody_column in custody_columns.get: - let data_column = DataColumnSidecar.new() - if not router[].processor.dag.db.getDataColumnSidecar( - blck.root, custody_column, data_column[]): - columnsOk = false - debug "Issue with loading reconstructed data columns" - break - data_column_sidecars.add data_column - - var das_workers = newSeq[Future[SendResult]](len(data_column_sidecars)) - for i in 0..