Skip to content

Commit

Permalink
Unify concurrent and non-concurrent verification with ability to cont…
Browse files Browse the repository at this point in the history
…rol concurrency
  • Loading branch information
nazar-pc committed Oct 14, 2023
1 parent 920fb27 commit dd57c36
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 121 deletions.
2 changes: 1 addition & 1 deletion substrate/client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ sp-consensus = { path = "../../../primitives/consensus/common" }
sp-core = { path = "../../../primitives/core" }
sp-runtime = { path = "../../../primitives/runtime" }
sp-state-machine = { path = "../../../primitives/state-machine" }
tokio = "1.32.0"
tokio = { version = "1.32.0", features = ["macros"] }

[dev-dependencies]
sp-test-primitives = { path = "../../../primitives/test-primitives" }
21 changes: 11 additions & 10 deletions substrate/client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use log::{debug, trace};
use std::{
fmt,
future::Future,
num::NonZeroUsize,
ops::Deref,
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -129,16 +130,16 @@ pub struct IncomingBlock<B: BlockT> {
/// Verify a justification of a block
#[async_trait::async_trait]
pub trait Verifier<B: BlockT>: Send + Sync {
/// Whether verifier supports stateless verification.
/// How many blocks can be verified concurrently.
///
/// Stateless verification means that verification on blocks can be done in arbitrary order,
/// doesn't expect parent block to be imported first, etc.
/// Defaults to 1, which means blocks are verified sequentially, one at a time.
///
/// Verifiers that support stateless verification can verify multiple blocks concurrently,
/// significantly improving sync speed.
fn supports_stateless_verification(&self) -> bool {
// Unless re-defined by verifier is assumed to not support stateless verification.
false
/// value higher than one means verification on blocks can be done in arbitrary order,
/// doesn't expect parent block to be imported first, etc. This significantly improves sync
/// speed by leveraging multiple CPU cores. Good value here is to make concurrency equal to
/// number of CPU cores available.
fn verification_concurrency(&self) -> NonZeroUsize {
NonZeroUsize::new(1).expect("Not zero; qed")
}

/// Verify the given block data and return the `BlockImportParams` to
Expand All @@ -150,8 +151,8 @@ impl<Block> Verifier<Block> for Arc<dyn Verifier<Block>>
where
Block: BlockT,
{
fn supports_stateless_verification(&self) -> bool {
(**self).supports_stateless_verification()
fn verification_concurrency(&self) -> NonZeroUsize {
(**self).verification_concurrency()
}

fn verify<'life0, 'async_trait>(
Expand Down
145 changes: 35 additions & 110 deletions substrate/client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use sp_runtime::{
Justification, Justifications,
};
use std::{
num::NonZeroUsize,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -250,18 +251,15 @@ async fn block_import_process<B: BlockT>(
},
};

let res = if verifier.supports_stateless_verification() {
import_many_blocks_with_stateless_verification(
&mut block_import,
origin,
blocks,
&verifier,
metrics.clone(),
)
.await
} else {
import_many_blocks(&mut block_import, origin, blocks, &verifier, metrics.clone()).await
};
let res = import_many_blocks_with_verification_concurrency(
&mut block_import,
origin,
blocks,
&verifier,
metrics.clone(),
verifier.verification_concurrency(),
)
.await;

result_sender.blocks_processed(res.imported, res.block_count, res.results);
}
Expand Down Expand Up @@ -404,98 +402,15 @@ struct ImportManyBlocksResult<B: BlockT> {
/// This will yield after each imported block once, to ensure that other futures can
/// be called as well.
///
/// For verifiers that support stateless verification use
/// [`import_many_blocks_with_stateless_verification`] for better performance.
async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
import_handle: &mut SharedBlockImport<B>,
blocks_origin: BlockOrigin,
blocks: Vec<IncomingBlock<B>>,
verifier: &V,
metrics: Option<Metrics>,
) -> ImportManyBlocksResult<B> {
let count = blocks.len();

let blocks_range = match (
blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
) {
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};

trace!(target: LOG_TARGET, "Starting import of {} blocks {}", count, blocks_range);

let mut imported = 0;
let mut results = vec![];
let mut has_error = false;
let mut blocks = blocks.into_iter();

// Blocks in the response/drain should be in ascending order.
loop {
// Is there any block left to import?
let block = match blocks.next() {
Some(b) => b,
None => {
// No block left to import, success!
return ImportManyBlocksResult { block_count: count, imported, results }
},
};

let block_number = block.header.as_ref().map(|h| *h.number());
let block_hash = block.hash;
let import_result = if has_error {
Err(BlockImportError::Cancelled)
} else {
let verification_fut = verify_single_block_metered(
import_handle,
blocks_origin,
block,
verifier,
false,
metrics.as_ref(),
);
match verification_fut.await {
Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status),
Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => {
// The actual import.
import_single_block_metered(import_handle, import_parameters, metrics.as_ref())
.await
},
Err(e) => Err(e),
}
};

if let Some(metrics) = metrics.as_ref() {
metrics.report_import::<B>(&import_result);
}

if import_result.is_ok() {
trace!(
target: LOG_TARGET,
"Block imported successfully {:?} ({})",
block_number,
block_hash,
);
imported += 1;
} else {
has_error = true;
}

results.push((import_result, block_hash));

Yield::new().await
}
}

/// The same as [`import_many_blocks()`]`, but for verifiers that support stateless verification of
/// blocks (use [`Verifier::supports_stateless_verification()`]).
async fn import_many_blocks_with_stateless_verification<B: BlockT>(
/// When verification concurrency is set to value higher than 1, block verification will happen in
/// parallel to block import, reducing overall time required.
async fn import_many_blocks_with_verification_concurrency<B: BlockT>(
import_handle: &mut SharedBlockImport<B>,
blocks_origin: BlockOrigin,
blocks: Vec<IncomingBlock<B>>,
verifier: &Arc<dyn Verifier<B>>,
metrics: Option<Metrics>,
verification_concurrency: NonZeroUsize,
) -> ImportManyBlocksResult<B> {
let count = blocks.len();

Expand All @@ -512,11 +427,8 @@ async fn import_many_blocks_with_stateless_verification<B: BlockT>(

let has_error = Arc::new(AtomicBool::new(false));

// Blocks in the response/drain should be in ascending order.
let mut verified_blocks = blocks
.into_iter()
.enumerate()
.map(|(index, block)| {
let verify_block_task =
|index, block: IncomingBlock<B>, import_handle: &SharedBlockImport<B>| {
let import_handle = import_handle.clone();
let verifier = Arc::clone(verifier);
let metrics = metrics.clone();
Expand Down Expand Up @@ -551,7 +463,14 @@ async fn import_many_blocks_with_stateless_verification<B: BlockT>(

(block_number, block_hash, result)
}
})
};

// Blocks in the response/drain should be in ascending order.
let mut blocks_to_verify = blocks.into_iter().enumerate();
let mut verified_blocks = blocks_to_verify
.by_ref()
.take(verification_concurrency.get())
.map(|(index, block)| verify_block_task(index, block, import_handle))
.collect::<FuturesOrdered<_>>();

let mut imported = 0;
Expand Down Expand Up @@ -589,6 +508,11 @@ async fn import_many_blocks_with_stateless_verification<B: BlockT>(

results.push((import_result, block_hash));

// Add more blocks into verification queue if there are any
if let Some((index, block)) = blocks_to_verify.next() {
verified_blocks.push_back(verify_block_task(index, block, import_handle));
}

Yield::new().await
}

Expand Down Expand Up @@ -632,7 +556,7 @@ mod tests {
},
import_queue::Verifier,
};
use futures::{executor::block_on, Future};
use futures::Future;
use sp_test_primitives::{Block, BlockNumber, Hash, Header};

#[async_trait::async_trait]
Expand Down Expand Up @@ -716,8 +640,8 @@ mod tests {
}
}

#[test]
fn prioritizes_finality_work_over_block_import() {
#[tokio::test]
async fn prioritizes_finality_work_over_block_import() {
let (result_sender, mut result_port) = buffered_link::buffered_link(100_000);

let (worker, finality_sender, block_import_sender) = BlockImportWorker::new(
Expand Down Expand Up @@ -789,7 +713,7 @@ mod tests {
let justification3 = import_justification();

// we poll the worker until we have processed 9 events
block_on(futures::future::poll_fn(|cx| {
futures::future::poll_fn(|cx| {
while link.events.len() < 9 {
match Future::poll(Pin::new(&mut worker), cx) {
Poll::Pending => {},
Expand All @@ -800,7 +724,8 @@ mod tests {
}

Poll::Ready(())
}));
})
.await;

// all justification tasks must be done before any block import work
assert_eq!(
Expand Down

0 comments on commit dd57c36

Please sign in to comment.