Skip to content

Commit

Permalink
feat(networking): shift to use ilog2 bucket distance for close data c…
Browse files Browse the repository at this point in the history
…alcs

ilog2 is about magnitude rather than specifics as distance is already being estimated
we can use this to easily render a buffer zone of data we replicate
vs a close bucket which we deem to be our responsibility
  • Loading branch information
joshuef authored and maqi committed Apr 9, 2024
1 parent 6d905e9 commit 17d292b
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 53 deletions.
9 changes: 5 additions & 4 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
driver::{PendingGetClosestType, SwarmDriver},
error::{NetworkError, Result},
multiaddr_pop_p2p, GetRecordCfg, GetRecordError, MsgResponder, NetworkEvent, CLOSE_GROUP_SIZE,
REPLICATE_RANGE,
REPLICATION_PEERS_COUNT,
};
use libp2p::{
kad::{store::RecordStore, Quorum, Record, RecordKey},
Expand Down Expand Up @@ -479,9 +479,10 @@ impl SwarmDriver {
.behaviour_mut()
.kademlia
.store_mut()
.get_distance_range()
.get_farthest_replication_distance_bucket()
{
self.replication_fetcher.set_distance_range(distance);
self.replication_fetcher
.set_replication_distance_range(distance);
}

if let Err(err) = result {
Expand Down Expand Up @@ -818,7 +819,7 @@ impl SwarmDriver {
let replicate_targets = closest_k_peers
.into_iter()
// add some leeway to allow for divergent knowledge
.take(REPLICATE_RANGE)
.take(REPLICATION_PEERS_COUNT)
.collect::<Vec<_>>();

let all_records: Vec<_> = self
Expand Down
16 changes: 8 additions & 8 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use crate::{
transport, Network, CLOSE_GROUP_SIZE,
};
use futures::StreamExt;
use libp2p::kad::KBucketDistance as Distance;
#[cfg(feature = "local-discovery")]
use libp2p::mdns;
use libp2p::{
Expand Down Expand Up @@ -609,8 +608,10 @@ impl SwarmDriver {
if let Some(distance) = self.get_farthest_data_address_estimate(&closest_k_peers) {
// set any new distance to farthest record in the store
self.swarm.behaviour_mut().kademlia.store_mut().set_distance_range(distance);

let replication_distance = self.swarm.behaviour_mut().kademlia.store_mut().get_farthest_replication_distance_bucket().unwrap_or(1);
// the distance range within the replication_fetcher shall be in sync as well
self.replication_fetcher.set_distance_range(distance);
self.replication_fetcher.set_replication_distance_range(replication_distance);
}
}
}
Expand All @@ -622,27 +623,26 @@ impl SwarmDriver {
// ---------- Crate helpers -------------------
// --------------------------------------------

/// Return a far address, close to but probably farther than our responsibilty range.
/// Returns the farthest bucket, close to but probably farther than our responsibilty range.
/// This simply uses the closest k peers to estimate the farthest address as
/// `K_VALUE`th peer's address distance.
/// `K_VALUE`th peer's bucket.
fn get_farthest_data_address_estimate(
&mut self,
// Sorted list of closest k peers to our peer id.
closest_k_peers: &[PeerId],
) -> Option<Distance> {
) -> Option<u32> {
// if we don't have enough peers we don't set the distance range yet.
let mut farthest_distance = None;

let our_address = NetworkAddress::from_peer(self.self_peer_id);

// get K_VALUE/2 peer's address distance
// get K_VALUEth peer's address distance
// This is a rough estimate of the farthest address we might be responsible for.
// We want this to be higher than actually necessary, so we retain more data
// and can be sure to pass bad node checks
if let Some(peer) = closest_k_peers.last() {
let address = NetworkAddress::from_peer(*peer);
let distance = our_address.distance(&address);
farthest_distance = Some(distance);
farthest_distance = our_address.distance(&address).ilog2();
}

farthest_distance
Expand Down
6 changes: 3 additions & 3 deletions sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
error::{NetworkError, Result},
multiaddr_is_global, multiaddr_strip_p2p, sort_peers_by_address,
target_arch::Instant,
CLOSE_GROUP_SIZE, REPLICATE_RANGE,
CLOSE_GROUP_SIZE, REPLICATION_PEERS_COUNT,
};
use core::fmt;
use custom_debug::Debug as CustomDebug;
Expand Down Expand Up @@ -1310,12 +1310,12 @@ impl SwarmDriver {
target: &NetworkAddress,
all_peers: &Vec<PeerId>,
) -> bool {
if all_peers.len() <= REPLICATE_RANGE {
if all_peers.len() <= REPLICATION_PEERS_COUNT {
return true;
}

// Margin of 2 to allow our RT being bit lagging.
match sort_peers_by_address(all_peers, target, REPLICATE_RANGE) {
match sort_peers_by_address(all_peers, target, REPLICATION_PEERS_COUNT) {
Ok(close_group) => close_group.contains(&our_peer_id),
Err(err) => {
warn!("Could not get sorted peers for {target:?} with error {err:?}");
Expand Down
4 changes: 2 additions & 2 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ pub type PayeeQuote = (PeerId, MainPubkey, PaymentQuote);
/// The size has been set to 5 for improved performance.
pub const CLOSE_GROUP_SIZE: usize = 5;

/// The range of peers that will be considered as close to a record target,
/// The count of peers that will be considered as close to a record target,
/// that a replication of the record shall be sent/accepted to/by the peer.
pub const REPLICATE_RANGE: usize = CLOSE_GROUP_SIZE + 2;
pub const REPLICATION_PEERS_COUNT: usize = CLOSE_GROUP_SIZE + 2;

/// Majority of a given group (i.e. > 1/2).
#[inline]
Expand Down
52 changes: 31 additions & 21 deletions sn_networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use libp2p::{
identity::PeerId,
kad::{
store::{Error, RecordStore, Result},
KBucketDistance as Distance, KBucketKey, ProviderRecord, Record, RecordKey as Key,
KBucketKey, ProviderRecord, Record, RecordKey as Key,
},
};
#[cfg(feature = "open-metrics")]
Expand Down Expand Up @@ -58,9 +58,10 @@ pub struct NodeRecordStore {
network_event_sender: mpsc::Sender<NetworkEvent>,
/// Send cmds to the network layer. Used to interact with self in an async fashion.
swarm_cmd_sender: mpsc::Sender<SwarmCmd>,
/// Distance range specify the acceptable range of record entry.
/// ilog2 distance range of responsible records
/// AKA: how many buckets of data do we consider "close"
/// None means accept all records.
distance_range: Option<Distance>,
responsible_distance_range: Option<u32>,
#[cfg(feature = "open-metrics")]
/// Used to report the number of records held by the store to the metrics server.
record_count_metric: Option<Gauge>,
Expand Down Expand Up @@ -195,7 +196,7 @@ impl NodeRecordStore {
records,
network_event_sender,
swarm_cmd_sender,
distance_range: None,
responsible_distance_range: None,
#[cfg(feature = "open-metrics")]
record_count_metric: None,
received_payment_count: 0,
Expand All @@ -211,9 +212,9 @@ impl NodeRecordStore {
self
}

/// Returns the current distance range
pub fn get_distance_range(&self) -> Option<Distance> {
self.distance_range
/// Returns the current distance ilog2 (aka bucket) range of CLOSE_GROUP nodes.
pub fn get_responsible_distance_range(&self) -> Option<u32> {
self.responsible_distance_range
}

// Converts a Key into a Hex string.
Expand Down Expand Up @@ -438,7 +439,7 @@ impl NodeRecordStore {
live_time: self.timestamp.elapsed().as_secs(),
};

if let Some(distance_range) = self.distance_range {
if let Some(distance_range) = self.responsible_distance_range {
let relevant_records =
self.get_records_within_distance_range(record_keys_as_hashset, distance_range);

Expand Down Expand Up @@ -474,7 +475,7 @@ impl NodeRecordStore {
pub fn get_records_within_distance_range(
&self,
records: HashSet<&Key>,
distance_range: Distance,
distance_range: u32,
) -> usize {
debug!(
"Total record count is {:?}. Distance is: {distance_range:?}",
Expand All @@ -485,7 +486,7 @@ impl NodeRecordStore {
.iter()
.filter(|key| {
let kbucket_key = KBucketKey::new(key.to_vec());
distance_range >= self.local_key.distance(&kbucket_key)
distance_range >= self.local_key.distance(&kbucket_key).ilog2().unwrap_or(0)
})
.count();

Expand All @@ -494,8 +495,8 @@ impl NodeRecordStore {
}

/// Setup the distance range.
pub(crate) fn set_distance_range(&mut self, distance_range: Distance) {
self.distance_range = Some(distance_range);
pub(crate) fn set_responsible_distance_range(&mut self, farthest_responsible_bucket: u32) {
self.responsible_distance_range = Some(farthest_responsible_bucket);
}
}

Expand Down Expand Up @@ -730,7 +731,7 @@ pub fn calculate_cost_for_records(quoting_metrics: &QuotingMetrics) -> u64 {
mod tests {

use super::*;
use crate::{close_group_majority, sort_peers_by_key, REPLICATE_RANGE};
use crate::{close_group_majority, sort_peers_by_key, REPLICATION_PEERS_COUNT};
use bytes::Bytes;
use eyre::ContextCompat;
use libp2p::{core::multihash::Multihash, kad::RecordKey};
Expand Down Expand Up @@ -1076,7 +1077,7 @@ mod tests {

#[tokio::test]
#[allow(clippy::mutable_key_type)]
async fn get_records_within_distance_range() -> eyre::Result<()> {
async fn get_records_within_bucket_range() -> eyre::Result<()> {
let max_records = 50;

let temp_dir = std::env::temp_dir();
Expand Down Expand Up @@ -1140,16 +1141,21 @@ mod tests {
.wrap_err("Could not parse record store key")?,
);
// get the distance to this record from our local key
let distance = self_address.distance(&halfway_record_address);
let distance = self_address
.distance(&halfway_record_address)
.ilog2()
.unwrap_or(0);

store.set_distance_range(distance);
// must be plus one bucket from the halfway record
store.set_responsible_distance_range(distance);

let record_keys = store.records.keys().collect();

// check that the number of records returned is correct
assert_eq!(
store.get_records_within_distance_range(record_keys, distance),
stored_records.len() / 2
// check that the number of records returned is larger than half our records
// (ie, that we cover _at least_ all the records within our distance range)
assert!(
store.get_records_within_distance_range(record_keys, distance)
>= stored_records.len() / 2
);

Ok(())
Expand Down Expand Up @@ -1177,7 +1183,11 @@ mod tests {
for _ in 0..num_of_chunks_per_itr {
let name = xor_name::rand::random();
let address = NetworkAddress::from_chunk_address(ChunkAddress::new(name));
match sort_peers_by_key(&peers_vec, &address.as_kbucket_key(), REPLICATE_RANGE) {
match sort_peers_by_key(
&peers_vec,
&address.as_kbucket_key(),
REPLICATION_PEERS_COUNT,
) {
Ok(peers_in_replicate_range) => {
let peers_in_replicate_range: Vec<PeerId> = peers_in_replicate_range
.iter()
Expand Down
10 changes: 5 additions & 5 deletions sn_networking/src/record_store_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use crate::record_store::{ClientRecordStore, NodeRecordStore};
use libp2p::kad::{
store::{RecordStore, Result},
KBucketDistance as Distance, ProviderRecord, Record, RecordKey,
ProviderRecord, Record, RecordKey,
};
use sn_protocol::{storage::RecordType, NetworkAddress};
use sn_transfers::{NanoTokens, QuotingMetrics};
Expand Down Expand Up @@ -131,22 +131,22 @@ impl UnifiedRecordStore {
}
}

pub(crate) fn get_distance_range(&self) -> Option<Distance> {
pub(crate) fn get_farthest_replication_distance_bucket(&self) -> Option<u32> {
match self {
Self::Client(_store) => {
warn!("Calling get_distance_range at Client. This should not happen");
None
}
Self::Node(store) => store.get_distance_range(),
Self::Node(store) => store.get_responsible_distance_range(),
}
}

pub(crate) fn set_distance_range(&mut self, distance: Distance) {
pub(crate) fn set_distance_range(&mut self, distance: u32) {
match self {
Self::Client(_store) => {
warn!("Calling set_distance_range at Client. This should not happen");
}
Self::Node(store) => store.set_distance_range(distance),
Self::Node(store) => store.set_responsible_distance_range(distance),
}
}

Expand Down
17 changes: 9 additions & 8 deletions sn_networking/src/replication_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use crate::target_arch::spawn;
use crate::{event::NetworkEvent, target_arch::Instant};
use libp2p::{
kad::{KBucketDistance as Distance, RecordKey, K_VALUE},
kad::{RecordKey, K_VALUE},
PeerId,
};
use sn_protocol::{storage::RecordType, NetworkAddress, PrettyPrintRecordKey};
Expand Down Expand Up @@ -41,8 +41,8 @@ pub(crate) struct ReplicationFetcher {
// Avoid fetching same chunk from different nodes AND carry out too many parallel tasks.
on_going_fetches: HashMap<(RecordKey, RecordType), (PeerId, ReplicationTimeout)>,
event_sender: mpsc::Sender<NetworkEvent>,
// Distance range that the incoming key shall be fetched
distance_range: Option<Distance>,
/// ilog2 bucket distance range that the incoming key shall be fetched
distance_range: Option<u32>,
}

impl ReplicationFetcher {
Expand All @@ -58,7 +58,7 @@ impl ReplicationFetcher {
}

/// Set the distance range.
pub(crate) fn set_distance_range(&mut self, distance_range: Distance) {
pub(crate) fn set_replication_distance_range(&mut self, distance_range: u32) {
self.distance_range = Some(distance_range);
}

Expand Down Expand Up @@ -102,7 +102,8 @@ impl ReplicationFetcher {
let self_address = NetworkAddress::from_peer(self.self_peer_id);

incoming_keys.retain(|(addr, _record_type)| {
let is_in_range = self_address.distance(addr) <= *distance_range;
let is_in_range =
self_address.distance(addr).ilog2().unwrap_or(0) <= *distance_range;
if !is_in_range {
out_of_range_keys.push(addr.clone());
}
Expand Down Expand Up @@ -368,16 +369,16 @@ mod tests {

// Set distance range
let distance_target = NetworkAddress::from_peer(PeerId::random());
let distance_range = self_address.distance(&distance_target);
replication_fetcher.set_distance_range(distance_range);
let distance_range = self_address.distance(&distance_target).ilog2().unwrap_or(1);
replication_fetcher.set_replication_distance_range(distance_range);

let mut incoming_keys = Vec::new();
let mut in_range_keys = 0;
(0..100).for_each(|_| {
let random_data: Vec<u8> = (0..50).map(|_| rand::random::<u8>()).collect();
let key = NetworkAddress::from_record_key(&RecordKey::from(random_data));

if key.distance(&self_address) <= distance_range {
if key.distance(&self_address).ilog2().unwrap_or(0) <= distance_range {
in_range_keys += 1;
}

Expand Down
4 changes: 2 additions & 2 deletions sn_node/src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use libp2p::{
kad::{Quorum, Record, RecordKey},
PeerId,
};
use sn_networking::{sort_peers_by_address, GetRecordCfg, Network, REPLICATE_RANGE};
use sn_networking::{sort_peers_by_address, GetRecordCfg, Network, REPLICATION_PEERS_COUNT};
use sn_protocol::{
messages::{Cmd, Query, QueryResponse, Request, Response},
storage::RecordType,
Expand Down Expand Up @@ -150,7 +150,7 @@ impl Node {
let sorted_based_on_addr = match sort_peers_by_address(
&closest_k_peers,
&data_addr,
REPLICATE_RANGE,
REPLICATION_PEERS_COUNT,
) {
Ok(result) => result,
Err(err) => {
Expand Down

0 comments on commit 17d292b

Please sign in to comment.