From 038cc7505d3595d0c6d89fb3fc3e1956c7c7c585 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 18 Oct 2023 11:32:56 +0200 Subject: [PATCH] Start updating light-base/network_service --- light-base/src/network_service.rs | 632 +++++++++++------------- light-base/src/network_service/tasks.rs | 28 +- 2 files changed, 314 insertions(+), 346 deletions(-) diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index 99de11bf73..c39d3e1df4 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -55,10 +55,10 @@ use smoldot::{ header, informant::{BytesDisplay, HashDisplay}, libp2p::{connection, multiaddr::Multiaddr, peer_id::PeerId, peers}, - network::{protocol, service}, + network::{protocol, service2}, }; -pub use service::EncodedMerkleProof; +pub use service2::{EncodedMerkleProof, QueueNotificationError}; mod tasks; @@ -115,7 +115,7 @@ pub struct ConfigChain { pub struct NetworkService { /// Names of the various chains the network service connects to. Used only for logging /// purposes. - log_chain_names: Vec, + log_chain_names: hashbrown::HashMap, /// Channel to send messages to the background task. messages_tx: async_channel::Sender>, @@ -135,36 +135,50 @@ impl NetworkService { .map(|_| async_channel::bounded(16)) .unzip(); - let num_chains = config.chains.len(); - let mut chains = Vec::with_capacity(num_chains); - let mut log_chain_names = Vec::with_capacity(num_chains); + let mut log_chain_names = + hashbrown::HashMap::with_capacity_and_hasher(config.chains.len(), Default::default()); + + let network = service2::ChainNetwork::new(service2::Config { + chains_capacity: config.chains.len(), + connections_capacity: 32, + peers_capacity: 8, + max_addresses_per_peer: NonZeroUsize::new(5).unwrap(), + noise_key: config.noise_key, + handshake_timeout: Duration::from_secs(8), + randomness_seed: { + let mut seed = [0; 32]; + config.platform.fill_random_bytes(&mut seed); + seed + }, + }); for chain in config.chains { - chains.push(service::ChainConfig { - in_slots: 3, - out_slots: 4, - grandpa_protocol_config: if let Some(commit_finalized_height) = - chain.grandpa_protocol_finalized_block_height - { - // TODO: dummy values - Some(service::GrandpaState { - commit_finalized_height, - round_number: 1, - set_id: 0, - }) - } else { - None - }, - fork_id: chain.fork_id.clone(), - block_number_bytes: chain.block_number_bytes, - best_hash: chain.best_block.1, - best_number: chain.best_block.0, - genesis_hash: chain.genesis_block_hash, - role: protocol::Role::Light, - allow_inbound_block_requests: false, - }); - - log_chain_names.push(chain.log_name); + // TODO: can panic in case of duplicate chain, how do we handle that? + let chain_id = network + .add_chain(service2::ChainConfig { + grandpa_protocol_config: if let Some(commit_finalized_height) = + chain.grandpa_protocol_finalized_block_height + { + // TODO: dummy values + Some(service2::GrandpaState { + commit_finalized_height, + round_number: 1, + set_id: 0, + }) + } else { + None + }, + fork_id: chain.fork_id.clone(), + block_number_bytes: chain.block_number_bytes, + best_hash: chain.best_block.1, + best_number: chain.best_block.0, + genesis_hash: chain.genesis_block_hash, + role: protocol::Role::Light, + allow_inbound_block_requests: false, + }) + .unwrap(); + + log_chain_names.insert(chain_id, chain.log_name); } let on_service_killed = event_listener::Event::new(); @@ -204,20 +218,7 @@ impl NetworkService { identify_agent_version: config.identify_agent_version, log_chain_names: log_chain_names.clone(), messages_tx: messages_tx.clone(), - network: service::ChainNetwork::new(service::Config { - now: config.platform.now(), - chains, - connections_capacity: 32, - peers_capacity: 8, - max_addresses_per_peer: NonZeroUsize::new(5).unwrap(), - noise_key: config.noise_key, - handshake_timeout: Duration::from_secs(8), - randomness_seed: { - let mut seed = [0; 32]; - config.platform.fill_random_bytes(&mut seed); - seed - }, - }), + network, platform: config.platform.clone(), event_senders: either::Left(event_senders), slots_assign_backoff: HashMap::with_capacity_and_hasher( @@ -277,10 +278,11 @@ impl NetworkService { /// Sends a blocks request to the given peer. // TODO: more docs + // TODO: unverified /!\ pub async fn blocks_request( self: Arc, target: PeerId, - chain_index: usize, + chain_id: service2::ChainId, config: protocol::BlocksRequestConfig, timeout: Duration, ) -> Result, BlocksRequestError> { @@ -289,7 +291,7 @@ impl NetworkService { self.messages_tx .send(ToBackground::StartBlocksRequest { target: target.clone(), - chain_index, + chain_id, config, timeout, result: tx, @@ -305,7 +307,7 @@ impl NetworkService { target: "network", "Connection({}) => BlocksRequest(chain={}, num_blocks={}, block_data_total_size={})", target, - self.log_chain_names[chain_index], + self.log_chain_names[&chain_id], blocks.len(), BytesDisplay(blocks.iter().fold(0, |sum, block| { let block_size = block.header.as_ref().map_or(0, |h| h.len()) + @@ -320,7 +322,7 @@ impl NetworkService { target: "network", "Connection({}) => BlocksRequest(chain={}, error={:?})", target, - self.log_chain_names[chain_index], + self.log_chain_names[&chain_id], err ); } @@ -328,11 +330,8 @@ impl NetworkService { if !log::log_enabled!(log::Level::Debug) { match &result { - Ok(_) - | Err(BlocksRequestError::NoConnection) - | Err(BlocksRequestError::Request(service::BlocksRequestError::EmptyResponse)) - | Err(BlocksRequestError::Request(service::BlocksRequestError::NotVerifiable)) => {} - Err(BlocksRequestError::Request(service::BlocksRequestError::Request(err))) + Ok(_) | Err(BlocksRequestError::NoConnection) => {} + Err(BlocksRequestError::Request(service2::BlocksRequestError::Request(err))) if !err.is_protocol_error() => {} Err(err) => { log::warn!( @@ -353,16 +352,16 @@ impl NetworkService { pub async fn grandpa_warp_sync_request( self: Arc, target: PeerId, - chain_index: usize, + chain_id: service2::ChainId, begin_hash: [u8; 32], timeout: Duration, - ) -> Result { + ) -> Result { let (tx, rx) = oneshot::channel(); self.messages_tx .send(ToBackground::StartWarpSyncRequest { target: target.clone(), - chain_index, + chain_id, begin_hash, timeout, result: tx, @@ -380,7 +379,7 @@ impl NetworkService { target: "network", "Connection({}) => WarpSyncRequest(chain={}, num_fragments={}, finished={:?})", target, - self.log_chain_names[chain_index], + self.log_chain_names[&chain_id], decoded.fragments.len(), decoded.is_finished, ); @@ -390,7 +389,7 @@ impl NetworkService { target: "network", "Connection({}) => WarpSyncRequest(chain={}, error={:?})", target, - self.log_chain_names[chain_index], + self.log_chain_names[&chain_id], err, ); } @@ -401,13 +400,13 @@ impl NetworkService { pub async fn set_local_best_block( &self, - chain_index: usize, + chain_id: service2::ChainId, best_hash: [u8; 32], best_number: u64, ) { self.messages_tx .send(ToBackground::SetLocalBestBlock { - chain_index, + chain_id, best_hash, best_number, }) @@ -417,12 +416,12 @@ impl NetworkService { pub async fn set_local_grandpa_state( &self, - chain_index: usize, - grandpa_state: service::GrandpaState, + chain_id: service2::ChainId, + grandpa_state: service2::GrandpaState, ) { self.messages_tx .send(ToBackground::SetLocalGrandpaState { - chain_index, + chain_id, grandpa_state, }) .await @@ -433,17 +432,17 @@ impl NetworkService { // TODO: more docs pub async fn storage_proof_request( self: Arc, - chain_index: usize, + chain_id: service2::ChainId, target: PeerId, // TODO: takes by value because of futures longevity issue config: protocol::StorageProofRequestConfig + Clone>>, timeout: Duration, - ) -> Result { + ) -> Result { let (tx, rx) = oneshot::channel(); self.messages_tx .send(ToBackground::StartStorageProofRequest { target: target.clone(), - chain_index, + chain_id, config: protocol::StorageProofRequestConfig { block_hash: config.block_hash, keys: config @@ -467,7 +466,7 @@ impl NetworkService { target: "network", "Connection({}) => StorageProofRequest(chain={}, total_size={})", target, - self.log_chain_names[chain_index], + self.log_chain_names[&chain_id], BytesDisplay(u64::try_from(decoded.len()).unwrap()), ); } @@ -476,7 +475,7 @@ impl NetworkService { target: "network", "Connection({}) => StorageProofRequest(chain={}, error={:?})", target, - self.log_chain_names[chain_index], + self.log_chain_names[&chain_id], err ); } @@ -491,7 +490,7 @@ impl NetworkService { // TODO: more docs pub async fn call_proof_request( self: Arc, - chain_index: usize, + chain_id: service2::ChainId, target: PeerId, // TODO: takes by value because of futures longevity issue config: protocol::CallProofRequestConfig<'_, impl Iterator>>, timeout: Duration, @@ -501,7 +500,7 @@ impl NetworkService { self.messages_tx .send(ToBackground::StartCallProofRequest { target: target.clone(), - chain_index, + chain_id, config: protocol::CallProofRequestConfig { block_hash: config.block_hash, method: config.method.into_owned().into(), @@ -526,7 +525,7 @@ impl NetworkService { target: "network", "Connection({}) => CallProofRequest({}, total_size: {})", target, - self.log_chain_names[chain_index], + self.log_chain_names[&chain_id], BytesDisplay(u64::try_from(decoded.len()).unwrap()) ); } @@ -535,7 +534,7 @@ impl NetworkService { target: "network", "Connection({}) => CallProofRequest({}, {})", target, - self.log_chain_names[chain_index], + self.log_chain_names[&chain_id], err ); } @@ -555,14 +554,14 @@ impl NetworkService { /// a transaction not being received are extremely low. This can be considered as known flaw. pub async fn announce_transaction( self: Arc, - chain_index: usize, + chain_id: service2::ChainId, transaction: &[u8], ) -> Vec { let (tx, rx) = oneshot::channel(); self.messages_tx .send(ToBackground::AnnounceTransaction { - chain_index, + chain_id, transaction: transaction.to_vec(), // TODO: ovheread result: tx, }) @@ -572,11 +571,11 @@ impl NetworkService { rx.await.unwrap() } - /// See [`service::ChainNetwork::send_block_announce`]. + /// See [`service2::ChainNetwork::send_block_announce`]. pub async fn send_block_announce( self: Arc, target: &PeerId, - chain_index: usize, + chain_id: service2::ChainId, scale_encoded_header: &[u8], is_best: bool, ) -> Result<(), QueueNotificationError> { @@ -585,7 +584,7 @@ impl NetworkService { self.messages_tx .send(ToBackground::SendBlockAnnounce { target: target.clone(), // TODO: overhead - chain_index, + chain_id, scale_encoded_header: scale_encoded_header.to_vec(), // TODO: overhead is_best, result: tx, @@ -596,21 +595,21 @@ impl NetworkService { rx.await.unwrap() } - /// See [`service::ChainNetwork::discover`]. + /// See [`service2::ChainNetwork::discover`]. /// /// The `important_nodes` parameter indicates whether these nodes are considered note-worthy /// and should have additional logging. pub async fn discover( &self, now: &TPlat::Instant, - chain_index: usize, + chain_id: service2::ChainId, list: impl IntoIterator)>, important_nodes: bool, ) { self.messages_tx .send(ToBackground::Discover { now: now.clone(), // TODO: overhead - chain_index, + chain_id, // TODO: overhead list: list .into_iter() @@ -633,13 +632,13 @@ impl NetworkService { /// returned by [`NetworkService::discovered_nodes`]. pub async fn discovered_nodes( &self, - chain_index: usize, + chain_id: service2::ChainId, ) -> impl Iterator)> { let (tx, rx) = oneshot::channel(); self.messages_tx .send(ToBackground::DiscoveredNodes { - chain_index, + chain_id, result: tx, }) .await @@ -674,30 +673,30 @@ impl Drop for NetworkService { pub enum Event { Connected { peer_id: PeerId, - chain_index: usize, + chain_id: service2::ChainId, role: protocol::Role, best_block_number: u64, best_block_hash: [u8; 32], }, Disconnected { peer_id: PeerId, - chain_index: usize, + chain_id: service2::ChainId, }, BlockAnnounce { peer_id: PeerId, - chain_index: usize, - announce: service::EncodedBlockAnnounce, + chain_id: service2::ChainId, + announce: service2::EncodedBlockAnnounce, }, GrandpaNeighborPacket { peer_id: PeerId, - chain_index: usize, + chain_id: service2::ChainId, finalized_block_height: u64, }, /// Received a GrandPa commit message from the network. GrandpaCommitMessage { peer_id: PeerId, - chain_index: usize, - message: service::EncodedGrandpaCommitMessage, + chain_id: service2::ChainId, + message: service2::EncodedGrandpaCommitMessage, }, } @@ -708,7 +707,7 @@ pub enum BlocksRequestError { NoConnection, /// Error during the request. #[display(fmt = "{_0}")] - Request(service::BlocksRequestError), + Request(service2::BlocksRequestError), } /// Error returned by [`NetworkService::grandpa_warp_sync_request`]. @@ -718,7 +717,7 @@ pub enum WarpSyncRequestError { NoConnection, /// Error during the request. #[display(fmt = "{_0}")] - Request(service::GrandpaWarpSyncRequestError), + Request(service2::GrandpaWarpSyncRequestError), } /// Error returned by [`NetworkService::storage_proof_request`]. @@ -730,7 +729,7 @@ pub enum StorageProofRequestError { RequestTooLarge, /// Error during the request. #[display(fmt = "{_0}")] - Request(service::StorageProofRequestError), + Request(service2::StorageProofRequestError), } /// Error returned by [`NetworkService::call_proof_request`]. @@ -742,7 +741,7 @@ pub enum CallProofRequestError { RequestTooLarge, /// Error during the request. #[display(fmt = "{_0}")] - Request(service::CallProofRequestError), + Request(service2::CallProofRequestError), } impl CallProofRequestError { @@ -757,44 +756,15 @@ impl CallProofRequestError { } } -/// Error returned by [`NetworkService::send_block_announce`]. -#[derive(Debug, derive_more::Display)] -pub enum QueueNotificationError { - /// No established connection with the target. - NoConnection, - /// Error during the queuing. - #[display(fmt = "{_0}")] - Queue(peers::QueueNotificationError), -} - enum ToBackground { - ConnectionAttemptOkSingleStream { - pending_id: service::PendingId, - connection: TPlat::Stream, - expected_peer_id: PeerId, - multiaddr: Multiaddr, - handshake_kind: service::SingleStreamHandshakeKind, - }, - ConnectionAttemptOkMultiStream { - pending_id: service::PendingId, - connection: TPlat::MultiStream, - expected_peer_id: PeerId, - multiaddr: Multiaddr, - handshake_kind: service::MultiStreamHandshakeKind, - }, - ConnectionAttemptErr { - pending_id: service::PendingId, - expected_peer_id: PeerId, - is_bad_addr: bool, - }, ConnectionMessage { - connection_id: service::ConnectionId, - message: service::ConnectionToCoordinator, + connection_id: service2::ConnectionId, + message: service2::ConnectionToCoordinator, }, // TODO: serialize the request before sending over channel StartBlocksRequest { target: PeerId, // TODO: takes by value because of future longevity issue - chain_index: usize, + chain_id: service2::ChainId, config: protocol::BlocksRequestConfig, timeout: Duration, result: oneshot::Sender, BlocksRequestError>>, @@ -802,57 +772,57 @@ enum ToBackground { // TODO: serialize the request before sending over channel StartWarpSyncRequest { target: PeerId, - chain_index: usize, + chain_id: service2::ChainId, begin_hash: [u8; 32], timeout: Duration, result: - oneshot::Sender>, + oneshot::Sender>, }, // TODO: serialize the request before sending over channel StartStorageProofRequest { - chain_index: usize, + chain_id: service2::ChainId, target: PeerId, config: protocol::StorageProofRequestConfig>>, timeout: Duration, - result: oneshot::Sender>, + result: oneshot::Sender>, }, // TODO: serialize the request before sending over channel StartCallProofRequest { - chain_index: usize, + chain_id: service2::ChainId, target: PeerId, // TODO: takes by value because of futures longevity issue config: protocol::CallProofRequestConfig<'static, vec::IntoIter>>, timeout: Duration, - result: oneshot::Sender>, + result: oneshot::Sender>, }, SetLocalBestBlock { - chain_index: usize, + chain_id: service2::ChainId, best_hash: [u8; 32], best_number: u64, }, SetLocalGrandpaState { - chain_index: usize, - grandpa_state: service::GrandpaState, + chain_id: service2::ChainId, + grandpa_state: service2::GrandpaState, }, AnnounceTransaction { - chain_index: usize, + chain_id: service2::ChainId, transaction: Vec, result: oneshot::Sender>, }, SendBlockAnnounce { target: PeerId, - chain_index: usize, + chain_id: service2::ChainId, scale_encoded_header: Vec, is_best: bool, result: oneshot::Sender>, }, Discover { now: TPlat::Instant, - chain_index: usize, + chain_id: service2::ChainId, list: vec::IntoIter<(PeerId, vec::IntoIter)>, important_nodes: bool, }, DiscoveredNodes { - chain_index: usize, + chain_id: service2::ChainId, result: oneshot::Sender)>>, }, PeersList { @@ -870,13 +840,14 @@ struct BackgroundTask { /// Names of the various chains the network service connects to. Used only for logging /// purposes. - log_chain_names: Vec, + // TODO: add a user data to the network state machine + log_chain_names: hashbrown::HashMap, /// Channel to send messages to the background task. messages_tx: async_channel::Sender>, /// Data structure holding the entire state of the networking. - network: service::ChainNetwork, + network: service2::ChainNetwork, /// List of nodes that are considered as important for logging purposes. // TODO: should also detect whenever we fail to open a block announces substream with any of these peers @@ -899,37 +870,36 @@ struct BackgroundTask { messages_rx: async_channel::Receiver>, active_connections: HashMap< - service::ConnectionId, - async_channel::Sender>, + service2::ConnectionId, + async_channel::Sender>, fnv::FnvBuildHasher, >, blocks_requests: HashMap< - service::OutRequestId, + service2::SubstreamId, oneshot::Sender, BlocksRequestError>>, fnv::FnvBuildHasher, >, grandpa_warp_sync_requests: HashMap< - service::OutRequestId, - oneshot::Sender>, + service2::SubstreamId, + oneshot::Sender>, fnv::FnvBuildHasher, >, storage_proof_requests: HashMap< - service::OutRequestId, - oneshot::Sender>, + service2::SubstreamId, + oneshot::Sender>, fnv::FnvBuildHasher, >, call_proof_requests: HashMap< - service::OutRequestId, - oneshot::Sender>, + service2::SubstreamId, + oneshot::Sender>, fnv::FnvBuildHasher, >, - kademlia_discovery_operations: - HashMap, + kademlia_discovery_operations: HashMap, } async fn background_task(mut task: BackgroundTask) { @@ -963,20 +933,20 @@ async fn background_task(mut task: BackgroundTask) { log::debug!( target: "connections", "OutSlots({}) ∋ {}", - &task.log_chain_names[chain_index], + &task.log_chain_names[&chain_id], peer_id ); - task.network.assign_out_slot(chain_index, peer_id); + task.network.assign_out_slot(chain_id, peer_id); } } enum WhatHappened { Message(ToBackground), - NetworkEvent(service::Event), - StartConnect(service::StartConnect), + NetworkEvent(service2::Event), + StartConnect(service2::StartConnect), MessageToConnection { - connection_id: service::ConnectionId, - message: service::CoordinatorToConnection, + connection_id: service2::ConnectionId, + message: service2::CoordinatorToConnection, }, EventSendersReady, } @@ -987,7 +957,7 @@ async fn background_task(mut task: BackgroundTask) { let can_generate_event = matches!(task.event_senders, either::Left(_)); let service_event = async { if let Some(event) = if can_generate_event { - task.network.next_event(task.platform.now()) + task.network.next_event() } else { None } { @@ -1117,7 +1087,7 @@ async fn background_task(mut task: BackgroundTask) { }) => { task.network.pending_outcome_err(pending_id, is_bad_addr); for chain_index in 0..task.network.num_chains() { - task.unassign_slot_and_ban(chain_index, expected_peer_id.clone()); + task.unassign_slot_and_ban(chain_id, expected_peer_id.clone()); } continue; } @@ -1131,7 +1101,7 @@ async fn background_task(mut task: BackgroundTask) { } WhatHappened::Message(ToBackground::StartBlocksRequest { target, - chain_index, + chain_id, config, timeout, result, @@ -1147,7 +1117,7 @@ async fn background_task(mut task: BackgroundTask) { log::debug!( target: "network", "Connection({}) <= BlocksRequest(chain={}, start={}, num={}, descending={:?}, header={:?}, body={:?}, justifications={:?})", - target, task.log_chain_names[chain_index], HashDisplay(hash), + target, task.log_chain_names[&chain_id], HashDisplay(hash), config.desired_count.get(), matches!(config.direction, protocol::BlocksRequestDirection::Descending), config.fields.header, config.fields.body, config.fields.justifications @@ -1157,7 +1127,7 @@ async fn background_task(mut task: BackgroundTask) { log::debug!( target: "network", "Connection({}) <= BlocksRequest(chain={}, start=#{}, num={}, descending={:?}, header={:?}, body={:?}, justifications={:?})", - target, task.log_chain_names[chain_index], number, + target, task.log_chain_names[&chain_id], number, config.desired_count.get(), matches!(config.direction, protocol::BlocksRequestDirection::Descending), config.fields.header, config.fields.body, config.fields.justifications @@ -1168,7 +1138,7 @@ async fn background_task(mut task: BackgroundTask) { let request_id = task.network.start_blocks_request( task.platform.now(), &target, - chain_index, + chain_id, config, timeout, ); @@ -1178,130 +1148,122 @@ async fn background_task(mut task: BackgroundTask) { } WhatHappened::Message(ToBackground::StartWarpSyncRequest { target, - chain_index, + chain_id, begin_hash, timeout, result, }) => { - // The call to `start_grandpa_warp_sync_request` below panics if we have no - // active connection. - if !task.network.can_start_requests(&target) { - let _ = result.send(Err(WarpSyncRequestError::NoConnection)); - continue; - } - - log::debug!( - target: "network", "Connection({}) <= WarpSyncRequest(chain={}, start={})", - target, task.log_chain_names[chain_index], HashDisplay(&begin_hash) - ); - - let request_id = task.network.start_grandpa_warp_sync_request( + match task.network.start_grandpa_warp_sync_request( task.platform.now(), &target, - chain_index, + chain_id, begin_hash, timeout, - ); + ) { + Ok(substream_id) => { + log::debug!( + target: "network", "Connection({}) <= WarpSyncRequest(chain={}, start={})", + target, task.log_chain_names[&chain_id], HashDisplay(&begin_hash) + ); + + task.grandpa_warp_sync_requests.insert(substream_id, result); + } + Err(service2::StartRequestError::NoConnection) => { + let _ = result.send(Err(WarpSyncRequestError::NoConnection)); + } + } - task.grandpa_warp_sync_requests.insert(request_id, result); continue; } WhatHappened::Message(ToBackground::StartStorageProofRequest { - chain_index, + chain_id, target, config, timeout, result, }) => { - // The call to `start_storage_proof_request` below panics if we have no active - // connection. - if !task.network.can_start_requests(&target) { - let _ = result.send(Err(StorageProofRequestError::NoConnection)); - continue; - } - - log::debug!( - target: "network", - "Connection({}) <= StorageProofRequest(chain={}, block={})", - target, - task.log_chain_names[chain_index], - HashDisplay(&config.block_hash) - ); - - let request_id = match task.network.start_storage_proof_request( + match task.network.start_storage_proof_request( task.platform.now(), &target, - chain_index, + chain_id, config, timeout, ) { - Ok(r) => r, - Err(service::StartRequestError::RequestTooLarge) => { + Ok(substream_id) => { + log::debug!( + target: "network", + "Connection({}) <= StorageProofRequest(chain={}, block={})", + target, + task.log_chain_names[&chain_id], + HashDisplay(&config.block_hash) + ); + + task.storage_proof_requests.insert(substream_id, result); + } + Err(service2::StartRequestMaybeTooLargeError::NoConnection) => { + let _ = result.send(Err(StorageProofRequestError::NoConnection)); + } + Err(service2::StartRequestMaybeTooLargeError::RequestTooLarge) => { // TODO: consider dealing with the problem of requests too large internally by sending multiple requests let _ = result.send(Err(StorageProofRequestError::RequestTooLarge)); - continue; } }; - task.storage_proof_requests.insert(request_id, result); continue; } WhatHappened::Message(ToBackground::StartCallProofRequest { - chain_index, + chain_id, target, config, timeout, result, }) => { - // The call to `start_call_proof_request` below panics if we have no active connection. - if !task.network.can_start_requests(&target) { - let _ = result.send(Err(CallProofRequestError::NoConnection)); - continue; - } - - log::debug!( - target: "network", - "Connection({}) <= CallProofRequest({}, {}, {})", - target, - task.log_chain_names[chain_index], - HashDisplay(&config.block_hash), - config.method - ); - - let request_id = match task.network.start_call_proof_request( + match task.network.start_call_proof_request( task.platform.now(), &target, - chain_index, + chain_id, config, timeout, ) { - Ok(r) => r, - Err(service::StartRequestError::RequestTooLarge) => { + Ok(substream_id) => { + log::debug!( + target: "network", + "Connection({}) <= CallProofRequest({}, {}, {})", + target, + task.log_chain_names[&chain_id], + HashDisplay(&config.block_hash), + config.method + ); + + task.call_proof_requests.insert(substream_id, result); + } + Err(service2::StartRequestMaybeTooLargeError::NoConnection) => { + let _ = result.send(Err(CallProofRequestError::NoConnection)); + } + Err(service2::StartRequestMaybeTooLargeError::RequestTooLarge) => { let _ = result.send(Err(CallProofRequestError::RequestTooLarge)); - continue; } }; - task.call_proof_requests.insert(request_id, result); continue; } WhatHappened::Message(ToBackground::SetLocalBestBlock { - chain_index, + chain_id, best_hash, best_number, }) => { task.network - .set_local_best_block(chain_index, best_hash, best_number); + .set_chain_local_best_block(chain_id, best_hash, best_number); continue; } WhatHappened::Message(ToBackground::SetLocalGrandpaState { - chain_index, + chain_id, grandpa_state, }) => { log::debug!( target: "network", "Chain({}) <= SetLocalGrandpaState(set_id: {}, commit_finalized_height: {})", - task.log_chain_names[chain_index], + task.log_chain_names[&chain_id], grandpa_state.set_id, grandpa_state.commit_finalized_height, ); @@ -1309,11 +1271,11 @@ async fn background_task(mut task: BackgroundTask) { // TODO: log the list of peers we sent the packet to task.network - .set_local_grandpa_state(chain_index, grandpa_state); + .gossip_broadcast_grandpa_state_and_update(chain_id, grandpa_state); continue; } WhatHappened::Message(ToBackground::AnnounceTransaction { - chain_index, + chain_id, transaction, result, }) => { @@ -1324,13 +1286,13 @@ async fn background_task(mut task: BackgroundTask) { // TODO: collecting in a Vec :-/ for peer in task .network - .opened_transactions_substream(chain_index) + .gossip_connected_peers(chain_id, service2::GossipKind::ConsensusTransactions) .cloned() .collect::>() { if task .network - .announce_transaction(&peer, chain_index, &transaction) + .gossip_send_transaction(&peer, chain_id, &transaction) .is_ok() { sent_peers.push(peer); @@ -1342,28 +1304,22 @@ async fn background_task(mut task: BackgroundTask) { } WhatHappened::Message(ToBackground::SendBlockAnnounce { target, - chain_index, + chain_id, scale_encoded_header, is_best, result, }) => { - // The call to `send_block_announce` below panics if we have no active substream. - if !task.network.can_send_block_announces(&target, chain_index) { - let _ = result.send(Err(QueueNotificationError::NoConnection)); - continue; - } - - let res = task - .network - .send_block_announce(&target, chain_index, &scale_encoded_header, is_best) - .map_err(QueueNotificationError::Queue); - - let _ = result.send(res); + let _ = result.send(task.network.gossip_send_block_announce( + &target, + chain_id, + &scale_encoded_header, + is_best, + )); continue; } WhatHappened::Message(ToBackground::Discover { now, - chain_index, + chain_id, list, important_nodes, }) => { @@ -1372,18 +1328,15 @@ async fn background_task(mut task: BackgroundTask) { task.important_nodes.insert(peer_id.clone()); } - task.network.discover(&now, chain_index, peer_id, addrs); + task.network.discover(&now, chain_id, peer_id, addrs); } continue; } - WhatHappened::Message(ToBackground::DiscoveredNodes { - chain_index, - result, - }) => { + WhatHappened::Message(ToBackground::DiscoveredNodes { chain_id, result }) => { let _ = result.send( task.network - .discovered_nodes(chain_index) + .discovered_nodes(chain_id) .map(|(peer_id, addresses)| { (peer_id.clone(), addresses.cloned().collect::>()) }) @@ -1410,11 +1363,11 @@ async fn background_task(mut task: BackgroundTask) { continue; } - WhatHappened::NetworkEvent(service::Event::Connected(peer_id)) => { + WhatHappened::NetworkEvent(service2::Event::HandshakeFinished { peer_id, .. }) => { log::debug!(target: "network", "Connected({})", peer_id); continue; } - WhatHappened::NetworkEvent(service::Event::Disconnected { + WhatHappened::NetworkEvent(service2::Event::Disconnected { peer_id, chain_indices, }) => { @@ -1424,7 +1377,7 @@ async fn background_task(mut task: BackgroundTask) { if chain_indices.len() == 1 { log::debug!( target: "network", - "Connection({}, {}) => ChainDisconnected", + "Connection({}, {}) => GossipDisconnected", peer_id, &task.log_chain_names[chain_indices[0]], ); @@ -1440,8 +1393,8 @@ async fn background_task(mut task: BackgroundTask) { continue; } } - WhatHappened::NetworkEvent(service::Event::BlockAnnounce { - chain_index, + WhatHappened::NetworkEvent(service2::Event::BlockAnnounce { + chain_id, peer_id, announce, }) => { @@ -1449,141 +1402,138 @@ async fn background_task(mut task: BackgroundTask) { target: "network", "Connection({}, {}) => BlockAnnounce(best_hash={}, is_best={})", peer_id, - &task.log_chain_names[chain_index], + &task.log_chain_names[&chain_id], HashDisplay(&header::hash_from_scale_encoded_header(announce.decode().scale_encoded_header)), announce.decode().is_best ); Event::BlockAnnounce { - chain_index, + chain_id, peer_id, announce, } } - WhatHappened::NetworkEvent(service::Event::ChainConnected { + WhatHappened::NetworkEvent(service2::Event::GossipConnected { peer_id, - chain_index, + chain_id, role, best_number, best_hash, - slot_ty: _, + kind: service2::GossipKind::ConsensusTransactions, }) => { log::debug!( target: "network", "Connection({}, {}) => ChainConnected(best_height={}, best_hash={})", peer_id, - &task.log_chain_names[chain_index], + &task.log_chain_names[&chain_id], best_number, HashDisplay(&best_hash) ); Event::Connected { peer_id, - chain_index, + chain_id, role, best_block_number: best_number, best_block_hash: best_hash, } } - WhatHappened::NetworkEvent(service::Event::ChainConnectAttemptFailed { + WhatHappened::NetworkEvent(service2::Event::GossipOpenFailed { peer_id, - chain_index, - unassigned_slot_ty, + chain_id, error, + kind: service2::GossipKind::ConsensusTransactions, }) => { log::debug!( target: "network", - "Connection({}, {}) => ChainConnectAttemptFailed(error={:?})", - &task.log_chain_names[chain_index], + "Connection({}, {}) => GossipOpenFailed(error={:?})", + &task.log_chain_names[&chain_id], peer_id, error, ); log::debug!( target: "connections", "{}Slots({}) ∌ {}", match unassigned_slot_ty { - service::SlotTy::Inbound => "In", - service::SlotTy::Outbound => "Out", + service2::SlotTy::Inbound => "In", + service2::SlotTy::Outbound => "Out", }, - &task.log_chain_names[chain_index], + &task.log_chain_names[&chain_id], peer_id ); - task.unassign_slot_and_ban(chain_index, peer_id); + task.unassign_slot_and_ban(chain_id, peer_id); continue; } - WhatHappened::NetworkEvent(service::Event::ChainDisconnected { + WhatHappened::NetworkEvent(service2::Event::GossipDisconnected { peer_id, - chain_index, - unassigned_slot_ty, + chain_id, + kind: service2::GossipKind::ConsensusTransactions, }) => { log::debug!( target: "network", - "Connection({}, {}) => ChainDisconnected", + "Connection({}, {}) => GossipDisconnected", peer_id, - &task.log_chain_names[chain_index], + &task.log_chain_names[&chain_id], ); log::debug!( target: "connections", "{}Slots({}) ∌ {}", match unassigned_slot_ty { - service::SlotTy::Inbound => "In", - service::SlotTy::Outbound => "Out", + service2::SlotTy::Inbound => "In", + service2::SlotTy::Outbound => "Out", }, - &task.log_chain_names[chain_index], + &task.log_chain_names[&chain_id], peer_id ); - task.unassign_slot_and_ban(chain_index, peer_id.clone()); - Event::Disconnected { - peer_id, - chain_index, - } + task.unassign_slot_and_ban(chain_id, peer_id.clone()); + Event::Disconnected { peer_id, chain_id } } - WhatHappened::NetworkEvent(service::Event::RequestResult { - request_id, - response: service::RequestResult::Blocks(response), + WhatHappened::NetworkEvent(service2::Event::RequestResult { + substream_id, + response: service2::RequestResult::Blocks(response), }) => { let _ = task .blocks_requests - .remove(&request_id) + .remove(&substream_id) .unwrap() .send(response.map_err(BlocksRequestError::Request)); continue; } - WhatHappened::NetworkEvent(service::Event::RequestResult { - request_id, - response: service::RequestResult::GrandpaWarpSync(response), + WhatHappened::NetworkEvent(service2::Event::RequestResult { + substream_id, + response: service2::RequestResult::GrandpaWarpSync(response), }) => { let _ = task .grandpa_warp_sync_requests - .remove(&request_id) + .remove(&substream_id) .unwrap() .send(response.map_err(WarpSyncRequestError::Request)); continue; } - WhatHappened::NetworkEvent(service::Event::RequestResult { - request_id, - response: service::RequestResult::StorageProof(response), + WhatHappened::NetworkEvent(service2::Event::RequestResult { + substream_id, + response: service2::RequestResult::StorageProof(response), }) => { let _ = task .storage_proof_requests - .remove(&request_id) + .remove(&substream_id) .unwrap() .send(response.map_err(StorageProofRequestError::Request)); continue; } - WhatHappened::NetworkEvent(service::Event::RequestResult { - request_id, - response: service::RequestResult::CallProof(response), + WhatHappened::NetworkEvent(service2::Event::RequestResult { + substream_id, + response: service2::RequestResult::CallProof(response), }) => { let _ = task .call_proof_requests - .remove(&request_id) + .remove(&substream_id) .unwrap() .send(response.map_err(CallProofRequestError::Request)); continue; } - WhatHappened::NetworkEvent(service::Event::RequestResult { .. }) => { + WhatHappened::NetworkEvent(service2::Event::RequestResult { .. }) => { // We never start any other kind of requests. unreachable!() } - WhatHappened::NetworkEvent(service::Event::KademliaDiscoveryResult { + WhatHappened::NetworkEvent(service2::Event::KademliaDiscoveryResult { operation_id, result, }) => { @@ -1595,7 +1545,7 @@ async fn background_task(mut task: BackgroundTask) { Ok(nodes) => { log::debug!( target: "connections", "On chain {}, discovered: {}", - &task.log_chain_names[chain_index], + &task.log_chain_names[&chain_id], nodes.iter().map(|(p, _)| p.to_string()).join(", ") ); @@ -1617,7 +1567,7 @@ async fn background_task(mut task: BackgroundTask) { task.network.discover( &task.platform.now(), - chain_index, + chain_id, peer_id, valid_addrs, ); @@ -1636,12 +1586,12 @@ async fn background_task(mut task: BackgroundTask) { // No error is printed either if the request fails due to a benign // networking error such as an unresponsive peer. match error { - service::DiscoveryError::NoPeer => {} - service::DiscoveryError::FindNode( - service::KademliaFindNodeError::RequestFailed(err), + service2::DiscoveryError::NoPeer => {} + service2::DiscoveryError::FindNode( + service2::KademliaFindNodeError::RequestFailed(err), ) if !err.is_protocol_error() => {} - service::DiscoveryError::FindNode( - service::KademliaFindNodeError::RequestFailed( + service2::DiscoveryError::FindNode( + service2::KademliaFindNodeError::RequestFailed( peers::RequestError::Substream( connection::established::RequestError::ProtocolNotAvailable, ), @@ -1654,14 +1604,14 @@ async fn background_task(mut task: BackgroundTask) { This might indicate that the version of Substrate used by \ the chain doesn't include \ .", - &task.log_chain_names[chain_index] + &task.log_chain_names[&chain_id] ); } _ => { log::warn!( target: "connections", "Problem during discovery on {}: {}", - &task.log_chain_names[chain_index], + &task.log_chain_names[&chain_id], error ); } @@ -1671,21 +1621,35 @@ async fn background_task(mut task: BackgroundTask) { continue; } - WhatHappened::NetworkEvent(service::Event::InboundSlotAssigned { + WhatHappened::NetworkEvent(service2::Event::GossipInDesired { peer_id, - chain_index, + chain_id, + kind: service2::GossipKind::ConsensusTransactions, }) => { + // TODO: reject if too many in slots? log::debug!( target: "connections", "InSlots({}) ∋ {}", - &task.log_chain_names[chain_index], + &task.log_chain_names[&chain_id], peer_id ); + task.network + .gossip_open( + task.platform.now(), + chain_id, + &peer_id, + service2::GossipKind::ConsensusTransactions, + ) + .unwrap(); continue; } - WhatHappened::NetworkEvent(service::Event::IdentifyRequestIn { + WhatHappened::NetworkEvent(service2::Event::GossipInDesiredCancel { .. }) => { + // Can't happen as we already instantaneously accept or reject gossip in requests. + unreachable!() + } + WhatHappened::NetworkEvent(service2::Event::IdentifyRequestIn { peer_id, - request_id, + substream_id, }) => { log::debug!( target: "network", @@ -1693,16 +1657,16 @@ async fn background_task(mut task: BackgroundTask) { peer_id, ); task.network - .respond_identify(request_id, &task.identify_agent_version); + .respond_identify(substream_id, &task.identify_agent_version); continue; } - WhatHappened::NetworkEvent(service::Event::BlocksRequestIn { .. }) => unreachable!(), - WhatHappened::NetworkEvent(service::Event::RequestInCancel { .. }) => { + WhatHappened::NetworkEvent(service2::Event::BlocksRequestIn { .. }) => unreachable!(), + WhatHappened::NetworkEvent(service2::Event::RequestInCancel { .. }) => { // All incoming requests are immediately answered. unreachable!() } - WhatHappened::NetworkEvent(service::Event::GrandpaNeighborPacket { - chain_index, + WhatHappened::NetworkEvent(service2::Event::GrandpaNeighborPacket { + chain_id, peer_id, state, }) => { @@ -1710,19 +1674,19 @@ async fn background_task(mut task: BackgroundTask) { target: "network", "Connection({}, {}) => GrandpaNeighborPacket(round_number={}, set_id={}, commit_finalized_height={})", peer_id, - &task.log_chain_names[chain_index], + &task.log_chain_names[&chain_id], state.round_number, state.set_id, state.commit_finalized_height, ); Event::GrandpaNeighborPacket { - chain_index, + chain_id, peer_id, finalized_block_height: state.commit_finalized_height, } } - WhatHappened::NetworkEvent(service::Event::GrandpaCommitMessage { - chain_index, + WhatHappened::NetworkEvent(service2::Event::GrandpaCommitMessage { + chain_id, peer_id, message, }) => { @@ -1730,16 +1694,16 @@ async fn background_task(mut task: BackgroundTask) { target: "network", "Connection({}, {}) => GrandpaCommitMessage(target_block_hash={})", peer_id, - &task.log_chain_names[chain_index], + &task.log_chain_names[&chain_id], HashDisplay(message.decode().message.target_hash), ); Event::GrandpaCommitMessage { - chain_index, + chain_id, peer_id, message, } } - WhatHappened::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => { + WhatHappened::NetworkEvent(service2::Event::ProtocolError { peer_id, error }) => { // TODO: handle properly? log::warn!( target: "network", @@ -1749,7 +1713,7 @@ async fn background_task(mut task: BackgroundTask) { ); for chain_index in 0..task.network.num_chains() { - task.unassign_slot_and_ban(chain_index, peer_id.clone()); + task.unassign_slot_and_ban(chain_id, peer_id.clone()); } continue; } @@ -1827,8 +1791,8 @@ async fn background_task(mut task: BackgroundTask) { } impl BackgroundTask { - fn unassign_slot_and_ban(&mut self, chain_index: usize, peer_id: PeerId) { - self.network.unassign_slot(chain_index, &peer_id); + fn unassign_slot_and_ban(&mut self, chain_id: service2::ChainId, peer_id: PeerId) { + self.network.unassign_slot(chain_id, &peer_id); let new_expiration = self.platform.now() + Duration::from_secs(20); // TODO: arbitrary constant match self.slots_assign_backoff.entry((peer_id, chain_index)) { diff --git a/light-base/src/network_service/tasks.rs b/light-base/src/network_service/tasks.rs index 0b35249c81..36aa057b5f 100644 --- a/light-base/src/network_service/tasks.rs +++ b/light-base/src/network_service/tasks.rs @@ -24,12 +24,12 @@ use alloc::{boxed::Box, string::String}; use core::{pin, time::Duration}; use futures_lite::FutureExt as _; use futures_util::{future, stream::FuturesUnordered, FutureExt as _, StreamExt as _}; -use smoldot::{libp2p::collection::SubstreamFate, network::service}; +use smoldot::{libp2p::collection::SubstreamFate, network::service2}; /// Asynchronous task managing a specific connection, including the connection process and the /// processing of the connection after it's been open. pub(super) async fn connection_task( - start_connect: service::StartConnect, + start_connect: service2::StartConnect, platform: TPlat, messages_tx: async_channel::Sender>, is_important: bool, @@ -143,7 +143,10 @@ pub(super) async fn connection_task( connection, expected_peer_id: start_connect.expected_peer_id, multiaddr: start_connect.multiaddr, - handshake_kind: service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux, + handshake_kind: + service2::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux { + is_initiator: true, + }, }) .await; } @@ -167,7 +170,8 @@ pub(super) async fn connection_task( connection, expected_peer_id: start_connect.expected_peer_id, multiaddr: start_connect.multiaddr, - handshake_kind: service::MultiStreamHandshakeKind::WebRtc { + handshake_kind: service2::MultiStreamHandshakeKind::WebRtc { + is_initiator: true, local_tls_certificate_multihash, remote_tls_certificate_multihash, }, @@ -182,10 +186,10 @@ pub(super) async fn single_stream_connection_task( mut socket: TPlat::Stream, address: String, platform: TPlat, - connection_id: service::ConnectionId, - mut connection_task: service::SingleStreamConnectionTask, + connection_id: service2::ConnectionId, + mut connection_task: service2::SingleStreamConnectionTask, mut coordinator_to_connection: async_channel::Receiver< - service::CoordinatorToConnection, + service2::CoordinatorToConnection, >, connection_to_coordinator: async_channel::Sender>, ) { @@ -259,7 +263,7 @@ pub(super) async fn single_stream_connection_task( // Now wait for something interesting to happen before looping again. enum WhatHappened { - CoordinatorMessage(service::CoordinatorToConnection), + CoordinatorMessage(service2::CoordinatorToConnection), CoordinatorDead, SocketEvent, MessageSent, @@ -331,10 +335,10 @@ pub(super) async fn webrtc_multi_stream_connection_task( mut connection: TPlat::MultiStream, address: String, platform: TPlat, - connection_id: service::ConnectionId, - mut connection_task: service::MultiStreamConnectionTask, + connection_id: service2::ConnectionId, + mut connection_task: service2::MultiStreamConnectionTask, mut coordinator_to_connection: async_channel::Receiver< - service::CoordinatorToConnection, + service2::CoordinatorToConnection, >, connection_to_coordinator: async_channel::Sender>, ) { @@ -365,7 +369,7 @@ pub(super) async fn webrtc_multi_stream_connection_task( // Now wait for something interesting to happen before looping again. enum WhatHappened { - CoordinatorMessage(service::CoordinatorToConnection), + CoordinatorMessage(service2::CoordinatorToConnection), CoordinatorDead, SocketEvent(pin::Pin>, usize), MessageSent,