From 6720279fb3aac12ad525785d2366cdf30da4d78c Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> Date: Thu, 25 Jul 2024 19:27:24 +0300 Subject: [PATCH] authorithy-discovery: Make changing of peer-id while active a bit more robust (#3786) In the case when nodes don't persist their node-key or they want to generate a new one while being in the active set, things go wrong because both the old addresses and the new ones will still be present in DHT, so because of the distributed nature of the DHT both will survive in the network untill the old ones expires which is 36 hours. Nodes in the network will randomly resolve the authorithy-id to the old address or the new one. More details in: https://github.com/paritytech/polkadot-sdk/issues/3673 This PR proposes we mitigate this problem, by: 1. Let the query for a DHT key retrieve more than one results(4), that is also bounded by the replication factor which is 20, currently we interrupt the querry on the first result. ~2. Modify the authority-discovery service to keep all the discovered addresses around for 24h since they last seen an address.~ ~3. Plumb through other subsystems where the assumption was that an authorithy-id will resolve only to one PeerId. Currently, the authorithy-discovery keeps just the last record it received from DHT and queries the DHT every 10 minutes. But they could always receive only the old address, only the new address or a flip-flop between them depending on what node wins the race to provide the record~ 2. Extend the `SignedAuthorityRecord` with a signed creation_time. 3. Modify authority discovery to keep track of nodes that sent us old record and once we are made aware of a new record update the nodes we know about with the new record. 4. Update gossip-support to try resolve authorities more often than every session. ~This would gives us a lot more chances for the nodes in the networks to also discover not only the old address of the node but also the new one and should improve the time it takes for a node to be properly connected in the network. The behaviour won't be deterministic because there is no guarantee the all nodes will see the new record at least once, since they could query only nodes that have the old one.~ ## TODO - [x] Add unittests for the new paths. - [x] Make sure the implementation is backwards compatible - [x] Evaluate if there are any bad consequence of letting the query continue rather than finish it at first record found. - [x] Bake in versi the new changes. --------- Signed-off-by: Alexandru Gheorghe Co-authored-by: Dmitry Markin Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> --- polkadot/node/network/bridge/src/network.rs | 15 + polkadot/node/network/bridge/src/rx/tests.rs | 8 + polkadot/node/network/bridge/src/tx/mod.rs | 16 + polkadot/node/network/bridge/src/tx/tests.rs | 8 + .../network/bridge/src/validator_discovery.rs | 47 ++ .../node/network/gossip-support/src/lib.rs | 127 ++++- .../node/network/gossip-support/src/tests.rs | 315 ++++++++++++ polkadot/node/subsystem-types/src/messages.rs | 10 + prdoc/pr_3786.prdoc | 22 + substrate/client/authority-discovery/build.rs | 6 +- .../client/authority-discovery/src/worker.rs | 278 ++++++++--- .../src/worker/schema/dht-v3.proto | 31 ++ .../src/worker/schema/tests.rs | 85 +++- .../authority-discovery/src/worker/tests.rs | 468 +++++++++++++++--- substrate/client/network/src/behaviour.rs | 19 +- substrate/client/network/src/discovery.rs | 67 ++- substrate/client/network/src/event.rs | 7 +- .../client/network/src/litep2p/discovery.rs | 25 +- substrate/client/network/src/litep2p/mod.rs | 69 ++- .../client/network/src/litep2p/service.rs | 31 ++ substrate/client/network/src/service.rs | 24 +- .../client/network/src/service/traits.rs | 10 + 22 files changed, 1511 insertions(+), 177 deletions(-) create mode 100644 prdoc/pr_3786.prdoc create mode 100644 substrate/client/authority-discovery/src/worker/schema/dht-v3.proto diff --git a/polkadot/node/network/bridge/src/network.rs b/polkadot/node/network/bridge/src/network.rs index b31359f48a56..1f438df2d148 100644 --- a/polkadot/node/network/bridge/src/network.rs +++ b/polkadot/node/network/bridge/src/network.rs @@ -204,6 +204,13 @@ pub trait Network: Clone + Send + 'static { multiaddresses: HashSet, ) -> Result<(), String>; + /// Ask the network to extend the reserved set with these nodes. + async fn add_peers_to_reserved_set( + &mut self, + protocol: ProtocolName, + multiaddresses: HashSet, + ) -> Result<(), String>; + /// Removes the peers for the protocol's peer set (both reserved and non-reserved). async fn remove_from_peers_set( &mut self, @@ -240,6 +247,14 @@ impl Network for Arc { ::set_reserved_peers(&**self, protocol, multiaddresses) } + async fn add_peers_to_reserved_set( + &mut self, + protocol: ProtocolName, + multiaddresses: HashSet, + ) -> Result<(), String> { + ::add_peers_to_reserved_set(&**self, protocol, multiaddresses) + } + async fn remove_from_peers_set( &mut self, protocol: ProtocolName, diff --git a/polkadot/node/network/bridge/src/rx/tests.rs b/polkadot/node/network/bridge/src/rx/tests.rs index 392ff7391a1c..601dca5cb8a3 100644 --- a/polkadot/node/network/bridge/src/rx/tests.rs +++ b/polkadot/node/network/bridge/src/rx/tests.rs @@ -124,6 +124,14 @@ impl Network for TestNetwork { Ok(()) } + async fn add_peers_to_reserved_set( + &mut self, + _protocol: ProtocolName, + _: HashSet, + ) -> Result<(), String> { + Ok(()) + } + async fn remove_from_peers_set( &mut self, _protocol: ProtocolName, diff --git a/polkadot/node/network/bridge/src/tx/mod.rs b/polkadot/node/network/bridge/src/tx/mod.rs index 7b6dea748572..6c353195d41a 100644 --- a/polkadot/node/network/bridge/src/tx/mod.rs +++ b/polkadot/node/network/bridge/src/tx/mod.rs @@ -370,6 +370,22 @@ where .await; return (network_service, authority_discovery_service) }, + + NetworkBridgeTxMessage::AddToResolvedValidators { validator_addrs, peer_set } => { + gum::trace!( + target: LOG_TARGET, + action = "AddToResolvedValidators", + peer_set = ?peer_set, + ?validator_addrs, + "Received a resolved validator connection request", + ); + + let all_addrs = validator_addrs.into_iter().flatten().collect(); + let network_service = validator_discovery + .on_add_to_resolved_request(all_addrs, peer_set, network_service) + .await; + return (network_service, authority_discovery_service) + }, } (network_service, authority_discovery_service) } diff --git a/polkadot/node/network/bridge/src/tx/tests.rs b/polkadot/node/network/bridge/src/tx/tests.rs index 9265358196db..30b2c3421372 100644 --- a/polkadot/node/network/bridge/src/tx/tests.rs +++ b/polkadot/node/network/bridge/src/tx/tests.rs @@ -148,6 +148,14 @@ impl Network for TestNetwork { Ok(()) } + async fn add_peers_to_reserved_set( + &mut self, + _protocol: ProtocolName, + _: HashSet, + ) -> Result<(), String> { + Ok(()) + } + async fn remove_from_peers_set( &mut self, _protocol: ProtocolName, diff --git a/polkadot/node/network/bridge/src/validator_discovery.rs b/polkadot/node/network/bridge/src/validator_discovery.rs index f0ef038d5eb4..9accd56d86ae 100644 --- a/polkadot/node/network/bridge/src/validator_discovery.rs +++ b/polkadot/node/network/bridge/src/validator_discovery.rs @@ -92,6 +92,44 @@ impl Service { network_service } + /// Connect to already resolved addresses. + pub async fn on_add_to_resolved_request( + &mut self, + newly_requested: HashSet, + peer_set: PeerSet, + mut network_service: N, + ) -> N { + let state = &mut self.state[peer_set]; + let new_peer_ids: HashSet = extract_peer_ids(newly_requested.iter().cloned()); + let num_peers = new_peer_ids.len(); + + state.previously_requested.extend(new_peer_ids); + + gum::debug!( + target: LOG_TARGET, + ?peer_set, + ?num_peers, + "New add to resolved validators request", + ); + + // ask the network to connect to these nodes and not disconnect + // from them until they are removed from the set. + // + // for peer-set management, the main protocol name should be used regardless of + // the negotiated version. + if let Err(e) = network_service + .add_peers_to_reserved_set( + self.peerset_protocol_names.get_main_name(peer_set), + newly_requested, + ) + .await + { + gum::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress"); + } + + network_service + } + /// On a new connection request, a peer set update will be issued. /// It will ask the network to connect to the validators and not disconnect /// from them at least until the next request is issued for the same peer set. @@ -222,6 +260,15 @@ mod tests { Ok(()) } + async fn add_peers_to_reserved_set( + &mut self, + _protocol: ProtocolName, + multiaddresses: HashSet, + ) -> Result<(), String> { + self.peers_set.extend(extract_peer_ids(multiaddresses.into_iter())); + Ok(()) + } + async fn remove_from_peers_set( &mut self, _protocol: ProtocolName, diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index 4dfdd1f7208f..cd327c11e408 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -69,6 +69,16 @@ const BACKOFF_DURATION: Duration = Duration::from_secs(5); #[cfg(test)] const BACKOFF_DURATION: Duration = Duration::from_millis(500); +// The authorithy_discovery queries runs every ten minutes, +// so it make sense to run a bit more often than that to +// detect changes as often as we can, but not too often since +// it won't help. +#[cfg(not(test))] +const TRY_RERESOLVE_AUTHORITIES: Duration = Duration::from_secs(5 * 60); + +#[cfg(test)] +const TRY_RERESOLVE_AUTHORITIES: Duration = Duration::from_secs(2); + /// Duration after which we consider low connectivity a problem. /// /// Especially at startup low connectivity is expected (authority discovery cache needs to be @@ -91,6 +101,14 @@ pub struct GossipSupport { // `None` otherwise. last_failure: Option, + // Validators can restart during a session, so if they change + // their PeerID, we will connect to them in the best case after + // a session, so we need to try more often to resolved peers and + // reconnect to them. The authorithy_discovery queries runs every ten + // minutes, so we can't detect changes in the address more often + // that that. + last_connection_request: Option, + /// First time we did not reach our connectivity threshold. /// /// This is the time of the first failed attempt to connect to >2/3 of all validators in a @@ -131,6 +149,7 @@ where keystore, last_session_index: None, last_failure: None, + last_connection_request: None, failure_start: None, resolved_authorities: HashMap::new(), connected_authorities: HashMap::new(), @@ -196,15 +215,22 @@ where for leaf in leaves { let current_index = util::request_session_index_for_child(leaf, sender).await.await??; let since_failure = self.last_failure.map(|i| i.elapsed()).unwrap_or_default(); + let since_last_reconnect = + self.last_connection_request.map(|i| i.elapsed()).unwrap_or_default(); + let force_request = since_failure >= BACKOFF_DURATION; + let re_resolve_authorities = since_last_reconnect >= TRY_RERESOLVE_AUTHORITIES; let leaf_session = Some((current_index, leaf)); let maybe_new_session = match self.last_session_index { Some(i) if current_index <= i => None, _ => leaf_session, }; - let maybe_issue_connection = - if force_request { leaf_session } else { maybe_new_session }; + let maybe_issue_connection = if force_request || re_resolve_authorities { + leaf_session + } else { + maybe_new_session + }; if let Some((session_index, relay_parent)) = maybe_issue_connection { let session_info = @@ -248,7 +274,7 @@ where // connections to a much broader set of validators. { let mut connections = authorities_past_present_future(sender, leaf).await?; - + self.last_connection_request = Some(Instant::now()); // Remove all of our locally controlled validator indices so we don't connect to // ourself. let connections = @@ -259,7 +285,12 @@ where // to clean up all connections. Vec::new() }; - self.issue_connection_request(sender, connections).await; + + if force_request || is_new_session { + self.issue_connection_request(sender, connections).await; + } else if re_resolve_authorities { + self.issue_connection_request_to_changed(sender, connections).await; + } } if is_new_session { @@ -324,17 +355,14 @@ where authority_check_result } - async fn issue_connection_request( + async fn resolve_authorities( &mut self, - sender: &mut Sender, authorities: Vec, - ) where - Sender: overseer::GossipSupportSenderTrait, - { - let num = authorities.len(); + ) -> (Vec>, HashMap>, usize) { let mut validator_addrs = Vec::with_capacity(authorities.len()); - let mut failures = 0; let mut resolved = HashMap::with_capacity(authorities.len()); + let mut failures = 0; + for authority in authorities { if let Some(addrs) = self.authority_discovery.get_addresses_by_authority_id(authority.clone()).await @@ -350,6 +378,67 @@ where ); } } + (validator_addrs, resolved, failures) + } + + async fn issue_connection_request_to_changed( + &mut self, + sender: &mut Sender, + authorities: Vec, + ) where + Sender: overseer::GossipSupportSenderTrait, + { + let (_, resolved, _) = self.resolve_authorities(authorities).await; + + let mut changed = Vec::new(); + + for (authority, new_addresses) in &resolved { + let new_peer_ids = new_addresses + .iter() + .flat_map(|addr| parse_addr(addr.clone()).ok().map(|(p, _)| p)) + .collect::>(); + match self.resolved_authorities.get(authority) { + Some(old_addresses) => { + let old_peer_ids = old_addresses + .iter() + .flat_map(|addr| parse_addr(addr.clone()).ok().map(|(p, _)| p)) + .collect::>(); + if !old_peer_ids.is_superset(&new_peer_ids) { + changed.push(new_addresses.clone()); + } + }, + None => changed.push(new_addresses.clone()), + } + } + gum::debug!( + target: LOG_TARGET, + num_changed = ?changed.len(), + ?changed, + "Issuing a connection request to changed validators" + ); + if !changed.is_empty() { + self.resolved_authorities = resolved; + + sender + .send_message(NetworkBridgeTxMessage::AddToResolvedValidators { + validator_addrs: changed, + peer_set: PeerSet::Validation, + }) + .await; + } + } + + async fn issue_connection_request( + &mut self, + sender: &mut Sender, + authorities: Vec, + ) where + Sender: overseer::GossipSupportSenderTrait, + { + let num = authorities.len(); + + let (validator_addrs, resolved, failures) = self.resolve_authorities(authorities).await; + self.resolved_authorities = resolved; gum::debug!(target: LOG_TARGET, %num, "Issuing a connection request"); @@ -399,16 +488,24 @@ where { let mut authority_ids: HashMap> = HashMap::new(); for authority in authorities { - let peer_id = self + let peer_ids = self .authority_discovery .get_addresses_by_authority_id(authority.clone()) .await .into_iter() .flat_map(|list| list.into_iter()) - .find_map(|addr| parse_addr(addr).ok().map(|(p, _)| p)); + .flat_map(|addr| parse_addr(addr).ok().map(|(p, _)| p)) + .collect::>(); + + gum::trace!( + target: LOG_TARGET, + ?peer_ids, + ?authority, + "Resolved to peer ids" + ); - if let Some(p) = peer_id { - authority_ids.entry(p).or_default().insert(authority); + for p in peer_ids { + authority_ids.entry(p).or_default().insert(authority.clone()); } } diff --git a/polkadot/node/network/gossip-support/src/tests.rs b/polkadot/node/network/gossip-support/src/tests.rs index 42197d00e6f3..09622254f523 100644 --- a/polkadot/node/network/gossip-support/src/tests.rs +++ b/polkadot/node/network/gossip-support/src/tests.rs @@ -119,6 +119,14 @@ impl MockAuthorityDiscovery { } } + fn change_address_for_authority(&self, authority_id: AuthorityDiscoveryId) -> PeerId { + let new_peer_id = PeerId::random(); + let addr = Multiaddr::empty().with(Protocol::P2p(new_peer_id.into())); + self.addrs.lock().insert(authority_id.clone(), HashSet::from([addr])); + self.authorities.lock().insert(new_peer_id, HashSet::from([authority_id])); + new_peer_id + } + fn authorities(&self) -> HashMap> { self.authorities.lock().clone() } @@ -809,6 +817,313 @@ fn issues_update_authorities_after_session() { ); } +// Test we connect to authorities that changed their address `TRY_RERESOLVE_AUTHORITIES` rate +// and that is is no-op if no authority changed. +#[test] +fn test_quickly_connect_to_authorities_that_changed_address() { + let hash = Hash::repeat_byte(0xAA); + + let authorities = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + let authority_that_changes_address = authorities.get(5).unwrap().clone(); + + let mut authority_discovery_mock = MockAuthorityDiscovery::new(authorities); + + test_harness( + make_subsystem_with_authority_discovery(authority_discovery_mock.clone()), + |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // 1. Initialize with the first leaf in the session. + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + let mut session_info = make_session_info(); + session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + tx.send(Ok(Some(session_info))).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { + validator_addrs, + peer_set, + }) => { + let all_without_ferdie: Vec<_> = PAST_PRESENT_FUTURE_AUTHORITIES + .iter() + .cloned() + .filter(|p| p != &Sr25519Keyring::Ferdie.public().into()) + .collect(); + + let addrs = get_multiaddrs(all_without_ferdie, authority_discovery_mock.clone()).await; + + assert_eq!(validator_addrs, addrs); + assert_eq!(peer_set, PeerSet::Validation); + } + ); + + // Ensure neighbors are unaffected + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::CurrentBabeEpoch(tx), + )) => { + let _ = tx.send(Ok(BabeEpoch { + epoch_index: 2 as _, + start_slot: 0.into(), + duration: 200, + authorities: vec![(Sr25519Keyring::Alice.public().into(), 1)], + randomness: [0u8; 32], + config: BabeEpochConfiguration { + c: (1, 4), + allowed_slots: AllowedSlots::PrimarySlots, + }, + })).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::NewGossipTopology { + session: _, + local_index: _, + canonical_shuffling: _, + shuffled_indices: _, + }) => { + + } + ); + + // 2. Connect all authorities that are known so far. + let known_authorities = authority_discovery_mock.authorities(); + for (peer_id, _id) in known_authorities.iter() { + let msg = + GossipSupportMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( + *peer_id, + ObservedRole::Authority, + ValidationVersion::V3.into(), + None, + )); + overseer.send(FromOrchestra::Communication { msg }).await + } + + // 3. Send a new leaf after TRY_RERESOLVE_AUTHORITIES, we should notice + // UpdateAuthorithies is emitted for all ConnectedPeers. + Delay::new(TRY_RERESOLVE_AUTHORITIES).await; + let hash = Hash::repeat_byte(0xBB); + overseer_signal_active_leaves(overseer, hash).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + let mut session_info = make_session_info(); + session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + tx.send(Ok(Some(session_info))).unwrap(); + + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); + + for _ in 0..known_authorities.len() { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::UpdatedAuthorityIds { + peer_id, + authority_ids, + }) => { + assert_eq!(authority_discovery_mock.get_authority_ids_by_peer_id(peer_id).await.unwrap_or_default(), authority_ids); + } + ); + } + + // 4. At next re-resolve no-authorithy changes their address, so it should be no-op. + Delay::new(TRY_RERESOLVE_AUTHORITIES).await; + let hash = Hash::repeat_byte(0xCC); + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + let mut session_info = make_session_info(); + session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + tx.send(Ok(Some(session_info))).unwrap(); + + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); + assert!(overseer.recv().timeout(TIMEOUT).await.is_none()); + + // Change address for one authorithy and check we try to connect to it and + // that we emit UpdateAuthorityID for the old PeerId and the new one. + Delay::new(TRY_RERESOLVE_AUTHORITIES).await; + let changed_peerid = authority_discovery_mock + .change_address_for_authority(authority_that_changes_address.clone()); + let hash = Hash::repeat_byte(0xDD); + let msg = GossipSupportMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( + changed_peerid, + ObservedRole::Authority, + ValidationVersion::V3.into(), + None, + )); + overseer.send(FromOrchestra::Communication { msg }).await; + + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + let mut session_info = make_session_info(); + session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + tx.send(Ok(Some(session_info))).unwrap(); + + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::AddToResolvedValidators { + validator_addrs, + peer_set, + }) => { + let expected = get_address_map(vec![authority_that_changes_address.clone()], authority_discovery_mock.clone()).await; + let expected: HashSet = expected.into_values().flat_map(|v| v.into_iter()).collect(); + assert_eq!(validator_addrs.into_iter().flat_map(|v| v.into_iter()).collect::>(), expected); + assert_eq!(peer_set, PeerSet::Validation); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::UpdatedAuthorityIds { + peer_id, + authority_ids, + }) => { + assert_eq!(authority_discovery_mock.get_authority_ids_by_peer_id(peer_id).await.unwrap(), HashSet::from([authority_that_changes_address.clone()])); + assert!(authority_ids.is_empty()); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::UpdatedAuthorityIds { + peer_id, + authority_ids, + }) => { + assert_eq!(authority_ids, HashSet::from([authority_that_changes_address])); + assert_eq!(changed_peerid, peer_id); + } + ); + + assert!(overseer.recv().timeout(TIMEOUT).await.is_none()); + + virtual_overseer + }, + ); +} + #[test] fn disconnect_when_not_in_past_present_future() { sp_tracing::try_init_simple(); diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index ee937bca05bf..4d27ac9b70e3 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -444,6 +444,16 @@ pub enum NetworkBridgeTxMessage { /// The peer set we want the connection on. peer_set: PeerSet, }, + + /// Extends the known validators set with new peers we already know the `Multiaddrs`, this is + /// usually needed for validators that change their address mid-session. It is usually called + /// after a ConnectToResolvedValidators at the beginning of the session. + AddToResolvedValidators { + /// Each entry corresponds to the addresses of an already resolved validator. + validator_addrs: Vec>, + /// The peer set we want the connection on. + peer_set: PeerSet, + }, } /// Availability Distribution Message. diff --git a/prdoc/pr_3786.prdoc b/prdoc/pr_3786.prdoc new file mode 100644 index 000000000000..0bb9e6c23f75 --- /dev/null +++ b/prdoc/pr_3786.prdoc @@ -0,0 +1,22 @@ +title: Make changing of peer-id while active a bit more robust + +doc: + - audience: Node Dev + description: | + Implemetation of https://github.com/polkadot-fellows/RFCs/pull/91, to use `creation_time` field to determine + the newest DHT record and to update nodes known to have the old record. + + Gossip-support is modified to try to re-resolve new address authorithies every 5 minutes instead of each session, + so that we pick autorithies that changed their address faster and try to connect to them. + +crates: +- name: sc-authority-discovery + bump: major +- name: polkadot-gossip-support + bump: major +- name: polkadot-network-bridge + bump: major +- name: polkadot-node-subsystem-types + bump: major +- name: sc-network + bump: minor \ No newline at end of file diff --git a/substrate/client/authority-discovery/build.rs b/substrate/client/authority-discovery/build.rs index 83076ac8c893..cdabc1a74427 100644 --- a/substrate/client/authority-discovery/build.rs +++ b/substrate/client/authority-discovery/build.rs @@ -18,7 +18,11 @@ fn main() { prost_build::compile_protos( - &["src/worker/schema/dht-v1.proto", "src/worker/schema/dht-v2.proto"], + &[ + "src/worker/schema/dht-v1.proto", + "src/worker/schema/dht-v2.proto", + "src/worker/schema/dht-v3.proto", + ], &["src/worker/schema"], ) .unwrap(); diff --git a/substrate/client/authority-discovery/src/worker.rs b/substrate/client/authority-discovery/src/worker.rs index 1f1cce160786..42994bbc7ea8 100644 --- a/substrate/client/authority-discovery/src/worker.rs +++ b/substrate/client/authority-discovery/src/worker.rs @@ -26,7 +26,7 @@ use std::{ collections::{HashMap, HashSet}, marker::PhantomData, sync::Arc, - time::Duration, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt}; @@ -34,16 +34,17 @@ use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt} use addr_cache::AddrCache; use codec::{Decode, Encode}; use ip_network::IpNetwork; +use libp2p::kad::{PeerRecord, Record}; use linked_hash_set::LinkedHashSet; -use log::{debug, error, log_enabled}; +use log::{debug, error}; use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64}; use prost::Message; use rand::{seq::SliceRandom, thread_rng}; use sc_network::{ - event::DhtEvent, multiaddr, KademliaKey, Multiaddr, NetworkDHTProvider, NetworkSigner, - NetworkStateInfo, + config::DEFAULT_KADEMLIA_REPLICATION_FACTOR, event::DhtEvent, multiaddr, KademliaKey, + Multiaddr, NetworkDHTProvider, NetworkSigner, NetworkStateInfo, }; use sc_network_types::{multihash::Code, PeerId}; use schema::PeerSignature; @@ -62,7 +63,7 @@ mod schema { #[cfg(test)] mod tests; - include!(concat!(env!("OUT_DIR"), "/authority_discovery_v2.rs")); + include!(concat!(env!("OUT_DIR"), "/authority_discovery_v3.rs")); } #[cfg(test)] pub mod tests; @@ -159,6 +160,16 @@ pub struct Worker { /// Set of in-flight lookups. in_flight_lookups: HashMap, + /// Set of lookups we can still receive records. + /// These are the entries in the `in_flight_lookups` for which + /// we got at least one successfull result. + known_lookups: HashMap, + + /// Last known record by key, here we always keep the record with + /// the highest creation time and we don't accept records older than + /// that. + last_known_records: HashMap, + addr_cache: addr_cache::AddrCache, metrics: Option, @@ -168,6 +179,17 @@ pub struct Worker { phantom: PhantomData, } +#[derive(Debug, Clone)] +struct RecordInfo { + /// Time since UNIX_EPOCH in nanoseconds. + creation_time: u128, + /// Peers that we know have this record, bounded to no more than + /// DEFAULT_KADEMLIA_REPLICATION_FACTOR(20). + peers_with_record: HashSet, + /// The record itself. + record: Record, +} + /// Wrapper for [`AuthorityDiscoveryApi`](sp_authority_discovery::AuthorityDiscoveryApi). Can be /// be implemented by any struct without dependency on the runtime. #[async_trait::async_trait] @@ -283,10 +305,12 @@ where query_interval, pending_lookups: Vec::new(), in_flight_lookups: HashMap::new(), + known_lookups: HashMap::new(), addr_cache, role, metrics, phantom: PhantomData, + last_known_records: HashMap::new(), } } @@ -444,7 +468,7 @@ where .set(addresses.len().try_into().unwrap_or(std::u64::MAX)); } - let serialized_record = serialize_authority_record(addresses)?; + let serialized_record = serialize_authority_record(addresses, Some(build_creation_time()))?; let peer_signature = sign_record_with_peer_id(&serialized_record, &self.network)?; let keys_vec = keys.iter().cloned().collect::>(); @@ -495,12 +519,17 @@ where self.authorities_queried_at = Some(best_hash); self.addr_cache.retain_ids(&authorities); + let now = Instant::now(); + self.last_known_records.retain(|k, value| { + self.known_authorities.contains_key(k) && !value.record.is_expired(now) + }); authorities.shuffle(&mut thread_rng()); self.pending_lookups = authorities; // Ignore all still in-flight lookups. Those that are still in-flight are likely stalled as // query interval ticks are far enough apart for all lookups to succeed. self.in_flight_lookups.clear(); + self.known_lookups.clear(); if let Some(metrics) = &self.metrics { metrics @@ -538,16 +567,12 @@ where metrics.dht_event_received.with_label_values(&["value_found"]).inc(); } - if log_enabled!(log::Level::Debug) { - let hashes: Vec<_> = v.iter().map(|(hash, _value)| hash.clone()).collect(); - debug!(target: LOG_TARGET, "Value for hash '{:?}' found on Dht.", hashes); - } + debug!(target: LOG_TARGET, "Value for hash '{:?}' found on Dht.", v.record.key); if let Err(e) = self.handle_dht_value_found_event(v) { if let Some(metrics) = &self.metrics { metrics.handle_value_found_event_failure.inc(); } - debug!(target: LOG_TARGET, "Failed to handle Dht value found event: {}", e); } }, @@ -651,6 +676,31 @@ where publisher, authority_id, )?; + + let records_creation_time: u128 = + schema::AuthorityRecord::decode(signed_record.record.as_slice()) + .map_err(Error::DecodingProto)? + .creation_time + .map(|creation_time| { + u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default() + }) + .unwrap_or_default(); // 0 is a sane default for records that do not have creation time present. + + let current_record_info = self.last_known_records.get(&record_key); + // If record creation time is older than the current record creation time, + // we don't store it since we want to give higher priority to newer records. + if let Some(current_record_info) = current_record_info { + if records_creation_time < current_record_info.creation_time { + debug!( + target: LOG_TARGET, + "Skip storing because record creation time {:?} is older than the current known record {:?}", + records_creation_time, + current_record_info.creation_time + ); + return Ok(()); + } + } + self.network.store_record(record_key, record_value, Some(publisher), expires); Ok(()) } @@ -701,67 +751,88 @@ where Ok(()) } - fn handle_dht_value_found_event(&mut self, values: Vec<(KademliaKey, Vec)>) -> Result<()> { + fn handle_dht_value_found_event(&mut self, peer_record: PeerRecord) -> Result<()> { // Ensure `values` is not empty and all its keys equal. - let remote_key = single(values.iter().map(|(key, _)| key.clone())) - .map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentKeys)? - .ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?; - - let authority_id: AuthorityId = self - .in_flight_lookups - .remove(&remote_key) - .ok_or(Error::ReceivingUnexpectedRecord)?; + let remote_key = peer_record.record.key.clone(); + + let authority_id: AuthorityId = + if let Some(authority_id) = self.in_flight_lookups.remove(&remote_key) { + self.known_lookups.insert(remote_key.clone(), authority_id.clone()); + authority_id + } else if let Some(authority_id) = self.known_lookups.get(&remote_key) { + authority_id.clone() + } else { + return Err(Error::ReceivingUnexpectedRecord); + }; let local_peer_id = self.network.local_peer_id(); - let remote_addresses: Vec = values - .into_iter() - .map(|(_k, v)| { - let schema::SignedAuthorityRecord { record, peer_signature, .. } = - Self::check_record_signed_with_authority_id(&v, &authority_id)?; - - let addresses: Vec = schema::AuthorityRecord::decode(record.as_slice()) - .map(|a| a.addresses) - .map_err(Error::DecodingProto)? - .into_iter() - .map(|a| a.try_into()) - .collect::>() - .map_err(Error::ParsingMultiaddress)?; - - let get_peer_id = |a: &Multiaddr| match a.iter().last() { - Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(), - _ => None, - }; - - // Ignore [`Multiaddr`]s without [`PeerId`] or with own addresses. - let addresses: Vec = addresses - .into_iter() - .filter(|a| get_peer_id(a).filter(|p| *p != local_peer_id).is_some()) - .collect(); - - let remote_peer_id = single(addresses.iter().map(get_peer_id)) - .map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentPeerIds)? // different peer_id in records - .flatten() - .ok_or(Error::ReceivingDhtValueFoundEventWithNoPeerIds)?; // no records with peer_id in them - - // At this point we know all the valid multiaddresses from the record, know that - // each of them belong to the same PeerId, we just need to check if the record is - // properly signed by the owner of the PeerId - self.check_record_signed_with_network_key( - &record, - peer_signature, - remote_peer_id, - &authority_id, - )?; - Ok(addresses) + let schema::SignedAuthorityRecord { record, peer_signature, .. } = + Self::check_record_signed_with_authority_id( + peer_record.record.value.as_slice(), + &authority_id, + )?; + + let authority_record = + schema::AuthorityRecord::decode(record.as_slice()).map_err(Error::DecodingProto)?; + + let records_creation_time: u128 = authority_record + .creation_time + .as_ref() + .map(|creation_time| { + u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default() }) - .collect::>>>()? + .unwrap_or_default(); // 0 is a sane default for records that do not have creation time present. + + let addresses: Vec = authority_record + .addresses .into_iter() - .flatten() - .take(MAX_ADDRESSES_PER_AUTHORITY) + .map(|a| a.try_into()) + .collect::>() + .map_err(Error::ParsingMultiaddress)?; + + let get_peer_id = |a: &Multiaddr| match a.iter().last() { + Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(), + _ => None, + }; + + // Ignore [`Multiaddr`]s without [`PeerId`] or with own addresses. + let addresses: Vec = addresses + .into_iter() + .filter(|a| get_peer_id(&a).filter(|p| *p != local_peer_id).is_some()) .collect(); - if !remote_addresses.is_empty() { + let remote_peer_id = single(addresses.iter().map(|a| get_peer_id(&a))) + .map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentPeerIds)? // different peer_id in records + .flatten() + .ok_or(Error::ReceivingDhtValueFoundEventWithNoPeerIds)?; // no records with peer_id in them + + // At this point we know all the valid multiaddresses from the record, know that + // each of them belong to the same PeerId, we just need to check if the record is + // properly signed by the owner of the PeerId + self.check_record_signed_with_network_key( + &record, + peer_signature, + remote_peer_id, + &authority_id, + )?; + + let remote_addresses: Vec = + addresses.into_iter().take(MAX_ADDRESSES_PER_AUTHORITY).collect(); + + let answering_peer_id = peer_record.peer.map(|peer| peer.into()); + + let addr_cache_needs_update = self.handle_new_record( + &authority_id, + remote_key.clone(), + RecordInfo { + creation_time: records_creation_time, + peers_with_record: answering_peer_id.into_iter().collect(), + record: peer_record.record, + }, + ); + + if !remote_addresses.is_empty() && addr_cache_needs_update { self.addr_cache.insert(authority_id, remote_addresses); if let Some(metrics) = &self.metrics { metrics @@ -772,6 +843,68 @@ where Ok(()) } + // Handles receiving a new DHT record for the authorithy. + // Returns true if the record was new, false if the record was older than the current one. + fn handle_new_record( + &mut self, + authority_id: &AuthorityId, + kademlia_key: KademliaKey, + new_record: RecordInfo, + ) -> bool { + let current_record_info = self + .last_known_records + .entry(kademlia_key.clone()) + .or_insert_with(|| new_record.clone()); + + if new_record.creation_time > current_record_info.creation_time { + let peers_that_need_updating = current_record_info.peers_with_record.clone(); + self.network.put_record_to( + new_record.record.clone(), + peers_that_need_updating.clone(), + // If this is empty it means we received the answer from our node local + // storage, so we need to update that as well. + current_record_info.peers_with_record.is_empty(), + ); + debug!( + target: LOG_TARGET, + "Found a newer record for {:?} new record creation time {:?} old record creation time {:?}", + authority_id, new_record.creation_time, current_record_info.creation_time + ); + self.last_known_records.insert(kademlia_key, new_record); + return true + } + + if new_record.creation_time == current_record_info.creation_time { + // Same record just update in case this is a record from old nodes that don't have + // timestamp. + debug!( + target: LOG_TARGET, + "Found same record for {:?} record creation time {:?}", + authority_id, new_record.creation_time + ); + if current_record_info.peers_with_record.len() + new_record.peers_with_record.len() <= + DEFAULT_KADEMLIA_REPLICATION_FACTOR + { + current_record_info.peers_with_record.extend(new_record.peers_with_record); + } + return true + } + + debug!( + target: LOG_TARGET, + "Found old record for {:?} received record creation time {:?} current record creation time {:?}", + authority_id, new_record.creation_time, current_record_info.creation_time, + ); + self.network.put_record_to( + current_record_info.record.clone(), + new_record.peers_with_record.clone(), + // If this is empty it means we received the answer from our node local + // storage, so we need to update that as well. + new_record.peers_with_record.is_empty(), + ); + return false + } + /// Retrieve our public keys within the current and next authority set. // A node might have multiple authority discovery keys within its keystore, e.g. an old one and // one for the upcoming session. In addition it could be participating in the current and (/ or) @@ -838,9 +971,21 @@ fn serialize_addresses(addresses: impl Iterator) -> Vec>) -> Result> { +fn build_creation_time() -> schema::TimestampInfo { + let creation_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|time| time.as_nanos()) + .unwrap_or_default(); + schema::TimestampInfo { timestamp: creation_time.encode() } +} + +fn serialize_authority_record( + addresses: Vec>, + creation_time: Option, +) -> Result> { let mut serialized_record = vec![]; - schema::AuthorityRecord { addresses } + + schema::AuthorityRecord { addresses, creation_time } .encode(&mut serialized_record) .map_err(Error::EncodingProto)?; Ok(serialized_record) @@ -876,7 +1021,6 @@ fn sign_record_with_authority_ids( // Scale encode let auth_signature = auth_signature.encode(); - let signed_record = schema::SignedAuthorityRecord { record: serialized_record.clone(), auth_signature, diff --git a/substrate/client/authority-discovery/src/worker/schema/dht-v3.proto b/substrate/client/authority-discovery/src/worker/schema/dht-v3.proto new file mode 100644 index 000000000000..547237573af2 --- /dev/null +++ b/substrate/client/authority-discovery/src/worker/schema/dht-v3.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +package authority_discovery_v3; + +// First we need to serialize the addresses in order to be able to sign them. +message AuthorityRecord { + // Possibly multiple `MultiAddress`es through which the node can be reached. + repeated bytes addresses = 1; + // Information about the creation time of the record + TimestampInfo creation_time = 2; +} + +message PeerSignature { + bytes signature = 1; + bytes public_key = 2; +} + +// Information regarding the creation data of the record +message TimestampInfo { + // Time since UNIX_EPOCH in nanoseconds, scale encoded + bytes timestamp = 1; +} + +// Then we need to serialize the authority record and signature to send them over the wire. +message SignedAuthorityRecord { + bytes record = 1; + bytes auth_signature = 2; + // Even if there are multiple `record.addresses`, all of them have the same peer id. + // Old versions are missing this field. It is optional in order to provide compatibility both ways. + PeerSignature peer_signature = 3; +} diff --git a/substrate/client/authority-discovery/src/worker/schema/tests.rs b/substrate/client/authority-discovery/src/worker/schema/tests.rs index ef06ed7d336b..557fa9641f97 100644 --- a/substrate/client/authority-discovery/src/worker/schema/tests.rs +++ b/substrate/client/authority-discovery/src/worker/schema/tests.rs @@ -20,7 +20,12 @@ mod schema_v1 { include!(concat!(env!("OUT_DIR"), "/authority_discovery_v1.rs")); } +mod schema_v2 { + include!(concat!(env!("OUT_DIR"), "/authority_discovery_v2.rs")); +} + use super::*; +use codec::Encode; use libp2p::identity::Keypair; use prost::Message; use sc_network::{Multiaddr, PeerId}; @@ -65,7 +70,7 @@ fn v1_decodes_v2() { let vec_auth_signature = b"Totally valid signature, I promise!".to_vec(); let vec_peer_signature = b"Surprisingly hard to crack crypto".to_vec(); - let record_v2 = AuthorityRecord { addresses: vec_addresses.clone() }; + let record_v2 = schema_v2::AuthorityRecord { addresses: vec_addresses.clone() }; let mut vec_record_v2 = vec![]; record_v2.encode(&mut vec_record_v2).unwrap(); let vec_peer_public = peer_public.encode_protobuf(); @@ -85,6 +90,82 @@ fn v1_decodes_v2() { assert_eq!(&signed_addresses_v1_decoded.addresses, &vec_record_v2); assert_eq!(&signed_addresses_v1_decoded.signature, &vec_auth_signature); - let addresses_v2_decoded = AuthorityRecord::decode(vec_record_v2.as_slice()).unwrap(); + let addresses_v2_decoded = + schema_v2::AuthorityRecord::decode(vec_record_v2.as_slice()).unwrap(); + assert_eq!(&addresses_v2_decoded.addresses, &vec_addresses); +} + +#[test] +fn v1_decodes_v3() { + let peer_secret = Keypair::generate_ed25519(); + let peer_public = peer_secret.public(); + let peer_id = peer_public.to_peer_id(); + let multiaddress: Multiaddr = + format!("/ip4/127.0.0.1/tcp/3003/p2p/{}", peer_id).parse().unwrap(); + let vec_addresses = vec![multiaddress.to_vec()]; + let vec_auth_signature = b"Totally valid signature, I promise!".to_vec(); + let vec_peer_signature = b"Surprisingly hard to crack crypto".to_vec(); + + let record_v3 = AuthorityRecord { + addresses: vec_addresses.clone(), + creation_time: Some(TimestampInfo { timestamp: Encode::encode(&55) }), + }; + let mut vec_record_v3 = vec![]; + record_v3.encode(&mut vec_record_v3).unwrap(); + let vec_peer_public = peer_public.encode_protobuf(); + let peer_signature_v3 = + PeerSignature { public_key: vec_peer_public, signature: vec_peer_signature }; + let signed_record_v3 = SignedAuthorityRecord { + record: vec_record_v3.clone(), + auth_signature: vec_auth_signature.clone(), + peer_signature: Some(peer_signature_v3.clone()), + }; + let mut vec_signed_record_v3 = vec![]; + signed_record_v3.encode(&mut vec_signed_record_v3).unwrap(); + + let signed_addresses_v1_decoded = + schema_v1::SignedAuthorityAddresses::decode(vec_signed_record_v3.as_slice()).unwrap(); + + assert_eq!(&signed_addresses_v1_decoded.addresses, &vec_record_v3); + assert_eq!(&signed_addresses_v1_decoded.signature, &vec_auth_signature); + + let addresses_v2_decoded = + schema_v2::AuthorityRecord::decode(vec_record_v3.as_slice()).unwrap(); assert_eq!(&addresses_v2_decoded.addresses, &vec_addresses); } + +#[test] +fn v3_decodes_v2() { + let peer_secret = Keypair::generate_ed25519(); + let peer_public = peer_secret.public(); + let peer_id = peer_public.to_peer_id(); + let multiaddress: Multiaddr = + format!("/ip4/127.0.0.1/tcp/3003/p2p/{}", peer_id).parse().unwrap(); + let vec_addresses = vec![multiaddress.to_vec()]; + let vec_auth_signature = b"Totally valid signature, I promise!".to_vec(); + let vec_peer_signature = b"Surprisingly hard to crack crypto".to_vec(); + + let record_v2 = schema_v2::AuthorityRecord { addresses: vec_addresses.clone() }; + let mut vec_record_v2 = vec![]; + record_v2.encode(&mut vec_record_v2).unwrap(); + let vec_peer_public = peer_public.encode_protobuf(); + let peer_signature_v2 = + schema_v2::PeerSignature { public_key: vec_peer_public, signature: vec_peer_signature }; + let signed_record_v2 = schema_v2::SignedAuthorityRecord { + record: vec_record_v2.clone(), + auth_signature: vec_auth_signature.clone(), + peer_signature: Some(peer_signature_v2.clone()), + }; + let mut vec_signed_record_v2 = vec![]; + signed_record_v2.encode(&mut vec_signed_record_v2).unwrap(); + + let signed_addresses_v3_decoded = + SignedAuthorityRecord::decode(vec_signed_record_v2.as_slice()).unwrap(); + + assert_eq!(&signed_addresses_v3_decoded.record, &vec_record_v2); + assert_eq!(&signed_addresses_v3_decoded.auth_signature, &vec_auth_signature); + + let addresses_v3_decoded = AuthorityRecord::decode(vec_record_v2.as_slice()).unwrap(); + assert_eq!(&addresses_v3_decoded.addresses, &vec_addresses); + assert_eq!(&addresses_v3_decoded.creation_time, &None); +} diff --git a/substrate/client/authority-discovery/src/worker/tests.rs b/substrate/client/authority-discovery/src/worker/tests.rs index de7443d634fa..b49615382b8a 100644 --- a/substrate/client/authority-discovery/src/worker/tests.rs +++ b/substrate/client/authority-discovery/src/worker/tests.rs @@ -119,6 +119,7 @@ sp_api::mock_impl_runtime_apis! { pub enum TestNetworkEvent { GetCalled(KademliaKey), PutCalled(KademliaKey, Vec), + PutToCalled(Record, HashSet, bool), StoreRecordCalled(KademliaKey, Vec, Option, Option), } @@ -129,6 +130,7 @@ pub struct TestNetwork { // Whenever functions on `TestNetwork` are called, the function arguments are added to the // vectors below. pub put_value_call: Arc)>>>, + pub put_value_to_call: Arc, bool)>>>, pub get_value_call: Arc>>, pub store_value_call: Arc, Option, Option)>>>, @@ -153,6 +155,7 @@ impl Default for TestNetwork { external_addresses: vec!["/ip6/2001:db8::/tcp/30333".parse().unwrap()], put_value_call: Default::default(), get_value_call: Default::default(), + put_value_to_call: Default::default(), store_value_call: Default::default(), event_sender: tx, event_receiver: Some(rx), @@ -200,6 +203,23 @@ impl NetworkDHTProvider for TestNetwork { .unwrap(); } + fn put_record_to( + &self, + record: Record, + peers: HashSet, + update_local_storage: bool, + ) { + self.put_value_to_call.lock().unwrap().push(( + record.clone(), + peers.clone(), + update_local_storage, + )); + self.event_sender + .clone() + .unbounded_send(TestNetworkEvent::PutToCalled(record, peers, update_local_storage)) + .unwrap(); + } + fn store_record( &self, key: KademliaKey, @@ -262,9 +282,11 @@ fn build_dht_event( public_key: AuthorityId, key_store: &MemoryKeystore, network: Option<&Signer>, + creation_time: Option, ) -> Vec<(KademliaKey, Vec)> { let serialized_record = - serialize_authority_record(serialize_addresses(addresses.into_iter())).unwrap(); + serialize_authority_record(serialize_addresses(addresses.into_iter()), creation_time) + .unwrap(); let peer_signature = network.map(|n| sign_record_with_peer_id(&serialized_record, n).unwrap()); let kv_pairs = sign_record_with_authority_ids( @@ -372,7 +394,10 @@ fn publish_discover_cycle() { let dht_event = { let (key, value) = network.put_value_call.lock().unwrap().pop().unwrap(); - DhtEvent::ValueFound(vec![(key, value)]) + DhtEvent::ValueFound(PeerRecord { + peer: None, + record: Record { key, value, publisher: None, expires: None }, + }) }; // Node B discovering node A's address. @@ -515,21 +540,39 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() { // Send an event that should generate an error dht_event_tx - .send(DhtEvent::ValueFound(Default::default())) + .send(DhtEvent::ValueFound(PeerRecord { + peer: None, + record: Record { + key: vec![0x9u8].into(), + value: Default::default(), + publisher: None, + expires: None, + }, + })) .await .expect("Channel has capacity of 1."); // Make previously triggered lookup succeed. - let dht_event = { - let kv_pairs = build_dht_event::( - vec![remote_multiaddr.clone()], - remote_public_key.clone(), - &remote_key_store, - None, - ); - DhtEvent::ValueFound(kv_pairs) - }; - dht_event_tx.send(dht_event).await.expect("Channel has capacity of 1."); + let kv_pairs: Vec = build_dht_event::( + vec![remote_multiaddr.clone()], + remote_public_key.clone(), + &remote_key_store, + None, + Some(build_creation_time()), + ) + .into_iter() + .map(|(key, value)| PeerRecord { + peer: None, + record: Record { key, value, publisher: None, expires: None }, + }) + .collect(); + + for kv_pair in kv_pairs { + dht_event_tx + .send(DhtEvent::ValueFound(kv_pair)) + .await + .expect("Channel has capacity of 1."); + } // Expect authority discovery to function normally, now knowing the // address for the remote node. @@ -581,37 +624,51 @@ impl DhtValueFoundTester { &mut self, strict_record_validation: bool, values: Vec<(KademliaKey, Vec)>, - ) -> Option<&HashSet> { + ) -> (Option>, Option>) { let (_dht_event_tx, dht_event_rx) = channel(1); let local_test_api = Arc::new(TestApi { authorities: vec![self.remote_authority_public.into()] }); - let local_network: Arc = Arc::new(Default::default()); let local_key_store = MemoryKeystore::new(); let (_to_worker, from_service) = mpsc::channel(0); - let mut local_worker = Worker::new( - from_service, - local_test_api, - local_network.clone(), - Box::pin(dht_event_rx), - Role::PublishAndDiscover(Arc::new(local_key_store)), - None, - WorkerConfig { strict_record_validation, ..Default::default() }, - ); + let (local_worker, local_network) = if let Some(local_work) = self.local_worker.as_mut() { + (local_work, None) + } else { + let local_network: Arc = Arc::new(Default::default()); + + self.local_worker = Some(Worker::new( + from_service, + local_test_api, + local_network.clone(), + Box::pin(dht_event_rx), + Role::PublishAndDiscover(Arc::new(local_key_store)), + None, + WorkerConfig { strict_record_validation, ..Default::default() }, + )); + (self.local_worker.as_mut().unwrap(), Some(local_network)) + }; block_on(local_worker.refill_pending_lookups_queue()).unwrap(); local_worker.start_new_lookups(); - drop(local_worker.handle_dht_value_found_event(values)); - - self.local_worker = Some(local_worker); + for record in values.into_iter().map(|(key, value)| PeerRecord { + peer: Some(PeerId::random().into()), + record: Record { key, value, publisher: None, expires: None }, + }) { + drop(local_worker.handle_dht_value_found_event(record)) + } - self.local_worker - .as_ref() - .map(|w| { - w.addr_cache.get_addresses_by_authority_id(&self.remote_authority_public.into()) - }) - .unwrap() + ( + self.local_worker + .as_ref() + .map(|w| { + w.addr_cache + .get_addresses_by_authority_id(&self.remote_authority_public.into()) + .cloned() + }) + .unwrap(), + local_network, + ) } } @@ -625,9 +682,10 @@ fn limit_number_of_addresses_added_to_cache_per_authority() { tester.remote_authority_public.into(), &tester.remote_key_store, None, + Some(build_creation_time()), ); - let cached_remote_addresses = tester.process_value_found(false, kv_pairs); + let cached_remote_addresses = tester.process_value_found(false, kv_pairs).0; assert_eq!(MAX_ADDRESSES_PER_AUTHORITY, cached_remote_addresses.unwrap().len()); } @@ -640,17 +698,242 @@ fn strict_accept_address_with_peer_signature() { tester.remote_authority_public.into(), &tester.remote_key_store, Some(&TestSigner { keypair: &tester.remote_node_key }), + Some(build_creation_time()), ); - let cached_remote_addresses = tester.process_value_found(true, kv_pairs); + let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0; assert_eq!( - Some(&HashSet::from([addr])), + Some(HashSet::from([addr])), cached_remote_addresses, "Expect worker to only cache `Multiaddr`s with `PeerId`s.", ); } +#[test] +fn strict_accept_address_without_creation_time() { + let mut tester = DhtValueFoundTester::new(); + let addr = tester.multiaddr_with_peer_id(1); + let kv_pairs = build_dht_event( + vec![addr.clone()], + tester.remote_authority_public.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + None, + ); + + let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0; + + assert_eq!( + Some(HashSet::from([addr])), + cached_remote_addresses, + "Expect worker to cache address without creation time", + ); +} + +#[test] +fn keep_last_received_if_no_creation_time() { + let mut tester: DhtValueFoundTester = DhtValueFoundTester::new(); + let addr = tester.multiaddr_with_peer_id(1); + let kv_pairs = build_dht_event( + vec![addr.clone()], + tester.remote_authority_public.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + None, + ); + + let (cached_remote_addresses, network) = tester.process_value_found(true, kv_pairs); + + assert_eq!( + Some(HashSet::from([addr])), + cached_remote_addresses, + "Expect worker to cache address without creation time", + ); + + assert!(network + .as_ref() + .map(|network| network.put_value_to_call.lock().unwrap().is_empty()) + .unwrap_or_default()); + + let addr2 = tester.multiaddr_with_peer_id(2); + let kv_pairs = build_dht_event( + vec![addr2.clone()], + tester.remote_authority_public.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + None, + ); + + let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0; + + assert_eq!( + Some(HashSet::from([addr2])), + cached_remote_addresses, + "Expect worker to cache last received when no creation time", + ); + assert!(network + .as_ref() + .map(|network| network.put_value_to_call.lock().unwrap().is_empty()) + .unwrap_or_default()); +} + +#[test] +fn records_with_incorrectly_signed_creation_time_are_ignored() { + let mut tester: DhtValueFoundTester = DhtValueFoundTester::new(); + let addr = tester.multiaddr_with_peer_id(1); + let kv_pairs = build_dht_event( + vec![addr.clone()], + tester.remote_authority_public.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + Some(build_creation_time()), + ); + + let (cached_remote_addresses, network) = tester.process_value_found(true, kv_pairs); + + assert_eq!( + Some(HashSet::from([addr.clone()])), + cached_remote_addresses, + "Expect worker to cache record with creation time", + ); + assert!(network + .as_ref() + .map(|network| network.put_value_to_call.lock().unwrap().is_empty()) + .unwrap_or_default()); + + let alternative_key = tester + .remote_key_store + .sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None) + .unwrap(); + + let addr2 = tester.multiaddr_with_peer_id(2); + let mut kv_pairs = build_dht_event( + vec![addr2.clone()], + alternative_key.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + Some(build_creation_time()), + ); + let kademlia_key = hash_authority_id(tester.remote_authority_public.as_slice()); + for key in kv_pairs.iter_mut() { + key.0 = kademlia_key.clone(); + } + let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0; + + assert_eq!( + Some(HashSet::from([addr])), + cached_remote_addresses, + "Expect `Multiaddr` to remain the same", + ); + assert!(network + .as_ref() + .map(|network| network.put_value_to_call.lock().unwrap().is_empty()) + .unwrap_or_default()); +} + +#[test] +fn newer_records_overwrite_older_ones() { + let mut tester: DhtValueFoundTester = DhtValueFoundTester::new(); + let old_record = tester.multiaddr_with_peer_id(1); + let kv_pairs = build_dht_event( + vec![old_record.clone()], + tester.remote_authority_public.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + Some(build_creation_time()), + ); + + let (cached_remote_addresses, network) = tester.process_value_found(true, kv_pairs); + + assert_eq!( + Some(HashSet::from([old_record])), + cached_remote_addresses, + "Expect worker to cache record with creation time", + ); + + let nothing_updated = network + .as_ref() + .map(|network| network.put_value_to_call.lock().unwrap().is_empty()) + .unwrap(); + assert!(nothing_updated); + + let new_record = tester.multiaddr_with_peer_id(2); + let kv_pairs = build_dht_event( + vec![new_record.clone()], + tester.remote_authority_public.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + Some(build_creation_time()), + ); + + let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0; + + assert_eq!( + Some(HashSet::from([new_record])), + cached_remote_addresses, + "Expect worker to store the newest recrod", + ); + + let result = network + .as_ref() + .map(|network| network.put_value_to_call.lock().unwrap().first().unwrap().clone()) + .unwrap(); + assert!(matches!(result, (_, _, false))); + assert_eq!(result.1.len(), 1); +} + +#[test] +fn older_records_dont_affect_newer_ones() { + let mut tester: DhtValueFoundTester = DhtValueFoundTester::new(); + let old_record = tester.multiaddr_with_peer_id(1); + let old_kv_pairs = build_dht_event( + vec![old_record.clone()], + tester.remote_authority_public.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + Some(build_creation_time()), + ); + + let new_record = tester.multiaddr_with_peer_id(2); + let kv_pairs = build_dht_event( + vec![new_record.clone()], + tester.remote_authority_public.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + Some(build_creation_time()), + ); + + let (cached_remote_addresses, network) = tester.process_value_found(true, kv_pairs); + + assert_eq!( + Some(HashSet::from([new_record.clone()])), + cached_remote_addresses, + "Expect worker to store new record", + ); + + let nothing_updated = network + .as_ref() + .map(|network| network.put_value_to_call.lock().unwrap().is_empty()) + .unwrap(); + assert!(nothing_updated); + + let cached_remote_addresses = tester.process_value_found(true, old_kv_pairs).0; + + assert_eq!( + Some(HashSet::from([new_record])), + cached_remote_addresses, + "Expect worker to not update stored record", + ); + + let update_peers_info = network + .as_ref() + .map(|network| network.put_value_to_call.lock().unwrap().remove(0)) + .unwrap(); + assert!(matches!(update_peers_info, (_, _, false))); + assert_eq!(update_peers_info.1.len(), 1); +} + #[test] fn reject_address_with_rogue_peer_signature() { let mut tester = DhtValueFoundTester::new(); @@ -660,9 +943,10 @@ fn reject_address_with_rogue_peer_signature() { tester.remote_authority_public.into(), &tester.remote_key_store, Some(&TestSigner { keypair: &rogue_remote_node_key }), + Some(build_creation_time()), ); - let cached_remote_addresses = tester.process_value_found(false, kv_pairs); + let cached_remote_addresses = tester.process_value_found(false, kv_pairs).0; assert!( cached_remote_addresses.is_none(), @@ -678,13 +962,14 @@ fn reject_address_with_invalid_peer_signature() { tester.remote_authority_public.into(), &tester.remote_key_store, Some(&TestSigner { keypair: &tester.remote_node_key }), + Some(build_creation_time()), ); // tamper with the signature let mut record = schema::SignedAuthorityRecord::decode(kv_pairs[0].1.as_slice()).unwrap(); record.peer_signature.as_mut().map(|p| p.signature[1] = !p.signature[1]); record.encode(&mut kv_pairs[0].1).unwrap(); - let cached_remote_addresses = tester.process_value_found(false, kv_pairs); + let cached_remote_addresses = tester.process_value_found(false, kv_pairs).0; assert!( cached_remote_addresses.is_none(), @@ -700,9 +985,10 @@ fn reject_address_without_peer_signature() { tester.remote_authority_public.into(), &tester.remote_key_store, None, + Some(build_creation_time()), ); - let cached_remote_addresses = tester.process_value_found(true, kv_pairs); + let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0; assert!(cached_remote_addresses.is_none(), "Expected worker to ignore unsigned record.",); } @@ -718,12 +1004,13 @@ fn do_not_cache_addresses_without_peer_id() { tester.remote_authority_public.into(), &tester.remote_key_store, None, + Some(build_creation_time()), ); - let cached_remote_addresses = tester.process_value_found(false, kv_pairs); + let cached_remote_addresses = tester.process_value_found(false, kv_pairs).0; assert_eq!( - Some(&HashSet::from([multiaddr_with_peer_id])), + Some(HashSet::from([multiaddr_with_peer_id])), cached_remote_addresses, "Expect worker to only cache `Multiaddr`s with `PeerId`s.", ); @@ -861,16 +1148,24 @@ fn lookup_throttling() { // Make first lookup succeed. let remote_hash = network.get_value_call.lock().unwrap().pop().unwrap(); let remote_key: AuthorityId = remote_hash_to_key.get(&remote_hash).unwrap().clone(); - let dht_event = { - let kv_pairs = build_dht_event::( - vec![remote_multiaddr.clone()], - remote_key, - &remote_key_store, - None, - ); - DhtEvent::ValueFound(kv_pairs) - }; - dht_event_tx.send(dht_event).await.expect("Channel has capacity of 1."); + let kv_pairs = build_dht_event::( + vec![remote_multiaddr.clone()], + remote_key, + &remote_key_store, + None, + Some(build_creation_time()), + ) + .into_iter() + .map(|(key, value)| PeerRecord { + peer: None, + record: Record { key, value, publisher: None, expires: None }, + }); + for kv_pair in kv_pairs { + dht_event_tx + .send(DhtEvent::ValueFound(kv_pair)) + .await + .expect("Channel has capacity of 1."); + } // Assert worker to trigger another lookup. assert!(matches!(receiver.next().await, Some(TestNetworkEvent::GetCalled(_)))); @@ -899,14 +1194,17 @@ fn lookup_throttling() { #[test] fn test_handle_put_record_request() { - let network = TestNetwork::default(); - let peer_id = network.peer_id; + let local_node_network = TestNetwork::default(); + let remote_node_network = TestNetwork::default(); + let peer_id = remote_node_network.peer_id; let remote_multiaddr = { let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap(); - address.with(multiaddr::Protocol::P2p(peer_id.into())) + address.with(multiaddr::Protocol::P2p(remote_node_network.peer_id.into())) }; + + println!("{:?}", remote_multiaddr); let remote_key_store = MemoryKeystore::new(); let remote_public_keys: Vec = (0..20) .map(|_| { @@ -928,7 +1226,7 @@ fn test_handle_put_record_request() { let (_dht_event_tx, dht_event_rx) = channel(1); let (_to_worker, from_service) = mpsc::channel(0); - let network = Arc::new(network); + let network = Arc::new(local_node_network); let mut worker = Worker::new( from_service, Arc::new(TestApi { authorities: remote_public_keys.clone() }), @@ -944,10 +1242,11 @@ fn test_handle_put_record_request() { let valid_authorithy_key = remote_public_keys.first().unwrap().clone(); let kv_pairs = build_dht_event( - vec![remote_multiaddr], - valid_authorithy_key.into(), + vec![remote_multiaddr.clone()], + valid_authorithy_key.clone().into(), &remote_key_store, - Some(&TestSigner { keypair: &network.identity }), + Some(&TestSigner { keypair: &remote_node_network.identity }), + Some(build_creation_time()), ); pool.run_until( @@ -986,7 +1285,7 @@ fn test_handle_put_record_request() { let key = hash_authority_id(another_authorithy_id.as_ref()); // Valid record signed with a different key should return error. - for (_, value) in kv_pairs { + for (_, value) in kv_pairs.clone() { assert!(matches!( worker .handle_put_record_requested(key.clone(), value, Some(peer_id), None) @@ -995,6 +1294,57 @@ fn test_handle_put_record_request() { )); } assert_eq!(network.store_value_call.lock().unwrap().len(), 1); + let newer_kv_pairs = build_dht_event( + vec![remote_multiaddr], + valid_authorithy_key.clone().into(), + &remote_key_store, + Some(&TestSigner { keypair: &remote_node_network.identity }), + Some(build_creation_time()), + ); + + // Valid old authority, should not throw error, but it should not be stored since a + // newer one already exists. + for (new_key, new_value) in newer_kv_pairs.clone() { + worker.in_flight_lookups.insert(new_key.clone(), valid_authorithy_key.clone()); + + let found = PeerRecord { + peer: Some(peer_id.into()), + record: Record { + key: new_key, + value: new_value, + publisher: Some(peer_id.into()), + expires: None, + }, + }; + assert!(worker.handle_dht_value_found_event(found).is_ok()); + } + + for (key, value) in kv_pairs.clone() { + assert!(worker + .handle_put_record_requested(key, value, Some(peer_id), None) + .await + .is_ok()); + } + assert_eq!(network.store_value_call.lock().unwrap().len(), 1); + + // Newer kv pairs should always be stored. + for (key, value) in newer_kv_pairs.clone() { + assert!(worker + .handle_put_record_requested(key, value, Some(peer_id), None) + .await + .is_ok()); + } + + assert_eq!(network.store_value_call.lock().unwrap().len(), 2); + + worker.refill_pending_lookups_queue().await.unwrap(); + assert_eq!(worker.last_known_records.len(), 1); + + // Check known records gets clean up, when an authorithy gets out of the + // active set. + worker.client = Arc::new(TestApi { authorities: Default::default() }); + worker.refill_pending_lookups_queue().await.unwrap(); + assert_eq!(worker.last_known_records.len(), 0); } .boxed_local(), ); diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index 68816a10980d..9a6324dafd37 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -31,8 +31,13 @@ use crate::{ use futures::channel::oneshot; use libp2p::{ - connection_limits::ConnectionLimits, core::Multiaddr, identify::Info as IdentifyInfo, - identity::PublicKey, kad::RecordKey, swarm::NetworkBehaviour, PeerId, StreamProtocol, + connection_limits::ConnectionLimits, + core::Multiaddr, + identify::Info as IdentifyInfo, + identity::PublicKey, + kad::{Record, RecordKey}, + swarm::NetworkBehaviour, + PeerId, StreamProtocol, }; use parking_lot::Mutex; @@ -289,6 +294,16 @@ impl Behaviour { self.discovery.put_value(key, value); } + /// Puts a record into DHT, on the provided Peers + pub fn put_record_to( + &mut self, + record: Record, + peers: HashSet, + update_local_storage: bool, + ) { + self.discovery.put_record_to(record, peers, update_local_storage); + } + /// Stores value in DHT pub fn store_record( &mut self, diff --git a/substrate/client/network/src/discovery.rs b/substrate/client/network/src/discovery.rs index 3145b891a8d3..86c66c22701c 100644 --- a/substrate/client/network/src/discovery.rs +++ b/substrate/client/network/src/discovery.rs @@ -58,7 +58,8 @@ use libp2p::{ self, record::store::{MemoryStore, RecordStore}, Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent, - GetClosestPeersError, GetRecordOk, QueryId, QueryResult, Quorum, Record, RecordKey, + GetClosestPeersError, GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record, + RecordKey, }, mdns::{self, tokio::Behaviour as TokioMdns}, multiaddr::Protocol, @@ -92,8 +93,12 @@ const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32; /// record is replicated to. pub const DEFAULT_KADEMLIA_REPLICATION_FACTOR: usize = 20; +// The minimum number of peers we expect an answer before we terminate the request. +const GET_RECORD_REDUNDANCY_FACTOR: u32 = 4; + /// `DiscoveryBehaviour` configuration. /// +/// /// Note: In order to discover nodes or load and store values via Kademlia one has to add /// Kademlia protocol via [`DiscoveryConfig::with_kademlia`]. pub struct DiscoveryConfig { @@ -234,7 +239,6 @@ impl DiscoveryConfig { // auto-insertion and instead add peers manually. config.set_kbucket_inserts(BucketInserts::Manual); config.disjoint_query_paths(kademlia_disjoint_query_paths); - let store = MemoryStore::new(local_peer_id); let mut kad = Kademlia::with_config(local_peer_id, store, config); kad.set_mode(Some(kad::Mode::Server)); @@ -437,6 +441,31 @@ impl DiscoveryBehaviour { } } + /// Puts a record into the DHT on the provided `peers` + /// + /// If `update_local_storage` is true, the local storage is update as well. + pub fn put_record_to( + &mut self, + record: Record, + peers: HashSet, + update_local_storage: bool, + ) { + if let Some(kad) = self.kademlia.as_mut() { + if update_local_storage { + if let Err(_e) = kad.store_mut().put(record.clone()) { + warn!(target: "sub-libp2p", "Failed to update local starage"); + } + } + + if !peers.is_empty() { + kad.put_record_to( + record, + peers.into_iter().map(|peer_id| peer_id.into()), + Quorum::All, + ); + } + } + } /// Store a record in the Kademlia record store. pub fn store_record( &mut self, @@ -527,7 +556,7 @@ pub enum DiscoveryOut { /// The DHT yielded results for the record request. /// /// Returning the result grouped in (key, value) pairs as well as the request duration. - ValueFound(Vec<(RecordKey, Vec)>, Duration), + ValueFound(PeerRecord, Duration), /// The DHT received a put record request. PutRecordRequest( @@ -860,16 +889,24 @@ impl NetworkBehaviour for DiscoveryBehaviour { Ok(GetRecordOk::FoundRecord(r)) => { debug!( target: "sub-libp2p", - "Libp2p => Found record ({:?}) with value: {:?}", + "Libp2p => Found record ({:?}) with value: {:?} id {:?} stats {:?}", r.record.key, r.record.value, + id, + stats, ); - // Let's directly finish the query, as we are only interested in a - // quorum of 1. - if let Some(kad) = self.kademlia.as_mut() { - if let Some(mut query) = kad.query_mut(&id) { - query.finish(); + // Let's directly finish the query if we are above 4. + // This number is small enough to make sure we don't + // unnecessarily flood the network with queries, but high + // enough to make sure we also touch peers which might have + // old record, so that we can update them once we notice + // they have old records. + if stats.num_successes() > GET_RECORD_REDUNDANCY_FACTOR { + if let Some(kad) = self.kademlia.as_mut() { + if let Some(mut query) = kad.query_mut(&id) { + query.finish(); + } } } @@ -877,14 +914,18 @@ impl NetworkBehaviour for DiscoveryBehaviour { // `FinishedWithNoAdditionalRecord`. self.records_to_publish.insert(id, r.record.clone()); - DiscoveryOut::ValueFound( - vec![(r.record.key, r.record.value)], - stats.duration().unwrap_or_default(), - ) + DiscoveryOut::ValueFound(r, stats.duration().unwrap_or_default()) }, Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates, }) => { + debug!( + target: "sub-libp2p", + "Libp2p => Finished with no-additional-record {:?} stats {:?} took {:?} ms", + id, + stats, + stats.duration().map(|val| val.as_millis()) + ); // We always need to remove the record to not leak any data! if let Some(record) = self.records_to_publish.remove(&id) { if cache_candidates.is_empty() { diff --git a/substrate/client/network/src/event.rs b/substrate/client/network/src/event.rs index b518a2094d76..5400d11cb6ac 100644 --- a/substrate/client/network/src/event.rs +++ b/substrate/client/network/src/event.rs @@ -22,7 +22,10 @@ use crate::types::ProtocolName; use bytes::Bytes; -use libp2p::{kad::record::Key, PeerId}; +use libp2p::{ + kad::{record::Key, PeerRecord}, + PeerId, +}; use sc_network_common::role::ObservedRole; @@ -31,7 +34,7 @@ use sc_network_common::role::ObservedRole; #[must_use] pub enum DhtEvent { /// The value was found. - ValueFound(Vec<(Key, Vec)>), + ValueFound(PeerRecord), /// The requested record has not been found in the DHT. ValueNotFound(Key), diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index 6ff05e6af327..22285e7906c6 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -50,6 +50,7 @@ use schnellru::{ByLength, LruMap}; use std::{ cmp, collections::{HashMap, HashSet, VecDeque}, + num::NonZeroUsize, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -68,6 +69,9 @@ const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(30); /// Minimum number of confirmations received before an address is verified. const MIN_ADDRESS_CONFIRMATIONS: usize = 5; +// The minimum number of peers we expect an answer before we terminate the request. +const GET_RECORD_REDUNDANCY_FACTOR: usize = 4; + /// Discovery events. #[derive(Debug)] pub enum DiscoveryEvent { @@ -340,7 +344,10 @@ impl Discovery { /// Start Kademlia `GET_VALUE` query for `key`. pub async fn get_value(&mut self, key: KademliaKey) -> QueryId { self.kademlia_handle - .get_record(RecordKey::new(&key.to_vec()), Quorum::One) + .get_record( + RecordKey::new(&key.to_vec()), + Quorum::N(NonZeroUsize::new(GET_RECORD_REDUNDANCY_FACTOR).unwrap()), + ) .await } @@ -351,6 +358,22 @@ impl Discovery { .await } + /// Put record to given peers. + pub async fn put_value_to_peers( + &mut self, + record: Record, + peers: Vec, + update_local_storage: bool, + ) -> QueryId { + self.kademlia_handle + .put_record_to_peers( + record, + peers.into_iter().map(|peer| peer.into()).collect(), + update_local_storage, + ) + .await + } + /// Store record in the local DHT store. pub async fn store_record( &mut self, diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index 4f3cc5789431..475046cb9dba 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -50,7 +50,7 @@ use crate::{ use codec::Encode; use futures::StreamExt; -use libp2p::kad::RecordKey; +use libp2p::kad::{PeerRecord, Record as P2PRecord, RecordKey}; use litep2p::{ config::ConfigBuilder, crypto::ed25519::Keypair, @@ -703,6 +703,12 @@ impl NetworkBackend for Litep2pNetworkBac let query_id = self.discovery.put_value(key.clone(), value).await; self.pending_put_values.insert(query_id, (key, Instant::now())); } + NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => { + let kademlia_key = record.key.to_vec().into(); + let query_id = self.discovery.put_value_to_peers(record, peers, update_local_storage).await; + self.pending_put_values.insert(query_id, (kademlia_key, Instant::now())); + } + NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => { self.discovery.store_record(key, value, publisher.map(Into::into), expires).await; } @@ -819,19 +825,15 @@ impl NetworkBackend for Litep2pNetworkBac "`GET_VALUE` for {:?} ({query_id:?}) succeeded", key, ); - - let value_found = match records { - RecordsType::LocalStore(record) => vec![ - (libp2p::kad::RecordKey::new(&record.key), record.value) - ], - RecordsType::Network(records) => records.into_iter().map(|peer_record| { - (libp2p::kad::RecordKey::new(&peer_record.record.key), peer_record.record.value) - }).collect(), - }; - - self.event_streams.send(Event::Dht( - DhtEvent::ValueFound(value_found) - )); + for record in litep2p_to_libp2p_peer_record(records) { + self.event_streams.send( + Event::Dht( + DhtEvent::ValueFound( + record + ) + ) + ); + } if let Some(ref metrics) = self.metrics { metrics @@ -1026,3 +1028,42 @@ impl NetworkBackend for Litep2pNetworkBac } } } + +// Glue code to convert from a litep2p records type to a libp2p2 PeerRecord. +fn litep2p_to_libp2p_peer_record(records: RecordsType) -> Vec { + match records { + litep2p::protocol::libp2p::kademlia::RecordsType::LocalStore(record) => { + vec![PeerRecord { + record: P2PRecord { + key: record.key.to_vec().into(), + value: record.value, + publisher: record.publisher.map(|peer_id| { + let peer_id: sc_network_types::PeerId = peer_id.into(); + peer_id.into() + }), + expires: record.expires, + }, + peer: None, + }] + }, + litep2p::protocol::libp2p::kademlia::RecordsType::Network(records) => records + .into_iter() + .map(|record| { + let peer_id: sc_network_types::PeerId = record.peer.into(); + + PeerRecord { + record: P2PRecord { + key: record.record.key.to_vec().into(), + value: record.record.value, + publisher: record.record.publisher.map(|peer_id| { + let peer_id: sc_network_types::PeerId = peer_id.into(); + peer_id.into() + }), + expires: record.record.expires, + }, + peer: Some(peer_id.into()), + } + }) + .collect::>(), + } +} diff --git a/substrate/client/network/src/litep2p/service.rs b/substrate/client/network/src/litep2p/service.rs index 7d972bbeee5c..67fc44e6bfe0 100644 --- a/substrate/client/network/src/litep2p/service.rs +++ b/substrate/client/network/src/litep2p/service.rs @@ -32,6 +32,7 @@ use crate::{ RequestFailure, Signature, }; +use crate::litep2p::Record; use codec::DecodeAll; use futures::{channel::oneshot, stream::BoxStream}; use libp2p::{identity::SigningError, kad::record::Key as KademliaKey}; @@ -76,6 +77,15 @@ pub enum NetworkServiceCommand { value: Vec, }, + /// Put value to DHT. + PutValueTo { + /// Record. + record: Record, + /// Peers we want to put the record. + peers: Vec, + /// If we should update the local storage or not. + update_local_storage: bool, + }, /// Store record in the local DHT store. StoreRecord { /// Record key. @@ -253,6 +263,27 @@ impl NetworkDHTProvider for Litep2pNetworkService { let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValue { key, value }); } + fn put_record_to( + &self, + record: libp2p::kad::Record, + peers: HashSet, + update_local_storage: bool, + ) { + let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValueTo { + record: Record { + key: record.key.to_vec().into(), + value: record.value, + publisher: record.publisher.map(|peer_id| { + let peer_id: sc_network_types::PeerId = peer_id.into(); + peer_id.into() + }), + expires: record.expires, + }, + peers: peers.into_iter().collect(), + update_local_storage, + }); + } + fn store_record( &self, key: KademliaKey, diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 550e00a6cf2f..71d0b45aa06d 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -68,7 +68,7 @@ use libp2p::{ core::{upgrade, ConnectedPoint, Endpoint}, identify::Info as IdentifyInfo, identity::ed25519, - kad::record::Key as KademliaKey, + kad::{record::Key as KademliaKey, Record}, multiaddr::{self, Multiaddr}, swarm::{ Config as SwarmConfig, ConnectionError, ConnectionId, DialError, Executor, ListenError, @@ -946,6 +946,19 @@ where let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value)); } + fn put_record_to( + &self, + record: Record, + peers: HashSet, + update_local_storage: bool, + ) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutRecordTo { + record, + peers, + update_local_storage, + }); + } + fn store_record( &self, key: KademliaKey, @@ -1314,6 +1327,11 @@ impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> { enum ServiceToWorkerMsg { GetValue(KademliaKey), PutValue(KademliaKey, Vec), + PutRecordTo { + record: Record, + peers: HashSet, + update_local_storage: bool, + }, StoreRecord(KademliaKey, Vec, Option, Option), AddKnownAddress(PeerId, Multiaddr), EventStream(out_events::Sender), @@ -1440,6 +1458,10 @@ where self.network_service.behaviour_mut().get_value(key), ServiceToWorkerMsg::PutValue(key, value) => self.network_service.behaviour_mut().put_value(key, value), + ServiceToWorkerMsg::PutRecordTo { record, peers, update_local_storage } => self + .network_service + .behaviour_mut() + .put_record_to(record, peers, update_local_storage), ServiceToWorkerMsg::StoreRecord(key, value, publisher, expires) => self .network_service .behaviour_mut() diff --git a/substrate/client/network/src/service/traits.rs b/substrate/client/network/src/service/traits.rs index 69c9fbc3a618..bd4f83c7fd44 100644 --- a/substrate/client/network/src/service/traits.rs +++ b/substrate/client/network/src/service/traits.rs @@ -32,6 +32,7 @@ use crate::{ }; use futures::{channel::oneshot, Stream}; +use libp2p::kad::Record; use prometheus_endpoint::Registry; use sc_client_api::BlockBackend; @@ -217,6 +218,11 @@ pub trait NetworkDHTProvider { /// Start putting a value in the DHT. fn put_value(&self, key: KademliaKey, value: Vec); + /// Start putting the record to `peers`. + /// + /// If `update_local_storage` is true the local storage is udpated as well. + fn put_record_to(&self, record: Record, peers: HashSet, update_local_storage: bool); + /// Store a record in the DHT memory store. fn store_record( &self, @@ -240,6 +246,10 @@ where T::put_value(self, key, value) } + fn put_record_to(&self, record: Record, peers: HashSet, update_local_storage: bool) { + T::put_record_to(self, record, peers, update_local_storage) + } + fn store_record( &self, key: KademliaKey,