Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add: data column reconstruction and broadcast #6481

Merged
merged 3 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 0 additions & 56 deletions beacon_chain/gossip_processing/eth2_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -445,62 +445,6 @@ proc checkForPotentialDoppelganger(
attestation = shortLog(attestation)
quitDoppelganger()

#TODO: need to revamp `recover_blobs` and rewrite this
# proc processDataColumnReconstruction*(
# self: ref Eth2Processor,
# node: Eth2Node,
# signed_block: deneb.SignedBeaconBlock |
# electra.SignedBeaconBlock):
# Future[ValidationRes] {.async: (raises: [CancelledError]).} =

# let
# dag = self.dag
# root = signed_block.root
# custodiedColumnIndices = get_custody_columns(
# node.nodeId,
# CUSTODY_REQUIREMENT)

# var
# data_column_sidecars: seq[DataColumnSidecar]
# columnsOk = true
# storedColumns: seq[ColumnIndex]

# # Loading the data columns from the database
# for custody_column in custodiedColumnIndices.get:
# let data_column = DataColumnSidecar.new()
# if not dag.db.getDataColumnSidecar(root, custody_column, data_column[]):
# columnsOk = false
# break
# data_column_sidecars.add data_column[]
# storedColumns.add data_column.index

# if columnsOk:
# debug "Loaded data column for reconstruction"

# # storedColumn number is less than the NUMBER_OF_COLUMNS
# # then reconstruction is not possible, and if all the data columns
# # are already stored then we do not need to reconstruct at all
# if storedColumns.len < NUMBER_OF_COLUMNS or storedColumns.len == NUMBER_OF_COLUMNS:
# return ok()
# else:
# return errIgnore ("DataColumnSidecar: Reconstruction error!")

# # Recover blobs from saved data column sidecars
# let recovered_blobs = recover_blobs(data_column_sidecars, storedColumns.len, signed_block)
# if not recovered_blobs.isOk:
# return errIgnore ("Error recovering blobs from data columns")

# # Reconstruct data column sidecars from recovered blobs
# let reconstructedDataColumns = get_data_column_sidecars(signed_block, recovered_blobs.get)

# for data_column in data_column_sidecars:
# if data_column.index notin custodiedColumnIndices.get:
# continue

# dag.db.putDataColumnSidecar(data_column)

# ok()

proc processAttestation*(
self: ref Eth2Processor, src: MsgSource,
attestation: phase0.Attestation | electra.Attestation, subnet_id: SubnetId,
Expand Down
112 changes: 111 additions & 1 deletion beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import
./networking/[topic_params, network_metadata_downloads, eth2_network],
./rpc/[rest_api, state_ttl_cache],
./spec/datatypes/[altair, bellatrix, phase0],
./spec/[deposit_snapshots, engine_authentication, weak_subjectivity],
./spec/[deposit_snapshots, eip7594_helpers, engine_authentication, weak_subjectivity],
./sync/[sync_protocol, light_client_protocol],
./validators/[keystore_management, beacon_validators],
"."/[
Expand All @@ -30,6 +30,7 @@ when defined(posix):
import system/ansi_c

from ./spec/datatypes/deneb import SignedBeaconBlock
from ./spec/datatypes/electra import SignedBeaconBlock

from
libp2p/protocols/pubsub/gossipsub
Expand Down Expand Up @@ -1471,10 +1472,119 @@ proc pruneDataColumns(node: BeaconNode, slot: Slot) =
count = count + 1
debug "pruned data columns", count, dataColumnPruneEpoch

proc tryReconstructingDataColumns* (self: BeaconNode,
signed_block: deneb.TrustedSignedBeaconBlock |
electra.TrustedSignedBeaconBlock):
Result[void, string] =
# Checks whether the data columns can be reconstructed
# or not from the recovery matrix

let localCustodySubnetCount =
if self.config.subscribeAllSubnets:
DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64
else:
CUSTODY_REQUIREMENT

let
db = self.db
root = signed_block.root
custodiedColumnIndices = get_custody_columns(
self.network.nodeId,
localCustodySubnetCount)

var
data_column_sidecars: seq[DataColumnSidecar]
columnsOk = true
storedColumns: seq[ColumnIndex]

# Loading the data columns from the database
for custody_column in custodiedColumnIndices.get:
let data_column = DataColumnSidecar.new()
if not db.getDataColumnSidecar(root, custody_column, data_column[]):
columnsOk = false
break
data_column_sidecars.add data_column[]
storedColumns.add data_column.index

if columnsOk:
debug "Loaded data column for reconstruction"

# storedColumn number is less than the NUMBER_OF_COLUMNS
# then reconstruction is not possible, and if all the data columns
# are already stored then we do not need to reconstruct at all
if storedColumns.len < NUMBER_OF_COLUMNS or storedColumns.len == NUMBER_OF_COLUMNS:
return ok()
else:
return err("DataColumnSidecar: Reconstruction error!")

# Recover blobs from saved data column sidecars
let recovered_cps = recover_cells_and_proofs(data_column_sidecars, storedColumns.len, signed_block)
if not recovered_cps.isOk:
return err("Error recovering cells and proofs from data columns")

# Reconstruct data column sidecars from recovered blobs
let reconstructedDataColumns = get_data_column_sidecars(signed_block, recovered_cps.get)

for data_column in reconstructedDataColumns.get:
if data_column.index notin custodiedColumnIndices.get:
continue

db.putDataColumnSidecar(data_column)
notice "Data Column Reconstructed and Saved Successfully"

ok()

proc reconstructAndSendDataColumns*(node: BeaconNode) {.async.} =
let
db = node.db
root = node.dag.head.root

let blck = getForkedBlock(db, root).valueOr: return
withBlck(blck):
when typeof(forkyBlck).kind < ConsensusFork.Deneb: return
else:
let res = node.tryReconstructingDataColumns(forkyblck)
if not res.isOk():
return

let custody_columns = get_custody_columns(
node.network.nodeId,
CUSTODY_REQUIREMENT)
var
data_column_sidecars: DataColumnSidecars
columnsOk = true

for custody_column in custody_columns.get:
let data_column = DataColumnSidecar.new()
if not db.getDataColumnSidecar(
root, custody_column, data_column[]):
columnsOk = false
debug "Issue with loading reconstructed data columns"
break
data_column_sidecars.add data_column

var das_workers = newSeq[Future[SendResult]](len(data_column_sidecars))
for i in 0..<data_column_sidecars.lenu64:
let subnet_id = compute_subnet_for_data_column_sidecar(i)
das_workers[i] =
node.network.broadcastDataColumnSidecar(subnet_id, data_column_sidecars[i][])
let allres = await allFinished(das_workers)
for i in 0..<allres.len:
let res = allres[i]
doAssert res.finished()
if res.failed():
notice "Reconstructed data columns not sent",
data_column = shortLog(data_column_sidecars[i][]), error = res.error[]
else:
notice "Reconstructed data columns sent",
data_column = shortLog(data_column_sidecars[i][])

proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# Things we do when slot processing has ended and we're about to wait for the
# next slot

await node.reconstructAndSendDataColumns()

# By waiting until close before slot end, ensure that preparation for next
# slot does not interfere with propagation of messages and with VC duties.
const endOffset = aggregateSlotOffset + nanos(
Expand Down
5 changes: 4 additions & 1 deletion beacon_chain/spec/crypto.nim
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type

BlsCurveType* = ValidatorPrivKey | ValidatorPubKey | ValidatorSig

BlsResult*[T] = Result[T, cstring]
BlsResult*[T] = Result[T, cstring]

TrustedSig* = object
data* {.align: 16.}: array[RawSigSize, byte]
Expand Down Expand Up @@ -414,6 +414,9 @@ func toHex*(x: CookedPubKey): string =
func `$`*(x: CookedPubKey): string =
$(x.toPubKey())

func toValidatorSig*(x: TrustedSig): ValidatorSig =
ValidatorSig(blob: x.data)

func toValidatorSig*(x: CookedSig): ValidatorSig =
ValidatorSig(blob: blscurve.Signature(x).exportRaw())

Expand Down
118 changes: 64 additions & 54 deletions beacon_chain/spec/eip7594_helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -136,61 +136,72 @@ proc recover_matrix*(partial_matrix: seq[MatrixEntry],

ok(extended_matrix)

## THIS METHOD IS DEPRECATED, WILL BE REMOVED ONCE ALPHA 4 IS RELEASED
# proc recover_blobs*(
# data_columns: seq[DataColumnSidecar],
# columnCount: int,
# blck: deneb.SignedBeaconBlock |
# electra.SignedBeaconBlock |
# ForkySignedBeaconBlock):
# Result[seq[KzgBlob], cstring] =

# # This helper recovers blobs from the data column sidecars
# if not (data_columns.len != 0):
# return err("DataColumnSidecar: Length should not be 0")

# var blobCount = data_columns[0].column.len
# for data_column in data_columns:
# if not (blobCount == data_column.column.len):
# return err ("DataColumns do not have the same length")

# var recovered_blobs = newSeqOfCap[KzgBlob](blobCount)

# for blobIdx in 0 ..< blobCount:
# var
# cell_ids = newSeqOfCap[CellID](columnCount)
# ckzgCells = newSeqOfCap[KzgCell](columnCount)
# THIS METHOD IS DEPRECATED, WILL BE REMOVED ONCE ALPHA 4 IS RELEASED
proc recover_cells_and_proofs*(
data_columns: seq[DataColumnSidecar],
columnCount: int,
blck: deneb.TrustedSignedBeaconBlock |
electra.TrustedSignedBeaconBlock |
ForkedTrustedSignedBeaconBlock):
Result[seq[CellsAndProofs], cstring] =

# This helper recovers blobs from the data column sidecars
if not (data_columns.len != 0):
return err("DataColumnSidecar: Length should not be 0")

var blobCount = data_columns[0].column.len
for data_column in data_columns:
if not (blobCount == data_column.column.len):
return err ("DataColumns do not have the same length")

var recovered_cps = newSeqOfCap[CellsAndProofs](blobCount)

for blobIdx in 0 ..< blobCount:
var
cell_ids = newSeqOfCap[CellID](columnCount)
ckzgCells = newSeqOfCap[KzgCell](columnCount)

# for data_column in data_columns:
# cell_ids.add(data_column.index)
for data_column in data_columns:
cell_ids.add(data_column.index)

# let
# column = data_column.column
# cell = column[blobIdx]
let
column = data_column.column
cell = column[blobIdx]

# # Transform the cell as a ckzg cell
# var ckzgCell: Cell
# for i in 0 ..< int(FIELD_ELEMENTS_PER_CELL):
# var start = 32 * i
# for j in 0 ..< 32:
# ckzgCell[start + j] = cell[start+j]
# Transform the cell as a ckzg cell
var ckzgCell: array[BYTES_PER_CELL, byte]
for i in 0 ..< int(FIELD_ELEMENTS_PER_CELL):
var start = 32 * i
for j in 0 ..< 32:
var inter = cell.bytes
ckzgCell[start + j] = inter[start+j].byte

# ckzgCells.add(ckzgCell)
ckzgCells.add(KzgCell(bytes: ckzgCell))

# # Recovering the blob
# let recovered_cells = recoverAllCells(cell_ids, ckzgCells)
# if not recovered_cells.isOk:
# return err ("Recovering all cells for blob failed")
# Recovering the cells and proofs
let recovered_cells_and_proofs = recoverCellsAndKzgProofs(cell_ids, ckzgCells)

# let recovered_blob_res = cellsToBlob(recovered_cells.get)
# if not recovered_blob_res.isOk:
# return err ("Cells to blob for blob failed")
recovered_cps.add(recovered_cells_and_proofs.get)

# recovered_blobs.add(recovered_blob_res.get)
ok(recovered_cps)

# ok(recovered_blobs)
proc compute_signed_block_header(signed_block: deneb.TrustedSignedBeaconBlock |
electra.TrustedSignedBeaconBlock):
SignedBeaconBlockHeader =
let blck = signed_block.message
let block_header = BeaconBlockHeader(
slot: blck.slot,
proposer_index: blck.proposer_index,
parent_root: blck.parent_root,
state_root: blck.state_root,
body_root: hash_tree_root(blck.body)
)
result = SignedBeaconBlockHeader(
message: block_header,
signature: signed_block.signature.toValidatorSig
)

proc compute_signed_block_header(signed_block: deneb.SignedBeaconBlock |
proc compute_signed_block_header(signed_block: deneb.SignedBeaconBlock |
electra.SignedBeaconBlock):
SignedBeaconBlockHeader =
let blck = signed_block.message
Expand All @@ -207,9 +218,9 @@ proc compute_signed_block_header(signed_block: deneb.SignedBeaconBlock |
)

# https://github.com/ethereum/consensus-specs/blob/bb8f3caafc92590cdcf2d14974adb602db9b5ca3/specs/_features/eip7594/das-core.md#get_data_column_sidecars
proc get_data_column_sidecars*(signed_block: deneb.SignedBeaconBlock |
electra.SignedBeaconBlock,
cellsAndProofs: CellsAndProofs):
proc get_data_column_sidecars*(signed_block: deneb.TrustedSignedBeaconBlock |
electra.TrustedSignedBeaconBlock,
cellsAndProofs: seq[CellsAndProofs]):
Result[seq[DataColumnSidecar], string] =
# Given a signed block and the cells/proofs associated with each blob
# in the block, assemble the sidecars which can be distributed to peers.
Expand All @@ -221,18 +232,17 @@ proc get_data_column_sidecars*(signed_block: deneb.SignedBeaconBlock |

var sidecars = newSeq[DataColumnSidecar](CELLS_PER_EXT_BLOB)

if cellsAndProofs.cells.len == 0 or
cellsAndProofs.proof.len == 0:
if cellsAndProofs.len == 0:
return ok(sidecars)

for column_index in 0..<NUMBER_OF_COLUMNS:
var
column_cells: DataColumn
column_proofs: KzgProofs
for i in 0..<cellsAndProofs.cells.len:
let check1 = column_cells.add(cellsAndProofs.cells[column_index])
for i in 0..<cellsAndProofs.len:
let check1 = column_cells.add(cellsAndProofs[column_index].cells)
doAssert check1 == true, "Issue fetching cell from CellsAndProofs"
let check2 = column_proofs.add(cellsAndProofs.proofs[column_index])
let check2 = column_proofs.add(cellsAndProofs[column_index].proofs)
doAssert check2 == true, "Issue fetching proof from CellsAndProofs"

var sidecar = DataColumnSidecar(
Expand Down
5 changes: 0 additions & 5 deletions beacon_chain/spec/network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,6 @@ iterator blobSidecarTopics*(forkDigest: ForkDigest): string =
yield getBlobSidecarTopic(forkDigest, subnet_id)


const
KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH* = 32
MAX_REQUEST_DATA_COLUMN_SIDECARS* = MAX_REQUEST_BLOCKS_DENEB * NUMBER_OF_COLUMNS
MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS* = 4096

func getDataColumnSidecarTopic*(forkDigest: ForkDigest,
subnet_id: uint64): string =
eth2Prefix(forkDigest) & "data_column_sidecar_" & $subnet_id & "/ssz_snappy"
Expand Down
2 changes: 1 addition & 1 deletion beacon_chain/sync/sync_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ p2pProtocol BeaconSync(version = 1,
reqCount: uint64,
reqColumns: List[ColumnIndex, NUMBER_OF_COLUMNS],
response: MultipleChunksResponse[
ref DataColumnSidecar, Limit(MAX_REQUEST_DATA_COLUMN_SIDECARS)])
ref DataColumnSidecar, Limit(MAX_REQUEST_DATA_COLUMNS)])
{.async, libp2pProtocol("data_column_sidecars_by_range", 1).} =

trace "got data columns range request", peer, startSlot,
Expand Down
Loading
Loading