Skip to content

Commit

Permalink
dropping transaction - size limit is properly obeyed now
Browse files Browse the repository at this point in the history
  • Loading branch information
michalkucharczyk committed Dec 4, 2024
1 parent 6cca272 commit 3b17a16
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ where
/// are reduced to single result. Refer to `reduce_multiview_result` for more details.
async fn submit_at(
&self,
_: <Self::Block as BlockT>::Hash,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xts: Vec<TransactionFor<Self>>,
) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
Expand All @@ -657,6 +657,21 @@ where
return Ok(mempool_results.into_iter().map(|r| r.map(|r| r.hash)).collect::<Vec<_>>())
}

//todo: review + test maybe?
let retries = mempool_results
.into_iter()
.zip(xts.clone())
.map(|(result, xt)| async move {
match result {
Err(TxPoolApiError::ImmediatelyDropped) =>
self.attempt_transaction_replacement(at, source, false, xt).await,
result @ _ => result,
}
})
.collect::<Vec<_>>();

let mempool_results = futures::future::join_all(retries).await;

let to_be_submitted = mempool_results
.iter()
.zip(xts)
Expand Down Expand Up @@ -721,7 +736,7 @@ where
log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_and_watch views:{}", self.tx_hash(&xt), self.active_views_count());
let xt = Arc::from(xt);

let InsertionInfo { hash: xt_hash, source: timed_source } =
let InsertionInfo { hash: xt_hash, source: timed_source, .. } =
match self.mempool.push_watched(source, xt.clone()) {
Ok(result) => result,
Err(TxPoolApiError::ImmediatelyDropped) =>
Expand Down Expand Up @@ -1333,33 +1348,31 @@ where
)
.await;

// 3. check if most_recent_view contains a transaction with lower priority (actually worse -
// do we want to check timestamp too - no: see #4609?)
// Would be perfect to choose transaction with lowest the number of dependant txs in its
// subtree.
if let Some(worst_tx_hash) =
best_view.pool.validated_pool().find_transaction_with_lower_prio(validated_tx)
{
if let Some(priority) = validated_tx.priority() {
// 3. check if mempool contains a transaction with lower priority (actually worse - do
// we want to check timestamp too - no: see #4609?) Would be perfect to choose
// transaction with lowest the number of dependant txs in its subtree.
// 4. if yes - remove worse transaction from mempool and add new one.
log::trace!(target: LOG_TARGET, "found candidate for removal: {worst_tx_hash:?} replaced by {tx_hash:?}");
let insertion_info =
self.mempool.try_replace_transaction(xt, source, watched, worst_tx_hash)?;

// 5. notify listner
self.view_store
.listener
.transaction_dropped(DroppedTransaction::new_enforced_by_limts(worst_tx_hash));

// 6. remove transaction from the view_store
self.view_store.remove_transaction_subtree(
worst_tx_hash,
|listener, removed_tx_hash| {
listener.limits_enforced(&removed_tx_hash);
},
);
self.mempool.try_replace_transaction(xt, priority, source, watched)?;

for worst_hash in &insertion_info.removed {
log::trace!(target: LOG_TARGET, "removed: {worst_hash:?} replaced by {tx_hash:?}");
// 5. notify listner
self.view_store
.listener
.transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash));

// 6. remove transaction from the view_store
self.view_store.remove_transaction_subtree(
*worst_hash,
|listener, removed_tx_hash| {
listener.limits_enforced(&removed_tx_hash);
},
);
}

// 8. add to pending_replacements - make sure it will not sneak back via cloned view

// 9. subemit new one to the view, this will be done upon in the caller
return Ok(insertion_info)
}
Expand Down
139 changes: 123 additions & 16 deletions substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,19 @@ use crate::{
};
use futures::FutureExt;
use itertools::Itertools;
use sc_transaction_pool_api::TransactionSource;
use sc_transaction_pool_api::{TransactionPriority, TransactionSource};
use sp_blockchain::HashAndNumber;
use sp_runtime::{
traits::Block as BlockT,
transaction_validity::{InvalidTransaction, TransactionValidityError},
};
use std::{
cmp::Ordering,
collections::HashMap,
sync::{atomic, atomic::AtomicU64, Arc},
sync::{
atomic::{self, AtomicU64},
Arc,
},
time::Instant,
};

