From 3b17a169800fad6e76cc4290ca7bc3b03ba15e8b Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Tue, 3 Dec 2024 19:17:33 +0100 Subject: [PATCH] dropping transaction - size limit is properly obeyed now --- .../fork_aware_txpool/fork_aware_txpool.rs | 63 ++++---- .../src/fork_aware_txpool/tx_mem_pool.rs | 139 ++++++++++++++++-- .../src/fork_aware_txpool/view_store.rs | 5 +- .../transaction-pool/src/graph/tracked_map.rs | 10 ++ .../src/graph/validated_pool.rs | 24 +-- .../transaction-pool/tests/fatp_prios.rs | 16 +- 6 files changed, 193 insertions(+), 64 deletions(-) diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index f25f837d31ef..6b8a6be22458 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -643,7 +643,7 @@ where /// are reduced to single result. Refer to `reduce_multiview_result` for more details. async fn submit_at( &self, - _: ::Hash, + at: ::Hash, source: TransactionSource, xts: Vec>, ) -> Result, Self::Error>>, Self::Error> { @@ -657,6 +657,21 @@ where return Ok(mempool_results.into_iter().map(|r| r.map(|r| r.hash)).collect::>()) } + //todo: review + test maybe? + let retries = mempool_results + .into_iter() + .zip(xts.clone()) + .map(|(result, xt)| async move { + match result { + Err(TxPoolApiError::ImmediatelyDropped) => + self.attempt_transaction_replacement(at, source, false, xt).await, + result @ _ => result, + } + }) + .collect::>(); + + let mempool_results = futures::future::join_all(retries).await; + let to_be_submitted = mempool_results .iter() .zip(xts) @@ -721,7 +736,7 @@ where log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_and_watch views:{}", self.tx_hash(&xt), self.active_views_count()); let xt = Arc::from(xt); - let InsertionInfo { hash: xt_hash, source: timed_source } = + let InsertionInfo { hash: xt_hash, source: timed_source, .. } = match self.mempool.push_watched(source, xt.clone()) { Ok(result) => result, Err(TxPoolApiError::ImmediatelyDropped) => @@ -1333,33 +1348,31 @@ where ) .await; - // 3. check if most_recent_view contains a transaction with lower priority (actually worse - - // do we want to check timestamp too - no: see #4609?) - // Would be perfect to choose transaction with lowest the number of dependant txs in its - // subtree. - if let Some(worst_tx_hash) = - best_view.pool.validated_pool().find_transaction_with_lower_prio(validated_tx) - { + if let Some(priority) = validated_tx.priority() { + // 3. check if mempool contains a transaction with lower priority (actually worse - do + // we want to check timestamp too - no: see #4609?) Would be perfect to choose + // transaction with lowest the number of dependant txs in its subtree. // 4. if yes - remove worse transaction from mempool and add new one. - log::trace!(target: LOG_TARGET, "found candidate for removal: {worst_tx_hash:?} replaced by {tx_hash:?}"); let insertion_info = - self.mempool.try_replace_transaction(xt, source, watched, worst_tx_hash)?; - - // 5. notify listner - self.view_store - .listener - .transaction_dropped(DroppedTransaction::new_enforced_by_limts(worst_tx_hash)); - - // 6. remove transaction from the view_store - self.view_store.remove_transaction_subtree( - worst_tx_hash, - |listener, removed_tx_hash| { - listener.limits_enforced(&removed_tx_hash); - }, - ); + self.mempool.try_replace_transaction(xt, priority, source, watched)?; + + for worst_hash in &insertion_info.removed { + log::trace!(target: LOG_TARGET, "removed: {worst_hash:?} replaced by {tx_hash:?}"); + // 5. notify listner + self.view_store + .listener + .transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash)); + + // 6. remove transaction from the view_store + self.view_store.remove_transaction_subtree( + *worst_hash, + |listener, removed_tx_hash| { + listener.limits_enforced(&removed_tx_hash); + }, + ); + } // 8. add to pending_replacements - make sure it will not sneak back via cloned view - // 9. subemit new one to the view, this will be done upon in the caller return Ok(insertion_info) } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs index 96a1a80fe761..c851af111e32 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs @@ -38,15 +38,19 @@ use crate::{ }; use futures::FutureExt; use itertools::Itertools; -use sc_transaction_pool_api::TransactionSource; +use sc_transaction_pool_api::{TransactionPriority, TransactionSource}; use sp_blockchain::HashAndNumber; use sp_runtime::{ traits::Block as BlockT, transaction_validity::{InvalidTransaction, TransactionValidityError}, }; use std::{ + cmp::Ordering, collections::HashMap, - sync::{atomic, atomic::AtomicU64, Arc}, + sync::{ + atomic::{self, AtomicU64}, + Arc, + }, time::Instant, }; @@ -80,6 +84,9 @@ where source: TimedTransactionSource, /// When the transaction was revalidated, used to periodically revalidate the mem pool buffer. validated_at: AtomicU64, + /// Priority of transaction at some block. It is assumed it will not be changed often. + //todo: Option is needed here. This means lock. So maybe +AtomicBool? + priority: AtomicU64, //todo: we need to add future / ready status at finalized block. //If future transactions are stuck in tx_mem_pool (due to limits being hit), we need a means // to replace them somehow with newly coming transactions. @@ -112,12 +119,23 @@ where Self::new(true, source, tx, bytes) } - /// Creates a new instance of wrapper for a transaction. + /// Creates a new instance of wrapper for a transaction with no priority. fn new( watched: bool, source: TransactionSource, tx: ExtrinsicFor, bytes: usize, + ) -> Self { + Self::new_with_priority(watched, source, tx, bytes, 0) + } + + /// Creates a new instance of wrapper for a transaction with given priority. + fn new_with_priority( + watched: bool, + source: TransactionSource, + tx: ExtrinsicFor, + bytes: usize, + priority: TransactionPriority, ) -> Self { Self { watched, @@ -125,6 +143,7 @@ where source: TimedTransactionSource::from_transaction_source(source, true), validated_at: AtomicU64::new(0), bytes, + priority: AtomicU64::new(priority), } } @@ -139,6 +158,11 @@ where pub(crate) fn source(&self) -> TimedTransactionSource { self.source.clone() } + + /// Returns the priority of the transaction. + pub(crate) fn priority(&self) -> TransactionPriority { + self.priority.load(atomic::Ordering::Relaxed) + } } impl Size for Arc> @@ -198,11 +222,15 @@ where pub(super) struct InsertionInfo { pub(super) hash: Hash, pub(super) source: TimedTransactionSource, + pub(super) removed: Vec, } impl InsertionInfo { fn new(hash: Hash, source: TimedTransactionSource) -> Self { - Self { hash, source } + Self::new_with_removed(hash, source, Default::default()) + } + fn new_with_removed(hash: Hash, source: TimedTransactionSource, removed: Vec) -> Self { + Self { hash, source, removed } } } @@ -286,14 +314,9 @@ where &self, hash: ExtrinsicHash, tx: TxInMemPool, - tx_to_be_removed: Option>, ) -> Result>, sc_transaction_pool_api::error::Error> { let mut transactions = self.transactions.write(); - tx_to_be_removed.inspect(|hash| { - transactions.remove(hash); - }); - let bytes = self.transactions.bytes(); let result = match ( @@ -314,6 +337,85 @@ where result } + /// Attempts to insert a new transaction in the memory pool and drop some worse existing + /// transactions. + /// + /// This operation will not overflow the limit of the mempool. It means that cumulative + /// size of removed transactions will be equal (or greated) then size of newly inserted + /// transaction. + /// + /// Returns a `Result` containing `InsertionInfo` if the new transaction is successfully + /// inserted; otherwise, returns an appropriate error indicating the failure. + pub(super) fn try_insert_with_dropping( + &self, + hash: ExtrinsicHash, + new_tx: TxInMemPool, + ) -> Result>, sc_transaction_pool_api::error::Error> { + let mut transactions = self.transactions.write(); + + if transactions.contains_key(&hash) { + return Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash))); + } + + let mut sorted = + transactions.iter().map(|(h, v)| (h.clone(), v.clone())).collect::>(); + + // When pushing higher prio transaction, we need to find a number of lower prio txs, such + // that the sum of their bytes is ge then size of new tx. Otherwise we could overflow size + // limits. Naive way to do it - rev-sort by priority and eat the tail. + + // reverse (lowest prio last) + sorted.sort_by(|(_, a), (_, b)| match b.priority().cmp(&a.priority()) { + Ordering::Equal => match (a.source.timestamp, b.source.timestamp) { + (Some(a), Some(b)) => b.cmp(&a), + _ => Ordering::Equal, + }, + ordering @ _ => ordering, + }); + + let required_size = new_tx.bytes; + let mut total_size = 0usize; + let mut to_be_removed = vec![]; + + while total_size < required_size { + let Some((worst_hash, worst_tx)) = sorted.pop() else { + return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped); + }; + + if worst_tx.priority() >= new_tx.priority() { + return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped); + } + + total_size += worst_tx.bytes; + to_be_removed.push(worst_hash); + } + + let source = new_tx.source(); + transactions.insert(hash, Arc::from(new_tx)); + for worst_hash in &to_be_removed { + transactions.remove(worst_hash); + } + debug_assert!(!self.is_limit_exceeded(transactions.len(), self.transactions.bytes())); + + Ok(InsertionInfo::new_with_removed(hash, source, to_be_removed)) + + // let result = match ( + // self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes), + // transactions.contains_key(&hash), + // ) { + // (false, false) => { + // let source = tx.source(); + // transactions.insert(hash, Arc::from(tx)); + // Ok(InsertionInfo::new(hash, source)) + // }, + // (_, true) => + // Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash))), + // (true, _) => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped), + // }; + // log::trace!(target: LOG_TARGET, "[{:?}] mempool::try_insert: {:?}", hash, + // result.as_ref().map(|r| r.hash)); + } + /// Attempts to replace an existing transaction in the memory pool with a new one. /// /// Returns a `Result` containing `InsertionInfo` if the new transaction is successfully @@ -321,15 +423,14 @@ where pub(super) fn try_replace_transaction( &self, new_xt: ExtrinsicFor, + priority: TransactionPriority, source: TransactionSource, watched: bool, - replaced_tx_hash: ExtrinsicHash, ) -> Result>, sc_transaction_pool_api::error::Error> { let (hash, length) = self.api.hash_and_length(&new_xt); - self.try_insert( + self.try_insert_with_dropping( hash, - TxInMemPool::new(watched, source, new_xt, length), - Some(replaced_tx_hash), + TxInMemPool::new_with_priority(watched, source, new_xt, length, priority), ) } @@ -347,7 +448,7 @@ where .iter() .map(|xt| { let (hash, length) = self.api.hash_and_length(&xt); - self.try_insert(hash, TxInMemPool::new_unwatched(source, xt.clone(), length), None) + self.try_insert(hash, TxInMemPool::new_unwatched(source, xt.clone(), length)) }) .collect::>(); result @@ -361,7 +462,7 @@ where xt: ExtrinsicFor, ) -> Result>, sc_transaction_pool_api::error::Error> { let (hash, length) = self.api.hash_and_length(&xt); - self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length), None) + self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length)) } /// Clones and returns a `HashMap` of references to all unwatched transactions in the memory @@ -493,7 +594,11 @@ where } pub(super) fn update_transaction(&self, outcome: &ViewStoreSubmitOutcome) { - // todo!() + if let Some(priority) = outcome.priority() { + if let Some(tx) = self.transactions.write().get_mut(&outcome.hash()) { + tx.priority.store(priority, atomic::Ordering::Relaxed); + } + } } } @@ -650,4 +755,6 @@ mod tx_mem_pool_tests { sc_transaction_pool_api::error::Error::ImmediatelyDropped )); } + + //add some test for try_insert_with_dropping } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs index 4ae54e99264c..89bf47cc3895 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs @@ -262,7 +262,7 @@ where log::trace!(target: LOG_TARGET, "[{:?}] submit_local: err: {}", tx_hash, err); Err(err) }, - None => Ok(ViewStoreSubmitOutcome::new_no_priority(tx_hash)), + None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None)), Some(Ok(r)) => Ok(r.into()), } } @@ -320,8 +320,7 @@ where }, Some(Ok(result)) => Ok(ViewStoreSubmitOutcome::from(result).with_watcher(external_watcher)), - None => - Ok(ViewStoreSubmitOutcome::new_no_priority(tx_hash).with_watcher(external_watcher)), + None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None).with_watcher(external_watcher)), } } diff --git a/substrate/client/transaction-pool/src/graph/tracked_map.rs b/substrate/client/transaction-pool/src/graph/tracked_map.rs index 6c3bbbf34b55..818c1ebe544f 100644 --- a/substrate/client/transaction-pool/src/graph/tracked_map.rs +++ b/substrate/client/transaction-pool/src/graph/tracked_map.rs @@ -173,6 +173,16 @@ where pub fn len(&mut self) -> usize { self.inner_guard.len() } + + /// Returns an iterator over all values. + pub fn values(&self) -> std::collections::hash_map::Values { + self.inner_guard.values() + } + + /// Returns an iterator over all key-value pairs. + pub fn iter(&self) -> Iter<'_, K, V> { + self.inner_guard.iter() + } } #[cfg(test)] diff --git a/substrate/client/transaction-pool/src/graph/validated_pool.rs b/substrate/client/transaction-pool/src/graph/validated_pool.rs index 720a80698538..0c0317750f85 100644 --- a/substrate/client/transaction-pool/src/graph/validated_pool.rs +++ b/substrate/client/transaction-pool/src/graph/validated_pool.rs @@ -76,6 +76,14 @@ impl ValidatedTransaction { valid_till: at.saturated_into::().saturating_add(validity.longevity), }) } + + /// Returns priority for valid transaction, None if transaction is not valid. + pub fn priority(&self) -> Option { + match self { + ValidatedTransaction::Valid(base::Transaction { priority, .. }) => Some(*priority), + _ => None, + } + } } /// A type of validated transaction stored in the validated pool. @@ -107,8 +115,9 @@ pub struct BaseSubmitOutcome { hash: ExtrinsicHash, /// A transaction watcher. This is `Some` for `submit_and_watch` and `None` for `submit`. watcher: Option, - /// The priority of the transaction. Defaults to zero if unknown. - priority: TransactionPriority, + + /// The priority of the transaction. Defaults to None if unknown. + priority: Option, } /// Type alias to outcome of submission to `ValidatedPool`. @@ -117,15 +126,10 @@ pub type ValidatedPoolSubmitOutcome = impl BaseSubmitOutcome { /// Creates a new instance with given hash and priority. - pub fn new(hash: ExtrinsicHash, priority: TransactionPriority) -> Self { + pub fn new(hash: ExtrinsicHash, priority: Option) -> Self { Self { hash, priority, watcher: None } } - /// Creates a new instance with given hash and zeroed priority. - pub fn new_no_priority(hash: ExtrinsicHash) -> Self { - Self { hash, priority: 0u64, watcher: None } - } - /// Sets the transaction watcher. pub fn with_watcher(mut self, watcher: W) -> Self { self.watcher = Some(watcher); @@ -133,7 +137,7 @@ impl BaseSubmitOutcome { } /// Provides priority of submitted transaction. - pub fn priority(&self) -> TransactionPriority { + pub fn priority(&self) -> Option { self.priority } @@ -283,7 +287,7 @@ impl ValidatedPool { let mut listener = self.listener.write(); fire_events(&mut *listener, &imported); - Ok(ValidatedPoolSubmitOutcome::new(*imported.hash(), priority)) + Ok(ValidatedPoolSubmitOutcome::new(*imported.hash(), Some(priority))) }, ValidatedTransaction::Invalid(hash, err) => { log::trace!(target: LOG_TARGET, "[{:?}] ValidatedPool::submit_one invalid: {:?}", hash, err); diff --git a/substrate/client/transaction-pool/tests/fatp_prios.rs b/substrate/client/transaction-pool/tests/fatp_prios.rs index 3c7db35323d8..abd8ebec2296 100644 --- a/substrate/client/transaction-pool/tests/fatp_prios.rs +++ b/substrate/client/transaction-pool/tests/fatp_prios.rs @@ -282,10 +282,8 @@ fn fatp_prios_watcher_full_mempool_higher_prio_is_accepted() { api.set_priority(&xt4, 5); api.set_priority(&xt5, 6); - let _xt0_watcher = - block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); - let _xt1_watcher = - block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); assert_pool_status!(header01.hash(), &pool, 2, 0); assert_eq!(pool.mempool_len().1, 2); @@ -313,13 +311,11 @@ fn fatp_prios_watcher_full_mempool_higher_prio_is_accepted() { // let header04 = api.push_block_with_parent(header03.hash(), vec![], true); // block_on(pool.maintain(new_best_block_event(&pool, Some(header03.hash()), header04.hash()))); - let xt2_status = futures::executor::block_on_stream(xt2_watcher).take(2).collect::>(); - assert_eq!(xt2_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]); - let xt3_status = futures::executor::block_on_stream(xt3_watcher).take(2).collect::>(); - assert_eq!(xt3_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]); + assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]); + assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]); - assert_ready_iterator!(header01.hash(), pool, [xt1, xt0]); - assert_ready_iterator!(header02.hash(), pool, []); + assert_ready_iterator!(header01.hash(), pool, []); + assert_ready_iterator!(header02.hash(), pool, [xt3, xt2]); assert_ready_iterator!(header03.hash(), pool, [xt5, xt4]); }