Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequencer better DA fork handling & minor improvements #1718

Open
wants to merge 25 commits into
base: nightly
Choose a base branch
from
Open
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f410c08
remove unnecesary to_vec
yaziciahmet Jan 16, 2025
988147a
remove unnecessary match
yaziciahmet Jan 16, 2025
7702f53
rustier way of error handling
yaziciahmet Jan 16, 2025
1244194
rename var
yaziciahmet Jan 16, 2025
53480f8
dont track l1 height separately
yaziciahmet Jan 16, 2025
6c0760c
panic if da block sender channel is closed
yaziciahmet Jan 16, 2025
0db5932
track and log last sent l1 height from da block monitor
yaziciahmet Jan 16, 2025
51250c7
lint
yaziciahmet Jan 16, 2025
1a5d0dd
get da block header as ref
yaziciahmet Jan 16, 2025
e6de8bd
remove ref
yaziciahmet Jan 16, 2025
04f10e3
remove todo
yaziciahmet Jan 16, 2025
dbdc959
rename
yaziciahmet Jan 16, 2025
4fd1f2d
Merge branch 'nightly' into yaziciahmet/sequencer-better-l1-block-tra…
yaziciahmet Jan 16, 2025
6fb2887
remove warn log
yaziciahmet Jan 16, 2025
c42f5a0
set l1 data if not missed blocks
yaziciahmet Jan 17, 2025
52fc96e
ensure no da forks on sequencer restart
yaziciahmet Jan 19, 2025
7b37068
Merge branch 'nightly' into yaziciahmet/sequencer-better-l1-block-tra…
yaziciahmet Jan 20, 2025
fe98334
Check forks on da block monitor
yaziciahmet Jan 20, 2025
32916d3
Merge branch 'nightly' into yaziciahmet/sequencer-better-l1-block-tra…
yaziciahmet Jan 20, 2025
b5d55ed
simplify assert messages
yaziciahmet Jan 20, 2025
3efe45f
Merge branch 'nightly' into yaziciahmet/sequencer-better-l1-block-tra…
yaziciahmet Jan 20, 2025
06aba79
Merge branch 'nightly' into yaziciahmet/sequencer-better-l1-block-tra…
yaziciahmet Jan 20, 2025
c499156
Merge branch 'nightly' into yaziciahmet/sequencer-better-l1-block-tra…
yaziciahmet Jan 20, 2025
b9c031a
Merge branch 'nightly' into yaziciahmet/sequencer-better-l1-block-tra…
jfldde Jan 23, 2025
faf935d
Merge branch 'nightly' into yaziciahmet/sequencer-better-l1-block-tra…
yaziciahmet Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 96 additions & 60 deletions crates/sequencer/src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::cmp;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::vec;