Expand Down Expand Up @@ -80,6 +84,9 @@ where
source: TimedTransactionSource,
/// When the transaction was revalidated, used to periodically revalidate the mem pool buffer.
validated_at: AtomicU64,
/// Priority of transaction at some block. It is assumed it will not be changed often.
//todo: Option is needed here. This means lock. So maybe +AtomicBool?
priority: AtomicU64,
//todo: we need to add future / ready status at finalized block.
//If future transactions are stuck in tx_mem_pool (due to limits being hit), we need a means
// to replace them somehow with newly coming transactions.
Expand Down Expand Up @@ -112,19 +119,31 @@ where
Self::new(true, source, tx, bytes)
}

/// Creates a new instance of wrapper for a transaction.
/// Creates a new instance of wrapper for a transaction with no priority.
fn new(
watched: bool,
source: TransactionSource,
tx: ExtrinsicFor<ChainApi>,
bytes: usize,
) -> Self {
Self::new_with_priority(watched, source, tx, bytes, 0)
}

/// Creates a new instance of wrapper for a transaction with given priority.
fn new_with_priority(
watched: bool,
source: TransactionSource,
tx: ExtrinsicFor<ChainApi>,
bytes: usize,
priority: TransactionPriority,
) -> Self {
Self {
watched,
tx,
source: TimedTransactionSource::from_transaction_source(source, true),
validated_at: AtomicU64::new(0),
bytes,
priority: AtomicU64::new(priority),
}
}

Expand All @@ -139,6 +158,11 @@ where
pub(crate) fn source(&self) -> TimedTransactionSource {
self.source.clone()
}

/// Returns the priority of the transaction.
pub(crate) fn priority(&self) -> TransactionPriority {
self.priority.load(atomic::Ordering::Relaxed)
}
}

impl<ChainApi, Block> Size for Arc<TxInMemPool<ChainApi, Block>>
Expand Down Expand Up @@ -198,11 +222,15 @@ where
pub(super) struct InsertionInfo<Hash> {
pub(super) hash: Hash,
pub(super) source: TimedTransactionSource,
pub(super) removed: Vec<Hash>,
}

impl<Hash> InsertionInfo<Hash> {
fn new(hash: Hash, source: TimedTransactionSource) -> Self {
Self { hash, source }
Self::new_with_removed(hash, source, Default::default())
}
fn new_with_removed(hash: Hash, source: TimedTransactionSource, removed: Vec<Hash>) -> Self {
Self { hash, source, removed }
}
}

Expand Down Expand Up @@ -286,14 +314,9 @@ where
&self,
hash: ExtrinsicHash<ChainApi>,
tx: TxInMemPool<ChainApi, Block>,
tx_to_be_removed: Option<ExtrinsicHash<ChainApi>>,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
let mut transactions = self.transactions.write();

tx_to_be_removed.inspect(|hash| {
transactions.remove(hash);
});

let bytes = self.transactions.bytes();

let result = match (
Expand All @@ -314,22 +337,100 @@ where
result
}

/// Attempts to insert a new transaction in the memory pool and drop some worse existing
/// transactions.
///
/// This operation will not overflow the limit of the mempool. It means that cumulative
/// size of removed transactions will be equal (or greated) then size of newly inserted
/// transaction.
///
/// Returns a `Result` containing `InsertionInfo` if the new transaction is successfully
/// inserted; otherwise, returns an appropriate error indicating the failure.
pub(super) fn try_insert_with_dropping(
&self,
hash: ExtrinsicHash<ChainApi>,
new_tx: TxInMemPool<ChainApi, Block>,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
let mut transactions = self.transactions.write();

if transactions.contains_key(&hash) {
return Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash)));
}

let mut sorted =
transactions.iter().map(|(h, v)| (h.clone(), v.clone())).collect::<Vec<_>>();

// When pushing higher prio transaction, we need to find a number of lower prio txs, such
// that the sum of their bytes is ge then size of new tx. Otherwise we could overflow size
// limits. Naive way to do it - rev-sort by priority and eat the tail.

// reverse (lowest prio last)
sorted.sort_by(|(_, a), (_, b)| match b.priority().cmp(&a.priority()) {
Ordering::Equal => match (a.source.timestamp, b.source.timestamp) {
(Some(a), Some(b)) => b.cmp(&a),
_ => Ordering::Equal,
},
ordering @ _ => ordering,
});

let required_size = new_tx.bytes;
let mut total_size = 0usize;
let mut to_be_removed = vec![];

