Skip to content

Commit

Permalink
ref: decouple state saves from tree inserts (#31)
Browse files Browse the repository at this point in the history
* ref: decouple state saves from tree inserts

* chore: cargo fmt
  • Loading branch information
zeapoz authored Oct 20, 2023
1 parent ebed5d7 commit ef16d49
Showing 1 changed file with 27 additions and 19 deletions.
46 changes: 27 additions & 19 deletions src/processor/tree/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod query_tree;
mod tree_wrapper;

use std::{path::PathBuf, sync::Arc};
use std::{io, path::PathBuf, sync::Arc};

use async_trait::async_trait;
use ethers::types::H256;
Expand Down Expand Up @@ -50,31 +50,39 @@ impl TreeProcessor<'static> {
snapshot,
})
}

pub async fn write_state(&self) -> Result<(), io::Error> {
let snapshot = self.snapshot.lock().await;
// Write the current state to a file.
let state_file_path = self.db_path.join(STATE_FILE_NAME);
snapshot.write(&state_file_path)
}
}

#[async_trait]
impl Processor for TreeProcessor<'static> {
async fn run(mut self, mut rx: mpsc::Receiver<CommitBlockInfoV1>) {
while let Some(block) = rx.recv().await {
let mut snapshot = self.snapshot.lock().await;
// Check if we've already processed this block.
if snapshot.latest_l2_block_number >= block.block_number {
tracing::debug!(
"Block {} has already been processed, skipping.",
block.block_number
);
continue;
}
loop {
if let Some(block) = rx.recv().await {
let mut snapshot = self.snapshot.lock().await;
// Check if we've already processed this block.
if snapshot.latest_l2_block_number >= block.block_number {
tracing::debug!(
"Block {} has already been processed, skipping.",
block.block_number
);
continue;
}

self.tree.insert_block(&block);
self.tree.insert_block(&block);

// Update snapshot values.
snapshot.latest_l2_block_number = block.block_number;
snapshot.index_to_key_map = self.tree.index_to_key_map.clone();

// Write the current state to a file.
let state_file_path = self.db_path.join(STATE_FILE_NAME);
snapshot.write(&state_file_path).unwrap();
// Update snapshot values.
snapshot.latest_l2_block_number = block.block_number;
snapshot.index_to_key_map = self.tree.index_to_key_map.clone();
} else {
self.write_state().await.unwrap();
break;
}
}
}
}

0 comments on commit ef16d49

Please sign in to comment.