use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::{Address, Bytes, TxHash};
Expand Down Expand Up @@ -254,7 +254,7 @@ where
>,
pub_key: &[u8],
prestate: ProverStorage<SnapshotManager>,
da_block_header: <<Da as DaService>::Spec as DaSpec>::BlockHeader,
da_block_header: &<<Da as DaService>::Spec as DaSpec>::BlockHeader,
soft_confirmation_info: HookSoftConfirmationInfo,
l2_block_mode: L2BlockMode,
) -> anyhow::Result<(Vec<RlpEvmTransaction>, Vec<TxHash>)> {
Expand All @@ -273,7 +273,7 @@ where
match self.stf.begin_soft_confirmation(
pub_key,
&mut working_set_to_discard,
&da_block_header,
da_block_header,
&soft_confirmation_info,
) {
Ok(_) => {
Expand Down Expand Up @@ -415,12 +415,12 @@ where

async fn produce_l2_block(
&mut self,
da_block: <Da as DaService>::FilteredBlock,
da_block_header: &<<Da as DaService>::Spec as DaSpec>::BlockHeader,
l1_fee_rate: u128,
l2_block_mode: L2BlockMode,
) -> anyhow::Result<(u64, u64, StateDiff)> {
let start = Instant::now();
let da_height = da_block.header().height();
let da_height = da_block_header.height();
let (l2_height, l1_height) = match self
.ledger_db
.get_head_soft_confirmation()
Expand Down Expand Up @@ -451,9 +451,9 @@ where

let soft_confirmation_info = HookSoftConfirmationInfo {
l2_height,
da_slot_height: da_block.header().height(),
da_slot_hash: da_block.header().hash().into(),
da_slot_txs_commitment: da_block.header().txs_commitment().into(),
da_slot_height: da_block_header.height(),
da_slot_hash: da_block_header.hash().into(),
da_slot_txs_commitment: da_block_header.txs_commitment().into(),
pre_state_root: self.state_root.clone().as_ref().to_vec(),
deposit_data: deposit_data.clone(),
current_spec: active_fork_spec,
Expand All @@ -468,7 +468,7 @@ where
.map_err(Into::<anyhow::Error>::into)?;
debug!(
"Applying soft confirmation on DA block: {}",
hex::encode(da_block.header().hash().into())
hex::encode(da_block_header.hash().into())
);

let evm_txs = self.get_best_transactions()?;
Expand All @@ -481,16 +481,15 @@ where
evm_txs,
&pub_key,
prestate.clone(),
da_block.header().clone(),
da_block_header,
soft_confirmation_info.clone(),
l2_block_mode,
)
.await?;

let prestate = self
.storage_manager
.create_storage_on_l2_height(l2_height)
.map_err(Into::<anyhow::Error>::into)?;
.create_storage_on_l2_height(l2_height)?;

let checkpoint =
StateCheckpoint::with_witness(prestate.clone(), Default::default(), Default::default());
Expand All @@ -500,7 +499,7 @@ where
match self.stf.begin_soft_confirmation(
&pub_key,
&mut working_set,
da_block.header(),
da_block_header,
&soft_confirmation_info,
) {
Ok(_) => {
Expand All @@ -526,16 +525,15 @@ where
&txs_new,
&mut working_set,
)
// TODO: handle this error
.expect("dry_run_transactions should have already checked this");
}

// create the unsigned batch with the txs then sign th sc
let unsigned_batch = UnsignedSoftConfirmation::new(
l2_height,
da_block.header().height(),
da_block.header().hash().into(),
da_block.header().txs_commitment().into(),
da_block_header.height(),
da_block_header.hash().into(),
da_block_header.txs_commitment().into(),
&txs,
&txs_new,
deposit_data.clone(),
Expand Down Expand Up @@ -602,11 +600,11 @@ where

// connect L1 and L2 height
self.ledger_db.extend_l2_range_of_l1_slot(
SlotNumber(da_block.header().height()),
SlotNumber(da_block_header.height()),
SoftConfirmationNumber(l2_height),
)?;

let l1_height = da_block.header().height();
let l1_height = da_block_header.height();
info!(
"New block #{}, DA #{}, Tx count: #{}",
l2_height, l1_height, evm_txs_count,
Expand Down Expand Up @@ -642,7 +640,7 @@ where

Ok((
l2_height,
da_block.header().height(),
da_block_header.height(),
soft_confirmation_result.state_diff,
))
}
Expand Down Expand Up @@ -675,17 +673,15 @@ where
}

let (mut last_finalized_block, mut l1_fee_rate) =
match get_da_block_data(self.da_service.clone()).await {
Ok(l1_data) => l1_data,
Err(e) => {
error!("{}", e);
return Err(e);
}
};
let mut last_finalized_height = last_finalized_block.header().height();
get_da_block_data(self.da_service.clone()).await?;
let last_finalized_height = last_finalized_block.header().height();

let mut last_used_l1_height = match self.ledger_db.get_head_soft_confirmation() {
Ok(Some((_, sb))) => sb.da_slot_height,
Ok(Some((_, sb))) => {
ensure_no_da_forks(self.da_service.clone(), sb.da_slot_height, sb.da_slot_hash)
.await;
sb.da_slot_height
}
Ok(None) => last_finalized_height, // starting for the first time
Err(e) => {
return Err(anyhow!("previous L1 height: {}", e));
Expand Down Expand Up @@ -718,6 +714,8 @@ where
da_height_update_tx,
self.config.da_update_interval_ms,
cancellation_token,
last_finalized_height,
last_finalized_block.hash(),
)
});

Expand All @@ -736,16 +734,17 @@ where
tokio::select! {
// Receive updates from DA layer worker.
l1_data = da_height_update_rx.recv() => {
let l1_data = l1_data.expect("Da block channel closed");
// Stop receiving updates from DA layer until we have caught up.
if missed_da_blocks_count > 0 {
continue;
}
if let Some(l1_data) = l1_data {
(last_finalized_block, l1_fee_rate) = l1_data;
last_finalized_height = last_finalized_block.header().height();

missed_da_blocks_count = self.da_blocks_missed(last_finalized_height, last_used_l1_height);
}
(last_finalized_block, l1_fee_rate) = l1_data;
let last_finalized_height = last_finalized_block.header().height();

missed_da_blocks_count = self.da_blocks_missed(last_finalized_height, last_used_l1_height);

SEQUENCER_METRICS.current_l1_block.set(last_finalized_height as f64);
},
// If sequencer is in test mode, it will build a block every time it receives a message
Expand All @@ -761,7 +760,7 @@ where
missed_da_blocks_count = 0;
}

match self.produce_l2_block(last_finalized_block.clone(), l1_fee_rate, L2BlockMode::NotEmpty).await {
match self.produce_l2_block(last_finalized_block.header(), l1_fee_rate, L2BlockMode::NotEmpty).await {
Ok((l2_height, l1_block_number, state_diff)) => {
last_used_l1_height = l1_block_number;

Expand All @@ -781,8 +780,6 @@ where
// last_finalized_block. If there are missed DA blocks, we start producing
// empty blocks at ~2 second rate, 1 L2 block per respective missed DA block
// until we know we caught up with L1.
let da_block = last_finalized_block.clone();

if missed_da_blocks_count > 0 {
if let Err(e) = self.process_missed_da_blocks(missed_da_blocks_count, last_used_l1_height, l1_fee_rate).await {
error!("Sequencer error: {}", e);
Expand All @@ -793,7 +790,7 @@ where
}


match self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::NotEmpty).await {
match self.produce_l2_block(last_finalized_block.header(), l1_fee_rate, L2BlockMode::NotEmpty).await {
Ok((l2_height, l1_block_number, state_diff)) => {
last_used_l1_height = l1_block_number;

Expand Down Expand Up @@ -982,7 +979,7 @@ where
pub async fn restore_mempool(&self) -> Result<(), anyhow::Error> {
let mempool_txs = self.ledger_db.get_mempool_txs()?;
for (_, tx) in mempool_txs {
let recovered = recover_raw_transaction(Bytes::from(tx.as_slice().to_vec()))?;
let recovered = recover_raw_transaction(Bytes::from(tx))?;
let pooled_tx = EthPooledTransaction::from_pooled(recovered);

let _ = self.mempool.add_external_transaction(pooled_tx).await?;
Expand Down Expand Up @@ -1051,7 +1048,7 @@ where
.await?;

debug!("Created an empty L2 for L1={}", needed_da_block_height);
self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::Empty)
self.produce_l2_block(da_block.header(), l1_fee_rate, L2BlockMode::Empty)
.await?;
}

Expand Down Expand Up @@ -1084,6 +1081,8 @@ async fn da_block_monitor<Da>(
sender: mpsc::Sender<L1Data<Da>>,
loop_interval: u64,
cancellation_token: CancellationToken,
mut last_da_height: u64,
mut last_da_hash: [u8; 32],
) where
Da: DaService,
{
Expand All @@ -1094,15 +1093,40 @@ async fn da_block_monitor<Da>(
return;
}
l1_data = get_da_block_data(da_service.clone()) => {
let l1_data = match l1_data {
let (l1_block, l1_fee_rate) = match l1_data {
Ok(l1_data) => l1_data,
Err(e) => {
error!("Could not fetch L1 data, {}", e);
continue;
}
};

let _ = sender.send(l1_data).await;
let header = l1_block.header();
match header.height().cmp(&last_da_height) {
cmp::Ordering::Less => panic!("DA blockchain moved in reverse"),
cmp::Ordering::Equal => assert_eq!(
last_da_hash,
header.hash().into(),
"Detected fork on DA block {}",
last_da_height,
),
cmp::Ordering::Greater => {
if header.height() == last_da_height + 1 {
assert_eq!(
last_da_hash,
header.prev_hash().into(),
"Detected fork on DA block {}",
last_da_height,
);
} else {
ensure_no_da_forks(da_service.clone(), last_da_height, last_da_hash).await;
}
last_da_height = header.height();
last_da_hash = header.hash().into();
}
}

let _ = sender.send((l1_block, l1_fee_rate)).await;

sleep(Duration::from_millis(loop_interval)).await;
},
Expand All @@ -1114,31 +1138,43 @@ async fn get_da_block_data<Da>(da_service: Arc<Da>) -> anyhow::Result<L1Data<Da>
where
Da: DaService,
{
let last_finalized_height = match da_service.get_last_finalized_block_header().await {
Ok(header) => header.height(),
Err(e) => {
return Err(anyhow!("Finalized L1 height: {}", e));
}
};
let last_finalized_height = da_service
.get_last_finalized_block_header()
.await
.map_err(|e| anyhow!("Failed to fetch finalized block header: {e}"))?
.height();

let last_finalized_block = match da_service.get_block_at(last_finalized_height).await {
Ok(block) => block,
Err(e) => {
return Err(anyhow!("Finalized L1 block: {}", e));
}
};
let last_finalized_block = da_service
.get_block_at(last_finalized_height)
.await
.map_err(|e| anyhow!("Failed to fetch finalized l1 block: {e}"))?;

debug!(
"Sequencer: last finalized L1 height: {:?}",
last_finalized_height
);

let l1_fee_rate = match da_service.get_fee_rate().await {
Ok(fee_rate) => fee_rate,
Err(e) => {
return Err(anyhow!("L1 fee rate: {}", e));
}
};
let l1_fee_rate = da_service
.get_fee_rate()
.await
.map_err(|e| anyhow!("Failed to fetch l1 fee rate: {e}"))?;

Ok((last_finalized_block, l1_fee_rate))
}

async fn ensure_no_da_forks<Da: DaService>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to implement this for all other nodes? because they also follow the wrong fork, at which point, being at the wrong DA block might lead them to misbehave.

I am not totally sure, but this is something to check for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seemed to me like there is nothing to do for other nodes for this PR specifically:

  • light client prover: light client proof execution will fail due to header chain verification (prev_hash will not be equal), hence it wont continue
  • batch prover: there won't be new seq comms since sequencer will be stopped so no new batch proofs. we will have to manually rollback to the start of the fork block anyways, hence, it doesn't matter if batch prover is up without producing any proof.
  • full node: similar logic with batch prover, no new seq comms, and we will have to rollback manually anyways.

Remaining work seems to me to be blocked by rollback feat for our nodes. Once we have proper rollback functionality, we can detect forks in all types of nodes, rollback to the beginning of the fork, and continue happily ever after from that point on, and we won't need manual intervention at all.

da_service: Arc<Da>,
da_block_height: u64,
expected_da_block_hash: [u8; 32],
) {
let da_block = da_service
.get_block_at(da_block_height)
.await
.expect("Should not continue if we cannot ensure there are no DA forks");
assert_eq!(
da_block.header().hash(),
expected_da_block_hash.into(),
"Detected fork on DA block {}",
da_block_height,
);
}
Loading