From 501f05abaa2de0aa127bbe0f1673c2dacc7ce72b Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 20 Mar 2024 14:44:41 +0900 Subject: [PATCH] feat: improve parallelisation with buffered streams --- sn_client/src/api.rs | 2 +- sn_client/src/audit/spend_dag_building.rs | 54 +++++++++-------------- sn_client/src/lib.rs | 2 + 3 files changed, 25 insertions(+), 33 deletions(-) diff --git a/sn_client/src/api.rs b/sn_client/src/api.rs index 6d7833bdd3..e8180347ec 100644 --- a/sn_client/src/api.rs +++ b/sn_client/src/api.rs @@ -906,7 +906,7 @@ impl Client { Err(err) => { warn!("Invalid signed spend got from network for {address:?}: {err:?}."); Err(Error::CouldNotVerifyTransfer(format!( - "Verifiation failed for spent at {address:?} with error {err:?}" + "Verification failed for spent at {address:?} with error {err:?}" ))) } } diff --git a/sn_client/src/audit/spend_dag_building.rs b/sn_client/src/audit/spend_dag_building.rs index 0520087c43..70dff80260 100644 --- a/sn_client/src/audit/spend_dag_building.rs +++ b/sn_client/src/audit/spend_dag_building.rs @@ -9,7 +9,7 @@ use super::{Client, SpendDag}; use crate::{Error, Result}; -use futures::future::join_all; +use futures::{future::join_all, StreamExt}; use sn_networking::{GetRecordError, NetworkError}; use sn_transfers::{SignedSpend, SpendAddress, WalletError, WalletResult}; use std::collections::BTreeSet; @@ -49,8 +49,7 @@ impl Client { while !txs_to_follow.is_empty() { let mut next_gen_tx = BTreeSet::new(); - // gather all descendants in parallel - let mut tasks = vec![]; + // list up all descendants let mut addrs = vec![]; for descendant_tx in txs_to_follow.iter() { let descendant_tx_hash = descendant_tx.hash(); @@ -60,40 +59,33 @@ impl Client { .map(|output| output.unique_pubkey); let addrs_to_follow = descendant_keys.map(|k| SpendAddress::from_unique_pubkey(&k)); info!("Gen {gen} - Following descendant Tx : {descendant_tx_hash:?}"); - - let tasks_for_this_descendant: Vec<_> = addrs_to_follow - .clone() - .map(|a| self.get_spend_from_network(a)) - .collect(); - tasks.extend(tasks_for_this_descendant); addrs.extend(addrs_to_follow); } - // wait for tasks to complete + // get all spends in parallel + let mut stream = futures::stream::iter(addrs.clone()) + .map(|a| async move { (self.get_spend_from_network(a).await, a) }) + .buffer_unordered(crate::MAX_CONCURRENT_TASKS); info!( - "Gen {gen} - Getting {} spends from {} txs", - tasks.len(), - txs_to_follow.len() + "Gen {gen} - Getting {} spends from {} txs in batches of: {}", + addrs.len(), + txs_to_follow.len(), + crate::MAX_CONCURRENT_TASKS, ); - let spends_res = join_all(tasks).await.into_iter().collect::>(); - info!("Gen {gen} - Got those {} spends", spends_res.len()); - // insert spends in the dag - for res in spends_res.iter().zip(addrs) { + // insert spends in the dag as they are collected + while let Some((res, addr)) = stream.next().await { match res { - (Ok(spend), addr) => { + Ok(spend) => { dag.insert(addr, spend.clone()); next_gen_tx.insert(spend.spend.spent_tx.clone()); } - ( - Err(Error::Network(NetworkError::GetRecordError( - GetRecordError::RecordNotFound, - ))), - addr, - ) => { + Err(Error::Network(NetworkError::GetRecordError( + GetRecordError::RecordNotFound, + ))) => { info!("Reached UTXO at {addr:?}"); } - (Err(err), addr) => { + Err(err) => { error!("Could not verify transfer at {addr:?}: {err:?}"); } } @@ -252,16 +244,14 @@ impl Client { trace!("Gathering spend DAG from utxos..."); let utxos = dag.get_utxos(); - let tasks: Vec<_> = utxos - .iter() + let mut stream = futures::stream::iter(utxos.into_iter()) .map(|utxo| { - debug!("Launching task to gather DAG from utxo: {:?}", utxo); - self.spend_dag_build_from(*utxo) + debug!("Queuing task to gather DAG from utxo: {:?}", utxo); + self.spend_dag_build_from(utxo) }) - .collect(); - let sub_dags = join_all(tasks).await; + .buffer_unordered(crate::MAX_CONCURRENT_TASKS); - for res in sub_dags { + while let Some(res) = stream.next().await { dag.merge(res?); } diff --git a/sn_client/src/lib.rs b/sn_client/src/lib.rs index cb5bec2b0e..00bcd5e4ea 100644 --- a/sn_client/src/lib.rs +++ b/sn_client/src/lib.rs @@ -32,6 +32,8 @@ pub use sn_protocol as protocol; pub use sn_registers as registers; pub use sn_transfers as transfers; +const MAX_CONCURRENT_TASKS: usize = 1024; + pub use self::{ audit::{DagError, SpendDag, SpendDagGet, SpendFault}, error::Error,