From 7c49f09cdca23171495621068ee1f017ebc17d70 Mon Sep 17 00:00:00 2001 From: Bo Wu Date: Mon, 2 Oct 2023 13:55:51 -0700 Subject: [PATCH] use gcs for reading replay-verify --- .github/workflows/replay-verify.yaml | 12 +- .../workflows/workflow-run-replay-verify.yaml | 16 +-- Cargo.lock | 5 +- Cargo.toml | 2 +- storage/backup/backup-cli/Cargo.toml | 3 +- .../src/coordinators/replay_verify.rs | 119 +++++++++++------- .../backup/backup-cli/src/metadata/cache.rs | 37 +++--- storage/backup/backup-cli/src/storage/mod.rs | 2 +- storage/db-tool/src/replay_verify.rs | 62 ++++++--- testsuite/replay_verify.py | 99 +++++++-------- testsuite/replay_verify_run_local.py | 11 +- testsuite/smoke-test/src/storage.rs | 5 +- 12 files changed, 210 insertions(+), 163 deletions(-) diff --git a/.github/workflows/replay-verify.yaml b/.github/workflows/replay-verify.yaml index f3de1f7be8143..54adf111a7f07 100644 --- a/.github/workflows/replay-verify.yaml +++ b/.github/workflows/replay-verify.yaml @@ -56,11 +56,11 @@ jobs: with: GIT_SHA: ${{ inputs.GIT_SHA }} # replay-verify config - BUCKET: aptos-testnet-backup-2223d95b + BUCKET: aptos-testnet-backup-b7b1ad7a SUB_DIR: e1 HISTORY_START: 250000000 # TODO: We need an exhaustive list of txns_to_skip before we can set this to 0. TXNS_TO_SKIP: 46874937 151020059 409163615 409163669 409163708 409163774 409163845 409163955 409164059 409164191 414625832 - BACKUP_CONFIG_TEMPLATE_PATH: terraform/helm/fullnode/files/backup/s5cmd-public.yaml + BACKUP_CONFIG_TEMPLATE_PATH: terraform/helm/fullnode/files/backup/gcs.yaml # workflow config RUNS_ON: "high-perf-docker-with-local-ssd" TIMEOUT_MINUTES: 840 @@ -76,11 +76,11 @@ jobs: with: GIT_SHA: ${{ inputs.GIT_SHA }} # replay-verify config - BUCKET: aptos-mainnet-backup-backup-831a69a8 + BUCKET: aptos-mainnet-backup-backup-e098483d SUB_DIR: e1 HISTORY_START: 0 TXNS_TO_SKIP: 12253479 12277499 148358668 - BACKUP_CONFIG_TEMPLATE_PATH: terraform/helm/fullnode/files/backup/s5cmd-public.yaml + BACKUP_CONFIG_TEMPLATE_PATH: terraform/helm/fullnode/files/backup/gcs.yaml # workflow config RUNS_ON: "high-perf-docker-with-local-ssd" TIMEOUT_MINUTES: 300 @@ -93,11 +93,11 @@ jobs: with: GIT_SHA: ${{ github.event.pull_request.head.sha }} # replay-verify config - BUCKET: aptos-testnet-backup-2223d95b + BUCKET: aptos-testnet-backup-b7b1ad7a SUB_DIR: e1 HISTORY_START: 250000000 # TODO: We need an exhaustive list of txns_to_skip before we can set this to 0. TXNS_TO_SKIP: 46874937 151020059 409163615 409163669 409163708 409163774 409163845 409163955 409164059 409164191 414625832 - BACKUP_CONFIG_TEMPLATE_PATH: terraform/helm/fullnode/files/backup/s5cmd-public.yaml + BACKUP_CONFIG_TEMPLATE_PATH: terraform/helm/fullnode/files/backup/gcs.yaml # workflow config RUNS_ON: "high-perf-docker-with-local-ssd" TIMEOUT_MINUTES: 120 # increase test replay timeout to capture more flaky errors diff --git a/.github/workflows/workflow-run-replay-verify.yaml b/.github/workflows/workflow-run-replay-verify.yaml index e07855ca4ad71..764d5984ddf0e 100644 --- a/.github/workflows/workflow-run-replay-verify.yaml +++ b/.github/workflows/workflow-run-replay-verify.yaml @@ -95,17 +95,11 @@ jobs: with: GIT_CREDENTIALS: ${{ secrets.GIT_CREDENTIALS }} - - name: Install AWS CLI - shell: bash - run: | - scripts/dev_setup.sh -b -i awscli - echo "${HOME}/bin/" >> $GITHUB_PATH # default INSTALL_DIR to path - - - name: Install s5cmd - shell: bash - run: | - scripts/dev_setup.sh -b -i s5cmd - echo "${HOME}/bin/" >> $GITHUB_PATH # default INSTALL_DIR to path + - name: Install GCloud SDK + uses: "google-github-actions/setup-gcloud@62d4898025f6041e16b1068643bfc5a696863587" # pin@v1 + with: + version: ">= 418.0.0" + install_components: "kubectl,gke-gcloud-auth-plugin" - name: Build CLI binaries in release mode shell: bash diff --git a/Cargo.lock b/Cargo.lock index 1ff44eef2a962..d8b76cbd49bcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -533,6 +533,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml 0.8.26", + "thiserror", "tokio", "tokio-io-timeout", "tokio-stream", @@ -13998,9 +13999,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.9" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" dependencies = [ "futures-core", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index 9f9141989f716..c070115c77247 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -632,7 +632,7 @@ tokio = { version = "1.21.0", features = ["full"] } tokio-io-timeout = "1.2.0" tokio-metrics = "0.1.0" tokio-retry = "0.3.0" -tokio-stream = "0.1.8" +tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-test = "0.4.1" tokio-util = { version = "0.7.2", features = ["compat", "codec"] } toml = "0.7.4" diff --git a/storage/backup/backup-cli/Cargo.toml b/storage/backup/backup-cli/Cargo.toml index 24ea011e09abe..d4b83ef52af9c 100644 --- a/storage/backup/backup-cli/Cargo.toml +++ b/storage/backup/backup-cli/Cargo.toml @@ -49,9 +49,10 @@ reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } +thiserror = { workspace = true } tokio = { workspace = true } tokio-io-timeout = { workspace = true } -tokio-stream = { workspace = true } +tokio-stream = { workspace = true, features = ["fs"] } tokio-util = { workspace = true } [dev-dependencies] diff --git a/storage/backup/backup-cli/src/coordinators/replay_verify.rs b/storage/backup/backup-cli/src/coordinators/replay_verify.rs index b89d77c0ee7a0..7dc36d38f5e82 100644 --- a/storage/backup/backup-cli/src/coordinators/replay_verify.rs +++ b/storage/backup/backup-cli/src/coordinators/replay_verify.rs @@ -12,14 +12,28 @@ use crate::{ storage::BackupStorage, utils::{GlobalRestoreOptions, RestoreRunMode, TrustedWaypointOpt}, }; -use anyhow::{bail, ensure, Result}; +use anyhow::Result; use aptos_db::backup::restore_handler::RestoreHandler; use aptos_executor_types::VerifyExecutionMode; use aptos_logger::prelude::*; use aptos_types::{on_chain_config::TimedFeatureOverride, transaction::Version}; use aptos_vm::AptosVM; use std::sync::Arc; +use thiserror::Error; +#[derive(Debug, Error)] +pub enum ReplayError { + #[error("Txn mismatch error")] + TxnMismatch, + #[error("Other Replay error {0}")] + OtherError(String), +} + +impl From for ReplayError { + fn from(error: anyhow::Error) -> Self { + ReplayError::OtherError(error.to_string()) + } +} pub struct ReplayVerifyCoordinator { storage: Arc, metadata_cache_opt: MetadataCacheOpt, @@ -60,9 +74,8 @@ impl ReplayVerifyCoordinator { }) } - pub async fn run(self) -> Result<()> { + pub async fn run(self) -> Result<(), ReplayError> { info!("ReplayVerify coordinator started."); - let ret = self.run_impl().await; if let Err(e) = &ret { @@ -77,7 +90,7 @@ impl ReplayVerifyCoordinator { ret } - async fn run_impl(self) -> Result<()> { + async fn run_impl(self) -> Result<(), ReplayError> { AptosVM::set_concurrency_level_once(self.replay_concurrency_level); AptosVM::set_timed_feature_override(TimedFeatureOverride::Replay); @@ -87,45 +100,56 @@ impl ReplayVerifyCoordinator { self.concurrent_downloads, ) .await?; - ensure!( - self.start_version <= self.end_version, - "start_version should precede end_version." - ); + if self.start_version > self.end_version { + return Err(ReplayError::OtherError(format!( + "start_version {} should precede end_version {}.", + self.start_version, self.end_version + ))); + } let run_mode = Arc::new(RestoreRunMode::Restore { restore_handler: self.restore_handler, }); - let next_txn_version = run_mode.get_next_expected_transaction_version()?; - let (state_snapshot, replay_transactions_from_version) = if next_txn_version != 0 { - // DB is already in workable state - info!( - next_txn_version = next_txn_version, - "DB already has non-empty State DB.", - ); - (None, next_txn_version) - } else if let Some(version) = run_mode.get_in_progress_state_kv_snapshot()? { + let mut next_txn_version = run_mode.get_next_expected_transaction_version()?; + let (state_snapshot, snapshot_version) = if let Some(version) = + run_mode.get_in_progress_state_kv_snapshot()? + { info!( version = version, "Found in progress state snapshot restore", ); - (Some(metadata_view.expect_state_snapshot(version)?), version) - } else if self.start_version == 0 { - (None, 0) + ( + Some(metadata_view.expect_state_snapshot(version)?), + Some(version), + ) + } else if let Some(snapshot) = metadata_view.select_state_snapshot(self.start_version)? { + let snapshot_version = snapshot.version; + info!( + "Found state snapshot backup at epoch {}, will replay from version {}.", + snapshot.epoch, + snapshot_version + 1 + ); + (Some(snapshot), Some(snapshot_version)) } else { - let state_snapshot = metadata_view.select_state_snapshot(self.start_version - 1)?; - let replay_transactions_from_version = - state_snapshot.as_ref().map(|b| b.version + 1).unwrap_or(0); - (state_snapshot, replay_transactions_from_version) + (None, None) }; - ensure!( - next_txn_version <= self.start_version, - "DB version is already beyond start_version requested.", - ); + + let skip_snapshot: bool = + snapshot_version.is_none() || next_txn_version > snapshot_version.unwrap(); + if skip_snapshot { + info!( + next_txn_version = next_txn_version, + snapshot_version = snapshot_version, + "found in progress replay and skip the state snapshot restore", + ); + } + + next_txn_version = std::cmp::max(next_txn_version, snapshot_version.map_or(0, |v| v + 1)); let transactions = metadata_view.select_transaction_backups( // transaction info at the snapshot must be restored otherwise the db will be confused // about the latest version after snapshot is restored. - replay_transactions_from_version.saturating_sub(1), + next_txn_version.saturating_sub(1), self.end_version, )?; let global_opt = GlobalRestoreOptions { @@ -135,21 +159,22 @@ impl ReplayVerifyCoordinator { concurrent_downloads: self.concurrent_downloads, replay_concurrency_level: 0, // won't replay, doesn't matter }; - - if let Some(backup) = state_snapshot { - StateSnapshotRestoreController::new( - StateSnapshotRestoreOpt { - manifest_handle: backup.manifest, - version: backup.version, - validate_modules: self.validate_modules, - restore_mode: Default::default(), - }, - global_opt.clone(), - Arc::clone(&self.storage), - None, /* epoch_history */ - ) - .run() - .await?; + if !skip_snapshot { + if let Some(backup) = state_snapshot { + StateSnapshotRestoreController::new( + StateSnapshotRestoreOpt { + manifest_handle: backup.manifest, + version: backup.version, + validate_modules: self.validate_modules, + restore_mode: Default::default(), + }, + global_opt.clone(), + Arc::clone(&self.storage), + None, /* epoch_history */ + ) + .run() + .await?; + } } let txn_manifests = transactions.into_iter().map(|b| b.manifest).collect(); @@ -158,8 +183,8 @@ impl ReplayVerifyCoordinator { self.storage, txn_manifests, None, - Some((replay_transactions_from_version, false)), /* replay_from_version */ - None, /* epoch_history */ + Some((next_txn_version, false)), /* replay_from_version */ + None, /* epoch_history */ self.verify_execution_mode.clone(), None, ) @@ -167,7 +192,7 @@ impl ReplayVerifyCoordinator { .await?; if self.verify_execution_mode.seen_error() { - bail!("Seen replay errors, check out logs.") + Err(ReplayError::TxnMismatch) } else { Ok(()) } diff --git a/storage/backup/backup-cli/src/metadata/cache.rs b/storage/backup/backup-cli/src/metadata/cache.rs index 64fc9259e81cb..a04ba7bd97883 100644 --- a/storage/backup/backup-cli/src/metadata/cache.rs +++ b/storage/backup/backup-cli/src/metadata/cache.rs @@ -13,7 +13,6 @@ use aptos_logger::prelude::*; use aptos_temppath::TempPath; use async_trait::async_trait; use clap::Parser; -use futures::stream::poll_fn; use std::{ collections::{HashMap, HashSet}, path::{Path, PathBuf}, @@ -24,7 +23,7 @@ use tokio::{ fs::{create_dir_all, read_dir, remove_file, OpenOptions}, io::{AsyncRead, AsyncReadExt}, }; -use tokio_stream::StreamExt; +use tokio_stream::{wrappers::ReadDirStream, StreamExt}; #[derive(Clone, Parser)] pub struct MetadataCacheOpt { @@ -99,25 +98,19 @@ pub async fn sync_and_load( create_dir_all(&cache_dir).await.err_notes(&cache_dir)?; // create if not present already // List cached metadata files. - let mut dir = read_dir(&cache_dir).await.err_notes(&cache_dir)?; - let local_entries = poll_fn(|ctx| { - ::std::task::Poll::Ready(match futures::ready!(dir.poll_next_entry(ctx)) { - Ok(Some(entry)) => Some(Ok(entry)), - Ok(None) => None, - Err(err) => Some(Err(err)), + let dir = read_dir(&cache_dir).await.err_notes(&cache_dir)?; + let local_hashes_vec: Vec = ReadDirStream::new(dir) + .filter_map(|entry| match entry { + Ok(e) => { + let path = e.path(); + let file_name = path.file_name()?.to_str()?; + Some(file_name.to_string()) + }, + Err(_) => None, }) - }) - .collect::>>() - .await?; - let local_hashes = local_entries - .iter() - .map(|e| { - e.file_name() - .into_string() - .map_err(|s| anyhow!("into_string() failed for file name {:?}", s)) - }) - .collect::>>()?; - + .collect() + .await; + let local_hashes: HashSet<_> = local_hashes_vec.into_iter().collect(); // List remote metadata files. let mut remote_file_handles = storage.list_metadata_files().await?; if remote_file_handles.is_empty() { @@ -164,7 +157,9 @@ pub async fn sync_and_load( download_file(storage_ref, file_handle, &local_tmp_file).await?; // rename to target file only if successful; stale tmp file caused by failure will be // reclaimed on next run - tokio::fs::rename(local_tmp_file, local_file).await?; + tokio::fs::rename(local_tmp_file.clone(), local_file) + .await + .err_notes(local_tmp_file)?; info!( file_handle = file_handle, processed = i + 1, diff --git a/storage/backup/backup-cli/src/storage/mod.rs b/storage/backup/backup-cli/src/storage/mod.rs index f2106aa08356b..79ef6fd8de749 100644 --- a/storage/backup/backup-cli/src/storage/mod.rs +++ b/storage/backup/backup-cli/src/storage/mod.rs @@ -209,7 +209,7 @@ impl StorageOpt { } } -#[derive(Parser)] +#[derive(Parser, Clone, Debug)] #[clap(group( ArgGroup::new("storage") .required(true) diff --git a/storage/db-tool/src/replay_verify.rs b/storage/db-tool/src/replay_verify.rs index 52a59f33468f9..937e94255c9f7 100644 --- a/storage/db-tool/src/replay_verify.rs +++ b/storage/db-tool/src/replay_verify.rs @@ -1,9 +1,9 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use anyhow::Result; +use anyhow::{bail, Result}; use aptos_backup_cli::{ - coordinators::replay_verify::ReplayVerifyCoordinator, + coordinators::replay_verify::{ReplayError, ReplayVerifyCoordinator}, metadata::cache::MetadataCacheOpt, storage::DBToolStorageOpt, utils::{ConcurrentDownloadsOpt, ReplayConcurrencyLevelOpt, RocksdbOpt, TrustedWaypointOpt}, @@ -14,6 +14,7 @@ use aptos_config::config::{ }; use aptos_db::{AptosDB, GetRestoreHandler}; use aptos_executor_types::VerifyExecutionMode; +use aptos_logger::info; use aptos_types::transaction::Version; use clap::Parser; use std::{path::PathBuf, sync::Arc}; @@ -58,6 +59,8 @@ pub struct Opt { lazy_quit: bool, } +const RETRY_ATTEMPT: u8 = 5; + impl Opt { pub async fn run(self) -> Result<()> { let restore_handler = Arc::new(AptosDB::open( @@ -70,19 +73,46 @@ impl Opt { DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, )?) .get_restore_handler(); - ReplayVerifyCoordinator::new( - self.storage.init_storage().await?, - self.metadata_cache_opt, - self.trusted_waypoints_opt, - self.concurrent_downloads.get(), - self.replay_concurrency_level.get(), - restore_handler, - self.start_version.unwrap_or(0), - self.end_version.unwrap_or(Version::MAX), - self.validate_modules, - VerifyExecutionMode::verify_except(self.txns_to_skip).set_lazy_quit(self.lazy_quit), - )? - .run() - .await + let mut attempt = 0; + while attempt < RETRY_ATTEMPT { + let ret = ReplayVerifyCoordinator::new( + self.storage.clone().init_storage().await?, + self.metadata_cache_opt.clone(), + self.trusted_waypoints_opt.clone(), + self.concurrent_downloads.get(), + self.replay_concurrency_level.get(), + restore_handler.clone(), + self.start_version.unwrap_or(0), + self.end_version.unwrap_or(Version::MAX), + self.validate_modules, + VerifyExecutionMode::verify_except(self.txns_to_skip.clone()) + .set_lazy_quit(self.lazy_quit), + )? + .run() + .await; + match ret { + Err(e) => match e { + ReplayError::TxnMismatch => { + info!("ReplayVerify coordinator exiting with Txn output mismatch error."); + break; + }, + _ => { + info!( + "ReplayVerify coordinator retrying with attempt {}.", + attempt + ); + }, + }, + _ => { + info!("ReplayVerify coordinator succeeded"); + return Ok(()); + }, + } + attempt += 1; + } + bail!( + "ReplayVerify coordinator failed after {} attempts.", + RETRY_ATTEMPT + ) } } diff --git a/testsuite/replay_verify.py b/testsuite/replay_verify.py index 5fbaa85ee0070..66d8b83d462fd 100755 --- a/testsuite/replay_verify.py +++ b/testsuite/replay_verify.py @@ -7,9 +7,10 @@ import subprocess import shutil import sys -import math from multiprocessing import Pool, freeze_support from typing import Tuple +from collections import deque + from verify_core.common import clear_artifacts, query_backup_latest_version @@ -29,7 +30,7 @@ # 2. meanwhile, the oncall should delete the old ranges that are beyond 300M window that we want to scan # -testnet_runner_mapping = [ +TESTNET_RANGES = [ [250000000, 255584106], [255584107, 271874718], [271874719, 300009463], @@ -49,10 +50,10 @@ [640_000_001, sys.maxsize], ] -mainnet_runner_mapping = [ - [0, 50_000_000], - [50_000_001, 100_000_000], - [100_000_001, 110_000_000], +MAINNET_RANGES = [ + [0, 40_000_000], + [40_000_001, 95_000_000], + [95_000_001, 110_000_000], [110_000_001, 150_000_000], [150_000_001, 170_000_000], [170_000_001, 180_000_000], @@ -64,26 +65,12 @@ [230_000_001, 240_000_000], [240_000_001, 250_000_000], [250_000_001, 260_000_000], - [260_000_001, 270_000_000], - [270_000_001, 278_000_000], - [278_000_001, sys.maxsize], + [260_000_001, 275_000_000], + [275_000_001, 285_000_000], + [285_000_001, sys.maxsize], ] -def replay_with_retry(*args): - MAX_RETRIES = 3 - attempt = 0 - while attempt < MAX_RETRIES: - (n, return_code) = replay_verify_partition(*args) - if return_code == 0: - return (n, return_code) - elif attempt < MAX_RETRIES - 1: - print(f"Attempt {attempt + 1} failed for arguments {args}. Retrying.") - attempt += 1 - else: - return (n, return_code) - - def replay_verify_partition( n: int, N: int, @@ -92,7 +79,7 @@ def replay_verify_partition( latest_version: int, txns_to_skip: Tuple[int], backup_config_template_path: str, -) -> Tuple[int, int]: +) -> Tuple[int, int, bytes]: """ Run replay-verify for a partition of the backup, returning a tuple of the (partition number, return code) @@ -105,15 +92,17 @@ def replay_verify_partition( backup_config_template_path: path to the backup config template """ end = history_start + n * per_partition - if n == N - 1 and end < latest_version: + if n == N and end < latest_version: end = latest_version start = end - per_partition partition_name = f"run_{n}_{start}_{end}" print(f"[partition {n}] spawning {partition_name}") - os.mkdir(partition_name) - shutil.copytree("metadata-cache", f"{partition_name}/metadata-cache") + if not os.path.exists(partition_name): + os.mkdir(partition_name) + # the metadata cache is shared across partitions and downloaded when querying the latest version. + shutil.copytree("metadata-cache", f"{partition_name}/metadata-cache") txns_to_skip_args = [f"--txns-to-skip={txn}" for txn in txns_to_skip] @@ -124,7 +113,7 @@ def replay_verify_partition( "replay-verify", *txns_to_skip_args, "--concurrent-downloads", - "2", + "8", "--replay-concurrency-level", "2", "--metadata-cache-dir", @@ -140,19 +129,20 @@ def replay_verify_partition( backup_config_template_path, ], stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, # redirect stderr to stdout ) if process.stdout is None: raise Exception(f"[partition {n}] stdout is None") + last_lines = deque(maxlen=10) for line in iter(process.stdout.readline, b""): print(f"[partition {n}] {line}", flush=True) - - # set the returncode + last_lines.append(line) process.communicate() - return (n, process.returncode) + return (n, process.returncode, b"\n".join(last_lines)) -def main(): +def main(runner_no=None, runner_cnt=None, start_version=None, end_version=None): # collect all required ENV variables REQUIRED_ENVS = [ "BUCKET", @@ -164,13 +154,17 @@ def main(): if not all(env in os.environ for env in REQUIRED_ENVS): raise Exception("Missing required ENV variables") - (runner_no, runner_cnt) = ( - (int(sys.argv[1]), int(sys.argv[2])) if len(sys.argv) > 2 else (None, None) + + # the runner may have small overlap at the boundary to prevent missing any transactions + runner_mapping = ( + TESTNET_RANGES if "testnet" in os.environ["BUCKET"] else MAINNET_RANGES ) - # by default we only run one job + + # by default we only have 1 runner if runner_no is None or runner_cnt is None: runner_no = 0 runner_cnt = 1 + runner_mapping = [[runner_mapping[0][0], runner_mapping[-1][1]]] assert ( runner_no >= 0 and runner_no < runner_cnt @@ -186,36 +180,34 @@ def main(): if "aws" in config and shutil.which("aws") is None: raise Exception("Missing required AWS CLI for pulling backup data from S3") - if os.environ.get("REUSE_BACKUP_ARTIFACTS", "true") == "true": + if os.environ.get("REUSE_BACKUP_ARTIFACTS", "true") != "true": print("[main process] clearing existing backup artifacts") clear_artifacts() else: print("[main process] skipping clearing backup artifacts") - LATEST_VERSION = query_backup_latest_version(BACKUP_CONFIG_TEMPLATE_PATH) - - # the runner may have small overlap at the boundary to prevent missing any transactions - runner_mapping = ( - testnet_runner_mapping - if "testnet" in os.environ["BUCKET"] - else mainnet_runner_mapping - ) - assert runner_cnt == len( runner_mapping ), "runner_cnt must match the number of runners in the mapping" runner_start = runner_mapping[runner_no][0] runner_end = runner_mapping[runner_no][1] + latest_version = query_backup_latest_version(BACKUP_CONFIG_TEMPLATE_PATH) if runner_no == runner_cnt - 1: - runner_end = LATEST_VERSION + runner_end = latest_version + if runner_end is None: + raise Exception("Failed to query latest version from backup") print("runner start %d end %d" % (runner_start, runner_end)) + if start_version is not None and end_version is not None: + runner_start = start_version + runner_end = end_version + # run replay-verify in parallel N = 16 PER_PARTITION = (runner_end - runner_start) // N with Pool(N) as p: all_partitions = p.starmap( - replay_with_retry, + replay_verify_partition, [ ( n, @@ -226,17 +218,19 @@ def main(): TXNS_TO_SKIP, BACKUP_CONFIG_TEMPLATE_PATH, ) - for n in range(1, N) + for n in range(1, N + 1) ], ) print("[main process] finished") err = False - for partition_num, return_code in all_partitions: + for partition_num, return_code, msg in all_partitions: if return_code != 0: print("======== ERROR ========") - print(f"ERROR: partition {partition_num} failed (exit {return_code})") + print( + f"ERROR: partition {partition_num} failed with exit status {return_code}, {msg})" + ) err = True if err: @@ -245,4 +239,7 @@ def main(): if __name__ == "__main__": freeze_support() - main() + (runner_no, runner_cnt) = ( + (int(sys.argv[1]), int(sys.argv[2])) if len(sys.argv) > 2 else (None, None) + ) + main(runner_no, runner_cnt) diff --git a/testsuite/replay_verify_run_local.py b/testsuite/replay_verify_run_local.py index 6a337bd4e1fab..64aeee413b66c 100755 --- a/testsuite/replay_verify_run_local.py +++ b/testsuite/replay_verify_run_local.py @@ -18,11 +18,12 @@ def local_setup(): # Take these from the expected replay verify run envs = { "TIMEOUT_MINUTES": "5", - "BUCKET": "aptos-testnet-backup-2223d95b", + "BUCKET": "aptos-testnet-backup-b7b1ad7a", "SUB_DIR": "e1", "HISTORY_START": "350000000", - "TXNS_TO_SKIP": "46874937 151020059", - "BACKUP_CONFIG_TEMPLATE_PATH": "terraform/helm/fullnode/files/backup/s3-public.yaml", + "TXNS_TO_SKIP": "0", # 46874937 151020059 should be excluded + "BACKUP_CONFIG_TEMPLATE_PATH": "terraform/helm/fullnode/files/backup/gcs.yaml", + "REUSE_BACKUP_ARTIFACTS": "true", } # build backup tools @@ -44,4 +45,6 @@ def local_setup(): if __name__ == "__main__": local_setup() - replay_verify.main() + replay_verify.main( + runner_no=None, runner_cnt=None, start_version=40000000, end_version=200000000 + ) diff --git a/testsuite/smoke-test/src/storage.rs b/testsuite/smoke-test/src/storage.rs index 1b5c010e6361f..268e91fd004ce 100644 --- a/testsuite/smoke-test/src/storage.rs +++ b/testsuite/smoke-test/src/storage.rs @@ -241,8 +241,9 @@ fn replay_verify(backup_path: &Path, trusted_waypoints: &[Waypoint]) { .unwrap(); assert!( replay.status.success(), - "{}", - std::str::from_utf8(&replay.stderr).unwrap() + "{}, {}", + std::str::from_utf8(&replay.stderr).unwrap(), + std::str::from_utf8(&replay.stdout).unwrap(), ); info!(