From 6be12884cd1d9d823ddf595289414345bf8e9e74 Mon Sep 17 00:00:00 2001 From: Jonathan <94441036+zeapoz@users.noreply.github.com> Date: Tue, 16 Jul 2024 16:14:18 +0200 Subject: [PATCH] chore: snapshot format update (#108) * fix: protobuf * chore: renaming fields * chore: adjust snapshots to always chunk in 10s * chore: serialize values as numbers not strings * chore(snapshots): l1 batch number offset * chore: serde rename all * doc: update comment --- .gitignore | 2 ++ Cargo.lock | 12 +++++++ Cargo.toml | 5 +-- proto/snapshot.proto | 23 -------------- src/cli.rs | 6 ++-- src/main.rs | 4 +-- src/processor/snapshot/exporter.rs | 26 +++++++++------- src/processor/snapshot/importer.rs | 28 +++++++---------- src/processor/snapshot/mod.rs | 12 ++++--- src/processor/tree/tree_wrapper.rs | 3 +- state-reconstruct-storage/Cargo.toml | 1 + .../proto/snapshot.proto | 10 ++++-- state-reconstruct-storage/src/lib.rs | 2 ++ state-reconstruct-storage/src/snapshot.rs | 13 ++++++++ state-reconstruct-storage/src/types.rs | 31 +++++++++++++------ 15 files changed, 99 insertions(+), 79 deletions(-) delete mode 100644 proto/snapshot.proto diff --git a/.gitignore b/.gitignore index c707bd6..ef3a4a2 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,5 @@ StateSnapshot.json # Markdown linting rules. .markdownlint.json + +.DS_Store diff --git a/Cargo.lock b/Cargo.lock index f9a5297..58fbcda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4195,6 +4195,17 @@ dependencies = [ "serde_json", ] +[[package]] +name = "serde_repr" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.53", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -4490,6 +4501,7 @@ dependencies = [ "prost-build", "rocksdb", "serde", + "serde_repr", "thiserror", "zkevm_opcode_defs 1.3.2 (git+https://github.com/matter-labs/era-zkevm_opcode_defs.git)", ] diff --git a/Cargo.toml b/Cargo.toml index 2560aa1..f2287dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,10 +5,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [workspace] -members = [ - "state-reconstruct-fetcher", - "state-reconstruct-storage", -] +members = ["state-reconstruct-fetcher", "state-reconstruct-storage"] [dependencies] async-trait = "0.1.74" diff --git a/proto/snapshot.proto b/proto/snapshot.proto deleted file mode 100644 index 3a656ef..0000000 --- a/proto/snapshot.proto +++ /dev/null @@ -1,23 +0,0 @@ -syntax = "proto3"; - -package protobuf; - -message SnapshotStorageLogsChunk { - repeated SnapshotStorageLog storage_logs = 1; -} - -message SnapshotStorageLog { - optional bytes account_address = 1; // required; H160 - optional bytes storage_key = 2; // required; H256 - optional bytes storage_value = 3; // required; H256 - optional uint32 l1_batch_number_of_initial_write = 4; // required - optional uint64 enumeration_index = 5; // required -} - -message SnapshotFactoryDependencies { - repeated SnapshotFactoryDependency factory_deps = 1; -} - -message SnapshotFactoryDependency { - optional bytes bytecode = 1; // required -} diff --git a/src/cli.rs b/src/cli.rs index a4bbea6..f5c5380 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -109,9 +109,9 @@ pub enum Command { /// The path to the storage solution. #[arg(short, long, default_value = snapshot::DEFAULT_DB_PATH)] db_path: Option, - /// Number of storage logs to stuff into one chunk. - #[arg(short, long, default_value_t = snapshot::DEFAULT_CHUNK_SIZE)] - chunk_size: usize, + /// Number of chunks to split storage chunks into. + #[arg(short, long, default_value_t = snapshot::DEFAULT_NUM_CHUNKS)] + num_chunks: usize, /// The directory to export the snapshot files to. directory: String, }, diff --git a/src/main.rs b/src/main.rs index 3c7d1fc..78c544c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -181,13 +181,13 @@ async fn main() -> Result<()> { } Command::ExportSnapshot { db_path, - chunk_size, + num_chunks, directory, } => { let export_path = Path::new(&directory); std::fs::create_dir_all(export_path)?; let exporter = SnapshotExporter::new(export_path, db_path)?; - exporter.export_snapshot(chunk_size)?; + exporter.export_snapshot(num_chunks)?; tracing::info!("Succesfully exported snapshot files to \"{directory}\"!"); } diff --git a/src/processor/snapshot/exporter.rs b/src/processor/snapshot/exporter.rs index 73df1ce..ce7e0b3 100644 --- a/src/processor/snapshot/exporter.rs +++ b/src/processor/snapshot/exporter.rs @@ -1,8 +1,8 @@ use std::path::{Path, PathBuf}; -use chrono::offset::Utc; use ethers::types::U256; use eyre::Result; +use state_reconstruct_fetcher::constants::ethereum::GENESIS_BLOCK; use state_reconstruct_storage::{ snapshot::SnapshotDatabase, snapshot_columns, @@ -36,15 +36,19 @@ impl SnapshotExporter { }) } - pub fn export_snapshot(&self, chunk_size: usize) -> Result<()> { - let l1_batch_number = self.database.get_latest_l1_batch_number()?; + pub fn export_snapshot(&self, num_chunks: usize) -> Result<()> { + let latest_l1_batch_number = self.database.get_latest_l1_batch_number()?; + // L1 batch number is calculated from the batch number where the + // DiamondProxy contract was deployed (`GENESIS_BLOCK`). + let l1_batch_number = latest_l1_batch_number - GENESIS_BLOCK; + let l2_batch_number = self.database.get_latest_l2_batch_number()?; let mut header = SnapshotHeader { - l1_batch_number, - generated_at: Utc::now(), + l1_batch_number: l1_batch_number.as_u64(), + miniblock_number: l2_batch_number.as_u64(), ..Default::default() }; - self.export_storage_logs(chunk_size, &mut header)?; + self.export_storage_logs(num_chunks, &mut header)?; self.export_factory_deps(&mut header)?; let path = self.basedir.join(SNAPSHOT_HEADER_FILE_NAME); @@ -91,7 +95,7 @@ impl SnapshotExporter { Ok(()) } - fn export_storage_logs(&self, chunk_size: usize, header: &mut SnapshotHeader) -> Result<()> { + fn export_storage_logs(&self, num_chunks: usize, header: &mut SnapshotHeader) -> Result<()> { tracing::info!("Exporting storage logs..."); let num_logs = self.database.get_last_repeated_key_index()?; @@ -102,9 +106,9 @@ impl SnapshotExporter { .database .iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start); - let total_num_chunks = (num_logs / chunk_size as u64) + 1; - for chunk_id in 0..total_num_chunks { - tracing::info!("Serializing chunk {}/{}...", chunk_id + 1, total_num_chunks); + let chunk_size = num_logs / num_chunks as u64; + for chunk_id in 0..num_chunks { + tracing::info!("Serializing chunk {}/{}...", chunk_id + 1, num_chunks); let mut chunk = SnapshotStorageLogsChunk::default(); for _ in 0..chunk_size { @@ -125,7 +129,7 @@ impl SnapshotExporter { header .storage_logs_chunks .push(SnapshotStorageLogsChunkMetadata { - chunk_id, + chunk_id: chunk_id as u64, filepath: path .clone() .into_os_string() diff --git a/src/processor/snapshot/importer.rs b/src/processor/snapshot/importer.rs index a466bd3..140a55f 100644 --- a/src/processor/snapshot/importer.rs +++ b/src/processor/snapshot/importer.rs @@ -6,13 +6,14 @@ use std::{ use ethers::types::U64; use eyre::Result; use regex::{Captures, Regex}; +use state_reconstruct_fetcher::constants::ethereum::GENESIS_BLOCK; use state_reconstruct_storage::types::{ Proto, SnapshotFactoryDependencies, SnapshotHeader, SnapshotStorageLogsChunk, SnapshotStorageLogsChunkMetadata, }; use tokio::sync::mpsc::{self, Sender}; -use super::{SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME}; +use super::SNAPSHOT_HEADER_FILE_NAME; use crate::processor::tree::tree_wrapper::TreeWrapper; const SNAPSHOT_CHUNK_REGEX: &str = r"snapshot_l1_batch_(\d*)_storage_logs_part_\d*.proto.gzip"; @@ -31,23 +32,25 @@ impl SnapshotImporter { pub async fn run(self, db_path: &Path) -> Result<()> { let (tx, rx) = mpsc::channel(1); - let header = self.read_header()?; - let _factory_deps = self.read_factory_deps(&header)?; + let header = self.read_header().expect("failed to read header filepath"); + let _factory_deps = + Self::read_factory_deps(&header).expect("failed to read factory deps filepath"); // Read storage logs async sending each read one into the tree to process. tokio::spawn({ let header = header.clone(); async move { - self.read_storage_logs_chunks_async(&header, tx) + Self::read_storage_logs_chunks_async(&header, tx) .await .expect("failed to read storage_logs_chunks"); } }); + let l1_batch_number = header.l1_batch_number + GENESIS_BLOCK; let mut tree = TreeWrapper::new_snapshot_wrapper(db_path) .await .expect("can't create tree"); - tree.restore_from_snapshot(rx, header.l1_batch_number) + tree.restore_from_snapshot(rx, U64::from(l1_batch_number)) .await?; Ok(()) @@ -65,17 +68,13 @@ impl SnapshotImporter { Ok(header) } - fn read_factory_deps(&self, header: &SnapshotHeader) -> Result { - let factory_deps_path = self.directory.join(format!( - "snapshot_l1_batch_{}_{}", - header.l1_batch_number, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX - )); + fn read_factory_deps(header: &SnapshotHeader) -> Result { + let factory_deps_path = header.factory_deps_filepath.clone(); let bytes = fs::read(factory_deps_path)?; SnapshotFactoryDependencies::decode(&bytes) } async fn read_storage_logs_chunks_async( - &self, header: &SnapshotHeader, tx: Sender, ) -> Result<()> { @@ -88,10 +87,7 @@ impl SnapshotImporter { let total_chunks = filepaths.len(); for (i, path) in filepaths.into_iter().enumerate() { - let factory_deps_path = self - .directory - .join(path.file_name().expect("path has no file name")); - let bytes = fs::read(factory_deps_path)?; + let bytes = fs::read(path)?; let storage_logs_chunk = SnapshotStorageLogsChunk::decode(&bytes)?; tracing::info!("Read chunk {}/{}, processing...", i + 1, total_chunks); tx.send(storage_logs_chunk).await?; @@ -151,7 +147,7 @@ impl SnapshotImporter { } Ok(SnapshotHeader { - l1_batch_number: l1_batch_number.expect("no l1 batch number found"), + l1_batch_number: l1_batch_number.expect("no l1 batch number found").as_u64(), storage_logs_chunks, factory_deps_filepath, ..Default::default() diff --git a/src/processor/snapshot/mod.rs b/src/processor/snapshot/mod.rs index db2d346..0eee951 100644 --- a/src/processor/snapshot/mod.rs +++ b/src/processor/snapshot/mod.rs @@ -15,7 +15,7 @@ use state_reconstruct_fetcher::{ }; use state_reconstruct_storage::{ bytecode, - types::{MiniblockNumber, SnapshotFactoryDependency, SnapshotStorageLog}, + types::{SnapshotFactoryDependency, SnapshotStorageLog}, }; use tokio::sync::mpsc; @@ -24,7 +24,7 @@ use super::Processor; pub const DEFAULT_DB_PATH: &str = "snapshot_db"; pub const SNAPSHOT_HEADER_FILE_NAME: &str = "snapshot-header.json"; pub const SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX: &str = "factory_deps.proto.gzip"; -pub const DEFAULT_CHUNK_SIZE: usize = 1_000_000; +pub const DEFAULT_NUM_CHUNKS: usize = 10; pub struct SnapshotBuilder { database: SnapshotDatabase, @@ -73,7 +73,6 @@ impl Processor for SnapshotBuilder { .insert_storage_log(&mut SnapshotStorageLog { key: *key, value, - miniblock_number_of_initial_write: U64::from(0), l1_batch_number_of_initial_write: U64::from( block.l1_block_number.unwrap_or(0), ), @@ -121,6 +120,10 @@ impl Processor for SnapshotBuilder { .expect("failed to save factory dep"); } + let _ = self + .database + .set_latest_l2_batch_number(block.l2_block_number); + if let Some(number) = block.l1_block_number { let _ = self.database.set_latest_l1_batch_number(number); }; @@ -202,7 +205,7 @@ fn reconstruct_genesis_state(database: &mut SnapshotDatabase, path: &str) -> Res tracing::trace!("Have {} unique keys in the tree", key_set.len()); - for (address, key, value, miniblock_number) in batched { + for (address, key, value, _miniblock_number) in batched { let derived_key = derive_final_address_for_params(&address, &key); let mut tmp = [0u8; 32]; value.to_big_endian(&mut tmp); @@ -213,7 +216,6 @@ fn reconstruct_genesis_state(database: &mut SnapshotDatabase, path: &str) -> Res database.insert_storage_log(&mut SnapshotStorageLog { key, value, - miniblock_number_of_initial_write: MiniblockNumber::from(miniblock_number), l1_batch_number_of_initial_write: U64::from(ethereum::GENESIS_BLOCK), enumeration_index: 0, })?; diff --git a/src/processor/tree/tree_wrapper.rs b/src/processor/tree/tree_wrapper.rs index 99503e2..6465e08 100644 --- a/src/processor/tree/tree_wrapper.rs +++ b/src/processor/tree/tree_wrapper.rs @@ -19,7 +19,6 @@ use zksync_merkle_tree::{Database, MerkleTree, RocksDBWrapper, TreeEntry}; use zksync_storage::{RocksDB, RocksDBOptions}; use super::RootHash; -use crate::processor::snapshot::DEFAULT_CHUNK_SIZE; #[derive(Error, Debug)] pub enum TreeError { @@ -148,7 +147,7 @@ impl TreeWrapper { async move { let mut inner_db = inner_db.lock().await; while let Some(chunk) = rx.recv().await { - let mut tree_entries = Vec::with_capacity(DEFAULT_CHUNK_SIZE); + let mut tree_entries = Vec::new(); for log in &chunk.storage_logs { tree_entries.push(TreeEntry::new( diff --git a/state-reconstruct-storage/Cargo.toml b/state-reconstruct-storage/Cargo.toml index 2160f28..f2db894 100644 --- a/state-reconstruct-storage/Cargo.toml +++ b/state-reconstruct-storage/Cargo.toml @@ -13,6 +13,7 @@ ethers = "1.0.2" eyre = "0.6.8" flate2 = "1.0.28" serde = { version = "1.0.189", features = ["derive"] } +serde_repr = "0.1.19" prost = "0.12.4" rocksdb = "0.21.0" thiserror = "1.0.50" diff --git a/state-reconstruct-storage/proto/snapshot.proto b/state-reconstruct-storage/proto/snapshot.proto index 3a656ef..1e2ae0e 100644 --- a/state-reconstruct-storage/proto/snapshot.proto +++ b/state-reconstruct-storage/proto/snapshot.proto @@ -1,14 +1,18 @@ syntax = "proto3"; -package protobuf; +package zksync.types; message SnapshotStorageLogsChunk { repeated SnapshotStorageLog storage_logs = 1; } message SnapshotStorageLog { - optional bytes account_address = 1; // required; H160 - optional bytes storage_key = 2; // required; H256 + // `account_address` and `storage_key` fields are obsolete and are not used in the new snapshot format; + // `hashed_key` is used instead. The fields are retained for now to support recovery from old snapshots. + optional bytes account_address = 1; // optional; H160 + optional bytes storage_key = 2; // optional; H256 + optional bytes hashed_key = 6; // optional; H256 + optional bytes storage_value = 3; // required; H256 optional uint32 l1_batch_number_of_initial_write = 4; // required optional uint64 enumeration_index = 5; // required diff --git a/state-reconstruct-storage/src/lib.rs b/state-reconstruct-storage/src/lib.rs index 61e4d45..f56fbfc 100644 --- a/state-reconstruct-storage/src/lib.rs +++ b/state-reconstruct-storage/src/lib.rs @@ -26,6 +26,8 @@ pub mod snapshot_columns { pub const LAST_REPEATED_KEY_INDEX: &str = "SNAPSHOT_LAST_REPEATED_KEY_INDEX"; /// The latest l1 block number that was processed. pub const LATEST_L1_BATCH: &str = "SNAPSHOT_LATEST_L1_BATCH"; + /// The latest l2 block number that was processed. + pub const LATEST_L2_BATCH: &str = "SNAPSHOT_LATEST_L2_BATCH"; } // NOTE: This is moved here as a temporary measure to resolve a cyclic dependency issue. diff --git a/state-reconstruct-storage/src/snapshot.rs b/state-reconstruct-storage/src/snapshot.rs index a26f2ac..5f5c430 100644 --- a/state-reconstruct-storage/src/snapshot.rs +++ b/state-reconstruct-storage/src/snapshot.rs @@ -36,6 +36,8 @@ impl SnapshotDatabase { KEY_TO_INDEX_MAP, snapshot_columns::STORAGE_LOGS, snapshot_columns::FACTORY_DEPS, + snapshot_columns::LATEST_L1_BATCH, + snapshot_columns::LATEST_L2_BATCH, ], )?; @@ -56,6 +58,8 @@ impl SnapshotDatabase { KEY_TO_INDEX_MAP, snapshot_columns::STORAGE_LOGS, snapshot_columns::FACTORY_DEPS, + snapshot_columns::LATEST_L1_BATCH, + snapshot_columns::LATEST_L2_BATCH, ], false, )?; @@ -159,6 +163,15 @@ impl SnapshotDatabase { self.set_metadata_value(snapshot_columns::LATEST_L1_BATCH, number) } + pub fn get_latest_l2_batch_number(&self) -> Result { + self.get_metadata_value(snapshot_columns::LATEST_L2_BATCH) + .map(U64::from) + } + + pub fn set_latest_l2_batch_number(&self, number: u64) -> Result<()> { + self.set_metadata_value(snapshot_columns::LATEST_L2_BATCH, number) + } + pub fn get_last_repeated_key_index(&self) -> Result { self.get_metadata_value(snapshot_columns::LAST_REPEATED_KEY_INDEX) } diff --git a/state-reconstruct-storage/src/types.rs b/state-reconstruct-storage/src/types.rs index 49c83af..e0c4806 100644 --- a/state-reconstruct-storage/src/types.rs +++ b/state-reconstruct-storage/src/types.rs @@ -4,12 +4,12 @@ use std::{ }; use bytes::BytesMut; -use chrono::{offset::Utc, DateTime}; use ethers::types::{H256, U256, U64}; use eyre::Result; use flate2::{read::GzDecoder, write::GzEncoder, Compression}; use prost::Message; use serde::{Deserialize, Serialize}; +use serde_repr::{Deserialize_repr, Serialize_repr}; use super::bytecode; @@ -20,7 +20,7 @@ pub type StorageKey = U256; pub type StorageValue = H256; pub mod protobuf { - include!(concat!(env!("OUT_DIR"), "/protobuf.rs")); + include!(concat!(env!("OUT_DIR"), "/zksync.types.rs")); } pub trait Proto { @@ -73,19 +73,31 @@ pub trait Proto { } } +/// Version of snapshot influencing the format of data stored in GCS. +#[derive(Clone, Default, Debug, Serialize_repr, Deserialize_repr)] +#[repr(u16)] +pub enum SnapshotVersion { + /// Initial snapshot version. Keys in storage logs are stored as `(address, key)` pairs. + Version0 = 0, + /// Snapshot version made compatible with L1 recovery. Differs from `Version0` by including + /// hashed keys in storage logs instead of `(address, key)` pairs. + #[default] + Version1 = 1, +} + #[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct SnapshotHeader { - pub l1_batch_number: L1BatchNumber, - pub miniblock_number: MiniblockNumber, + pub version: SnapshotVersion, + pub l1_batch_number: u64, + pub miniblock_number: u64, // ordered by chunk_id pub storage_logs_chunks: Vec, pub factory_deps_filepath: String, - // Following `L1BatchWithMetadata` type doesn't have definition. Ignoring. - //pub last_l1_batch_with_metadata: L1BatchWithMetadata, - pub generated_at: DateTime, } #[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct SnapshotStorageLogsChunkMetadata { pub chunk_id: u64, // can be either a gs or filesystem path @@ -133,7 +145,6 @@ impl Proto for SnapshotStorageLogsChunk { pub struct SnapshotStorageLog { pub key: StorageKey, pub value: StorageValue, - pub miniblock_number_of_initial_write: MiniblockNumber, pub l1_batch_number_of_initial_write: L1BatchNumber, pub enumeration_index: u64, } @@ -147,7 +158,8 @@ impl Proto for SnapshotStorageLog { Self::ProtoStruct { account_address: None, - storage_key: Some(key.to_vec()), + storage_key: None, + hashed_key: Some(key.to_vec()), storage_value: Some(self.value.as_bytes().to_vec()), l1_batch_number_of_initial_write: Some(self.l1_batch_number_of_initial_write.as_u32()), enumeration_index: Some(self.enumeration_index), @@ -159,7 +171,6 @@ impl Proto for SnapshotStorageLog { Ok(Self { key: U256::from_big_endian(proto.storage_key()), value: StorageValue::from(&value_bytes), - miniblock_number_of_initial_write: U64::from(0), l1_batch_number_of_initial_write: proto.l1_batch_number_of_initial_write().into(), enumeration_index: proto.enumeration_index(), })