Skip to content

Commit

Permalink
feat: long polling (#26)
Browse files Browse the repository at this point in the history
* chore: clippy lints

* feat: long polling

* ref: make sure sink is dropped and all tasks awaited

* chore: fmt
  • Loading branch information
zeapoz authored Oct 19, 2023
1 parent f9e84e9 commit 06865ce
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 51 deletions.
3 changes: 3 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub struct L1FetcherOptions {
/// The number of blocks to process from Ethereum.
#[arg(long)]
pub block_count: Option<u64>,
/// If present, don't poll for new blocks after reaching the end.
#[arg(long)]
pub disable_polling: bool,
}

#[derive(Subcommand)]
Expand Down
122 changes: 75 additions & 47 deletions src/l1_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ use crate::{
types::{CommitBlockInfoV1, ParseError},
};

/// MAX_RETRIES is the maximum number of retries on failed L1 call.
/// `MAX_RETRIES` is the maximum number of retries on failed L1 call.
const MAX_RETRIES: u8 = 5;
/// The interval in seconds in which to poll for new blocks.
const LONG_POLLING_INTERVAL_S: u64 = 120;

#[allow(clippy::enum_variant_names)]
#[derive(Error, Debug)]
Expand Down Expand Up @@ -53,11 +55,13 @@ impl L1Fetcher {
})
}

#[allow(clippy::too_many_lines)]
pub async fn fetch(
&self,
sink: mpsc::Sender<CommitBlockInfoV1>,
start_block: Option<U64>,
end_block: Option<U64>,
mut disable_polling: bool,
) -> Result<()> {
// Start fetching from the `GENESIS_BLOCK` unless the `start_block` argument is supplied,
// in which case, start from that instead. If no argument was supplied and a state snapshot
Expand All @@ -77,15 +81,6 @@ impl L1Fetcher {
};
}

let end_block_number = end_block.unwrap_or(
self.provider
.get_block(BlockNumber::Latest)
.await?
.unwrap()
.number
.unwrap(),
);

let event = self.contract.events_by_name("BlockCommit")?[0].clone();
let function = self.contract.functions_by_name("commitBlocks")?[0].clone();

