Skip to content

Commit

Permalink
encapsulated BlobHttpClient
Browse files Browse the repository at this point in the history
  • Loading branch information
vbar committed Mar 20, 2024
1 parent 177d7e1 commit 85d4138
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 51 deletions.
32 changes: 32 additions & 0 deletions state-reconstruct-fetcher/src/blob_http_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use eyre::Result;

pub struct BlobHttpClient {
client: reqwest::Client,
url_base: String,
}

impl BlobHttpClient {
pub fn new(blob_url: String) -> Result<Self> {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
"Accept",
reqwest::header::HeaderValue::from_static("application/json"),
);
let client = reqwest::Client::builder()
.default_headers(headers)
.build()?;
Ok(Self {
client,
url_base: blob_url,
})
}

pub fn format_url(&self, kzg_commitment: &[u8]) -> String {
format!("{}0x{}", self.url_base, hex::encode(kzg_commitment))
}

pub async fn retrieve_url(&self, url: &str) -> Result<reqwest::Response> {
let result = self.client.get(url).send().await?;
Ok(result)
}
}
36 changes: 7 additions & 29 deletions state-reconstruct-fetcher/src/l1_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tokio::{
use tokio_util::sync::CancellationToken;

use crate::{
blob_http_client::BlobHttpClient,
constants::ethereum::{BLOB_BLOCK, BLOCK_STEP, BOOJUM_BLOCK, GENESIS_BLOCK, ZK_SYNC_ADDR},
database::InnerDB,
metrics::L1Metrics,
Expand Down Expand Up @@ -449,8 +450,7 @@ impl L1Fetcher {
) -> Result<tokio::task::JoinHandle<Option<u64>>> {
let metrics = self.metrics.clone();
let contracts = self.contracts.clone();
let client = make_client()?;
let blobs_url = self.config.blobs_url.clone();
let client = BlobHttpClient::new(self.config.blobs_url.clone())?;
Ok(tokio::spawn({
async move {
let mut boojum_mode = false;
Expand Down Expand Up @@ -479,15 +479,7 @@ impl L1Fetcher {
}

let blocks = loop {
match parse_calldata(
block_number,
&function,
&tx.input,
&client,
&blobs_url,
)
.await
{
match parse_calldata(block_number, &function, &tx.input, &client).await {
Ok(blks) => break blks,
Err(e) => match e {
ParseError::BlobStorageError(_) => {
Expand Down Expand Up @@ -549,24 +541,11 @@ impl L1Fetcher {
}
}

fn make_client() -> Result<reqwest::Client> {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
"Accept",
reqwest::header::HeaderValue::from_static("application/json"),
);
let client = reqwest::Client::builder()
.default_headers(headers)
.build()?;
Ok(client)
}

pub async fn parse_calldata(
l1_block_number: u64,
commit_blocks_fn: &Function,
calldata: &[u8],
client: &reqwest::Client,
blobs_url: &str,
client: &BlobHttpClient,
) -> Result<Vec<CommitBlock>, ParseError> {
let mut parsed_input = commit_blocks_fn
.decode_input(&calldata[4..])
Expand Down Expand Up @@ -606,7 +585,7 @@ pub async fn parse_calldata(

// Parse blocks using [`CommitBlockInfoV1`] or [`CommitBlockInfoV2`]
let mut block_infos =
parse_commit_block_info(&new_blocks_data, l1_block_number, client, blobs_url).await?;
parse_commit_block_info(&new_blocks_data, l1_block_number, client).await?;
// Supplement every `CommitBlock` element with L1 block number information.
block_infos
.iter_mut()
Expand All @@ -617,8 +596,7 @@ pub async fn parse_calldata(
async fn parse_commit_block_info(
data: &abi::Token,
l1_block_number: u64,
client: &reqwest::Client,
blobs_url: &str,
client: &BlobHttpClient,
) -> Result<Vec<CommitBlock>, ParseError> {
let abi::Token::Array(data) = data else {
return Err(ParseError::InvalidCommitBlockInfo(
Expand All @@ -630,7 +608,7 @@ async fn parse_commit_block_info(
for d in data {
let commit_block = {
if l1_block_number >= BLOB_BLOCK {
CommitBlock::try_from_token_resolve(d, client, blobs_url).await?
CommitBlock::try_from_token_resolve(d, client).await?
} else if l1_block_number >= BOOJUM_BLOCK {
CommitBlock::try_from_token::<V2>(d)?
} else {
Expand Down
1 change: 1 addition & 0 deletions state-reconstruct-fetcher/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![feature(array_chunks)]
#![feature(iter_next_chunk)]
pub mod blob_http_client;
pub mod constants;
pub mod database;
pub mod l1_fetcher;
Expand Down
11 changes: 5 additions & 6 deletions state-reconstruct-fetcher/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde_json_any_key::any_key_map;
use thiserror::Error;

use self::{v1::V1, v2::V2, v3::V3};
use crate::blob_http_client::BlobHttpClient;

// NOTE: We should probably make these more human-readable.
pub mod common;
Expand Down Expand Up @@ -104,11 +105,10 @@ impl CommitBlock {

pub async fn try_from_token_resolve<'a>(
value: &'a abi::Token,
client: &reqwest::Client,
blobs_url: &str,
client: &BlobHttpClient,
) -> Result<Self, ParseError> {
let commit_block_info = V3::try_from(value)?;
Self::from_commit_block_resolve(commit_block_info, client, blobs_url).await
Self::from_commit_block_resolve(commit_block_info, client).await
}

pub fn from_commit_block(block_type: CommitBlockInfo) -> Self {
Expand Down Expand Up @@ -170,10 +170,9 @@ impl CommitBlock {

pub async fn from_commit_block_resolve(
block: V3,
client: &reqwest::Client,
blobs_url: &str,
client: &BlobHttpClient,
) -> Result<Self, ParseError> {
let total_l2_to_l1_pubdata = block.parse_pubdata(client, blobs_url).await?;
let total_l2_to_l1_pubdata = block.parse_pubdata(client).await?;
let mut initial_storage_changes = IndexMap::new();
let mut repeated_storage_changes = IndexMap::new();
let mut factory_deps = Vec::new();
Expand Down
27 changes: 11 additions & 16 deletions state-reconstruct-fetcher/src/types/v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use super::{
common::{parse_compressed_state_diffs, read_next_n_bytes, ExtractedToken},
L2ToL1Pubdata, ParseError,
};
use crate::constants::zksync::{L2_TO_L1_LOG_SERIALIZE_SIZE, PUBDATA_COMMITMENT_SIZE};
use crate::{
blob_http_client::BlobHttpClient,
constants::zksync::{L2_TO_L1_LOG_SERIALIZE_SIZE, PUBDATA_COMMITMENT_SIZE},
};

/// `MAX_RETRIES` is the maximum number of retries on failed blob retrieval.
const MAX_RETRIES: u8 = 5;
Expand Down Expand Up @@ -100,16 +103,13 @@ impl TryFrom<&abi::Token> for V3 {
impl V3 {
pub async fn parse_pubdata(
&self,
client: &reqwest::Client,
blobs_url: &str,
client: &BlobHttpClient,
) -> Result<Vec<L2ToL1Pubdata>, ParseError> {
let mut pointer = 0;
let bytes = &self.pubdata_commitments[..];
match self.pubdata_source {
PubdataSource::Calldata => parse_pubdata_from_calldata(bytes, &mut pointer, true),
PubdataSource::Blob => {
parse_pubdata_from_blobs(bytes, &mut pointer, client, blobs_url).await
}
PubdataSource::Blob => parse_pubdata_from_blobs(bytes, &mut pointer, client).await,
}
}
}
Expand Down Expand Up @@ -163,14 +163,13 @@ fn parse_pubdata_from_calldata(
async fn parse_pubdata_from_blobs(
bytes: &[u8],
pointer: &mut usize,
client: &reqwest::Client,
blobs_url: &str,
client: &BlobHttpClient,
) -> Result<Vec<L2ToL1Pubdata>, ParseError> {
let mut l = bytes.len() - *pointer;
let mut blobs = Vec::new();
while *pointer < l {
let pubdata_commitment = &bytes[*pointer..*pointer + PUBDATA_COMMITMENT_SIZE];
let blob = get_blob(&pubdata_commitment[48..96], client, blobs_url).await?;
let blob = get_blob(&pubdata_commitment[48..96], client).await?;
let mut blob_bytes = ethereum_4844_data_into_zksync_pubdata(&blob);
blobs.append(&mut blob_bytes);
*pointer += PUBDATA_COMMITMENT_SIZE;
Expand All @@ -186,14 +185,10 @@ async fn parse_pubdata_from_blobs(
parse_pubdata_from_calldata(blobs_view, &mut pointer, false)
}

async fn get_blob(
kzg_commitment: &[u8],
client: &reqwest::Client,
blobs_url: &str,
) -> Result<Vec<u8>, ParseError> {
let url = format!("{}0x{}", blobs_url, hex::encode(kzg_commitment));
async fn get_blob(kzg_commitment: &[u8], client: &BlobHttpClient) -> Result<Vec<u8>, ParseError> {
let url = client.format_url(kzg_commitment);
for attempt in 1..=MAX_RETRIES {
match client.get(url.clone()).send().await {
match client.retrieve_url(&url).await {
Ok(response) => match response.text().await {
Ok(text) => match get_blob_data(&text) {
Ok(data) => {
Expand Down

0 comments on commit 85d4138

Please sign in to comment.