From 1ced43236f86926146f077b5bb1d6fa810d777bf Mon Sep 17 00:00:00 2001 From: Jonathan <94441036+zeapoz@users.noreply.github.com> Date: Mon, 22 Apr 2024 11:42:32 +0200 Subject: [PATCH] feat: import snapshots without header file present (#89) * feat: import snapshots without header file present If a snapshot header file is not found within the specified snapshot folder, the importer will instead try to generate one using the filenames of the other contents as a base. * feat: infer header based on regex patterns * tweak: snapshot importer extend by chunk --- Cargo.lock | 5 +- Cargo.toml | 1 + src/processor/snapshot/importer.rs | 78 ++++++++++++++++++++++++++++-- src/processor/tree/tree_wrapper.rs | 16 +++--- 4 files changed, 87 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2128815..c25d455 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3623,9 +3623,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.3" +version = "1.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", @@ -4451,6 +4451,7 @@ dependencies = [ "hex", "indexmap", "primitive-types", + "regex", "rocksdb", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index dca3786..2ceb4ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ eyre = "0.6.8" hex = "0.4.3" indexmap = { version = "2.0.2" } primitive-types = "0.12.2" +regex = "1.10.4" rocksdb = "0.21" serde = { version = "1.0.189", features = ["derive"] } serde_json = { version = "1.0.107", features = ["std"] } diff --git a/src/processor/snapshot/importer.rs b/src/processor/snapshot/importer.rs index e3f9556..c8478e5 100644 --- a/src/processor/snapshot/importer.rs +++ b/src/processor/snapshot/importer.rs @@ -1,20 +1,28 @@ use std::{ - fs, + fs::{self, DirEntry}, path::{Path, PathBuf}, sync::Arc, }; +use ethers::types::U64; use eyre::Result; +use regex::{Captures, Regex}; use state_reconstruct_fetcher::constants::storage::INNER_DB_NAME; use state_reconstruct_storage::{ reconstruction::ReconstructionDatabase, - types::{Proto, SnapshotFactoryDependencies, SnapshotHeader, SnapshotStorageLogsChunk}, + types::{ + Proto, SnapshotFactoryDependencies, SnapshotHeader, SnapshotStorageLogsChunk, + SnapshotStorageLogsChunkMetadata, + }, }; use tokio::sync::Mutex; use super::{SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME}; use crate::processor::tree::tree_wrapper::TreeWrapper; +const SNAPSHOT_CHUNK_REGEX: &str = r"snapshot_l1_batch_(\d*)_storage_logs_part_\d*.proto.gzip"; +const FACTORY_DEPS_REGEX: &str = r"snapshot_l1_batch_(\d*)_factory_deps.proto.gzip"; + pub struct SnapshotImporter { // The path of the directory where snapshot chunks are stored. directory: PathBuf, @@ -44,8 +52,12 @@ impl SnapshotImporter { fn read_header(&self) -> Result { let header_path = self.directory.join(SNAPSHOT_HEADER_FILE_NAME); - let header_string = fs::read_to_string(header_path)?; - let header: SnapshotHeader = serde_json::from_str(&header_string)?; + + let header = if let Ok(string) = fs::read_to_string(header_path) { + serde_json::from_str(&string)? + } else { + self.infer_header_from_file_names()? + }; Ok(header) } @@ -81,4 +93,62 @@ impl SnapshotImporter { } Ok(chunks) } + + fn infer_header_from_file_names(&self) -> Result { + let snapshot_chunk_re = Regex::new(SNAPSHOT_CHUNK_REGEX)?; + let factory_deps_re = Regex::new(FACTORY_DEPS_REGEX)?; + + let mut l1_batch_number = None; + let mut storage_logs_chunks = Vec::new(); + let mut factory_deps_filepath = String::new(); + + // Closure to make sure that every file name contains the same l1 batch number, assinging + // one if it is currently set to [`None`]. + let mut process_l1_number = |caps: Captures| { + let number: u64 = caps[1].parse().expect("capture was not a number"); + match l1_batch_number { + Some(num) => assert_eq!(num, U64::from(number)), + None => l1_batch_number = Some(U64::from(number)), + } + }; + + // Read files and sort them by path name. + let mut files: Vec<_> = self + .directory + .read_dir()? + .map(|f| f.expect("read file error")) + .collect(); + files.sort_by_key(DirEntry::path); + + let mut chunk_id = 0; + for file in files { + let file_name = file + .file_name() + .to_str() + .expect("invalid filename") + .to_string(); + + if let Some(caps) = snapshot_chunk_re.captures(&file_name) { + process_l1_number(caps); + + // Add the storage log parts filename to header. + let chunk_metadata = SnapshotStorageLogsChunkMetadata { + chunk_id, + filepath: file_name, + }; + storage_logs_chunks.push(chunk_metadata); + chunk_id += 1; + } else if let Some(caps) = factory_deps_re.captures(&file_name) { + process_l1_number(caps); + factory_deps_filepath = file_name; + } + } + + Ok(SnapshotHeader { + l1_batch_number: l1_batch_number.expect("no l1 batch number found"), + storage_logs_chunks, + factory_deps_filepath, + ..Default::default() + }) + } } diff --git a/src/processor/tree/tree_wrapper.rs b/src/processor/tree/tree_wrapper.rs index 295cda2..545cd0b 100644 --- a/src/processor/tree/tree_wrapper.rs +++ b/src/processor/tree/tree_wrapper.rs @@ -126,9 +126,10 @@ impl TreeWrapper { chunks: Vec, l1_batch_number: U64, ) -> Result<()> { - let mut tree_entries = Vec::new(); - + let mut total_tree_entries = 0; for (i, chunk) in chunks.iter().enumerate() { + let mut tree_entries = Vec::new(); + tracing::info!("Importing chunk {}/{}...", i + 1, chunks.len()); for log in &chunk.storage_logs { @@ -140,14 +141,15 @@ impl TreeWrapper { .expect("cannot add key"); } + total_tree_entries += tree_entries.len(); + self.tree.extend(tree_entries); + tracing::info!("Chunk {} was succesfully imported!", i + 1); } - tracing::info!("Extending merkle tree with imported storage logs..."); - let num_tree_entries = tree_entries.len(); - self.tree.extend(tree_entries); - - tracing::info!("Succesfully imported snapshot containing {num_tree_entries} storage logs!",); + tracing::info!( + "Succesfully imported snapshot containing {total_tree_entries} storage logs!", + ); let db = self.inner_db.lock().await; db.set_latest_l1_batch_number(l1_batch_number.as_u64() + 1)?;