Expand All @@ -96,7 +91,7 @@ impl L1Fetcher {
// - BlockCommit event filter.
// - Referred L1 block fetch.
// - Calldata parsing.
tokio::spawn({
let tx_handle = tokio::spawn({
let provider = self.provider.clone();
async move {
while let Some(hash) = hash_rx.recv().await {
Expand All @@ -115,7 +110,7 @@ impl L1Fetcher {
}
});

tokio::spawn(async move {
let parse_handle = tokio::spawn(async move {
while let Some(calldata) = calldata_rx.recv().await {
let blocks = match parse_calldata(&function, &calldata) {
Ok(blks) => blks,
Expand All @@ -131,55 +126,88 @@ impl L1Fetcher {
}
});

let mut latest_l2_block_number = U256::zero();
while current_l1_block_number <= end_block_number {
// Create a filter showing only `BlockCommit`s from the [`ZK_SYNC_ADDR`].
// TODO: Filter by executed blocks too.
let filter = Filter::new()
.address(ZK_SYNC_ADDR.parse::<Address>()?)
.topic0(event.signature())
.from_block(current_l1_block_number)
.to_block(current_l1_block_number + BLOCK_STEP);

// Grab all relevant logs.
let logs =
L1Fetcher::retry_call(|| self.provider.get_logs(&filter), L1FetchError::GetLogs)
.await?;
for log in logs {
// log.topics:
// topics[1]: L2 block number.
// topics[2]: L2 block hash.
// topics[3]: L2 commitment.

let new_l2_block_number = U256::from_big_endian(log.topics[1].as_fixed_bytes());
if new_l2_block_number <= latest_l2_block_number {
continue;
}
{
// NOTE: The channel should close once it goes out of scope we move it here.
let hash_tx = hash_tx;
let mut latest_l2_block_number = U256::zero();

if let Some(tx_hash) = log.transaction_hash {
hash_tx.send(tx_hash).await?;
// If an `end_block` was supplied we shouldn't poll for newer blocks.
if end_block.is_some() {
disable_polling = true;
}

let end_block_number = end_block.unwrap_or(
self.provider
.get_block(BlockNumber::Latest)
.await?
.unwrap()
.number
.unwrap(),
);

loop {
if disable_polling && current_l1_block_number > end_block_number {
tracing::info!("Successfully reached end block. Shutting down...");
break;
}

latest_l2_block_number = new_l2_block_number;
}
// Create a filter showing only `BlockCommit`s from the [`ZK_SYNC_ADDR`].
// TODO: Filter by executed blocks too.
let filter = Filter::new()
.address(ZK_SYNC_ADDR.parse::<Address>()?)
.topic0(event.signature())
.from_block(current_l1_block_number)
.to_block(current_l1_block_number + BLOCK_STEP);

// Grab all relevant logs.
if let Ok(logs) =
L1Fetcher::retry_call(|| self.provider.get_logs(&filter), L1FetchError::GetLogs)
.await
{
for log in logs {
// log.topics:
// topics[1]: L2 block number.
// topics[2]: L2 block hash.
// topics[3]: L2 commitment.

let new_l2_block_number =
U256::from_big_endian(log.topics[1].as_fixed_bytes());
if new_l2_block_number <= latest_l2_block_number {
continue;
}

if let Some(tx_hash) = log.transaction_hash {
hash_tx.send(tx_hash).await?;
}

latest_l2_block_number = new_l2_block_number;
}
} else {
tokio::time::sleep(Duration::from_secs(LONG_POLLING_INTERVAL_S)).await;
continue;
};

// Store our current L1 block number so we can resume if the process exits.
if let Some(snapshot) = &self.snapshot {
snapshot.lock().await.latest_l1_block_number = current_l1_block_number;
}
// Store our current L1 block number so we can resume if the process exits.
if let Some(snapshot) = &self.snapshot {
snapshot.lock().await.latest_l1_block_number = current_l1_block_number;
}

// Increment current block index.
current_l1_block_number += BLOCK_STEP.into();
// Increment current block index.
current_l1_block_number += BLOCK_STEP.into();
}
}

tx_handle.await?;
parse_handle.await?;

Ok(())
}

async fn retry_call<T, Fut>(callback: impl Fn() -> Fut, err: L1FetchError) -> Result<T>
where
Fut: Future<Output = Result<T, ProviderError>>,
{
for attempt in 1..MAX_RETRIES + 1 {
for attempt in 1..=MAX_RETRIES {
match callback().await {
Ok(x) => return Ok(x),
Err(e) => {
Expand Down
14 changes: 10 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ async fn main() -> Result<()> {
start_block,
block_step: _,
block_count: _,
disable_polling,
},
} => {
let snapshot = Arc::new(Mutex::new(StateSnapshot::default()));
Expand All @@ -85,11 +86,14 @@ async fn main() -> Result<()> {
let processor = TreeProcessor::new(db_path, snapshot.clone()).await?;
let (tx, rx) = mpsc::channel::<CommitBlockInfoV1>(5);

tokio::spawn(async move {
let processor_handle = tokio::spawn(async move {
processor.run(rx).await;
});

fetcher.fetch(tx, Some(U64([start_block])), None).await?;
fetcher
.fetch(tx, Some(U64([start_block])), None, disable_polling)
.await?;
processor_handle.await?;
}
ReconstructSource::File { file } => {
let snapshot = Arc::new(Mutex::new(StateSnapshot::default()));
Expand Down Expand Up @@ -120,22 +124,24 @@ async fn main() -> Result<()> {
start_block,
block_step: _,
block_count,
disable_polling,
},
file,
} => {
let fetcher = L1Fetcher::new(&http_url, None)?;
let processor = JsonSerializationProcessor::new(Path::new(&file))?;
let (tx, rx) = mpsc::channel::<CommitBlockInfoV1>(5);

tokio::spawn(async move {
let processor_handle = tokio::spawn(async move {
processor.run(rx).await;
});

let end_block = block_count.map(|n| U64([start_block + n]));

fetcher
.fetch(tx, Some(U64([start_block])), end_block)
.fetch(tx, Some(U64([start_block])), end_block, disable_polling)
.await?;
processor_handle.await?;
}
Command::Query {
query,
Expand Down

0 comments on commit 06865ce

Please sign in to comment.