Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip parallel psw sending #12686

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use near_store::adapter::trie_store::TrieStoreAdapter;
use near_store::{DBCol, StorageError, TrieDBStorage, TrieStorage};
use near_vm_runner::{get_contract_cache_key, ContractCode, ContractRuntimeCache};
use rand::Rng;
use rayon::{iter::ParallelIterator, prelude::*};

use crate::client_actor::ClientSenderForPartialWitness;
use crate::metrics;
Expand Down Expand Up @@ -267,9 +268,10 @@ impl PartialWitnessActor {
let encoder = self.witness_encoders.entry(chunk_validators.len());
let (parts, encoded_length) = encoder.encode(&witness_bytes);

let mut generated_parts = vec![];
chunk_validators
.iter()
.zip_eq(parts)
.par_iter()
.zip_eq(parts.into_par_iter())
.enumerate()
.map(|(part_ord, (chunk_validator, part))| {
// It's fine to unwrap part here as we just constructed the parts above and we expect
Expand All @@ -278,13 +280,15 @@ impl PartialWitnessActor {
epoch_id,
chunk_header.clone(),
part_ord,
part.unwrap().to_vec(),
part.unwrap().into_vec(),
encoded_length,
signer,
);
(chunk_validator.clone(), partial_witness)
})
.collect_vec()
.collect_into_vec(&mut generated_parts);

generated_parts
}

fn generate_contract_deploys_parts(
Expand Down Expand Up @@ -596,7 +600,7 @@ impl PartialWitnessActor {

/// Sends the contract accesses to the same chunk validators
/// (except for the chunk producers that track the same shard),
/// which will receive the state witness for the new chunk.
/// which will receive the state witness for the new chunk.
fn send_contract_accesses_to_chunk_validators(
&self,
key: ChunkProductionKey,
Expand Down
92 changes: 92 additions & 0 deletions chain/network/src/peer_manager/network_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::types::{
ChainInfo, PeerManagerSenderForNetwork, PeerType, ReasonForBan, StatePartRequestBody,
Tier3Request, Tier3RequestBody,
};
use actix::ArbiterHandle;
use anyhow::Context;
use arc_swap::ArcSwap;
use near_async::messaging::{CanSend, SendAsync, Sender};
Expand Down Expand Up @@ -698,6 +699,97 @@ impl NetworkState {
success
}

/// Send message to specific account.
/// Return whether the message is sent or not.
/// The message might be sent over TIER1 or TIER2 connection depending on the message type.
pub fn send_message_to_account_with_arbiter(
self: &Arc<Self>,
clock: &time::Clock,
account_id: &AccountId,
msg: RoutedMessageBody,
arbiter: ArbiterHandle,
) -> bool {
// If the message is allowed to be sent to self, we handle it directly.
if self.config.validator.account_id().is_some_and(|id| &id == account_id) {
// For now, we don't allow some types of messages to be sent to self.
debug_assert!(msg.allow_sending_to_self());
let this = self.clone();
let clock = clock.clone();
let peer_id = self.config.node_id();
let msg = self.sign_message(
&clock,
RawRoutedMessage { target: PeerIdOrHash::PeerId(peer_id.clone()), body: msg },
);
arbiter.spawn(async move {
this.receive_routed_message(&clock, peer_id, msg.hash(), msg.msg.body).await;
});
return true;
}

let accounts_data = self.accounts_data.load();
if tcp::Tier::T1.is_allowed_routed(&msg) {
for key in accounts_data.keys_by_id.get(account_id).iter().flat_map(|keys| keys.iter())
{
let data = match accounts_data.data.get(key) {
Some(data) => data,
None => continue,
};
let conn = match self.get_tier1_proxy(data) {
Some(conn) => conn,
None => continue,
};
// TODO(gprusak): in case of PartialEncodedChunk, consider stripping everything
// but the header. This will bound the message size
conn.send_message(Arc::new(PeerMessage::Routed(self.sign_message(
clock,
RawRoutedMessage {
target: PeerIdOrHash::PeerId(data.peer_id.clone()),
body: msg,
},
))));
return true;
}
}

let peer_id_from_account_data = accounts_data
.keys_by_id
.get(account_id)
.iter()
.flat_map(|keys| keys.iter())
.flat_map(|key| accounts_data.data.get(key))
.next()
.map(|data| data.peer_id.clone());
// Find the target peer_id:
// - first look it up in self.accounts_data
// - if missing, fall back to lookup in self.graph.routing_table
// We want to deprecate self.graph.routing_table.account_owner in the next release.
let target = if let Some(peer_id) = peer_id_from_account_data {
metrics::ACCOUNT_TO_PEER_LOOKUPS.with_label_values(&["AccountData"]).inc();
peer_id
} else if let Some(peer_id) = self.account_announcements.get_account_owner(account_id) {
metrics::ACCOUNT_TO_PEER_LOOKUPS.with_label_values(&["AnnounceAccount"]).inc();
peer_id
} else {
// TODO(MarX, #1369): Message is dropped here. Define policy for this case.
metrics::MessageDropped::UnknownAccount.inc(&msg);
tracing::debug!(target: "network",
account_id = ?self.config.validator.account_id(),
to = ?account_id,
?msg,"Drop message: unknown account",
);
tracing::trace!(target: "network", known_peers = ?self.account_announcements.get_accounts_keys(), "Known peers");
return false;
};

let mut success = false;
let msg = RawRoutedMessage { target: PeerIdOrHash::PeerId(target), body: msg };
let msg = self.sign_message(clock, msg);
for _ in 0..msg.body.message_resend_count() {
success |= self.send_message_to_peer(clock, tcp::Tier::T2, msg.clone());
}
success
}

pub async fn receive_routed_message(
self: &Arc<Self>,
clock: &time::Clock,
Expand Down
27 changes: 17 additions & 10 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use network_protocol::MAX_SHARDS_PER_SNAPSHOT_HOST_INFO;
use rand::seq::{IteratorRandom, SliceRandom};
use rand::thread_rng;
use rand::Rng;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use std::cmp::min;
use std::collections::HashSet;
use std::sync::atomic::Ordering;
Expand Down Expand Up @@ -1070,28 +1071,34 @@ impl PeerManagerActor {
NetworkResponses::NoResponse
}
NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple) => {
for (chunk_validator, partial_witness) in validator_witness_tuple {
self.state.send_message_to_account(
&self.clock,
&chunk_validator,
RoutedMessageBody::PartialEncodedStateWitness(partial_witness),
);
}
let arbiter = actix::Arbiter::current();
validator_witness_tuple.into_par_iter().for_each(
|(chunk_validator, partial_witness)| {
self.state.send_message_to_account_with_arbiter(
&self.clock,
&chunk_validator,
RoutedMessageBody::PartialEncodedStateWitness(partial_witness),
arbiter.clone(),
);
},
);
NetworkResponses::NoResponse
}
NetworkRequests::PartialEncodedStateWitnessForward(
chunk_validators,
partial_witness,
) => {
for chunk_validator in chunk_validators {
self.state.send_message_to_account(
let arbiter = actix::Arbiter::current();
chunk_validators.into_par_iter().for_each(|chunk_validator| {
self.state.send_message_to_account_with_arbiter(
&self.clock,
&chunk_validator,
RoutedMessageBody::PartialEncodedStateWitnessForward(
partial_witness.clone(),
),
arbiter.clone(),
);
}
});
NetworkResponses::NoResponse
}
NetworkRequests::EpochSyncRequest { peer_id } => {
Expand Down
Loading