while total_size < required_size {
let Some((worst_hash, worst_tx)) = sorted.pop() else {
return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped);
};

if worst_tx.priority() >= new_tx.priority() {
return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped);
}

total_size += worst_tx.bytes;
to_be_removed.push(worst_hash);
}

let source = new_tx.source();
transactions.insert(hash, Arc::from(new_tx));
for worst_hash in &to_be_removed {
transactions.remove(worst_hash);
}
debug_assert!(!self.is_limit_exceeded(transactions.len(), self.transactions.bytes()));

Ok(InsertionInfo::new_with_removed(hash, source, to_be_removed))

// let result = match (
// self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes),
// transactions.contains_key(&hash),
// ) {
// (false, false) => {
// let source = tx.source();
// transactions.insert(hash, Arc::from(tx));
// Ok(InsertionInfo::new(hash, source))
// },
// (_, true) =>
// Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash))),
// (true, _) => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped),
// };
// log::trace!(target: LOG_TARGET, "[{:?}] mempool::try_insert: {:?}", hash,
// result.as_ref().map(|r| r.hash));
}

/// Attempts to replace an existing transaction in the memory pool with a new one.
///
/// Returns a `Result` containing `InsertionInfo` if the new transaction is successfully
/// inserted; otherwise, returns an appropriate error indicating the failure.
pub(super) fn try_replace_transaction(
&self,
new_xt: ExtrinsicFor<ChainApi>,
priority: TransactionPriority,
source: TransactionSource,
watched: bool,
replaced_tx_hash: ExtrinsicHash<ChainApi>,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
let (hash, length) = self.api.hash_and_length(&new_xt);
self.try_insert(
self.try_insert_with_dropping(
hash,
TxInMemPool::new(watched, source, new_xt, length),
Some(replaced_tx_hash),
TxInMemPool::new_with_priority(watched, source, new_xt, length, priority),
)
}

Expand All @@ -347,7 +448,7 @@ where
.iter()
.map(|xt| {
let (hash, length) = self.api.hash_and_length(&xt);
self.try_insert(hash, TxInMemPool::new_unwatched(source, xt.clone(), length), None)
self.try_insert(hash, TxInMemPool::new_unwatched(source, xt.clone(), length))
})
.collect::<Vec<_>>();
result
Expand All @@ -361,7 +462,7 @@ where
xt: ExtrinsicFor<ChainApi>,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
let (hash, length) = self.api.hash_and_length(&xt);
self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length), None)
self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length))
}

/// Clones and returns a `HashMap` of references to all unwatched transactions in the memory
Expand Down Expand Up @@ -493,7 +594,11 @@ where
}

pub(super) fn update_transaction(&self, outcome: &ViewStoreSubmitOutcome<ChainApi>) {
// todo!()
if let Some(priority) = outcome.priority() {
if let Some(tx) = self.transactions.write().get_mut(&outcome.hash()) {
tx.priority.store(priority, atomic::Ordering::Relaxed);
}
}
}
}

Expand Down Expand Up @@ -650,4 +755,6 @@ mod tx_mem_pool_tests {
sc_transaction_pool_api::error::Error::ImmediatelyDropped
));
}

//add some test for try_insert_with_dropping
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ where
log::trace!(target: LOG_TARGET, "[{:?}] submit_local: err: {}", tx_hash, err);
Err(err)
},
None => Ok(ViewStoreSubmitOutcome::new_no_priority(tx_hash)),
None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None)),
Some(Ok(r)) => Ok(r.into()),
}
}
Expand Down Expand Up @@ -320,8 +320,7 @@ where
},
Some(Ok(result)) =>
Ok(ViewStoreSubmitOutcome::from(result).with_watcher(external_watcher)),
None =>
Ok(ViewStoreSubmitOutcome::new_no_priority(tx_hash).with_watcher(external_watcher)),
None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None).with_watcher(external_watcher)),
}
}

Expand Down
10 changes: 10 additions & 0 deletions substrate/client/transaction-pool/src/graph/tracked_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ where
pub fn len(&mut self) -> usize {
self.inner_guard.len()
}

/// Returns an iterator over all values.
pub fn values(&self) -> std::collections::hash_map::Values<K, V> {
self.inner_guard.values()
}

/// Returns an iterator over all key-value pairs.
pub fn iter(&self) -> Iter<'_, K, V> {
self.inner_guard.iter()
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 3b17a16

Please sign in to comment.