diff --git a/core/src/banking_stage/transaction_scheduler/mod.rs b/core/src/banking_stage/transaction_scheduler/mod.rs index 65ece5fee6a8a1..5a3ab0c06ded5d 100644 --- a/core/src/banking_stage/transaction_scheduler/mod.rs +++ b/core/src/banking_stage/transaction_scheduler/mod.rs @@ -4,6 +4,7 @@ mod in_flight_tracker; pub(crate) mod prio_graph_scheduler; pub(crate) mod scheduler_controller; pub(crate) mod scheduler_error; +mod scheduler_metrics; mod thread_aware_account_locks; mod transaction_id_generator; mod transaction_priority_id; diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index a5c0fa134f5369..7d9a70931b4410 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -3,7 +3,9 @@ use { super::{ - prio_graph_scheduler::PrioGraphScheduler, scheduler_error::SchedulerError, + prio_graph_scheduler::PrioGraphScheduler, + scheduler_error::SchedulerError, + scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics}, transaction_id_generator::TransactionIdGenerator, transaction_state::SanitizedTransactionTTL, transaction_state_container::TransactionStateContainer, @@ -17,7 +19,6 @@ use { TOTAL_BUFFERED_PACKETS, }, crossbeam_channel::RecvTimeoutError, - itertools::MinMaxResult, solana_cost_model::cost_model::CostModel, solana_measure::measure_us, solana_program_runtime::compute_budget_processor::process_compute_budget_instructions, @@ -25,7 +26,7 @@ use { solana_sdk::{ clock::MAX_PROCESSING_AGE, feature_set::include_loaded_accounts_data_size_in_fee_calculation, fee::FeeBudgetLimits, - saturating_add_assign, timing::AtomicInterval, transaction::SanitizedTransaction, + saturating_add_assign, transaction::SanitizedTransaction, }, solana_svm::transaction_error_metrics::TransactionErrorMetrics, std::{ @@ -437,229 +438,6 @@ impl SchedulerController { } } -#[derive(Default)] -struct SchedulerCountMetrics { - interval: AtomicInterval, - - /// Number of packets received. - num_received: usize, - /// Number of packets buffered. - num_buffered: usize, - - /// Number of transactions scheduled. - num_scheduled: usize, - /// Number of transactions that were unschedulable. - num_unschedulable: usize, - /// Number of transactions that were filtered out during scheduling. - num_schedule_filtered_out: usize, - /// Number of completed transactions received from workers. - num_finished: usize, - /// Number of transactions that were retryable. - num_retryable: usize, - - /// Number of transactions that were immediately dropped on receive. - num_dropped_on_receive: usize, - /// Number of transactions that were dropped due to sanitization failure. - num_dropped_on_sanitization: usize, - /// Number of transactions that were dropped due to failed lock validation. - num_dropped_on_validate_locks: usize, - /// Number of transactions that were dropped due to failed transaction - /// checks during receive. - num_dropped_on_receive_transaction_checks: usize, - /// Number of transactions that were dropped due to clearing. - num_dropped_on_clear: usize, - /// Number of transactions that were dropped due to age and status checks. - num_dropped_on_age_and_status: usize, - /// Number of transactions that were dropped due to exceeded capacity. - num_dropped_on_capacity: usize, - /// Min prioritization fees in the transaction container - min_prioritization_fees: u64, - /// Max prioritization fees in the transaction container - max_prioritization_fees: u64, -} - -impl SchedulerCountMetrics { - fn maybe_report_and_reset(&mut self, should_report: bool) { - const REPORT_INTERVAL_MS: u64 = 1000; - if self.interval.should_update(REPORT_INTERVAL_MS) { - if should_report { - self.report(); - } - self.reset(); - } - } - - fn report(&self) { - datapoint_info!( - "banking_stage_scheduler_counts", - ("num_received", self.num_received, i64), - ("num_buffered", self.num_buffered, i64), - ("num_scheduled", self.num_scheduled, i64), - ("num_unschedulable", self.num_unschedulable, i64), - ( - "num_schedule_filtered_out", - self.num_schedule_filtered_out, - i64 - ), - ("num_finished", self.num_finished, i64), - ("num_retryable", self.num_retryable, i64), - ("num_dropped_on_receive", self.num_dropped_on_receive, i64), - ( - "num_dropped_on_sanitization", - self.num_dropped_on_sanitization, - i64 - ), - ( - "num_dropped_on_validate_locks", - self.num_dropped_on_validate_locks, - i64 - ), - ( - "num_dropped_on_receive_transaction_checks", - self.num_dropped_on_receive_transaction_checks, - i64 - ), - ("num_dropped_on_clear", self.num_dropped_on_clear, i64), - ( - "num_dropped_on_age_and_status", - self.num_dropped_on_age_and_status, - i64 - ), - ("num_dropped_on_capacity", self.num_dropped_on_capacity, i64), - ("min_priority", self.get_min_priority(), i64), - ("max_priority", self.get_max_priority(), i64) - ); - } - - fn has_data(&self) -> bool { - self.num_received != 0 - || self.num_buffered != 0 - || self.num_scheduled != 0 - || self.num_unschedulable != 0 - || self.num_schedule_filtered_out != 0 - || self.num_finished != 0 - || self.num_retryable != 0 - || self.num_dropped_on_receive != 0 - || self.num_dropped_on_sanitization != 0 - || self.num_dropped_on_validate_locks != 0 - || self.num_dropped_on_receive_transaction_checks != 0 - || self.num_dropped_on_clear != 0 - || self.num_dropped_on_age_and_status != 0 - || self.num_dropped_on_capacity != 0 - } - - fn reset(&mut self) { - self.num_received = 0; - self.num_buffered = 0; - self.num_scheduled = 0; - self.num_unschedulable = 0; - self.num_schedule_filtered_out = 0; - self.num_finished = 0; - self.num_retryable = 0; - self.num_dropped_on_receive = 0; - self.num_dropped_on_sanitization = 0; - self.num_dropped_on_validate_locks = 0; - self.num_dropped_on_receive_transaction_checks = 0; - self.num_dropped_on_clear = 0; - self.num_dropped_on_age_and_status = 0; - self.num_dropped_on_capacity = 0; - self.min_prioritization_fees = u64::MAX; - self.max_prioritization_fees = 0; - } - - pub fn update_priority_stats(&mut self, min_max_fees: MinMaxResult) { - // update min/max priority - match min_max_fees { - itertools::MinMaxResult::NoElements => { - // do nothing - } - itertools::MinMaxResult::OneElement(e) => { - self.min_prioritization_fees = e; - self.max_prioritization_fees = e; - } - itertools::MinMaxResult::MinMax(min, max) => { - self.min_prioritization_fees = min; - self.max_prioritization_fees = max; - } - } - } - - pub fn get_min_priority(&self) -> u64 { - // to avoid getting u64::max recorded by metrics / in case of edge cases - if self.min_prioritization_fees != u64::MAX { - self.min_prioritization_fees - } else { - 0 - } - } - - pub fn get_max_priority(&self) -> u64 { - self.max_prioritization_fees - } -} - -#[derive(Default)] -struct SchedulerTimingMetrics { - interval: AtomicInterval, - /// Time spent making processing decisions. - decision_time_us: u64, - /// Time spent receiving packets. - receive_time_us: u64, - /// Time spent buffering packets. - buffer_time_us: u64, - /// Time spent filtering transactions during scheduling. - schedule_filter_time_us: u64, - /// Time spent scheduling transactions. - schedule_time_us: u64, - /// Time spent clearing transactions from the container. - clear_time_us: u64, - /// Time spent cleaning expired or processed transactions from the container. - clean_time_us: u64, - /// Time spent receiving completed transactions. - receive_completed_time_us: u64, -} - -impl SchedulerTimingMetrics { - fn maybe_report_and_reset(&mut self, should_report: bool) { - const REPORT_INTERVAL_MS: u64 = 1000; - if self.interval.should_update(REPORT_INTERVAL_MS) { - if should_report { - self.report(); - } - self.reset(); - } - } - - fn report(&self) { - datapoint_info!( - "banking_stage_scheduler_timing", - ("decision_time_us", self.decision_time_us, i64), - ("receive_time_us", self.receive_time_us, i64), - ("buffer_time_us", self.buffer_time_us, i64), - ("schedule_filter_time_us", self.schedule_filter_time_us, i64), - ("schedule_time_us", self.schedule_time_us, i64), - ("clear_time_us", self.clear_time_us, i64), - ("clean_time_us", self.clean_time_us, i64), - ( - "receive_completed_time_us", - self.receive_completed_time_us, - i64 - ) - ); - } - - fn reset(&mut self) { - self.decision_time_us = 0; - self.receive_time_us = 0; - self.buffer_time_us = 0; - self.schedule_filter_time_us = 0; - self.schedule_time_us = 0; - self.clear_time_us = 0; - self.clean_time_us = 0; - self.receive_completed_time_us = 0; - } -} - #[cfg(test)] mod tests { use { diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_metrics.rs b/core/src/banking_stage/transaction_scheduler/scheduler_metrics.rs new file mode 100644 index 00000000000000..2ab86bd684e4b4 --- /dev/null +++ b/core/src/banking_stage/transaction_scheduler/scheduler_metrics.rs @@ -0,0 +1,224 @@ +use {itertools::MinMaxResult, solana_sdk::timing::AtomicInterval}; + +#[derive(Default)] +pub struct SchedulerCountMetrics { + interval: AtomicInterval, + + /// Number of packets received. + pub num_received: usize, + /// Number of packets buffered. + pub num_buffered: usize, + + /// Number of transactions scheduled. + pub num_scheduled: usize, + /// Number of transactions that were unschedulable. + pub num_unschedulable: usize, + /// Number of transactions that were filtered out during scheduling. + pub num_schedule_filtered_out: usize, + /// Number of completed transactions received from workers. + pub num_finished: usize, + /// Number of transactions that were retryable. + pub num_retryable: usize, + + /// Number of transactions that were immediately dropped on receive. + pub num_dropped_on_receive: usize, + /// Number of transactions that were dropped due to sanitization failure. + pub num_dropped_on_sanitization: usize, + /// Number of transactions that were dropped due to failed lock validation. + pub num_dropped_on_validate_locks: usize, + /// Number of transactions that were dropped due to failed transaction + /// checks during receive. + pub num_dropped_on_receive_transaction_checks: usize, + /// Number of transactions that were dropped due to clearing. + pub num_dropped_on_clear: usize, + /// Number of transactions that were dropped due to age and status checks. + pub num_dropped_on_age_and_status: usize, + /// Number of transactions that were dropped due to exceeded capacity. + pub num_dropped_on_capacity: usize, + /// Min prioritization fees in the transaction container + pub min_prioritization_fees: u64, + /// Max prioritization fees in the transaction container + pub max_prioritization_fees: u64, +} + +impl SchedulerCountMetrics { + pub fn maybe_report_and_reset(&mut self, should_report: bool) { + const REPORT_INTERVAL_MS: u64 = 1000; + if self.interval.should_update(REPORT_INTERVAL_MS) { + if should_report { + self.report(); + } + self.reset(); + } + } + + fn report(&self) { + datapoint_info!( + "banking_stage_scheduler_counts", + ("num_received", self.num_received, i64), + ("num_buffered", self.num_buffered, i64), + ("num_scheduled", self.num_scheduled, i64), + ("num_unschedulable", self.num_unschedulable, i64), + ( + "num_schedule_filtered_out", + self.num_schedule_filtered_out, + i64 + ), + ("num_finished", self.num_finished, i64), + ("num_retryable", self.num_retryable, i64), + ("num_dropped_on_receive", self.num_dropped_on_receive, i64), + ( + "num_dropped_on_sanitization", + self.num_dropped_on_sanitization, + i64 + ), + ( + "num_dropped_on_validate_locks", + self.num_dropped_on_validate_locks, + i64 + ), + ( + "num_dropped_on_receive_transaction_checks", + self.num_dropped_on_receive_transaction_checks, + i64 + ), + ("num_dropped_on_clear", self.num_dropped_on_clear, i64), + ( + "num_dropped_on_age_and_status", + self.num_dropped_on_age_and_status, + i64 + ), + ("num_dropped_on_capacity", self.num_dropped_on_capacity, i64), + ("min_priority", self.get_min_priority(), i64), + ("max_priority", self.get_max_priority(), i64) + ); + } + + pub fn has_data(&self) -> bool { + self.num_received != 0 + || self.num_buffered != 0 + || self.num_scheduled != 0 + || self.num_unschedulable != 0 + || self.num_schedule_filtered_out != 0 + || self.num_finished != 0 + || self.num_retryable != 0 + || self.num_dropped_on_receive != 0 + || self.num_dropped_on_sanitization != 0 + || self.num_dropped_on_validate_locks != 0 + || self.num_dropped_on_receive_transaction_checks != 0 + || self.num_dropped_on_clear != 0 + || self.num_dropped_on_age_and_status != 0 + || self.num_dropped_on_capacity != 0 + } + + fn reset(&mut self) { + self.num_received = 0; + self.num_buffered = 0; + self.num_scheduled = 0; + self.num_unschedulable = 0; + self.num_schedule_filtered_out = 0; + self.num_finished = 0; + self.num_retryable = 0; + self.num_dropped_on_receive = 0; + self.num_dropped_on_sanitization = 0; + self.num_dropped_on_validate_locks = 0; + self.num_dropped_on_receive_transaction_checks = 0; + self.num_dropped_on_clear = 0; + self.num_dropped_on_age_and_status = 0; + self.num_dropped_on_capacity = 0; + self.min_prioritization_fees = u64::MAX; + self.max_prioritization_fees = 0; + } + + pub fn update_priority_stats(&mut self, min_max_fees: MinMaxResult) { + // update min/max priority + match min_max_fees { + itertools::MinMaxResult::NoElements => { + // do nothing + } + itertools::MinMaxResult::OneElement(e) => { + self.min_prioritization_fees = e; + self.max_prioritization_fees = e; + } + itertools::MinMaxResult::MinMax(min, max) => { + self.min_prioritization_fees = min; + self.max_prioritization_fees = max; + } + } + } + + pub fn get_min_priority(&self) -> u64 { + // to avoid getting u64::max recorded by metrics / in case of edge cases + if self.min_prioritization_fees != u64::MAX { + self.min_prioritization_fees + } else { + 0 + } + } + + pub fn get_max_priority(&self) -> u64 { + self.max_prioritization_fees + } +} + +#[derive(Default)] +pub struct SchedulerTimingMetrics { + interval: AtomicInterval, + /// Time spent making processing decisions. + pub decision_time_us: u64, + /// Time spent receiving packets. + pub receive_time_us: u64, + /// Time spent buffering packets. + pub buffer_time_us: u64, + /// Time spent filtering transactions during scheduling. + pub schedule_filter_time_us: u64, + /// Time spent scheduling transactions. + pub schedule_time_us: u64, + /// Time spent clearing transactions from the container. + pub clear_time_us: u64, + /// Time spent cleaning expired or processed transactions from the container. + pub clean_time_us: u64, + /// Time spent receiving completed transactions. + pub receive_completed_time_us: u64, +} + +impl SchedulerTimingMetrics { + pub fn maybe_report_and_reset(&mut self, should_report: bool) { + const REPORT_INTERVAL_MS: u64 = 1000; + if self.interval.should_update(REPORT_INTERVAL_MS) { + if should_report { + self.report(); + } + self.reset(); + } + } + + fn report(&self) { + datapoint_info!( + "banking_stage_scheduler_timing", + ("decision_time_us", self.decision_time_us, i64), + ("receive_time_us", self.receive_time_us, i64), + ("buffer_time_us", self.buffer_time_us, i64), + ("schedule_filter_time_us", self.schedule_filter_time_us, i64), + ("schedule_time_us", self.schedule_time_us, i64), + ("clear_time_us", self.clear_time_us, i64), + ("clean_time_us", self.clean_time_us, i64), + ( + "receive_completed_time_us", + self.receive_completed_time_us, + i64 + ) + ); + } + + fn reset(&mut self) { + self.decision_time_us = 0; + self.receive_time_us = 0; + self.buffer_time_us = 0; + self.schedule_filter_time_us = 0; + self.schedule_time_us = 0; + self.clear_time_us = 0; + self.clean_time_us = 0; + self.receive_completed_time_us = 0; + } +}