Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
v1.18: reverts back in SocketAddr dedup in retransmit stage (backport…
Browse files Browse the repository at this point in the history
… of #1106) (#1225)

reverts back in SocketAddr dedup in retransmit stage (#1106)

This was erronously deemed as unnecessary and removed in:
anza-xyz#864

The commit partially reverts #864 and adds back socket-addr dedup.

(cherry picked from commit fbe1dbc)

Co-authored-by: behzad nouri <[email protected]>
  • Loading branch information
2 people authored and yihau committed May 14, 2024
1 parent ddaf56d commit 12fe7d3
Showing 1 changed file with 21 additions and 5 deletions.
26 changes: 21 additions & 5 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ pub struct ClusterNodesCache<T> {
pub struct RetransmitPeers<'a> {
root_distance: usize, // distance from the root node
children: Vec<&'a Node>,
// Maps tvu addresses to the first node
// in the shuffle with the same address.
addrs: HashMap<SocketAddr, Pubkey>, // tvu addresses
}

impl Node {
Expand Down Expand Up @@ -173,13 +176,16 @@ impl ClusterNodes<RetransmitStage> {
let RetransmitPeers {
root_distance,
children,
addrs,
} = self.get_retransmit_peers(slot_leader, shred, fanout)?;
let protocol = get_broadcast_protocol(shred);
let peers = children
.into_iter()
.filter_map(|node| node.contact_info()?.tvu(protocol).ok())
.collect();
Ok((root_distance, peers))
let peers = children.into_iter().filter_map(|node| {
node.contact_info()?
.tvu(protocol)
.ok()
.filter(|addr| addrs.get(addr) == Some(&node.pubkey()))
});
Ok((root_distance, peers.collect()))
}

pub fn get_retransmit_peers(
Expand All @@ -199,10 +205,19 @@ impl ClusterNodes<RetransmitStage> {
if let Some(index) = self.index.get(slot_leader) {
weighted_shuffle.remove_index(*index);
}
let mut addrs = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
let mut rng = get_seeded_rng(slot_leader, shred);
let protocol = get_broadcast_protocol(shred);
let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng)
.map(|index| &self.nodes[index])
.inspect(|node| {
if let Some(node) = node.contact_info() {
if let Ok(addr) = node.tvu(protocol) {
addrs.entry(addr).or_insert(*node.pubkey());
}
}
})
.collect();
let self_index = nodes
.iter()
Expand All @@ -221,6 +236,7 @@ impl ClusterNodes<RetransmitStage> {
Ok(RetransmitPeers {
root_distance,
children: peers.collect(),
addrs,
})
}

Expand Down

0 comments on commit 12fe7d3

Please sign in to comment.