Skip to content

Commit

Permalink
feat!: node support get_closest query from client for RBS
Browse files Browse the repository at this point in the history
BREAKING CHANGE
  • Loading branch information
maqi committed Dec 3, 2024
1 parent dd2c66f commit cd0e442
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 6 deletions.
28 changes: 26 additions & 2 deletions ant-networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ pub enum NodeIssue {

/// Commands to send to the Swarm
pub enum LocalSwarmCmd {
/// Get a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that
/// bucket.
/// Get a list of all peers in local RT, with correspondent Multiaddr info attached as well.
GetPeersWithMultiaddr {
sender: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
},
/// Get a map where each key is the ilog2 distance of that Kbucket
/// and each value is a vector of peers in that bucket.
GetKBuckets {
sender: oneshot::Sender<BTreeMap<u32, Vec<PeerId>>>,
},
Expand Down Expand Up @@ -253,6 +257,9 @@ impl Debug for LocalSwarmCmd {
LocalSwarmCmd::GetAllLocalRecordAddresses { .. } => {
write!(f, "LocalSwarmCmd::GetAllLocalRecordAddresses")
}
LocalSwarmCmd::GetPeersWithMultiaddr { .. } => {
write!(f, "LocalSwarmCmd::GetPeersWithMultiaddr")
}
LocalSwarmCmd::GetKBuckets { .. } => {
write!(f, "LocalSwarmCmd::GetKBuckets")
}
Expand Down Expand Up @@ -795,6 +802,23 @@ impl SwarmDriver {
}
let _ = sender.send(ilog2_kbuckets);
}
LocalSwarmCmd::GetPeersWithMultiaddr { sender } => {
cmd_string = "GetPeersWithMultiAddr";
let mut result: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
let peers_in_kbucket = kbucket
.iter()
.map(|peer_entry| {
(
peer_entry.node.key.into_preimage(),
peer_entry.node.value.clone().into_vec(),
)
})
.collect::<Vec<(PeerId, Vec<Multiaddr>)>>();
result.extend(peers_in_kbucket);
}
let _ = sender.send(result);
}
LocalSwarmCmd::GetCloseGroupLocalPeers { key, sender } => {
cmd_string = "GetCloseGroupLocalPeers";
let key = key.as_kbucket_key();
Expand Down
14 changes: 12 additions & 2 deletions ant-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,18 @@ impl Network {
.await
}

/// Returns a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that
/// bucket.
/// Returns a list of peers in local RT and their correspondent Multiaddr.
/// Does not include self
pub async fn get_local_peers_with_multiaddr(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetPeersWithMultiaddr { sender });
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}

/// Returns a map where each key is the ilog2 distance of that Kbucket
/// and each value is a vector of peers in that bucket.
/// Does not include self
pub async fn get_kbuckets(&self) -> Result<BTreeMap<u32, Vec<PeerId>>> {
let (sender, receiver) = oneshot::channel();
Expand Down
188 changes: 187 additions & 1 deletion ant-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ use ant_protocol::{
};
use bytes::Bytes;
use itertools::Itertools;
use libp2p::{identity::Keypair, Multiaddr, PeerId};
use libp2p::{
identity::Keypair,
kad::{KBucketDistance as Distance, U256},
Multiaddr, PeerId,
};
use num_traits::cast::ToPrimitive;
use rand::{
rngs::{OsRng, StdRng},
Expand Down Expand Up @@ -674,10 +678,91 @@ impl Node {
is_in_trouble,
}
}
Query::GetClosestPeers {
key,
num_of_peers,
range,
sign_result,
} => {
debug!(
"Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
);
Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
.await
}
};
Response::Query(resp)
}

async fn respond_get_closest_peers(
network: &Network,
target: NetworkAddress,
num_of_peers: Option<usize>,
range: Option<[u8; 32]>,
sign_result: bool,
) -> QueryResponse {
let local_peers = network.get_local_peers_with_multiaddr().await;
let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
} else {
vec![]
};

let signature = if sign_result {
let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
if let Ok(sig) = network.sign(&bytes) {
Some(sig)
} else {
None
}
} else {
None
};

QueryResponse::GetClosestPeers {
target,
peers,
signature,
}
}

fn calculate_get_closest_peers(
peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
target: NetworkAddress,
num_of_peers: Option<usize>,
range: Option<[u8; 32]>,
) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
match (num_of_peers, range) {
(_, Some(value)) => {
let distance = Distance(U256::from(value));
peer_addrs
.iter()
.filter_map(|(peer_id, multi_addrs)| {
let addr = NetworkAddress::from_peer(*peer_id);
if target.distance(&addr) <= distance {
Some((addr, multi_addrs.clone()))
} else {
None
}
})
.collect()
}
(Some(num_of_peers), _) => {
let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
.iter()
.map(|(peer_id, multi_addrs)| {
let addr = NetworkAddress::from_peer(*peer_id);
(addr, multi_addrs.clone())
})
.collect();
result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
result.into_iter().take(num_of_peers).collect()
}
(None, None) => vec![],
}
}

// Nodes only check ChunkProof each other, to avoid `multi-version` issue
// Client check proof against all records, as have to fetch from network anyway.
async fn respond_x_closest_record_proof(
Expand Down Expand Up @@ -971,3 +1056,104 @@ fn challenge_score_scheme(
HIGHEST_SCORE * correct_answers / expected_proofs.len(),
)
}

#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;

