From 035eff8232ac7a2628d24a8b821115efd5ccf8de Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Sat, 4 Jan 2025 12:48:24 +0200 Subject: [PATCH 1/2] . --- .../src/peer_manager/network_state/mod.rs | 92 +++++++++++++++++++ .../src/peer_manager/peer_manager_actor.rs | 27 ++++-- 2 files changed, 109 insertions(+), 10 deletions(-) diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index 90370ff2816..cda45d4d4b9 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -35,6 +35,7 @@ use crate::types::{ ChainInfo, PeerManagerSenderForNetwork, PeerType, ReasonForBan, StatePartRequestBody, Tier3Request, Tier3RequestBody, }; +use actix::ArbiterHandle; use anyhow::Context; use arc_swap::ArcSwap; use near_async::messaging::{CanSend, SendAsync, Sender}; @@ -698,6 +699,97 @@ impl NetworkState { success } + /// Send message to specific account. + /// Return whether the message is sent or not. + /// The message might be sent over TIER1 or TIER2 connection depending on the message type. + pub fn send_message_to_account_with_arbiter( + self: &Arc, + clock: &time::Clock, + account_id: &AccountId, + msg: RoutedMessageBody, + arbiter: ArbiterHandle, + ) -> bool { + // If the message is allowed to be sent to self, we handle it directly. + if self.config.validator.account_id().is_some_and(|id| &id == account_id) { + // For now, we don't allow some types of messages to be sent to self. + debug_assert!(msg.allow_sending_to_self()); + let this = self.clone(); + let clock = clock.clone(); + let peer_id = self.config.node_id(); + let msg = self.sign_message( + &clock, + RawRoutedMessage { target: PeerIdOrHash::PeerId(peer_id.clone()), body: msg }, + ); + arbiter.spawn(async move { + this.receive_routed_message(&clock, peer_id, msg.hash(), msg.msg.body).await; + }); + return true; + } + + let accounts_data = self.accounts_data.load(); + if tcp::Tier::T1.is_allowed_routed(&msg) { + for key in accounts_data.keys_by_id.get(account_id).iter().flat_map(|keys| keys.iter()) + { + let data = match accounts_data.data.get(key) { + Some(data) => data, + None => continue, + }; + let conn = match self.get_tier1_proxy(data) { + Some(conn) => conn, + None => continue, + }; + // TODO(gprusak): in case of PartialEncodedChunk, consider stripping everything + // but the header. This will bound the message size + conn.send_message(Arc::new(PeerMessage::Routed(self.sign_message( + clock, + RawRoutedMessage { + target: PeerIdOrHash::PeerId(data.peer_id.clone()), + body: msg, + }, + )))); + return true; + } + } + + let peer_id_from_account_data = accounts_data + .keys_by_id + .get(account_id) + .iter() + .flat_map(|keys| keys.iter()) + .flat_map(|key| accounts_data.data.get(key)) + .next() + .map(|data| data.peer_id.clone()); + // Find the target peer_id: + // - first look it up in self.accounts_data + // - if missing, fall back to lookup in self.graph.routing_table + // We want to deprecate self.graph.routing_table.account_owner in the next release. + let target = if let Some(peer_id) = peer_id_from_account_data { + metrics::ACCOUNT_TO_PEER_LOOKUPS.with_label_values(&["AccountData"]).inc(); + peer_id + } else if let Some(peer_id) = self.account_announcements.get_account_owner(account_id) { + metrics::ACCOUNT_TO_PEER_LOOKUPS.with_label_values(&["AnnounceAccount"]).inc(); + peer_id + } else { + // TODO(MarX, #1369): Message is dropped here. Define policy for this case. + metrics::MessageDropped::UnknownAccount.inc(&msg); + tracing::debug!(target: "network", + account_id = ?self.config.validator.account_id(), + to = ?account_id, + ?msg,"Drop message: unknown account", + ); + tracing::trace!(target: "network", known_peers = ?self.account_announcements.get_accounts_keys(), "Known peers"); + return false; + }; + + let mut success = false; + let msg = RawRoutedMessage { target: PeerIdOrHash::PeerId(target), body: msg }; + let msg = self.sign_message(clock, msg); + for _ in 0..msg.body.message_resend_count() { + success |= self.send_message_to_peer(clock, tcp::Tier::T2, msg.clone()); + } + success + } + pub async fn receive_routed_message( self: &Arc, clock: &time::Clock, diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 1f4a00cf8d8..09ad567abae 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -40,6 +40,7 @@ use network_protocol::MAX_SHARDS_PER_SNAPSHOT_HOST_INFO; use rand::seq::{IteratorRandom, SliceRandom}; use rand::thread_rng; use rand::Rng; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use std::cmp::min; use std::collections::HashSet; use std::sync::atomic::Ordering; @@ -1070,28 +1071,34 @@ impl PeerManagerActor { NetworkResponses::NoResponse } NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple) => { - for (chunk_validator, partial_witness) in validator_witness_tuple { - self.state.send_message_to_account( - &self.clock, - &chunk_validator, - RoutedMessageBody::PartialEncodedStateWitness(partial_witness), - ); - } + let arbiter = actix::Arbiter::current(); + validator_witness_tuple.into_par_iter().for_each( + |(chunk_validator, partial_witness)| { + self.state.send_message_to_account_with_arbiter( + &self.clock, + &chunk_validator, + RoutedMessageBody::PartialEncodedStateWitness(partial_witness), + arbiter.clone(), + ); + }, + ); NetworkResponses::NoResponse } NetworkRequests::PartialEncodedStateWitnessForward( chunk_validators, partial_witness, ) => { - for chunk_validator in chunk_validators { - self.state.send_message_to_account( + let arbiter = actix::Arbiter::current(); + chunk_validators.into_par_iter().for_each(|chunk_validator| { + self.state.send_message_to_account_with_arbiter( &self.clock, &chunk_validator, RoutedMessageBody::PartialEncodedStateWitnessForward( partial_witness.clone(), ), + arbiter.clone(), ); - } + }); NetworkResponses::NoResponse } NetworkRequests::EpochSyncRequest { peer_id } => { From 9444962ee8a540f02602247a4a3db354a9e77fb3 Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Sat, 4 Jan 2025 12:56:17 +0200 Subject: [PATCH 2/2] parallel creation --- .../partial_witness/partial_witness_actor.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index 191607df2fc..f1e52f2f3fe 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -39,6 +39,7 @@ use near_store::adapter::trie_store::TrieStoreAdapter; use near_store::{DBCol, StorageError, TrieDBStorage, TrieStorage}; use near_vm_runner::{get_contract_cache_key, ContractCode, ContractRuntimeCache}; use rand::Rng; +use rayon::{iter::ParallelIterator, prelude::*}; use crate::client_actor::ClientSenderForPartialWitness; use crate::metrics; @@ -267,9 +268,10 @@ impl PartialWitnessActor { let encoder = self.witness_encoders.entry(chunk_validators.len()); let (parts, encoded_length) = encoder.encode(&witness_bytes); + let mut generated_parts = vec![]; chunk_validators - .iter() - .zip_eq(parts) + .par_iter() + .zip_eq(parts.into_par_iter()) .enumerate() .map(|(part_ord, (chunk_validator, part))| { // It's fine to unwrap part here as we just constructed the parts above and we expect @@ -278,13 +280,15 @@ impl PartialWitnessActor { epoch_id, chunk_header.clone(), part_ord, - part.unwrap().to_vec(), + part.unwrap().into_vec(), encoded_length, signer, ); (chunk_validator.clone(), partial_witness) }) - .collect_vec() + .collect_into_vec(&mut generated_parts); + + generated_parts } fn generate_contract_deploys_parts( @@ -596,7 +600,7 @@ impl PartialWitnessActor { /// Sends the contract accesses to the same chunk validators /// (except for the chunk producers that track the same shard), - /// which will receive the state witness for the new chunk. + /// which will receive the state witness for the new chunk. fn send_contract_accesses_to_chunk_validators( &self, key: ChunkProductionKey,