From 1a895e1880a4ea20a77cf696f5eaf6387796f515 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Fri, 15 Nov 2024 14:37:07 -0700 Subject: [PATCH] feat: backoff recon synchronization to peers with bad data (#597) * feat: backoff recon synchronization to peers with bad data With this change peers will backoff synchronization attempts with peers that fail synchronization. This will have the effect that the bootstrap nodes do not talk to nodes that are sending bad data very often. However the backoff state is tied to the connection state and is reset if the connection resets. Allowing developers to restart their node and be in good standing with bootstrap nodes. Additionally a small change was added to prefer using the connected address to a peer when reporting the address of connected peers. * fix: change default connection timeout to 15m Keep connections around longer so we can remember that we have backed off peers, otherwise we the connection will close and reconnect resetting the backoff delay. * fix: keep interests and model syncs separate This change makes it so that interests and model syncs are independent. We always start with the interests sync. But then after the initial syncs they can independently be backed off. This helps with no resetting idle connections and therefore resetting the backoff state. As there is likely some activity on the connection for one of the two syncs. * comment * Update recon/src/libp2p.rs Co-authored-by: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> --------- Co-authored-by: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> --- p2p/src/behaviour/ceramic_peer_manager.rs | 14 ++- p2p/src/node.rs | 14 ++- recon/src/libp2p.rs | 132 ++++++++++++++-------- recon/src/libp2p/handler.rs | 61 ++++++---- recon/src/libp2p/stream_set.rs | 2 +- recon/src/libp2p/tests.rs | 132 +++++++++++++++++++++- 6 files changed, 274 insertions(+), 81 deletions(-) diff --git a/p2p/src/behaviour/ceramic_peer_manager.rs b/p2p/src/behaviour/ceramic_peer_manager.rs index 854a58299..f9e9aacbc 100644 --- a/p2p/src/behaviour/ceramic_peer_manager.rs +++ b/p2p/src/behaviour/ceramic_peer_manager.rs @@ -41,6 +41,7 @@ pub struct CeramicPeerManager { pub struct Info { pub last_rtt: Option, pub last_info: Option, + pub connected_point: Option, } impl Info { @@ -145,11 +146,16 @@ impl NetworkBehaviour for CeramicPeerManager { self.handle_connection_established(&event.peer_id) } - if let Some(info) = self.info.get_mut(&event.peer_id) { - if let Some(ref mut info) = info.last_info { - info.listen_addrs - .retain(|addr| !event.failed_addresses.contains(addr)) + let info = self.info.entry(event.peer_id).or_default(); + info.connected_point = Some(match event.endpoint { + libp2p::core::ConnectedPoint::Dialer { address, .. } => address.clone(), + libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => { + send_back_addr.clone() } + }); + if let Some(ref mut info) = info.last_info { + info.listen_addrs + .retain(|addr| !event.failed_addresses.contains(addr)) } } libp2p::swarm::FromSwarm::ConnectionClosed(event) => { diff --git a/p2p/src/node.rs b/p2p/src/node.rs index 991aab345..32d71fb62 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -984,10 +984,16 @@ where .peer_manager .info_for_peer(&pid) .map(|info| { - info.last_info - .as_ref() - .map(|last_info| last_info.listen_addrs.clone()) - .unwrap_or_default() + info.connected_point + .iter() + .chain( + info.last_info + .as_ref() + .map(|last_info| last_info.listen_addrs.iter()) + .unwrap_or_default(), + ) + .cloned() + .collect() }) .unwrap_or_default(), ) diff --git a/recon/src/libp2p.rs b/recon/src/libp2p.rs index 7743997b7..1dab520ee 100644 --- a/recon/src/libp2p.rs +++ b/recon/src/libp2p.rs @@ -18,16 +18,19 @@ mod tests; mod upgrade; use ceramic_core::{EventId, Interest}; +use futures::{future::BoxFuture, FutureExt}; use libp2p::{ core::ConnectedPoint, swarm::{ConnectionId, NetworkBehaviour, NotifyHandler, ToSwarm}, }; use libp2p_identity::PeerId; use std::{ + cmp::min, collections::{btree_map::Entry, BTreeMap}, task::Poll, - time::{Duration, Instant}, + time::Duration, }; +use tokio::time::Instant; use tracing::{debug, trace, warn}; pub use crate::protocol::Recon; @@ -49,13 +52,20 @@ pub const PROTOCOL_NAME_MODEL: &str = "/ceramic/recon/0.1.0/model"; pub struct Config { /// Start a new sync once the duration has past in the failed or synchronized state. /// Defaults to 1 second. - pub per_peer_sync_timeout: Duration, + pub per_peer_sync_delay: Duration, + /// Backoff sequential failures as multiples of delay. + pub per_peer_sync_backoff: f64, + /// Maximum delay between synchronization attempts. + /// Defaults to 10 minutes + pub per_peer_maximum_sync_delay: Duration, } impl Default for Config { fn default() -> Self { Self { - per_peer_sync_timeout: Duration::from_millis(1000), + per_peer_sync_delay: Duration::from_millis(1000), + per_peer_sync_backoff: 2.0, + per_peer_maximum_sync_delay: Duration::from_secs(60 * 10), } } } @@ -65,7 +75,6 @@ impl Default for Config { /// The Behavior tracks all peers on the network that speak the Recon protocol. /// It is responsible for starting and stopping syncs with various peers depending on the needs of /// the application. -#[derive(Debug)] pub struct Behaviour { interest: I, model: M, @@ -73,6 +82,21 @@ pub struct Behaviour { peers: BTreeMap, swarm_events_sender: tokio::sync::mpsc::Sender>, swarm_events_receiver: tokio::sync::mpsc::Receiver>, + next_sync: Option>, +} + +impl std::fmt::Debug for Behaviour { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Behaviour") + .field("interest", &self.interest) + .field("model", &self.model) + .field("config", &self.config) + .field("peers", &self.peers) + .field("swarm_events_sender", &self.swarm_events_sender) + .field("swarm_events_receiver", &self.swarm_events_receiver) + .field("next_sync", &"_") + .finish() + } } /// Information about a remote peer and its sync status. @@ -80,7 +104,8 @@ pub struct Behaviour { struct PeerInfo { status: PeerStatus, connections: Vec, - last_sync: Option, + next_sync: BTreeMap, + sync_delay: BTreeMap, } #[derive(Clone, Copy, Debug)] @@ -106,7 +131,10 @@ pub enum PeerStatus { stream_set: StreamSet, }, /// The last attempt to synchronize with the remote peer resulted in an error. - Failed, + Failed { + /// The stream_set that has failed synchronizing. + stream_set: StreamSet, + }, /// Local peer has stopped synchronizing with the remote peer and will not attempt to /// synchronize again. Stopped, @@ -127,6 +155,7 @@ impl Behaviour { peers: BTreeMap::new(), swarm_events_sender: tx, swarm_events_receiver: rx, + next_sync: None, } } @@ -162,7 +191,13 @@ where .or_insert_with(|| PeerInfo { status: PeerStatus::Waiting, connections: vec![connection_info], - last_sync: None, + next_sync: BTreeMap::from_iter([ + // Schedule all stream_sets initially + (StreamSet::Interest, Instant::now()), + // Schedule models after interests + (StreamSet::Model, Instant::now() + Duration::from_millis(1)), + ]), + sync_delay: Default::default(), }); } libp2p::swarm::FromSwarm::ConnectionClosed(info) => { @@ -214,8 +249,17 @@ where // is now idle. FromHandler::Succeeded { stream_set } => { if let Entry::Occupied(mut entry) = self.peers.entry(peer_id) { + debug!(%peer_id, ?stream_set, "synchronization succeeded with peer"); let info = entry.get_mut(); - info.last_sync = Some(Instant::now()); + let sync_delay = *info + .sync_delay + .get(&stream_set) + .unwrap_or(&self.config.per_peer_sync_delay); + info.next_sync + .insert(stream_set, Instant::now() + sync_delay); + // On success reset delay + info.sync_delay + .insert(stream_set, self.config.per_peer_sync_delay); info.status = PeerStatus::Synchronized { stream_set }; Some(ToSwarm::GenerateEvent(Event::PeerEvent(PeerEvent { remote_peer_id: peer_id, @@ -229,12 +273,25 @@ where // The peer has failed to synchronized with us, mark the time and record that the peer connection // is now failed. - FromHandler::Failed(error) => { + FromHandler::Failed { stream_set, error } => { if let Entry::Occupied(mut entry) = self.peers.entry(peer_id) { let info = entry.get_mut(); - warn!(%peer_id, %error, "synchronization failed with peer"); - info.last_sync = Some(Instant::now()); - info.status = PeerStatus::Failed; + let sync_delay = *info + .sync_delay + .get(&stream_set) + .unwrap_or(&self.config.per_peer_sync_delay); + warn!(%peer_id, %error, ?sync_delay, ?stream_set, "synchronization failed with peer"); + info.next_sync + .insert(stream_set, Instant::now() + sync_delay); + // On failure increase sync delay + info.sync_delay.insert( + stream_set, + min( + sync_delay.mul_f64(self.config.per_peer_sync_backoff), + self.config.per_peer_maximum_sync_delay, + ), + ); + info.status = PeerStatus::Failed { stream_set }; Some(ToSwarm::GenerateEvent(Event::PeerEvent(PeerEvent { remote_peer_id: peer_id, status: info.status, @@ -267,52 +324,37 @@ where if connection_info.dialer { match info.status { PeerStatus::Waiting | PeerStatus::Started { .. } | PeerStatus::Stopped => {} - PeerStatus::Failed => { - // Sync if its been a while since we last synchronized - let should_sync = if let Some(last_sync) = &info.last_sync { - last_sync.elapsed() > self.config.per_peer_sync_timeout - } else { - false - }; - if should_sync { + PeerStatus::Failed { .. } | PeerStatus::Synchronized { .. } => { + // Find earliest scheduled stream set + let (next_stream_set, next_sync) = + info.next_sync.iter().min_by_key(|(_, t)| *t).expect( + "next_sync should always be initialized with stream sets", + ); + debug!(?next_stream_set,?next_sync, now=?Instant::now(), "polling"); + // Sync if enough time has passed since we synced the stream set. + if *next_sync < Instant::now() { + self.next_sync = None; info.status = PeerStatus::Waiting; return Poll::Ready(ToSwarm::NotifyHandler { peer_id: *peer_id, handler: NotifyHandler::One(connection_info.id), event: FromBehaviour::StartSync { - stream_set: StreamSet::Interest, - }, - }); - } - } - PeerStatus::Synchronized { stream_set } => { - // Sync if we just finished an interest sync or its been a while since we - // last synchronized. - let should_sync = stream_set == StreamSet::Interest - || if let Some(last_sync) = &info.last_sync { - last_sync.elapsed() > self.config.per_peer_sync_timeout - } else { - false - }; - if should_sync { - info.status = PeerStatus::Waiting; - let next_stream_set = match stream_set { - StreamSet::Interest => StreamSet::Model, - StreamSet::Model => StreamSet::Interest, - }; - return Poll::Ready(ToSwarm::NotifyHandler { - peer_id: *peer_id, - handler: NotifyHandler::One(connection_info.id), - event: FromBehaviour::StartSync { - stream_set: next_stream_set, + stream_set: *next_stream_set, }, }); + } else { + self.next_sync = + Some(Box::pin(tokio::time::sleep_until(*next_sync))); } } } } } } + // Ensure we are scheduled to be polled again when the next sync is ready + if let Some(ref mut next_sync) = &mut self.next_sync { + let _ = next_sync.poll_unpin(cx); + } Poll::Pending } diff --git a/recon/src/libp2p/handler.rs b/recon/src/libp2p/handler.rs index 257fe2711..5fed86625 100644 --- a/recon/src/libp2p/handler.rs +++ b/recon/src/libp2p/handler.rs @@ -95,8 +95,8 @@ pub enum State { WaitingInbound, RequestOutbound { stream_set: StreamSet }, WaitingOutbound, - Outbound(SyncFuture), - Inbound(SyncFuture), + Outbound(SyncFuture, StreamSet), + Inbound(SyncFuture, StreamSet), } impl std::fmt::Debug for State { @@ -109,8 +109,16 @@ impl std::fmt::Debug for State { .field("stream_set", stream_set) .finish(), Self::WaitingOutbound => f.debug_struct("WaitingOutbound").finish(), - Self::Outbound(_) => f.debug_tuple("Outbound").field(&"_").finish(), - Self::Inbound(_) => f.debug_tuple("Inbound").field(&"_").finish(), + Self::Outbound(_, stream_set) => f + .debug_tuple("Outbound") + .field(&"_") + .field(&stream_set) + .finish(), + Self::Inbound(_, stream_set) => f + .debug_tuple("Inbound") + .field(&"_") + .field(&stream_set) + .finish(), } } } @@ -121,10 +129,17 @@ pub enum FromBehaviour { } #[derive(Debug)] pub enum FromHandler { - Started { stream_set: StreamSet }, - Succeeded { stream_set: StreamSet }, + Started { + stream_set: StreamSet, + }, + Succeeded { + stream_set: StreamSet, + }, Stopped, - Failed(anyhow::Error), + Failed { + stream_set: StreamSet, + error: anyhow::Error, + }, } impl ConnectionHandler for Handler @@ -172,7 +187,8 @@ where let protocol = SubstreamProtocol::new(MultiReadyUpgrade::new(vec![stream_set]), ()); return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }); } - State::Outbound(stream) | State::Inbound(stream) => { + State::Outbound(stream, stream_set) | State::Inbound(stream, stream_set) => { + let stream_set = *stream_set; if let Poll::Ready(result) = stream.poll_unpin(cx) { self.transition_state(State::Idle); match result { @@ -183,7 +199,10 @@ where } Err(e) => { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - FromHandler::Failed(e), + FromHandler::Failed { + stream_set, + error: e, + }, )); } } @@ -201,8 +220,8 @@ where State::RequestOutbound { .. } | State::WaitingOutbound { .. } | State::WaitingInbound - | State::Outbound(_) - | State::Inbound(_) => {} + | State::Outbound(_, _) + | State::Inbound(_, _) => {} }, } } @@ -245,13 +264,13 @@ where ) .boxed(), }; - self.transition_state(State::Inbound(stream)); + self.transition_state(State::Inbound(stream, stream_set)); } // Ignore inbound connection when we are not expecting it State::RequestOutbound { .. } | State::WaitingOutbound { .. } - | State::Inbound(_) - | State::Outbound(_) => {} + | State::Inbound(_, _) + | State::Outbound(_, _) => {} } } libp2p::swarm::handler::ConnectionEvent::FullyNegotiatedOutbound( @@ -282,14 +301,14 @@ where ) .boxed(), }; - self.transition_state(State::Outbound(stream)); + self.transition_state(State::Outbound(stream, stream_set)); } // Ignore outbound connection when we are not expecting it State::Idle | State::WaitingInbound | State::RequestOutbound { .. } - | State::Outbound(_) - | State::Inbound(_) => {} + | State::Outbound(_, _) + | State::Inbound(_, _) => {} } } libp2p::swarm::handler::ConnectionEvent::AddressChange(_) => {} @@ -306,8 +325,8 @@ where State::Idle | State::WaitingOutbound { .. } | State::RequestOutbound { .. } - | State::Outbound(_) - | State::Inbound(_) => {} + | State::Outbound(_, _) + | State::Inbound(_, _) => {} } } // We failed to upgrade the outbound connection. @@ -323,8 +342,8 @@ where State::Idle | State::WaitingInbound | State::RequestOutbound { .. } - | State::Outbound(_) - | State::Inbound(_) => {} + | State::Outbound(_, _) + | State::Inbound(_, _) => {} } } event => { diff --git a/recon/src/libp2p/stream_set.rs b/recon/src/libp2p/stream_set.rs index 323409e44..e9d7e32ae 100644 --- a/recon/src/libp2p/stream_set.rs +++ b/recon/src/libp2p/stream_set.rs @@ -3,7 +3,7 @@ use anyhow::anyhow; use crate::libp2p::{PROTOCOL_NAME_INTEREST, PROTOCOL_NAME_MODEL}; /// Represents a stream set key -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Copy, Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] pub enum StreamSet { /// Stream set of interest ranges Interest, diff --git a/recon/src/libp2p/tests.rs b/recon/src/libp2p/tests.rs index 8dbe821f8..7f603f90e 100644 --- a/recon/src/libp2p/tests.rs +++ b/recon/src/libp2p/tests.rs @@ -134,12 +134,17 @@ macro_rules! setup_test { Metrics::register(&mut Registry::default()), ); + // Use shorter timings with large backoff for testing. + let config = crate::libp2p::Config { + per_peer_sync_delay: std::time::Duration::from_millis(10), + per_peer_sync_backoff: 100.0, + per_peer_maximum_sync_delay: std::time::Duration::from_millis(1000), + }; let swarm1 = Swarm::new_ephemeral(|_| { - crate::libp2p::Behaviour::new(alice_interest, alice, crate::libp2p::Config::default()) - }); - let swarm2 = Swarm::new_ephemeral(|_| { - crate::libp2p::Behaviour::new(bob_interests, bob, crate::libp2p::Config::default()) + crate::libp2p::Behaviour::new(alice_interest, alice, config.clone()) }); + let swarm2 = + Swarm::new_ephemeral(|_| crate::libp2p::Behaviour::new(bob_interests, bob, config)); (swarm1, swarm2) }}; @@ -204,7 +209,9 @@ async fn initiator_model_error() { failed_peer, crate::libp2p::Event::PeerEvent(PeerEvent { remote_peer_id: swarm2.local_peer_id().to_owned(), - status: PeerStatus::Failed + status: PeerStatus::Failed { + stream_set: StreamSet::Model + } }) ); }; @@ -243,7 +250,120 @@ async fn responder_model_error() { p2_e4, crate::libp2p::Event::PeerEvent(PeerEvent { remote_peer_id: swarm1.local_peer_id().to_owned(), - status: PeerStatus::Failed + status: PeerStatus::Failed { + stream_set: StreamSet::Model + } + }) + ); + }; + + fut.await; +} + +#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] +async fn model_error_backoff() { + let mut bob_model_store = BTreeStoreErrors::default(); + bob_model_store.set_error(Error::new_transient(anyhow::anyhow!( + "transient error should be handled" + ))); + let (mut swarm1, mut swarm2) = setup_test!( + BTreeStoreErrors::default(), + BTreeStoreErrors::default(), + bob_model_store, + BTreeStoreErrors::default(), + ); + + let fut = async move { + swarm1.listen().with_memory_addr_external().await; + swarm2.connect(&mut swarm1).await; + + // Expect interests to sync twice in a row since models fail to sync + let ( + [p1_e1, p1_e2, p1_e3, p1_e4, p1_e5, p1_e6, p1_e7, p1_e8, p1_e9, p1_e10, p1_e11, p1_e12], + [p2_e1, p2_e2, p2_e3, p2_e4, p2_e5, p2_e6, p2_e7, p2_e8, p2_e9, p2_e10, p2_e11, p2_e12], + ): ([crate::libp2p::Event; 12], [crate::libp2p::Event; 12]) = + libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; + + let events = [ + [ + &p1_e1, &p1_e2, &p1_e3, &p1_e4, &p1_e5, &p1_e6, &p1_e7, &p1_e8, &p1_e9, &p1_e10, + &p1_e11, &p1_e12, + ], + [ + &p2_e1, &p2_e2, &p2_e3, &p2_e4, &p2_e5, &p2_e6, &p2_e7, &p2_e8, &p2_e9, &p2_e10, + &p2_e11, &p2_e12, + ], + ]; + + for ev in events.iter().flatten() { + info!("{:?}", ev); + } + let stream_sets: Vec<_> = events + .iter() + .map(|peer_events| { + peer_events + .iter() + .map(|ev| { + let crate::libp2p::Event::PeerEvent(PeerEvent { status, .. }) = ev; + match status { + PeerStatus::Waiting | PeerStatus::Stopped => None, + PeerStatus::Synchronized { stream_set } + | PeerStatus::Started { stream_set } + | PeerStatus::Failed { stream_set } => Some(*stream_set), + } + }) + .collect::>>() + }) + .collect(); + let expected_stream_set_order = vec![ + // First interests sync + Some(StreamSet::Interest), + Some(StreamSet::Interest), + // First model sync + Some(StreamSet::Model), + Some(StreamSet::Model), + // Second interests sync + Some(StreamSet::Interest), + Some(StreamSet::Interest), + // Second model sync with initial short backoff + Some(StreamSet::Model), + Some(StreamSet::Model), + // Third interests sync + Some(StreamSet::Interest), + Some(StreamSet::Interest), + // Third model sync is skipped because the backoff pushed it past the interests sync + Some(StreamSet::Interest), + Some(StreamSet::Interest), + ]; + assert_eq!( + stream_sets, + vec![expected_stream_set_order.clone(), expected_stream_set_order] + ); + assert_eq!( + p2_e4, + crate::libp2p::Event::PeerEvent(PeerEvent { + remote_peer_id: swarm1.local_peer_id().to_owned(), + status: PeerStatus::Failed { + stream_set: StreamSet::Model + } + }) + ); + assert_eq!( + p1_e12, + crate::libp2p::Event::PeerEvent(PeerEvent { + remote_peer_id: swarm2.local_peer_id().to_owned(), + status: PeerStatus::Synchronized { + stream_set: StreamSet::Interest + } + }) + ); + assert_eq!( + p2_e12, + crate::libp2p::Event::PeerEvent(PeerEvent { + remote_peer_id: swarm1.local_peer_id().to_owned(), + status: PeerStatus::Synchronized { + stream_set: StreamSet::Interest + } }) ); };