From 06865ce9e18d5d59fc9b099d0e5a58a38ae8a900 Mon Sep 17 00:00:00 2001 From: Jonathan <94441036+zeapoz@users.noreply.github.com> Date: Thu, 19 Oct 2023 11:11:00 +0200 Subject: [PATCH] feat: long polling (#26) * chore: clippy lints * feat: long polling * ref: make sure sink is dropped and all tasks awaited * chore: fmt --- src/cli.rs | 3 ++ src/l1_fetcher.rs | 122 ++++++++++++++++++++++++++++------------------ src/main.rs | 14 ++++-- 3 files changed, 88 insertions(+), 51 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 67ae120..af068f5 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -16,6 +16,9 @@ pub struct L1FetcherOptions { /// The number of blocks to process from Ethereum. #[arg(long)] pub block_count: Option, + /// If present, don't poll for new blocks after reaching the end. + #[arg(long)] + pub disable_polling: bool, } #[derive(Subcommand)] diff --git a/src/l1_fetcher.rs b/src/l1_fetcher.rs index 58fbf08..ade1e3d 100644 --- a/src/l1_fetcher.rs +++ b/src/l1_fetcher.rs @@ -19,8 +19,10 @@ use crate::{ types::{CommitBlockInfoV1, ParseError}, }; -/// MAX_RETRIES is the maximum number of retries on failed L1 call. +/// `MAX_RETRIES` is the maximum number of retries on failed L1 call. const MAX_RETRIES: u8 = 5; +/// The interval in seconds in which to poll for new blocks. +const LONG_POLLING_INTERVAL_S: u64 = 120; #[allow(clippy::enum_variant_names)] #[derive(Error, Debug)] @@ -53,11 +55,13 @@ impl L1Fetcher { }) } + #[allow(clippy::too_many_lines)] pub async fn fetch( &self, sink: mpsc::Sender, start_block: Option, end_block: Option, + mut disable_polling: bool, ) -> Result<()> { // Start fetching from the `GENESIS_BLOCK` unless the `start_block` argument is supplied, // in which case, start from that instead. If no argument was supplied and a state snapshot @@ -77,15 +81,6 @@ impl L1Fetcher { }; } - let end_block_number = end_block.unwrap_or( - self.provider - .get_block(BlockNumber::Latest) - .await? - .unwrap() - .number - .unwrap(), - ); - let event = self.contract.events_by_name("BlockCommit")?[0].clone(); let function = self.contract.functions_by_name("commitBlocks")?[0].clone(); @@ -96,7 +91,7 @@ impl L1Fetcher { // - BlockCommit event filter. // - Referred L1 block fetch. // - Calldata parsing. - tokio::spawn({ + let tx_handle = tokio::spawn({ let provider = self.provider.clone(); async move { while let Some(hash) = hash_rx.recv().await { @@ -115,7 +110,7 @@ impl L1Fetcher { } }); - tokio::spawn(async move { + let parse_handle = tokio::spawn(async move { while let Some(calldata) = calldata_rx.recv().await { let blocks = match parse_calldata(&function, &calldata) { Ok(blks) => blks, @@ -131,47 +126,80 @@ impl L1Fetcher { } }); - let mut latest_l2_block_number = U256::zero(); - while current_l1_block_number <= end_block_number { - // Create a filter showing only `BlockCommit`s from the [`ZK_SYNC_ADDR`]. - // TODO: Filter by executed blocks too. - let filter = Filter::new() - .address(ZK_SYNC_ADDR.parse::
()?) - .topic0(event.signature()) - .from_block(current_l1_block_number) - .to_block(current_l1_block_number + BLOCK_STEP); - - // Grab all relevant logs. - let logs = - L1Fetcher::retry_call(|| self.provider.get_logs(&filter), L1FetchError::GetLogs) - .await?; - for log in logs { - // log.topics: - // topics[1]: L2 block number. - // topics[2]: L2 block hash. - // topics[3]: L2 commitment. - - let new_l2_block_number = U256::from_big_endian(log.topics[1].as_fixed_bytes()); - if new_l2_block_number <= latest_l2_block_number { - continue; - } + { + // NOTE: The channel should close once it goes out of scope we move it here. + let hash_tx = hash_tx; + let mut latest_l2_block_number = U256::zero(); - if let Some(tx_hash) = log.transaction_hash { - hash_tx.send(tx_hash).await?; + // If an `end_block` was supplied we shouldn't poll for newer blocks. + if end_block.is_some() { + disable_polling = true; + } + + let end_block_number = end_block.unwrap_or( + self.provider + .get_block(BlockNumber::Latest) + .await? + .unwrap() + .number + .unwrap(), + ); + + loop { + if disable_polling && current_l1_block_number > end_block_number { + tracing::info!("Successfully reached end block. Shutting down..."); + break; } - latest_l2_block_number = new_l2_block_number; - } + // Create a filter showing only `BlockCommit`s from the [`ZK_SYNC_ADDR`]. + // TODO: Filter by executed blocks too. + let filter = Filter::new() + .address(ZK_SYNC_ADDR.parse::
()?) + .topic0(event.signature()) + .from_block(current_l1_block_number) + .to_block(current_l1_block_number + BLOCK_STEP); + + // Grab all relevant logs. + if let Ok(logs) = + L1Fetcher::retry_call(|| self.provider.get_logs(&filter), L1FetchError::GetLogs) + .await + { + for log in logs { + // log.topics: + // topics[1]: L2 block number. + // topics[2]: L2 block hash. + // topics[3]: L2 commitment. + + let new_l2_block_number = + U256::from_big_endian(log.topics[1].as_fixed_bytes()); + if new_l2_block_number <= latest_l2_block_number { + continue; + } + + if let Some(tx_hash) = log.transaction_hash { + hash_tx.send(tx_hash).await?; + } + + latest_l2_block_number = new_l2_block_number; + } + } else { + tokio::time::sleep(Duration::from_secs(LONG_POLLING_INTERVAL_S)).await; + continue; + }; - // Store our current L1 block number so we can resume if the process exits. - if let Some(snapshot) = &self.snapshot { - snapshot.lock().await.latest_l1_block_number = current_l1_block_number; - } + // Store our current L1 block number so we can resume if the process exits. + if let Some(snapshot) = &self.snapshot { + snapshot.lock().await.latest_l1_block_number = current_l1_block_number; + } - // Increment current block index. - current_l1_block_number += BLOCK_STEP.into(); + // Increment current block index. + current_l1_block_number += BLOCK_STEP.into(); + } } + tx_handle.await?; + parse_handle.await?; + Ok(()) } @@ -179,7 +207,7 @@ impl L1Fetcher { where Fut: Future>, { - for attempt in 1..MAX_RETRIES + 1 { + for attempt in 1..=MAX_RETRIES { match callback().await { Ok(x) => return Ok(x), Err(e) => { diff --git a/src/main.rs b/src/main.rs index b819a68..4c4a3b5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -77,6 +77,7 @@ async fn main() -> Result<()> { start_block, block_step: _, block_count: _, + disable_polling, }, } => { let snapshot = Arc::new(Mutex::new(StateSnapshot::default())); @@ -85,11 +86,14 @@ async fn main() -> Result<()> { let processor = TreeProcessor::new(db_path, snapshot.clone()).await?; let (tx, rx) = mpsc::channel::(5); - tokio::spawn(async move { + let processor_handle = tokio::spawn(async move { processor.run(rx).await; }); - fetcher.fetch(tx, Some(U64([start_block])), None).await?; + fetcher + .fetch(tx, Some(U64([start_block])), None, disable_polling) + .await?; + processor_handle.await?; } ReconstructSource::File { file } => { let snapshot = Arc::new(Mutex::new(StateSnapshot::default())); @@ -120,6 +124,7 @@ async fn main() -> Result<()> { start_block, block_step: _, block_count, + disable_polling, }, file, } => { @@ -127,15 +132,16 @@ async fn main() -> Result<()> { let processor = JsonSerializationProcessor::new(Path::new(&file))?; let (tx, rx) = mpsc::channel::(5); - tokio::spawn(async move { + let processor_handle = tokio::spawn(async move { processor.run(rx).await; }); let end_block = block_count.map(|n| U64([start_block + n])); fetcher - .fetch(tx, Some(U64([start_block])), end_block) + .fetch(tx, Some(U64([start_block])), end_block, disable_polling) .await?; + processor_handle.await?; } Command::Query { query,