#[test]
fn test_no_local_peers() {
let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
let target = NetworkAddress::from_peer(PeerId::random());
let num_of_peers = Some(5);
let range = None;
let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);

assert_eq!(result, vec![]);
}

#[test]
fn test_fewer_local_peers_than_num_of_peers() {
let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
(
PeerId::random(),
vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
),
(
PeerId::random(),
vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
),
(
PeerId::random(),
vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
),
];
let target = NetworkAddress::from_peer(PeerId::random());
let num_of_peers = Some(2);
let range = None;
let result = Node::calculate_get_closest_peers(
local_peers.clone(),
target.clone(),
num_of_peers,
range,
);

// Result shall be sorted and truncated
let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
.iter()
.map(|(peer_id, multi_addrs)| {
let addr = NetworkAddress::from_peer(*peer_id);
(addr, multi_addrs.clone())
})
.collect();
expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();

assert_eq!(expected_result, result);
}

#[test]
fn test_with_range_and_num_of_peers() {
let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
(
PeerId::random(),
vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
),
(
PeerId::random(),
vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
),
(
PeerId::random(),
vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
),
];
let target = NetworkAddress::from_peer(PeerId::random());
let num_of_peers = Some(0);
let range_value = [128; 32];
let range = Some(range_value);
let result = Node::calculate_get_closest_peers(
local_peers.clone(),
target.clone(),
num_of_peers,
range,
);

// Range shall be preferred, i.e. the result peers shall all within the range
let distance = Distance(U256::from(range_value));
let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
.into_iter()
.filter_map(|(peer_id, multi_addrs)| {
let addr = NetworkAddress::from_peer(peer_id);
if target.distance(&addr) <= distance {
Some((addr, multi_addrs.clone()))
} else {
None
}
})
.collect();

assert_eq!(expected_result, result);
}
}
28 changes: 27 additions & 1 deletion ant-protocol/src/messages/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{messages::Nonce, NetworkAddress};
use libp2p::kad::{KBucketDistance as Distance, U256};
use serde::{Deserialize, Serialize};

/// Data queries - retrieving data and inspecting their structure.
Expand Down Expand Up @@ -65,6 +66,18 @@ pub enum Query {
},
/// Queries close_group peers whether the target peer is a bad_node
CheckNodeInProblem(NetworkAddress),
/// Query the the peers in range to the target address, from the receiver's perspective.
/// In case none of the parameters provided, returns nothing.
/// In case both of the parameters provided, `range` is preferred to be replied.
GetClosestPeers {
key: NetworkAddress,
// Shall be greater than K_VALUE, otherwise can use libp2p function directly
num_of_peers: Option<usize>,
// Defines the range that replied peers shall be within
range: Option<[u8; 32]>,
// For future econ usage,
sign_result: bool,
},
}

impl Query {
Expand All @@ -77,7 +90,8 @@ impl Query {
Query::GetStoreCost { key, .. }
| Query::GetReplicatedRecord { key, .. }
| Query::GetRegisterRecord { key, .. }
| Query::GetChunkExistenceProof { key, .. } => key.clone(),
| Query::GetChunkExistenceProof { key, .. }
| Query::GetClosestPeers { key, .. } => key.clone(),
}
}
}
Expand Down Expand Up @@ -111,6 +125,18 @@ impl std::fmt::Display for Query {
Query::CheckNodeInProblem(address) => {
write!(f, "Query::CheckNodeInProblem({address:?})")
}
Query::GetClosestPeers {
key,
num_of_peers,
range,
sign_result,
} => {
let distance = range.as_ref().map(|value| Distance(U256::from(value)));
write!(
f,
"Query::GetClosestPeers({key:?} {num_of_peers:?} {distance:?} {sign_result})"
)
}
}
}
}
22 changes: 22 additions & 0 deletions ant-protocol/src/messages/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use super::ChunkProof;
use ant_evm::{PaymentQuote, RewardsAddress};
use bytes::Bytes;
use core::fmt;
use libp2p::Multiaddr;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;

Expand Down Expand Up @@ -59,6 +60,20 @@ pub enum QueryResponse {
///
/// [`GetChunkExistenceProof`]: crate::messages::Query::GetChunkExistenceProof
GetChunkExistenceProof(Vec<(NetworkAddress, Result<ChunkProof>)>),
// ===== GetClosestPeers =====
//
/// Response to [`GetClosestPeers`]
///
/// [`GetClosestPeers`]: crate::messages::Query::GetClosestPeers
GetClosestPeers {
// The target address that the original request is about.
target: NetworkAddress,
// `Multiaddr` is required to allow the requester to dial the peer
// Note: the list doesn't contain the node that being queried.
peers: Vec<(NetworkAddress, Vec<Multiaddr>)>,
// Signature of signing the above (if requested), for future economic model usage.
signature: Option<Vec<u8>>,
},
}

// Debug implementation for QueryResponse, to avoid printing Vec<u8>
Expand Down Expand Up @@ -117,6 +132,13 @@ impl Debug for QueryResponse {
let addresses: Vec<_> = proofs.iter().map(|(addr, _)| addr.clone()).collect();
write!(f, "GetChunkExistenceProof(checked chunks: {addresses:?})")
}
QueryResponse::GetClosestPeers { target, peers, .. } => {
let addresses: Vec<_> = peers.iter().map(|(addr, _)| addr.clone()).collect();
write!(
f,
"GetClosestPeers target {target:?} close peers {addresses:?}"
)
}
}
}
}
Expand Down

0 comments on commit cd0e442

Please sign in to comment.