Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Populate topology after restart if finality is lagging behind current session #6913

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 116 additions & 4 deletions polkadot/node/network/gossip-support/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::{
collections::{HashMap, HashSet},
fmt,
time::{Duration, Instant},
u32,
alexggh marked this conversation as resolved.
Show resolved Hide resolved
};

use futures::{channel::oneshot, select, FutureExt as _};
Expand All @@ -45,8 +46,8 @@ use polkadot_node_network_protocol::{
};
use polkadot_node_subsystem::{
messages::{
GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeRxMessage, NetworkBridgeTxMessage,
RuntimeApiMessage, RuntimeApiRequest,
ChainApiMessage, GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeRxMessage,
NetworkBridgeTxMessage, RuntimeApiMessage, RuntimeApiRequest,
},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
};
Expand Down Expand Up @@ -96,6 +97,8 @@ pub struct GossipSupport<AD> {
keystore: KeystorePtr,

last_session_index: Option<SessionIndex>,
/// The minimum known session we build the topology for.
min_known_session: SessionIndex,
// Some(timestamp) if we failed to resolve
// at least a third of authorities the last time.
// `None` otherwise.
Expand Down Expand Up @@ -130,6 +133,9 @@ pub struct GossipSupport<AD> {
/// Authority discovery service.
authority_discovery: AD,

/// The oldest session we need to build a topology for because
/// the finalized blocks are from a session we haven't built a topology for.
finalized_needed_session: Option<u32>,
/// Subsystem metrics.
metrics: Metrics,
}
Expand All @@ -154,7 +160,9 @@ where
resolved_authorities: HashMap::new(),
connected_authorities: HashMap::new(),
connected_peers: HashMap::new(),
min_known_session: u32::MAX,
authority_discovery,
finalized_needed_session: None,
metrics,
}
}
Expand Down Expand Up @@ -199,7 +207,22 @@ where
gum::debug!(target: LOG_TARGET, error = ?e);
}
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, _number)) => {},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, _number)) =>
if let Some(session_index) = self.last_session_index {
if let Err(e) = self
.build_topology_for_last_finalized_if_needed(
ctx.sender(),
session_index,
)
.await
{
gum::warn!(
target: LOG_TARGET,
"Failed to build topology for last finalized session: {:?}",
e
);
}
},
FromOrchestra::Signal(OverseerSignal::Conclude) => return self,
}
}
Expand Down Expand Up @@ -294,9 +317,19 @@ where
}

if is_new_session {
if let Err(err) = self
.build_topology_for_last_finalized_if_needed(sender, session_index)
.await
{
gum::warn!(
target: LOG_TARGET,
"Failed to build topology for last finalized session: {:?}",
err
);
}

// Gossip topology is only relevant for authorities in the current session.
let our_index = self.get_key_index_and_update_metrics(&session_info)?;

update_gossip_topology(
sender,
our_index,
Expand All @@ -314,6 +347,85 @@ where
Ok(())
}

/// Build the gossip topology for the session of the last finalized block if we haven't built
/// one.
///
/// This is needed to ensure that if finality is lagging accross session boundary and a restart
/// happens after the new session started, we built a topology from the session we haven't
/// finalized the blocks yet.
/// Once finalized blocks start to be from a session we've built a topology for, we can stop.
async fn build_topology_for_last_finalized_if_needed(
&mut self,
sender: &mut impl overseer::GossipSupportSenderTrait,
current_session_index: u32,
) -> Result<(), util::Error> {
self.min_known_session = self.min_known_session.min(current_session_index);

if self
.finalized_needed_session
.map(|oldest_needed_session| oldest_needed_session < self.min_known_session)
.unwrap_or(true)
{
let (tx, rx) = oneshot::channel();
sender.send_message(ChainApiMessage::FinalizedBlockNumber(tx)).await;
let finalized_block_number = match rx.await? {
Ok(block_number) => block_number,
_ => return Ok(()),
};

let (tx, rx) = oneshot::channel();
sender
.send_message(ChainApiMessage::FinalizedBlockHash(finalized_block_number, tx))
.await;

let finalized_block_hash = match rx.await? {
Ok(Some(block_hash)) => block_hash,
_ => return Ok(()),
};

let finalized_session_index =
util::request_session_index_for_child(finalized_block_hash, sender)
.await
.await??;

if finalized_session_index < self.min_known_session &&
Some(finalized_session_index) != self.finalized_needed_session
{
gum::debug!(
target: LOG_TARGET,
?finalized_block_hash,
?finalized_block_number,
?finalized_session_index,
"Building topology for finalized block session",
);

let finalized_session_info = match util::request_session_info(
finalized_block_hash,
finalized_session_index,
sender,
)
.await
.await??
{
Some(session_info) => session_info,
_ => return Ok(()),
};

let our_index = self.get_key_index_and_update_metrics(&finalized_session_info)?;
update_gossip_topology(
sender,
our_index,
finalized_session_info.discovery_keys.clone(),
finalized_block_hash,
finalized_session_index,
)
.await?;
}
self.finalized_needed_session = Some(finalized_session_index);
}
Ok(())
}

// Checks if the node is an authority and also updates `polkadot_node_is_authority` and
// `polkadot_node_is_parachain_validator` metrics accordingly.
// On success, returns the index of our keys in `session_info.discovery_keys`.
Expand Down
Loading
Loading