diff --git a/Cargo.lock b/Cargo.lock index dd74ea32642cf1..118962b386ad05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6362,6 +6362,7 @@ dependencies = [ "csv", "dashmap", "futures 0.3.29", + "hex", "histogram", "itertools", "log", diff --git a/accounts-db/src/transaction_results.rs b/accounts-db/src/transaction_results.rs index 46e59cd4c60b51..7d426b5bbdd7ea 100644 --- a/accounts-db/src/transaction_results.rs +++ b/accounts-db/src/transaction_results.rs @@ -68,7 +68,7 @@ impl TransactionExecutionResult { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct TransactionExecutionDetails { pub status: transaction::Result<()>, pub log_messages: Option>, @@ -81,7 +81,7 @@ pub struct TransactionExecutionDetails { pub accounts_data_len_delta: i64, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum DurableNonceFee { Valid(u64), Invalid, @@ -109,7 +109,7 @@ impl DurableNonceFee { /// transaction instruction pub type InnerInstructions = Vec; -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct InnerInstruction { pub instruction: CompiledInstruction, /// Invocation stack height of this instruction. Instruction stack height diff --git a/core/src/validator.rs b/core/src/validator.rs index 33e4aaf2a862a3..75e86e61f78738 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -2168,6 +2168,7 @@ fn initialize_rpc_transaction_history_services( let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root())); let (transaction_status_sender, transaction_status_receiver) = unbounded(); let transaction_status_sender = Some(TransactionStatusSender { + bank_hash: false, sender: transaction_status_sender, }); let transaction_status_service = Some(TransactionStatusService::new( diff --git a/ledger-tool/Cargo.toml b/ledger-tool/Cargo.toml index c64dfa07e91a91..cb2f0ed04840e6 100644 --- a/ledger-tool/Cargo.toml +++ b/ledger-tool/Cargo.toml @@ -18,6 +18,7 @@ csv = { workspace = true } dashmap = { workspace = true } futures = { workspace = true } histogram = { workspace = true } +hex = { workspace = true } itertools = { workspace = true } log = { workspace = true } num_cpus = { workspace = true } diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs index 292aee2e1ee391..648dd53cb8cd0a 100644 --- a/ledger-tool/src/ledger_utils.rs +++ b/ledger-tool/src/ledger_utils.rs @@ -68,6 +68,7 @@ pub fn load_and_process_ledger( blockstore: Arc, process_options: ProcessOptions, snapshot_archive_path: Option, + transaction_status_sender: Option, incremental_snapshot_archive_path: Option, ) -> Result<(Arc>, Option), String> { let bank_snapshots_dir = if blockstore.is_primary_access() { @@ -326,12 +327,13 @@ pub fn load_and_process_ledger( ); ( Some(TransactionStatusSender { + bank_hash: false, sender: transaction_status_sender, }), Some(transaction_status_service), ) } else { - (None, None) + (transaction_status_sender, None) }; let result = blockstore_processor::process_blockstore_from_root( @@ -352,6 +354,8 @@ pub fn load_and_process_ledger( accounts_hash_verifier.join().unwrap(); if let Some(service) = transaction_status_service { service.join().unwrap(); + } else if let Some(transaction_status_sender) = transaction_status_sender { + drop(transaction_status_sender.sender); } result diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index ac8404edd53dbb..3a91eb2b62aa4b 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -19,6 +19,7 @@ use { solana_accounts_db::{ accounts::Accounts, accounts_db::CalcAccountsHashDataSource, accounts_index::ScanConfig, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, + transaction_results::TransactionExecutionDetails, }, solana_clap_utils::{ hidden_unless_forced, @@ -44,7 +45,7 @@ use { AccessType, BlockstoreRecoveryMode, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_FIFO, }, - blockstore_processor::ProcessOptions, + blockstore_processor::{ProcessOptions, TransactionStatusSender}, shred::Shred, use_snapshot_archives_at_startup::{self, UseSnapshotArchivesAtStartup}, }, @@ -1068,6 +1069,12 @@ fn get_access_type(process_options: &ProcessOptions) -> AccessType { #[cfg(not(target_env = "msvc"))] use jemallocator::Jemalloc; +use solana_ledger::blockstore_processor::TransactionStatusMessage; +use { + serde::Deserialize, + solana_ledger::blockstore_processor::ProcessSlotCallback, + std::{collections::VecDeque, sync::Mutex}, +}; #[cfg(not(target_env = "msvc"))] #[global_allocator] @@ -1716,7 +1723,22 @@ fn main() { information that went into computing the completed bank's bank hash. \ The file will be written within /bank_hash_details/", ), - ), + ) + .arg( + Arg::with_name("verify_slots") + .long("verify-slots") + .takes_value(true) + .value_name("FILENAME") + .help("Record slots to new file or verify slots match contents of existing file.") + ) + .arg( + Arg::with_name("verify_slots_details") + .long("verify-slots-details") + .possible_values(&["none", "bank", "tx", "bank-tx"]) + .default_value("none") + .takes_value(true) + .help("In the slot recording, include bank and tx details or not") + ) ) .subcommand( SubCommand::with_name("graph") @@ -2431,6 +2453,7 @@ fn main() { Arc::new(blockstore), process_options, snapshot_archive_path, + None, incremental_snapshot_archive_path, ) { Ok((bank_forks, ..)) => { @@ -2517,6 +2540,7 @@ fn main() { Arc::new(blockstore), process_options, snapshot_archive_path, + None, incremental_snapshot_archive_path, ) { Ok((bank_forks, ..)) => { @@ -2722,6 +2746,127 @@ fn main() { ); } + let (include_bank, include_tx) = + match arg_matches.value_of("verify_slots_details").unwrap() { + "none" => (false, false), + "bank" => (true, false), + "tx" => (false, true), + "bank-tx" => (true, true), + _ => unreachable!(), + }; + + #[derive(Serialize, Deserialize)] + struct TransactionBatch { + bank_hash: String, + transactions: Vec, + } + + #[derive(Serialize, Deserialize)] + struct Transaction { + accounts: Vec, + instructions: Vec, + is_simple_vote_tx: bool, + execution_results: Option, + } + + #[derive(Serialize, Deserialize)] + struct Instruction { + program_id: String, + accounts: Vec, + data: String, + } + + #[derive(Serialize, Deserialize)] + struct Slot { + slot: Slot, + hash: String, + #[serde(skip_serializing_if = "Option::is_none")] + details: Option, + #[serde(skip_serializing_if = "Vec::is_empty")] + transactions: Vec, + } + + let (slot_callback, record_slots_file, recorded_slots) = if let Some(filename) = + arg_matches.value_of_os("verify_slots") + { + let filename = Path::new(filename); + + if filename.exists() { + let file = File::open(filename).unwrap_or_else(|err| { + eprintln!("Unable to read file: {}: {err:#}", filename.display()); + exit(1); + }); + + let slots: Arc>> = Arc::new(Mutex::new( + serde_json::from_reader(file).unwrap_or_else(|err| { + eprintln!("Error loading slots file: {err:#}"); + exit(1); + }), + )); + + let slot_callback = Arc::new(move |bank: &Bank| { + let Slot { + slot: expected_slot, + hash: expected_hash, + .. + } = slots.lock().unwrap().pop_front().unwrap(); + if bank.slot() != expected_slot + || bank.hash().to_string() != expected_hash + { + error!("Expected slot: {expected_slot} hash: {expected_hash} got slot: {} hash: {}", + bank.slot(), bank.hash()); + } + }); + + (Some(slot_callback as ProcessSlotCallback), None, None) + } else { + let file = File::create(filename).unwrap_or_else(|err| { + eprintln!("Unable to write to file: {}: {:#}", filename.display(), err); + exit(1); + }); + + let slot_hashes = Arc::new(Mutex::new(Vec::new())); + + let slot_callback = Arc::new({ + let slot_hashes = Arc::clone(&slot_hashes); + move |bank: &Bank| { + let details = include_bank.then_some( + bank_hash_details::BankHashDetails::try_from(bank).unwrap(), + ); + + slot_hashes.lock().unwrap().push(Slot { + slot: bank.slot(), + hash: bank.hash().to_string(), + details, + transactions: Vec::new(), + }); + } + }); + + ( + Some(slot_callback as ProcessSlotCallback), + Some(file), + Some(slot_hashes), + ) + } + } else { + (None, None, None) + }; + + let (sender, receiver) = if include_tx { + let (sender, receiver) = crossbeam_channel::unbounded(); + + ( + Some(TransactionStatusSender { + bank_hash: true, + sender, + }), + Some(receiver), + ) + } else { + (None, None) + }; + let process_options = ProcessOptions { new_hard_forks: hardforks_of(arg_matches, "hard_forks"), run_verification: !(arg_matches.is_present("skip_poh_verify") @@ -2749,6 +2894,7 @@ fn main() { use_snapshot_archives_at_startup::cli::NAME, UseSnapshotArchivesAtStartup ), + slot_callback, ..ProcessOptions::default() }; let print_accounts_stats = arg_matches.is_present("print_accounts_stats"); @@ -2763,12 +2909,14 @@ fn main() { force_update_to_open, enforce_ulimit_nofile, ); + let (bank_forks, ..) = load_and_process_ledger( arg_matches, &genesis_config, Arc::new(blockstore), process_options, snapshot_archive_path, + sender, incremental_snapshot_archive_path, ) .unwrap_or_else(|err| { @@ -2787,6 +2935,93 @@ fn main() { }) .ok(); } + + if let Some(recorded_slots_file) = record_slots_file { + if let Ok(mut recorded_slots) = recorded_slots.clone().unwrap().lock() { + if let Some(recv) = receiver { + for tsm in recv { + if let TransactionStatusMessage::Batch(batch) = tsm { + let slot = batch.bank.slot(); + + assert_eq!( + batch.transactions.len(), + batch.execution_results.len() + ); + + let transactions = batch + .transactions + .iter() + .enumerate() + .map(|(no, tx)| { + let message = tx.message(); + + let accounts: Vec = message + .account_keys() + .iter() + .map(|acc| acc.to_string()) + .collect(); + + let instructions = message + .instructions() + .iter() + .map(|ix| { + let program_id = accounts + [ix.program_id_index as usize] + .clone(); + + let accounts = ix + .accounts + .iter() + .map(|idx| accounts[*idx as usize].clone()) + .collect(); + + let data = hex::encode(&ix.data); + + Instruction { + program_id, + accounts, + data, + } + }) + .collect(); + + let execution_results = + batch.execution_results[no].clone(); + + let is_simple_vote_tx = tx.is_simple_vote_transaction(); + + Transaction { + accounts, + instructions, + is_simple_vote_tx, + execution_results, + } + }) + .collect(); + + let batch = TransactionBatch { + bank_hash: batch.bank_hash.unwrap().to_string(), + transactions, + }; + + if let Some(recorded_slot) = + recorded_slots.iter_mut().find(|f| f.slot == slot) + { + recorded_slot.transactions.push(batch); + } + } + } + } + } + + // writing the json file ends up with a syscall for each number, comma, indentation etc. + // use BufWriter to speed things up + + let writer = std::io::BufWriter::new(recorded_slots_file); + + serde_json::to_writer_pretty(writer, &recorded_slots.unwrap()).unwrap(); + } + exit_signal.store(true, Ordering::Relaxed); system_monitor_service.join().unwrap(); } @@ -2827,6 +3062,7 @@ fn main() { Arc::new(blockstore), process_options, snapshot_archive_path, + None, incremental_snapshot_archive_path, ) { Ok((bank_forks, ..)) => { @@ -3013,6 +3249,7 @@ fn main() { blockstore.clone(), process_options, snapshot_archive_path, + None, incremental_snapshot_archive_path, ) { Ok((bank_forks, starting_snapshot_hashes)) => { @@ -3408,6 +3645,7 @@ fn main() { Arc::new(blockstore), process_options, snapshot_archive_path, + None, incremental_snapshot_archive_path, ) .unwrap_or_else(|err| { @@ -3502,6 +3740,7 @@ fn main() { Arc::new(blockstore), process_options, snapshot_archive_path, + None, incremental_snapshot_archive_path, ) { Ok((bank_forks, ..)) => { diff --git a/ledger-tool/src/program.rs b/ledger-tool/src/program.rs index 616dcabbc431f5..08ce8096fadf37 100644 --- a/ledger-tool/src/program.rs +++ b/ledger-tool/src/program.rs @@ -125,6 +125,7 @@ fn load_blockstore(ledger_path: &Path, arg_matches: &ArgMatches<'_>) -> Arc; +/// Callback for accessing bank state after each slot is confirmed while +/// processing the blockstore +pub type ProcessSlotCallback = Arc; #[derive(Default, Clone)] pub struct ProcessOptions { @@ -683,6 +684,7 @@ pub struct ProcessOptions { pub run_verification: bool, pub full_leader_cache: bool, pub halt_at_slot: Option, + pub slot_callback: Option, pub new_hard_forks: Option>, pub debug_keys: Option>>, pub account_indexes: AccountSecondaryIndexes, @@ -1808,6 +1810,11 @@ fn process_single_slot( result? } bank.freeze(); // all banks handled by this routine are created from complete slots + + if let Some(slot_callback) = &opts.slot_callback { + slot_callback(bank); + } + if blockstore.is_primary_access() { blockstore.insert_bank_hash(bank.slot(), bank.hash(), false); } @@ -1830,10 +1837,12 @@ pub struct TransactionStatusBatch { pub token_balances: TransactionTokenBalancesSet, pub rent_debits: Vec, pub transaction_indexes: Vec, + pub bank_hash: Option, } #[derive(Clone)] pub struct TransactionStatusSender { + pub bank_hash: bool, pub sender: Sender, } @@ -1850,6 +1859,8 @@ impl TransactionStatusSender { ) { let slot = bank.slot(); + let hash = self.bank_hash.then(|| bank.hash_internal_state()); + if let Err(e) = self .sender .send(TransactionStatusMessage::Batch(TransactionStatusBatch { @@ -1866,6 +1877,7 @@ impl TransactionStatusSender { token_balances, rent_debits, transaction_indexes, + bank_hash: hash, })) { trace!( @@ -4404,6 +4416,7 @@ pub mod tests { let (transaction_status_sender, transaction_status_receiver) = crossbeam_channel::unbounded(); let transaction_status_sender = TransactionStatusSender { + bank_hash: false, sender: transaction_status_sender, }; diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 0a92a4d031e9ef..7a23b563240188 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -4604,6 +4604,7 @@ pub fn populate_blockstore_for_tests( entries, Some( &solana_ledger::blockstore_processor::TransactionStatusSender { + bank_hash: false, sender: transaction_status_sender, }, ), diff --git a/rpc/src/transaction_status_service.rs b/rpc/src/transaction_status_service.rs index d36efbc6c4fb7f..f9b87de50a2419 100644 --- a/rpc/src/transaction_status_service.rs +++ b/rpc/src/transaction_status_service.rs @@ -73,6 +73,7 @@ impl TransactionStatusService { token_balances, rent_debits, transaction_indexes, + .. }) => { let slot = bank.slot(); for ( diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index e21caddcd5a764..ded76d2865e4dd 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -6956,7 +6956,7 @@ impl Bank { /// Hash the `accounts` HashMap. This represents a validator's interpretation /// of the delta of the ledger since the last vote and up to now - fn hash_internal_state(&self) -> Hash { + pub fn hash_internal_state(&self) -> Hash { let slot = self.slot(); let ignore = (!self.is_partitioned_rewards_feature_enabled() && (self diff --git a/runtime/src/bank/bank_hash_details.rs b/runtime/src/bank/bank_hash_details.rs index 6b40e7aef6e4ba..443ec896322fac 100644 --- a/runtime/src/bank/bank_hash_details.rs +++ b/runtime/src/bank/bank_hash_details.rs @@ -22,7 +22,7 @@ use { }; #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] -pub(crate) struct BankHashDetails { +pub struct BankHashDetails { /// client version pub version: String, pub account_data_encoding: String, @@ -101,7 +101,7 @@ impl TryFrom<&Bank> for BankHashDetails { // Wrap the Vec<...> so we can implement custom Serialize/Deserialize traits on the wrapper type #[derive(Clone, Debug, Eq, PartialEq)] -pub(crate) struct BankHashAccounts { +pub struct BankHashAccounts { pub accounts: Vec, }