Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Populate topology after restart if finality is lagging behind current session #6913

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 51 additions & 23 deletions polkadot/node/network/bridge/src/rx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use polkadot_primitives::{AuthorityDiscoveryId, BlockNumber, Hash, ValidatorInde
use std::{
collections::{hash_map, HashMap},
iter::ExactSizeIterator,
u32,
};

use super::validator_discovery;
Expand Down Expand Up @@ -750,7 +751,7 @@ where
// This is kept sorted, descending, by block number.
let mut live_heads: Vec<ActivatedLeaf> = Vec::with_capacity(MAX_VIEW_HEADS);
let mut finalized_number = 0;

let mut newest_session = u32::MIN;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a fancy way of writing zero :P

let mut mode = Mode::Syncing(sync_oracle);
loop {
match ctx.recv().fuse().await? {
Expand All @@ -775,15 +776,29 @@ where
flesh_out_topology_peers(&mut authority_discovery_service, canonical_shuffling)
.await;

dispatch_validation_event_to_all_unbounded(
NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
session,
topology: SessionGridTopology::new(shuffled_indices, topology_peers),
local_index,
}),
ctx.sender(),
approval_voting_parallel_enabled,
);
if session >= newest_session {
alindima marked this conversation as resolved.
Show resolved Hide resolved
dispatch_validation_event_to_all_unbounded(
NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
session,
topology: SessionGridTopology::new(shuffled_indices, topology_peers),
local_index,
}),
ctx.sender(),
approval_voting_parallel_enabled,
);
} else {
dispatch_validation_event_to_approval_unbounded(
&NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
session,
topology: SessionGridTopology::new(shuffled_indices, topology_peers),
local_index,
}),
ctx.sender(),
approval_voting_parallel_enabled,
);
}

newest_session = newest_session.max(session);
},
FromOrchestra::Communication {
msg: NetworkBridgeRxMessage::UpdatedAuthorityIds { peer_id, authority_ids },
Expand Down Expand Up @@ -1123,22 +1138,11 @@ async fn dispatch_collation_event_to_all(
dispatch_collation_events_to_all(std::iter::once(event), ctx).await
}

fn dispatch_validation_event_to_all_unbounded(
event: NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
fn dispatch_validation_event_to_approval_unbounded(
event: &NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
approval_voting_parallel_enabled: bool,
) {
event
.focus()
.ok()
.map(StatementDistributionMessage::from)
.and_then(|msg| Some(sender.send_unbounded_message(msg)));
event
.focus()
.ok()
.map(BitfieldDistributionMessage::from)
.and_then(|msg| Some(sender.send_unbounded_message(msg)));

if approval_voting_parallel_enabled {
event
.focus()
Expand All @@ -1152,6 +1156,30 @@ fn dispatch_validation_event_to_all_unbounded(
.map(ApprovalDistributionMessage::from)
.and_then(|msg| Some(sender.send_unbounded_message(msg)));
}
}

fn dispatch_validation_event_to_all_unbounded(
event: NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
approval_voting_parallel_enabled: bool,
) {
event
.focus()
.ok()
.map(StatementDistributionMessage::from)
.and_then(|msg| Some(sender.send_unbounded_message(msg)));
event
.focus()
.ok()
.map(BitfieldDistributionMessage::from)
.and_then(|msg| Some(sender.send_unbounded_message(msg)));

dispatch_validation_event_to_approval_unbounded(
&event,
sender,
approval_voting_parallel_enabled,
);

event
.focus()
.ok()
Expand Down
116 changes: 116 additions & 0 deletions polkadot/node/network/bridge/src/rx/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ use polkadot_node_subsystem::messages::NetworkBridgeEvent;
use assert_matches::assert_matches;
use async_trait::async_trait;
use parking_lot::Mutex;
use polkadot_overseer::TimeoutExt;
use std::{
collections::HashSet,
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};

use sc_network::{
Expand Down Expand Up @@ -1465,6 +1467,120 @@ fn network_protocol_versioning_view_update() {
});
}

// Test rx bridge sends the newest gossip topology to all subsystems and old ones only to approval
// distribution.
#[test]
fn network_new_topology_update() {
let (oracle, handle) = make_sync_oracle(false);
test_harness(Box::new(oracle), |test_harness| async move {
let TestHarness { mut network_handle, mut virtual_overseer, shared } = test_harness;

let peer_ids: Vec<_> = (0..4).map(|_| PeerId::random()).collect();
let peers = [
(peer_ids[0], PeerSet::Validation, ValidationVersion::V2),
(peer_ids[1], PeerSet::Validation, ValidationVersion::V1),
(peer_ids[2], PeerSet::Validation, ValidationVersion::V1),
(peer_ids[3], PeerSet::Collation, ValidationVersion::V2),
];

let head = Hash::repeat_byte(1);
virtual_overseer
.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(new_leaf(head, 1)),
)))
.await;

handle.await_mode_switch().await;

let mut total_validation_peers = 0;
let mut total_collation_peers = 0;

for &(peer_id, peer_set, version) in &peers {
network_handle
.connect_peer(peer_id, version, peer_set, ObservedRole::Full)
.await;

match peer_set {
PeerSet::Validation => total_validation_peers += 1,
PeerSet::Collation => total_collation_peers += 1,
}
}

await_peer_connections(&shared, total_validation_peers, total_collation_peers).await;

// Drain setup messages.
while let Some(_) = virtual_overseer.recv().timeout(Duration::from_secs(1)).await {}

// 1. Send new gossip topology and check is sent to all subsystems.
virtual_overseer
.send(polkadot_overseer::FromOrchestra::Communication {
msg: NetworkBridgeRxMessage::NewGossipTopology {
session: 2,
local_index: Some(ValidatorIndex(0)),
canonical_shuffling: Vec::new(),
shuffled_indices: Vec::new(),
},
})
.await;

assert_sends_validation_event_to_all(
NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
session: 2,
topology: SessionGridTopology::new(Vec::new(), Vec::new()),
local_index: Some(ValidatorIndex(0)),
}),
&mut virtual_overseer,
)
.await;

// 2. Send old gossip topology and check is sent only to approval distribution.
virtual_overseer
.send(polkadot_overseer::FromOrchestra::Communication {
msg: NetworkBridgeRxMessage::NewGossipTopology {
session: 1,
local_index: Some(ValidatorIndex(0)),
canonical_shuffling: Vec::new(),
shuffled_indices: Vec::new(),
},
})
.await;

assert_matches!(
virtual_overseer.recv().await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
session: 1,
topology: _,
local_index: _,
})
))
);

// 3. Send new gossip topology and check is sent to all subsystems.
virtual_overseer
.send(polkadot_overseer::FromOrchestra::Communication {
msg: NetworkBridgeRxMessage::NewGossipTopology {
session: 3,
local_index: Some(ValidatorIndex(0)),
canonical_shuffling: Vec::new(),
shuffled_indices: Vec::new(),
},
})
.await;

assert_sends_validation_event_to_all(
NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
session: 3,
topology: SessionGridTopology::new(Vec::new(), Vec::new()),
local_index: Some(ValidatorIndex(0)),
}),
&mut virtual_overseer,
)
.await;
virtual_overseer
});
}

#[test]
fn network_protocol_versioning_subsystem_msg() {
use polkadot_primitives::CandidateHash;
Expand Down
Loading
Loading