From 30a07e4d1d99f834591eaec649c1274521453ca3 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 19 Jun 2023 12:27:17 +0200 Subject: [PATCH 1/3] Do not send more than one request per peer at a time --- Cargo.lock | 1 + lib/src/sync/all.rs | 1 + light-base/Cargo.toml | 1 + light-base/src/network_service.rs | 173 ++++++++++++++++++---- light-base/src/sync_service.rs | 149 +++++++++++++------ light-base/src/sync_service/standalone.rs | 51 ++++--- wasm-node/CHANGELOG.md | 4 + 7 files changed, 290 insertions(+), 90 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c20a00118c..f43f3c057b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2418,6 +2418,7 @@ version = "0.6.0" dependencies = [ "async-lock", "blake2-rfc", + "crossbeam-queue", "derive_more", "either", "env_logger", diff --git a/lib/src/sync/all.rs b/lib/src/sync/all.rs index 45f0b0967d..48c23347cb 100644 --- a/lib/src/sync/all.rs +++ b/lib/src/sync/all.rs @@ -672,6 +672,7 @@ impl AllSync { /// /// Panics if the [`SourceId`] is invalid. /// + // TODO: this function is questionable, because in practice we send requests to sources that are outside the scope of syncing pub fn source_num_ongoing_requests(&self, source_id: SourceId) -> usize { debug_assert!(self.shared.sources.contains(source_id.0)); diff --git a/light-base/Cargo.toml b/light-base/Cargo.toml index a0249f4e0c..78d3d6042c 100644 --- a/light-base/Cargo.toml +++ b/light-base/Cargo.toml @@ -14,6 +14,7 @@ required-features = ["std"] [dependencies] async-lock = { version = "2.7.0", default-features = false } # TODO: no-std-ize; this is has been done and is just waiting for a release: https://github.com/smol-rs/event-listener/pull/34 blake2-rfc = { version = "0.2.18", default-features = false } +crossbeam-queue = { version = "0.3.8", default-features = false, features = ["alloc"] } derive_more = "0.99.17" either = { version = "1.8.1", default-features = false } event-listener = { version = "2.5.3" } diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index 74036126ac..5a724d3107 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -143,6 +143,13 @@ struct Shared { /// if the event is notified while the background task is already awake, the background task /// will do an additional loop. wake_up_main_background_task: event_listener::Event, + + /// Whenever a request to a peer finished or is aborted, an element is pushed to this queue. + /// The queue is later processed in order to update [`SharedGuarded::peer_requests_locks`]. + peer_requests_unlocks: crossbeam_queue::SegQueue, + + /// Event to notify when an element is pushed onto [`Shared::peer_requests_unlocks`]. + peer_requests_unlocks_pushed: event_listener::Event, } struct SharedGuarded { @@ -153,6 +160,9 @@ struct SharedGuarded { // TODO: should also detect whenever we fail to open a block announces substream with any of these peers important_nodes: HashSet, + /// List of peers for which a request lock has been grabbed. + peer_requests_locks: HashSet, + /// List of peer and chain index tuples for which no outbound slot should be assigned. /// /// The values are the moment when the ban expires. @@ -262,6 +272,7 @@ impl NetworkService { util::SipHasherBuild::new(rand::random()), ), important_nodes: HashSet::with_capacity_and_hasher(16, Default::default()), + peer_requests_locks: HashSet::with_capacity_and_hasher(16, Default::default()), active_connections: HashMap::with_capacity_and_hasher(32, Default::default()), messages_from_connections_tx, messages_from_connections_rx, @@ -281,6 +292,8 @@ impl NetworkService { identify_agent_version: config.identify_agent_version, log_chain_names, wake_up_main_background_task: event_listener::Event::new(), + peer_requests_unlocks: crossbeam_queue::SegQueue::new(), + peer_requests_unlocks_pushed: event_listener::Event::new(), }); // Spawn main task that processes the network service. @@ -355,11 +368,90 @@ impl NetworkService { (final_network_service, event_receivers) } + /// Try to grab a so-called "request lock" associated with the given `PeerId`. While the lock + /// is active, no other "request lock" towards the same `PeerId` can be acquired. The lock + /// can then be used to actually start a request. + /// + /// This function does **not** check whether there exists a connection with the given peer. + /// In other words, it is possible to successfully grab a lock only for the request to fail + /// because we aren't connected to the given peer. The reason for this design is that it is + /// possible for the peer disconnect between the lock being grabbed and the request starting, + /// and as such there's no benefit in performing the verification twice. + pub async fn try_lock_peer_for_request( + self: Arc, + target: PeerId, + ) -> Option> { + let mut guarded = self.shared.guarded.lock().await; + + while let Some(unlocked_peer) = self.shared.peer_requests_unlocks.pop() { + let _was_in = guarded.peer_requests_locks.remove(&unlocked_peer); + debug_assert!(_was_in); + } + + if guarded.peer_requests_locks.contains(&target) { + return None; + } + + guarded.peer_requests_locks.insert(target.clone()); + drop(guarded); + + Some(PeerRequestLock { + service: self, + peer_id: target, + }) + } + + /// Grabs a so-called "request lock" associated with any of the `PeerId` provided. While the + /// lock is active, no other "request lock" towards the same `PeerId` can be acquired. The + /// lock can then be used to actually start a request. + /// + /// This function waits until one of the peers can be locked and locks it. If multiple peers + /// are ready, one is chosen at random. + /// + /// See also [`NetworkService::try_lock_peer_for_request`]. + // TODO: better API for the list? + pub async fn lock_any_for_request( + self: Arc, + list: hashbrown::HashSet, + ) -> PeerRequestLock { + let mut on_pushed = None::; + + loop { + if let Some(on_pushed) = on_pushed.take() { + on_pushed.await; + } + + let mut guarded = self.shared.guarded.lock().await; + + on_pushed = Some(self.shared.peer_requests_unlocks_pushed.listen()); + + while let Some(unlocked_peer) = self.shared.peer_requests_unlocks.pop() { + let _was_in = guarded.peer_requests_locks.remove(&unlocked_peer); + debug_assert!(_was_in); + } + + if let Some(peer_id) = rand::seq::IteratorRandom::choose( + list.difference(&guarded.peer_requests_locks), + &mut rand::thread_rng(), + ) + .cloned() + { + guarded.peer_requests_locks.insert(peer_id.clone()); + drop(guarded); + + break PeerRequestLock { + service: self, + peer_id, + }; + } + } + } + /// Sends a blocks request to the given peer. // TODO: more docs pub async fn blocks_request( self: Arc, - target: PeerId, // TODO: takes by value because of future longevity issue + request_lock: PeerRequestLock, chain_index: usize, config: protocol::BlocksRequestConfig, timeout: Duration, @@ -368,7 +460,7 @@ impl NetworkService { let mut guarded = self.shared.guarded.lock().await; // The call to `start_blocks_request` below panics if we have no active connection. - if !guarded.network.can_start_requests(&target) { + if !guarded.network.can_start_requests(&request_lock.peer_id) { return Err(BlocksRequestError::NoConnection); } @@ -377,7 +469,7 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) <= BlocksRequest(chain={}, start={}, num={}, descending={:?}, header={:?}, body={:?}, justifications={:?})", - target, self.shared.log_chain_names[chain_index], HashDisplay(hash), + request_lock.peer_id, self.shared.log_chain_names[chain_index], HashDisplay(hash), config.desired_count.get(), matches!(config.direction, protocol::BlocksRequestDirection::Descending), config.fields.header, config.fields.body, config.fields.justifications @@ -387,7 +479,7 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) <= BlocksRequest(chain={}, start=#{}, num={}, descending={:?}, header={:?}, body={:?}, justifications={:?})", - target, self.shared.log_chain_names[chain_index], number, + request_lock.peer_id, self.shared.log_chain_names[chain_index], number, config.desired_count.get(), matches!(config.direction, protocol::BlocksRequestDirection::Descending), config.fields.header, config.fields.body, config.fields.justifications @@ -397,7 +489,7 @@ impl NetworkService { let request_id = guarded.network.start_blocks_request( self.shared.platform.now(), - &target, + &request_lock.peer_id, chain_index, config, timeout, @@ -417,7 +509,7 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) => BlocksRequest(chain={}, num_blocks={}, block_data_total_size={})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], blocks.len(), BytesDisplay(blocks.iter().fold(0, |sum, block| { @@ -432,7 +524,7 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) => BlocksRequest(chain={}, error={:?})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], err ); @@ -449,13 +541,15 @@ impl NetworkService { log::warn!( target: "network", "Error in block request with {}. This might indicate an incompatibility. Error: {}", - target, + request_lock.peer_id, err ); } } } + drop(request_lock); + result.map_err(BlocksRequestError::Request) } @@ -463,7 +557,7 @@ impl NetworkService { // TODO: more docs pub async fn grandpa_warp_sync_request( self: Arc, - target: PeerId, // TODO: takes by value because of future longevity issue + request_lock: PeerRequestLock, chain_index: usize, begin_hash: [u8; 32], timeout: Duration, @@ -473,18 +567,18 @@ impl NetworkService { // The call to `start_grandpa_warp_sync_request` below panics if we have no // active connection. - if !guarded.network.can_start_requests(&target) { + if !guarded.network.can_start_requests(&request_lock.peer_id) { return Err(GrandpaWarpSyncRequestError::NoConnection); } log::debug!( target: "network", "Connection({}) <= GrandpaWarpSyncRequest(chain={}, start={})", - target, self.shared.log_chain_names[chain_index], HashDisplay(&begin_hash) + request_lock.peer_id, self.shared.log_chain_names[chain_index], HashDisplay(&begin_hash) ); let request_id = guarded.network.start_grandpa_warp_sync_request( self.shared.platform.now(), - &target, + &request_lock.peer_id, chain_index, begin_hash, timeout, @@ -506,7 +600,7 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) => GrandpaWarpSyncRequest(chain={}, num_fragments={}, finished={:?})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], decoded.fragments.len(), decoded.is_finished, @@ -516,13 +610,15 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) => GrandpaWarpSyncRequest(chain={}, error={:?})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], err, ); } } + drop(request_lock); + result.map_err(GrandpaWarpSyncRequestError::Request) } @@ -568,7 +664,7 @@ impl NetworkService { pub async fn storage_proof_request( self: Arc, chain_index: usize, - target: PeerId, // TODO: takes by value because of futures longevity issue + request_lock: PeerRequestLock, config: protocol::StorageProofRequestConfig + Clone>>, timeout: Duration, ) -> Result { @@ -577,21 +673,21 @@ impl NetworkService { // The call to `start_storage_proof_request` below panics if we have no active // connection. - if !guarded.network.can_start_requests(&target) { + if !guarded.network.can_start_requests(&request_lock.peer_id) { return Err(StorageProofRequestError::NoConnection); } log::debug!( target: "network", "Connection({}) <= StorageProofRequest(chain={}, block={})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], HashDisplay(&config.block_hash) ); let request_id = match guarded.network.start_storage_proof_request( self.shared.platform.now(), - &target, + &request_lock.peer_id, chain_index, config, timeout, @@ -618,7 +714,7 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) => StorageProofRequest(chain={}, total_size={})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], BytesDisplay(u64::try_from(decoded.len()).unwrap()), ); @@ -627,13 +723,15 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) => StorageProofRequest(chain={}, error={:?})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], err ); } } + drop(request_lock); + result.map_err(StorageProofRequestError::Request) } @@ -644,7 +742,7 @@ impl NetworkService { pub async fn call_proof_request( self: Arc, chain_index: usize, - target: PeerId, // TODO: takes by value because of futures longevity issue + request_lock: PeerRequestLock, config: protocol::CallProofRequestConfig<'_, impl Iterator>>, timeout: Duration, ) -> Result { @@ -652,14 +750,14 @@ impl NetworkService { let mut guarded = self.shared.guarded.lock().await; // The call to `start_call_proof_request` below panics if we have no active connection. - if !guarded.network.can_start_requests(&target) { + if !guarded.network.can_start_requests(&request_lock.peer_id) { return Err(CallProofRequestError::NoConnection); } log::debug!( target: "network", "Connection({}) <= CallProofRequest({}, {}, {})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], HashDisplay(&config.block_hash), config.method @@ -667,7 +765,7 @@ impl NetworkService { let request_id = match guarded.network.start_call_proof_request( self.shared.platform.now(), - &target, + &request_lock.peer_id, chain_index, config, timeout, @@ -693,7 +791,7 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) => CallProofRequest({}, total_size: {})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], BytesDisplay(u64::try_from(decoded.len()).unwrap()) ); @@ -702,13 +800,15 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) => CallProofRequest({}, {})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], err ); } } + drop(request_lock); + result.map_err(CallProofRequestError::Request) } @@ -884,6 +984,27 @@ pub enum Event { }, } +/// Active lock preventing other requests towards the same peer from being started. +/// +/// See [`NetworkService::try_lock_peer_for_request`]. +pub struct PeerRequestLock { + service: Arc>, + peer_id: PeerId, +} + +impl Drop for PeerRequestLock { + fn drop(&mut self) { + self.service + .shared + .peer_requests_unlocks + .push(self.peer_id.clone()); + self.service + .shared + .peer_requests_unlocks_pushed + .notify(usize::max_value()); + } +} + /// Error returned by [`NetworkService::blocks_request`]. #[derive(Debug, derive_more::Display)] pub enum BlocksRequestError { diff --git a/light-base/src/sync_service.rs b/light-base/src/sync_service.rs index b69d5a5c21..6043f341b4 100644 --- a/light-base/src/sync_service.rs +++ b/light-base/src/sync_service.rs @@ -268,7 +268,7 @@ impl SyncService { &self, block_number: u64, block_hash: &[u8; 32], - ) -> impl Iterator { + ) -> Vec { let (send_back, rx) = oneshot::channel(); self.to_background @@ -282,7 +282,7 @@ impl SyncService { .await .unwrap(); - rx.await.unwrap().into_iter() + rx.await.unwrap() } // TODO: doc; explain the guarantees @@ -291,7 +291,7 @@ impl SyncService { block_number: u64, hash: [u8; 32], fields: protocol::BlocksRequestFields, - total_attempts: u32, + mut total_attempts: u32, timeout_per_request: Duration, _max_parallel: NonZeroU32, ) -> Result { @@ -304,17 +304,29 @@ impl SyncService { }; // TODO: handle max_parallel - // TODO: better peers selection ; don't just take the first 3 - for target in self - .peers_assumed_know_blocks(block_number, &hash) - .await - .take(usize::try_from(total_attempts).unwrap_or(usize::max_value())) - { + // TODO: shuffle peers ; don't just take the first + // TODO: slowly increase number of peers dynamically if no peer is immediately available + loop { + if total_attempts == 0 { + break; + } + + let peers_assumed_know_blocks = + self.peers_assumed_know_blocks(block_number, &hash).await; + + let request_lock = self + .network_service + .clone() + .lock_any_for_request(peers_assumed_know_blocks.into_iter().collect()) + .await; + + total_attempts -= 1; + let mut result = match self .network_service .clone() .blocks_request( - target, + request_lock, self.network_chain_index, request_config.clone(), timeout_per_request, @@ -336,7 +348,7 @@ impl SyncService { self: Arc, hash: [u8; 32], fields: protocol::BlocksRequestFields, - total_attempts: u32, + mut total_attempts: u32, timeout_per_request: Duration, _max_parallel: NonZeroU32, ) -> Result { @@ -349,18 +361,28 @@ impl SyncService { }; // TODO: handle max_parallel - // TODO: better peers selection ; don't just take the first - for target in self - .network_service - .peers_list() - .await - .take(usize::try_from(total_attempts).unwrap_or(usize::max_value())) - { + // TODO: shuffle peers ; don't just take the first + // TODO: slowly increase number of peers dynamically if no peer is immediately available + loop { + if total_attempts == 0 { + break; + } + + let peers_list = self.network_service.peers_list().await; + + let request_lock = self + .network_service + .clone() + .lock_any_for_request(peers_list.collect()) + .await; + + total_attempts -= 1; + let mut result = match self .network_service .clone() .blocks_request( - target, + request_lock, self.network_chain_index, request_config.clone(), timeout_per_request, @@ -399,26 +421,39 @@ impl SyncService { block_hash: &[u8; 32], storage_trie_root: &[u8; 32], requested_keys: impl Iterator + Clone> + Clone, - total_attempts: u32, + mut total_attempts: u32, timeout_per_request: Duration, _max_parallel: NonZeroU32, ) -> Result>>, StorageQueryError> { let mut outcome_errors = Vec::with_capacity(usize::try_from(total_attempts).unwrap_or(usize::max_value())); - // TODO: better peers selection ; don't just take the first // TODO: handle max_parallel - for target in self - .peers_assumed_know_blocks(block_number, block_hash) - .await - .take(usize::try_from(total_attempts).unwrap_or(usize::max_value())) - { + // TODO: shuffle peers ; don't just take the first + // TODO: slowly increase number of peers dynamically if no peer is immediately available + loop { + if total_attempts == 0 { + break; + } + + let peers_assumed_know_blocks = self + .peers_assumed_know_blocks(block_number, block_hash) + .await; + + let request_lock = self + .network_service + .clone() + .lock_any_for_request(peers_assumed_know_blocks.into_iter().collect()) + .await; + + total_attempts -= 1; + let result = self .network_service .clone() .storage_proof_request( self.network_chain_index, - target, + request_lock, protocol::StorageProofRequestConfig { block_hash: *block_hash, keys: requested_keys.clone(), @@ -466,7 +501,7 @@ impl SyncService { block_hash: &[u8; 32], prefix: &[u8], storage_trie_root: &[u8; 32], - total_attempts: u32, + mut total_attempts: u32, timeout_per_request: Duration, _max_parallel: NonZeroU32, ) -> Result>, StorageQueryError> { @@ -479,20 +514,33 @@ impl SyncService { let mut outcome_errors = Vec::with_capacity(usize::try_from(total_attempts).unwrap_or(usize::max_value())); - // TODO: better peers selection ; don't just take the first // TODO: handle max_parallel - // TODO: is the number of keys is large, split into multiple requests - for target in self - .peers_assumed_know_blocks(block_number, block_hash) - .await - .take(usize::try_from(total_attempts).unwrap_or(usize::max_value())) - { + // TODO: shuffle peers ; don't just take the first + // TODO: slowly increase number of peers dynamically if no peer is immediately available + // TODO: if the number of keys is large, split into multiple requests + loop { + if total_attempts == 0 { + break; + } + + let peers_assumed_know_blocks = self + .peers_assumed_know_blocks(block_number, block_hash) + .await; + + let request_lock = self + .network_service + .clone() + .lock_any_for_request(peers_assumed_know_blocks.into_iter().collect()) + .await; + + total_attempts -= 1; + let result = self .network_service .clone() .storage_proof_request( self.network_chain_index, - target, + request_lock, protocol::StorageProofRequestConfig { block_hash: *block_hash, keys: prefix_scan.requested_keys().map(|nibbles| { @@ -549,26 +597,39 @@ impl SyncService { '_, impl Iterator> + Clone, >, - total_attempts: u32, + mut total_attempts: u32, timeout_per_request: Duration, _max_parallel: NonZeroU32, ) -> Result { let mut outcome_errors = Vec::with_capacity(usize::try_from(total_attempts).unwrap_or(usize::max_value())); - // TODO: better peers selection ; don't just take the first // TODO: handle max_parallel - for target in self - .peers_assumed_know_blocks(block_number, &config.block_hash) - .await - .take(usize::try_from(total_attempts).unwrap_or(usize::max_value())) - { + // TODO: shuffle peers ; don't just take the first + // TODO: slowly increase number of peers dynamically if no peer is immediately available + loop { + if total_attempts == 0 { + break; + } + + let peers_assumed_know_blocks = self + .peers_assumed_know_blocks(block_number, &config.block_hash) + .await; + + let request_lock = self + .network_service + .clone() + .lock_any_for_request(peers_assumed_know_blocks.into_iter().collect()) + .await; + + total_attempts -= 1; + let result = self .network_service .clone() .call_proof_request( self.network_chain_index, - target, + request_lock, config.clone(), timeout_per_request, ) diff --git a/light-base/src/sync_service/standalone.rs b/light-base/src/sync_service/standalone.rs index 7cc1b55905..553daa6f73 100644 --- a/light-base/src/sync_service/standalone.rs +++ b/light-base/src/sync_service/standalone.rs @@ -121,7 +121,7 @@ pub(super) async fn start_standalone_chain( // Start a networking request (block requests, warp sync requests, etc.) that the // syncing state machine would like to start. - if task.start_next_request() { + if task.start_next_request().await { queue_empty = false; } @@ -393,18 +393,36 @@ impl Task { /// Starts one network request if any is necessary. /// /// Returns `true` if a request has been started. - fn start_next_request(&mut self) -> bool { + async fn start_next_request(&mut self) -> bool { // `desired_requests()` returns, in decreasing order of priority, the requests // that should be started in order for the syncing to proceed. The fact that multiple // requests are returned could be used to filter out undesired one. We use this // filtering to enforce a maximum of one ongoing request per source. - let (source_id, _, mut request_detail) = match self - .sync - .desired_requests() - .find(|(source_id, _, _)| self.sync.source_num_ongoing_requests(*source_id) == 0) - { - Some(v) => v, - None => return false, + let (source_id, request_lock, mut request_detail) = { + // We need to collect the desired requests, as we can't keep the iterator alive + // during an await point. + let mut desired_requests_iter = self + .sync + .desired_requests() + .map(|(src_id, _, details)| (src_id, details)) + .collect::>() + .into_iter(); + + loop { + let Some((source_id, request_details)) = desired_requests_iter.next() + // TODO: if no peer can be locked because they're all busy with something else, the syncing will not wake up once they are + else { return false }; + + let peer_id = self.sync[source_id].0.clone(); + if let Some(request_lock) = self + .network_service + .clone() + .try_lock_peer_for_request(peer_id) + .await + { + break (source_id, request_lock, request_details); + } + } }; // Before inserting the request back to the syncing state machine, clamp the number @@ -424,10 +442,8 @@ impl Task { request_bodies, request_justification, } => { - let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue - let block_request = self.network_service.clone().blocks_request( - peer_id, + request_lock, self.network_chain_index, network::protocol::BlocksRequestConfig { start: if let Some(first_block_hash) = first_block_hash { @@ -467,10 +483,8 @@ impl Task { all::DesiredRequest::GrandpaWarpSync { sync_start_block_hash, } => { - let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue - let grandpa_request = self.network_service.clone().grandpa_warp_sync_request( - peer_id, + request_lock, self.network_chain_index, sync_start_block_hash, // The timeout needs to be long enough to potentially download the maximum @@ -501,11 +515,9 @@ impl Task { ref keys, .. } => { - let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue - let storage_request = self.network_service.clone().storage_proof_request( self.network_chain_index, - peer_id, + request_lock, network::protocol::StorageProofRequestConfig { block_hash, keys: keys.clone().into_iter(), @@ -543,7 +555,6 @@ impl Task { ref function_name, ref parameter_vectored, } => { - let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue let network_service = self.network_service.clone(); let network_chain_index = self.network_chain_index; // TODO: all this copying is done because of lifetime requirements in NetworkService::call_proof_request; maybe check if it can be avoided @@ -553,7 +564,7 @@ impl Task { let call_proof_request = async move { let rq = network_service.call_proof_request( network_chain_index, - peer_id, + request_lock, network::protocol::CallProofRequestConfig { block_hash, method: &function_name, diff --git a/wasm-node/CHANGELOG.md b/wasm-node/CHANGELOG.md index 12c6070968..0e57732a6f 100644 --- a/wasm-node/CHANGELOG.md +++ b/wasm-node/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +### Changed + +- A limit of one simultaneous request per peer is now enforced in order to limit the load that a light client induces on each full node it is connected to. This limit is theoretically part of the Polkadot networking protocol but in practice isn't properly enforced. Consequently, requests that were previously being executed in parallel might now execute more one after the other. + ## 1.0.10 - 2023-06-19 ### Changed From 8dc44922d0780cd59bb64c421050200d632caf3f Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 19 Jun 2023 16:42:46 +0200 Subject: [PATCH 2/3] PR link --- wasm-node/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wasm-node/CHANGELOG.md b/wasm-node/CHANGELOG.md index 0e57732a6f..2304358cf6 100644 --- a/wasm-node/CHANGELOG.md +++ b/wasm-node/CHANGELOG.md @@ -4,7 +4,7 @@ ### Changed -- A limit of one simultaneous request per peer is now enforced in order to limit the load that a light client induces on each full node it is connected to. This limit is theoretically part of the Polkadot networking protocol but in practice isn't properly enforced. Consequently, requests that were previously being executed in parallel might now execute more one after the other. +- A limit of one simultaneous request per peer is now enforced in order to limit the load that a light client induces on each full node it is connected to. This limit is theoretically part of the Polkadot networking protocol but in practice isn't properly enforced. Consequently, requests that were previously being executed in parallel might now execute more one after the other. ([#779](https://github.com/smol-dot/smoldot/pull/779)) ## 1.0.10 - 2023-06-19 From 048f9cfcb26b651dcbeaa150f0733a7b9ca412f0 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 10 Jul 2023 19:00:14 +0200 Subject: [PATCH 3/3] Fix warning --- light-base/src/sync_service.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/light-base/src/sync_service.rs b/light-base/src/sync_service.rs index f6abb3e823..7f1e19c10e 100644 --- a/light-base/src/sync_service.rs +++ b/light-base/src/sync_service.rs @@ -33,7 +33,6 @@ use async_lock::Mutex; use core::{fmt, mem, num::NonZeroU32, time::Duration}; use futures_channel::{mpsc, oneshot}; use futures_util::{stream, SinkExt as _}; -use rand::seq::IteratorRandom as _; use smoldot::{ chain, executor::host,