Skip to content

Commit

Permalink
replace validator Bloom filter with partial bucket sort (#6469)
Browse files Browse the repository at this point in the history
  • Loading branch information
tersec authored Aug 2, 2024
1 parent 6a57697 commit 8333365
Show file tree
Hide file tree
Showing 16 changed files with 453 additions and 293 deletions.
6 changes: 3 additions & 3 deletions AllTests-mainnet.md
Original file line number Diff line number Diff line change
Expand Up @@ -933,10 +933,10 @@ OK: 6/6 Fail: 0/6 Skip: 0/6
+ Dynamic validator set: updateDynamicValidators() test OK
```
OK: 4/4 Fail: 0/4 Skip: 0/4
## ValidatorPubKey Bloom filter
## ValidatorPubKey bucket sort
```diff
+ incremental construction with no false positives/negatives OK
+ one-shot construction with no false positives/negatives OK
+ incremental construction OK
+ one-shot construction OK
```
OK: 2/2 Fail: 0/2 Skip: 0/2
## Zero signature sanity checks
Expand Down
49 changes: 0 additions & 49 deletions beacon_chain/bloomfilter.nim

This file was deleted.

110 changes: 51 additions & 59 deletions beacon_chain/spec/state_transition_block.nim
Original file line number Diff line number Diff line change
Expand Up @@ -275,48 +275,20 @@ proc process_attester_slashing*(

ok((proposer_reward, cur_exit_queue_info))

func findValidatorIndex*(state: ForkyBeaconState, pubkey: ValidatorPubKey):
Opt[ValidatorIndex] =
# This linear scan is unfortunate, but should be fairly fast as we do a simple
# byte comparison of the key. The alternative would be to build a Table, but
# given that each block can hold no more than 16 deposits, it's slower to
# build the table and use it for lookups than to scan it like this.
# Once we have a reusable, long-lived cache, this should be revisited
#
# For deposit processing purposes, two broad cases exist, either
#
# (a) someone has deposited all 32 required ETH as a single transaction,
# in which case the index doesn't yet exist so the search order does
# not matter so long as it's generally in an order memory controller
# prefetching can predict; or
#
# (b) the deposit has been split into multiple parts, typically not far
# apart from each other, such that on average one would expect this
# validator index to be nearer the maximal than minimal index.
#
# countdown() infinite-loops if the lower bound with uint32 is 0, so
# shift indices by 1, which avoids triggering unsigned wraparound.
for vidx in countdown(state.validators.len.uint32, 1):
if state.validators.asSeq[vidx - 1].pubkey == pubkey:
return Opt[ValidatorIndex].ok((vidx - 1).ValidatorIndex)

from ".."/bloomfilter import
PubkeyBloomFilter, constructBloomFilter, incl, mightContain
from ".."/validator_bucket_sort import
BucketSortedValidators, add, findValidatorIndex, sortValidatorBuckets

# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.0/specs/phase0/beacon-chain.md#deposits
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.0/specs/electra/beacon-chain.md#updated--apply_deposit
proc apply_deposit(
cfg: RuntimeConfig, state: var ForkyBeaconState,
bloom_filter: var PubkeyBloomFilter, deposit_data: DepositData,
flags: UpdateFlags): Result[void, cstring] =
bucketSortedValidators: var BucketSortedValidators,
deposit_data: DepositData, flags: UpdateFlags): Result[void, cstring] =
let
pubkey = deposit_data.pubkey
amount = deposit_data.amount
index =
if bloom_filter.mightContain(pubkey):
findValidatorIndex(state, pubkey)
else:
Opt.none(ValidatorIndex)
index = findValidatorIndex(
state.validators.asSeq, bucketSortedValidators, pubkey)

if index.isSome():
# Increase balance by deposit amount
Expand Down Expand Up @@ -358,14 +330,15 @@ proc apply_deposit(
return err("apply_deposit: too many validators (current_epoch_participation)")
if not state.inactivity_scores.add(0'u64):
return err("apply_deposit: too many validators (inactivity_scores)")
let new_vidx = state.validators.lenu64 - 1
when typeof(state).kind >= ConsensusFork.Electra:
debugComment "check hashlist add return"

# [New in Electra:EIP7251]
discard state.pending_balance_deposits.add PendingBalanceDeposit(
index: state.validators.lenu64 - 1, amount: amount)
index: new_vidx, amount: amount)
doAssert state.validators.len == state.balances.len
bloom_filter.incl pubkey
bucketSortedValidators.add new_vidx.ValidatorIndex
else:
# Deposits may come with invalid signatures - in that case, they are not
# turned into a validator but still get processed to keep the deposit
Expand All @@ -378,7 +351,8 @@ proc apply_deposit(
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.0/specs/phase0/beacon-chain.md#deposits
proc process_deposit*(
cfg: RuntimeConfig, state: var ForkyBeaconState,
bloom_filter: var PubkeyBloomFilter, deposit: Deposit, flags: UpdateFlags):
bucketSortedValidators: var BucketSortedValidators,
deposit: Deposit, flags: UpdateFlags):
Result[void, cstring] =
## Process an Eth1 deposit, registering a validator or increasing its balance.

Expand All @@ -395,20 +369,21 @@ proc process_deposit*(
# Deposits must be processed in order
state.eth1_deposit_index += 1

apply_deposit(cfg, state, bloom_filter, deposit.data, flags)
apply_deposit(cfg, state, bucketSortedValidators, deposit.data, flags)

# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.3/specs/electra/beacon-chain.md#new-process_deposit_request
func process_deposit_request*(
cfg: RuntimeConfig, state: var electra.BeaconState,
bloom_filter: var PubkeyBloomFilter, deposit_request: DepositRequest,
bucketSortedValidators: var BucketSortedValidators,
deposit_request: DepositRequest,
flags: UpdateFlags): Result[void, cstring] =
# Set deposit request start index
if state.deposit_requests_start_index ==
UNSET_DEPOSIT_REQUESTS_START_INDEX:
state.deposit_requests_start_index = deposit_request.index

apply_deposit(
cfg, state, bloom_filter, DepositData(
cfg, state, bucketSortedValidators, DepositData(
pubkey: deposit_request.pubkey,
withdrawal_credentials: deposit_request.withdrawal_credentials,
amount: deposit_request.amount,
Expand Down Expand Up @@ -510,6 +485,7 @@ proc process_bls_to_execution_change*(
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.3/specs/electra/beacon-chain.md#new-process_withdrawal_request
func process_withdrawal_request*(
cfg: RuntimeConfig, state: var electra.BeaconState,
bucketSortedValidators: BucketSortedValidators,
withdrawal_request: WithdrawalRequest, cache: var StateCache) =
let
amount = withdrawal_request.amount
Expand All @@ -523,7 +499,9 @@ func process_withdrawal_request*(
let
request_pubkey = withdrawal_request.validator_pubkey
# Verify pubkey exists
index = findValidatorIndex(state, request_pubkey).valueOr:
index = findValidatorIndex(
state.validators.asSeq, bucketSortedValidators,
request_pubkey).valueOr:
return
validator = state.validators.item(index)

Expand Down Expand Up @@ -591,6 +569,7 @@ func process_withdrawal_request*(
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.3/specs/electra/beacon-chain.md#new-process_consolidation_request
proc process_consolidation_request*(
cfg: RuntimeConfig, state: var electra.BeaconState,
bucketSortedValidators: BucketSortedValidators,
consolidation_request: ConsolidationRequest,
cache: var StateCache) =
# If the pending consolidations queue is full, consolidation requests are
Expand All @@ -606,11 +585,14 @@ proc process_consolidation_request*(

let
# Verify pubkeys exists
source_index =
findValidatorIndex(state, consolidation_request.source_pubkey).valueOr:
source_index = findValidatorIndex(
state.validators.asSeq, bucketSortedValidators,
consolidation_request.source_pubkey).valueOr:
return
target_index =
findValidatorIndex(state, consolidation_request.target_pubkey).valueOr:
findValidatorIndex(
state.validators.asSeq, bucketSortedValidators,
consolidation_request.target_pubkey).valueOr:
return

# Verify that source != target, so a consolidation cannot be used as an exit.
Expand Down Expand Up @@ -698,12 +680,26 @@ proc process_operations(

# It costs a full validator set scan to construct these values; only do so if
# there will be some kind of exit.
var exit_queue_info =
if body.proposer_slashings.len + body.attester_slashings.len +
body.voluntary_exits.len > 0:
get_state_exit_queue_info(state)
else:
default(ExitQueueInfo) # not used
# TODO Electra doesn't use exit_queue_info, don't calculate
var
exit_queue_info =
if body.proposer_slashings.len + body.attester_slashings.len +
body.voluntary_exits.len > 0:
get_state_exit_queue_info(state)
else:
default(ExitQueueInfo) # not used
bsv_use =
when typeof(body).kind >= ConsensusFork.Electra:
body.deposits.len + body.execution_payload.deposit_requests.len +
body.execution_payload.withdrawal_requests.len +
body.execution_payload.consolidation_requests.len > 0
else:
body.deposits.len > 0
bsv =
if bsv_use:
sortValidatorBuckets(state.validators.asSeq)
else:
nil # this is a logic error, effectively assert

for op in body.proposer_slashings:
let (proposer_slashing_reward, new_exit_queue_info) =
Expand All @@ -718,10 +714,8 @@ proc process_operations(
for op in body.attestations:
operations_rewards.attestations +=
? process_attestation(state, op, flags, base_reward_per_increment, cache)
if body.deposits.len > 0:
let bloom_filter = constructBloomFilter(state.validators.asSeq)
for op in body.deposits:
? process_deposit(cfg, state, bloom_filter[], op, flags)
for op in body.deposits:
? process_deposit(cfg, state, bsv[], op, flags)
for op in body.voluntary_exits:
exit_queue_info = ? process_voluntary_exit(
cfg, state, op, flags, exit_queue_info, cache)
Expand All @@ -731,15 +725,13 @@ proc process_operations(

when typeof(body).kind >= ConsensusFork.Electra:
for op in body.execution_payload.deposit_requests:
debugComment "combine with previous Bloom filter construction"
let bloom_filter = constructBloomFilter(state.validators.asSeq)
? process_deposit_request(cfg, state, bloom_filter[], op, {})
? process_deposit_request(cfg, state, bsv[], op, {})
for op in body.execution_payload.withdrawal_requests:
# [New in Electra:EIP7002:7251]
process_withdrawal_request(cfg, state, op, cache)
process_withdrawal_request(cfg, state, bsv[], op, cache)
for op in body.execution_payload.consolidation_requests:
# [New in Electra:EIP7251]
process_consolidation_request(cfg, state, op, cache)
process_consolidation_request(cfg, state, bsv[], op, cache)

ok(operations_rewards)

Expand Down
90 changes: 90 additions & 0 deletions beacon_chain/validator_bucket_sort.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# beacon_chain
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [].}

import std/typetraits
import "."/spec/crypto
from "."/spec/datatypes/base import Validator, ValidatorIndex, pubkey, `==`

const
BUCKET_BITS = 9 # >= 13 gets slow to construct
NUM_BUCKETS = 1 shl BUCKET_BITS

type
# `newSeqUninitialized` requires its type to be SomeNumber
IntValidatorIndex = distinctBase ValidatorIndex

BucketSortedValidators* = object
bucketSorted*: seq[IntValidatorIndex]
bucketUpperBounds: array[NUM_BUCKETS, uint] # avoids over/underflow checks
extraItems*: seq[ValidatorIndex]

template getBucketNumber(h: ValidatorPubKey): uint =
# This assumes https://en.wikipedia.org/wiki/Avalanche_effect for uniform
# distribution across pubkeys. ValidatorPubKey specifically satisfies this
# criterion. If required, can look at more input bytes, but ultimately it
# doesn't affect correctness, only speed.

# Otherwise need more than 2 bytes of input
static: doAssert BUCKET_BITS <= 16

const BUCKET_MASK = (NUM_BUCKETS - 1)
((h.blob[0] * 256 + h.blob[1]) and BUCKET_MASK)

func sortValidatorBuckets*(validators: openArray[Validator]):
ref BucketSortedValidators {.noinline.} =
var bucketSizes: array[NUM_BUCKETS, uint]
for validator in validators:
inc bucketSizes[getBucketNumber(validator.pubkey)]

var
bucketInsertPositions: array[NUM_BUCKETS, uint]
accum: uint
for i, s in bucketSizes:
accum += s
bucketInsertPositions[i] = accum
doAssert accum == validators.len.uint
let res = (ref BucketSortedValidators)(
bucketSorted: newSeqUninitialized[IntValidatorIndex](validators.len),
bucketUpperBounds: bucketInsertPositions)

for i, validator in validators:
let insertPos =
addr bucketInsertPositions[getBucketNumber(validator.pubkey)]
dec insertPos[]
res.bucketSorted[insertPos[]] = i.IntValidatorIndex

doAssert bucketInsertPositions[0] == 0
for i in 1 ..< NUM_BUCKETS:
doAssert res.bucketUpperBounds[i - 1] == bucketInsertPositions[i]

res

func add*(
bucketSortedValidators: var BucketSortedValidators,
validatorIndex: ValidatorIndex) =
bucketSortedValidators.extraItems.add validatorIndex

func findValidatorIndex*(
validators: openArray[Validator], bsv: BucketSortedValidators,
pubkey: ValidatorPubKey): Opt[ValidatorIndex] =
for validatorIndex in bsv.extraItems:
if validators[validatorIndex.distinctBase].pubkey == pubkey:
return Opt.some validatorIndex.ValidatorIndex
let
bucketNumber = getBucketNumber(pubkey)
lowerBounds =
if bucketNumber == 0:
0'u
else:
bsv.bucketUpperBounds[bucketNumber - 1]

for i in lowerBounds ..< bsv.bucketUpperBounds[bucketNumber]:
if validators[bsv.bucketSorted[i]].pubkey == pubkey:
return Opt.some bsv.bucketSorted[i].ValidatorIndex
Opt.none ValidatorIndex
12 changes: 9 additions & 3 deletions ncli/ncli_common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,9 @@ func collectFromAttestations(
rewardsAndPenalties[index].inclusion_delay =
some(inclusionDelay.uint64)

from ".."/beacon_chain/validator_bucket_sort import
findValidatorIndex, sortValidatorBuckets

proc collectFromDeposits(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
Expand All @@ -414,9 +417,12 @@ proc collectFromDeposits(
cfg: RuntimeConfig) =
withStateAndBlck(forkedState, forkedBlock):
for deposit in forkyBlck.message.body.deposits:
let pubkey = deposit.data.pubkey
let amount = deposit.data.amount
var index = findValidatorIndex(forkyState.data, pubkey)
let
pubkey = deposit.data.pubkey
amount = deposit.data.amount
var index = findValidatorIndex(
forkyState.data.validators.asSeq, sortValidatorBuckets(
forkyState.data.validators.asSeq)[], pubkey)
if index.isNone:
if pubkey in pubkeyToIndex:
try:
Expand Down
Loading

0 comments on commit 8333365

Please sign in to comment.