diff --git a/src/processor/tree/mod.rs b/src/processor/tree/mod.rs index 9fa34e0..902aede 100644 --- a/src/processor/tree/mod.rs +++ b/src/processor/tree/mod.rs @@ -1,7 +1,7 @@ pub mod query_tree; mod tree_wrapper; -use std::{path::PathBuf, sync::Arc}; +use std::{io, path::PathBuf, sync::Arc}; use async_trait::async_trait; use ethers::types::H256; @@ -50,31 +50,39 @@ impl TreeProcessor<'static> { snapshot, }) } + + pub async fn write_state(&self) -> Result<(), io::Error> { + let snapshot = self.snapshot.lock().await; + // Write the current state to a file. + let state_file_path = self.db_path.join(STATE_FILE_NAME); + snapshot.write(&state_file_path) + } } #[async_trait] impl Processor for TreeProcessor<'static> { async fn run(mut self, mut rx: mpsc::Receiver) { - while let Some(block) = rx.recv().await { - let mut snapshot = self.snapshot.lock().await; - // Check if we've already processed this block. - if snapshot.latest_l2_block_number >= block.block_number { - tracing::debug!( - "Block {} has already been processed, skipping.", - block.block_number - ); - continue; - } + loop { + if let Some(block) = rx.recv().await { + let mut snapshot = self.snapshot.lock().await; + // Check if we've already processed this block. + if snapshot.latest_l2_block_number >= block.block_number { + tracing::debug!( + "Block {} has already been processed, skipping.", + block.block_number + ); + continue; + } - self.tree.insert_block(&block); + self.tree.insert_block(&block); - // Update snapshot values. - snapshot.latest_l2_block_number = block.block_number; - snapshot.index_to_key_map = self.tree.index_to_key_map.clone(); - - // Write the current state to a file. - let state_file_path = self.db_path.join(STATE_FILE_NAME); - snapshot.write(&state_file_path).unwrap(); + // Update snapshot values. + snapshot.latest_l2_block_number = block.block_number; + snapshot.index_to_key_map = self.tree.index_to_key_map.clone(); + } else { + self.write_state().await.unwrap(); + break; + } } } }