Skip to content

Commit

Permalink
feat: improve parallelisation with buffered streams
Browse files Browse the repository at this point in the history
  • Loading branch information
grumbach authored and RolandSherwin committed Mar 20, 2024
1 parent e04ec65 commit 501f05a
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 33 deletions.
2 changes: 1 addition & 1 deletion sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ impl Client {
Err(err) => {
warn!("Invalid signed spend got from network for {address:?}: {err:?}.");
Err(Error::CouldNotVerifyTransfer(format!(
"Verifiation failed for spent at {address:?} with error {err:?}"
"Verification failed for spent at {address:?} with error {err:?}"
)))
}
}
Expand Down
54 changes: 22 additions & 32 deletions sn_client/src/audit/spend_dag_building.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use super::{Client, SpendDag};
use crate::{Error, Result};

use futures::future::join_all;
use futures::{future::join_all, StreamExt};
use sn_networking::{GetRecordError, NetworkError};
use sn_transfers::{SignedSpend, SpendAddress, WalletError, WalletResult};
use std::collections::BTreeSet;
Expand Down Expand Up @@ -49,8 +49,7 @@ impl Client {
while !txs_to_follow.is_empty() {
let mut next_gen_tx = BTreeSet::new();

// gather all descendants in parallel
let mut tasks = vec![];
// list up all descendants
let mut addrs = vec![];
for descendant_tx in txs_to_follow.iter() {
let descendant_tx_hash = descendant_tx.hash();
Expand All @@ -60,40 +59,33 @@ impl Client {
.map(|output| output.unique_pubkey);
let addrs_to_follow = descendant_keys.map(|k| SpendAddress::from_unique_pubkey(&k));
info!("Gen {gen} - Following descendant Tx : {descendant_tx_hash:?}");

let tasks_for_this_descendant: Vec<_> = addrs_to_follow
.clone()
.map(|a| self.get_spend_from_network(a))
.collect();
tasks.extend(tasks_for_this_descendant);
addrs.extend(addrs_to_follow);
}

// wait for tasks to complete
// get all spends in parallel
let mut stream = futures::stream::iter(addrs.clone())
.map(|a| async move { (self.get_spend_from_network(a).await, a) })
.buffer_unordered(crate::MAX_CONCURRENT_TASKS);
info!(
"Gen {gen} - Getting {} spends from {} txs",
tasks.len(),
txs_to_follow.len()
"Gen {gen} - Getting {} spends from {} txs in batches of: {}",
addrs.len(),
txs_to_follow.len(),
crate::MAX_CONCURRENT_TASKS,
);
let spends_res = join_all(tasks).await.into_iter().collect::<Vec<_>>();
info!("Gen {gen} - Got those {} spends", spends_res.len());

// insert spends in the dag
for res in spends_res.iter().zip(addrs) {
// insert spends in the dag as they are collected
while let Some((res, addr)) = stream.next().await {
match res {
(Ok(spend), addr) => {
Ok(spend) => {
dag.insert(addr, spend.clone());
next_gen_tx.insert(spend.spend.spent_tx.clone());
}
(
Err(Error::Network(NetworkError::GetRecordError(
GetRecordError::RecordNotFound,
))),
addr,
) => {
Err(Error::Network(NetworkError::GetRecordError(
GetRecordError::RecordNotFound,
))) => {
info!("Reached UTXO at {addr:?}");
}
(Err(err), addr) => {
Err(err) => {
error!("Could not verify transfer at {addr:?}: {err:?}");
}
}
Expand Down Expand Up @@ -252,16 +244,14 @@ impl Client {
trace!("Gathering spend DAG from utxos...");
let utxos = dag.get_utxos();

let tasks: Vec<_> = utxos
.iter()
let mut stream = futures::stream::iter(utxos.into_iter())
.map(|utxo| {
debug!("Launching task to gather DAG from utxo: {:?}", utxo);
self.spend_dag_build_from(*utxo)
debug!("Queuing task to gather DAG from utxo: {:?}", utxo);
self.spend_dag_build_from(utxo)
})
.collect();
let sub_dags = join_all(tasks).await;
.buffer_unordered(crate::MAX_CONCURRENT_TASKS);

for res in sub_dags {
while let Some(res) = stream.next().await {
dag.merge(res?);
}

Expand Down
2 changes: 2 additions & 0 deletions sn_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub use sn_protocol as protocol;
pub use sn_registers as registers;
pub use sn_transfers as transfers;

const MAX_CONCURRENT_TASKS: usize = 1024;

pub use self::{
audit::{DagError, SpendDag, SpendDagGet, SpendFault},
error::Error,
Expand Down

0 comments on commit 501f05a

Please sign in to comment.