Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: abstract away protobuf from snapshot #84

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 16 additions & 90 deletions src/processor/snapshot/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
use std::{
io::Write,
path::{Path, PathBuf},
};
use std::path::{Path, PathBuf};

use bytes::BytesMut;
use ethers::types::U64;
use eyre::Result;
use flate2::{write::GzEncoder, Compression};
use prost::Message;

use super::{
database::{self, SnapshotDB},
types::{self, SnapshotFactoryDependency, SnapshotHeader},
types::{SnapshotFactoryDependency, SnapshotHeader},
DEFAULT_DB_PATH, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME,
};

pub mod protobuf {
include!(concat!(env!("OUT_DIR"), "/protobuf.rs"));
}
use crate::processor::snapshot::types::{
Proto, SnapshotFactoryDependencies, SnapshotStorageLogsChunk, SnapshotStorageLogsChunkMetadata,
};

pub struct SnapshotExporter {
basedir: PathBuf,
Expand Down Expand Up @@ -68,26 +61,15 @@ impl SnapshotExporter {
fn export_factory_deps(&self, header: &mut SnapshotHeader) -> Result<()> {
tracing::info!("Exporting factory dependencies...");

let mut buf = BytesMut::new();

let storage_logs = self.database.cf_handle(database::FACTORY_DEPS).unwrap();
let mut iterator = self
.database
.iterator_cf(storage_logs, rocksdb::IteratorMode::Start);

let mut factory_deps = protobuf::SnapshotFactoryDependencies::default();
let mut factory_deps = SnapshotFactoryDependencies::default();
while let Some(Ok((_, bs))) = iterator.next() {
let factory_dep: SnapshotFactoryDependency = bincode::deserialize(&bs)?;
factory_deps
.factory_deps
.push(protobuf::SnapshotFactoryDependency {
bytecode: Some(factory_dep.bytecode),
});
}

let fd_len = factory_deps.encoded_len();
if buf.capacity() < fd_len {
buf.reserve(fd_len - buf.capacity());
factory_deps.factory_deps.push(factory_dep);
}

let path = self.basedir.join(format!(
Expand All @@ -100,83 +82,44 @@ impl SnapshotExporter {
.into_string()
.expect("path to string");

// Serialize chunk.
factory_deps.encode(&mut buf)?;

let outfile = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(path)?;

// Wrap in gzip compression before writing.
let mut encoder = GzEncoder::new(outfile, Compression::default());
encoder.write_all(&buf)?;
encoder.finish()?;

factory_deps.encode(&path)?;
tracing::info!("All factory dependencies were successfully serialized!");
Ok(())
}

fn export_storage_logs(&self, chunk_size: u64, header: &mut SnapshotHeader) -> Result<()> {
tracing::info!("Exporting storage logs...");

let mut buf = BytesMut::new();
let mut chunk_id = 0;

let num_logs = self.database.get_last_repeated_key_index()?;
tracing::info!("Found {num_logs} logs.");

let total_num_chunks = (num_logs / chunk_size) + 1;

let index_to_key_map = self.database.cf_handle(database::INDEX_TO_KEY_MAP).unwrap();
let mut iterator = self
.database
.iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start);

let mut has_more = true;

while has_more {
let total_num_chunks = (num_logs / chunk_size) + 1;
for chunk_id in 0..total_num_chunks {
tracing::info!("Serializing chunk {}/{}...", chunk_id + 1, total_num_chunks);

let mut chunk = protobuf::SnapshotStorageLogsChunk {
storage_logs: vec![],
};

let mut chunk = SnapshotStorageLogsChunk::default();
for _ in 0..chunk_size {
if let Some(Ok((_, key))) = iterator.next() {
if let Ok(Some(entry)) = self.database.get_storage_log(key.as_ref()) {
let pb = protobuf::SnapshotStorageLog {
account_address: None,
storage_key: Some(key.to_vec()),
storage_value: Some(entry.value.0.to_vec()),
l1_batch_number_of_initial_write: Some(
entry.l1_batch_number_of_initial_write.as_u32(),
),
enumeration_index: Some(entry.enumeration_index),
};

chunk.storage_logs.push(pb);
chunk.storage_logs.push(entry);
}
} else {
has_more = false;
break;
}
}

// Ensure that write buffer has enough capacity.
let chunk_len = chunk.encoded_len();
if buf.capacity() < chunk_len {
buf.reserve(chunk_len - buf.capacity());
}

let path = &self.basedir.join(format!(
let path = self.basedir.join(format!(
"snapshot_l1_batch_{}_storage_logs_part_{:0>4}.proto.gzip",
header.l1_batch_number, chunk_id
));

header
.storage_logs_chunks
.push(types::SnapshotStorageLogsChunkMetadata {
.push(SnapshotStorageLogsChunkMetadata {
chunk_id,
filepath: path
.clone()
Expand All @@ -185,25 +128,8 @@ impl SnapshotExporter {
.expect("path to string"),
});

// Serialize chunk.
chunk.encode(&mut buf)?;

let outfile = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(path)?;

// Wrap in gzip compression before writing.
let mut encoder = GzEncoder::new(outfile, Compression::default());
encoder.write_all(&buf)?;
encoder.finish()?;

// Clear $tmp buffer.
buf.truncate(0);

chunk.encode(&path)?;
tracing::info!("Chunk {} was successfully serialized!", chunk_id + 1);
chunk_id += 1;
}

tracing::info!("All storage logs were successfully serialized!");
Expand Down
24 changes: 3 additions & 21 deletions src/processor/snapshot/importer.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
use std::{
fs,
io::Read,
path::{Path, PathBuf},
sync::Arc,
};

use eyre::Result;
use flate2::read::GzDecoder;
use prost::Message;
use state_reconstruct_fetcher::{constants::storage::INNER_DB_NAME, database::InnerDB};
use tokio::sync::Mutex;

use super::{
exporter::protobuf::{SnapshotFactoryDependencies, SnapshotStorageLogsChunk},
types::SnapshotHeader,
types::{Proto, SnapshotFactoryDependencies, SnapshotHeader, SnapshotStorageLogsChunk},
SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME,
};
use crate::processor::tree::tree_wrapper::TreeWrapper;
Expand Down Expand Up @@ -59,13 +55,7 @@ impl SnapshotImporter {
header.l1_batch_number, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX
));
let bytes = fs::read(factory_deps_path)?;
let mut decoder = GzDecoder::new(&bytes[..]);

let mut decompressed_bytes = Vec::new();
decoder.read_to_end(&mut decompressed_bytes)?;

let factory_deps = SnapshotFactoryDependencies::decode(&decompressed_bytes[..])?;
Ok(factory_deps)
SnapshotFactoryDependencies::decode(&bytes)
}

fn read_storage_logs_chunks(
Expand All @@ -85,15 +75,7 @@ impl SnapshotImporter {
.directory
.join(path.file_name().expect("path has no file name"));
let bytes = fs::read(factory_deps_path)?;
let mut decoder = GzDecoder::new(&bytes[..]);

let mut decompressed_bytes = Vec::new();
decoder.read_to_end(&mut decompressed_bytes)?;

// TODO: It would be nice to avoid the intermediary step of decoding. Something like
// implementing a method on the types::* that does it automatically. Will improve
// readabitly for the export code too as a bonus.
let storage_logs_chunk = SnapshotStorageLogsChunk::decode(&decompressed_bytes[..])?;
let storage_logs_chunk = SnapshotStorageLogsChunk::decode(&bytes)?;
chunks.push(storage_logs_chunk);
}
Ok(chunks)
Expand Down
6 changes: 5 additions & 1 deletion src/processor/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::{fs, path::PathBuf, str::FromStr};
pub mod database;
pub mod exporter;
pub mod importer;
pub mod types;

mod bytecode;
mod types;

use async_trait::async_trait;
use blake2::{Blake2s256, Digest};
Expand All @@ -28,6 +28,10 @@ pub const DEFAULT_DB_PATH: &str = "snapshot_db";
pub const SNAPSHOT_HEADER_FILE_NAME: &str = "snapshot-header.json";
pub const SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX: &str = "factory_deps.proto.gzip";

pub mod protobuf {
include!(concat!(env!("OUT_DIR"), "/protobuf.rs"));
}

pub struct SnapshotBuilder {
database: SnapshotDB,
}
Expand Down
Loading