diff --git a/elfo-network/src/config.rs b/elfo-network/src/config.rs index 2c94f67..6bc50e3 100644 --- a/elfo-network/src/config.rs +++ b/elfo-network/src/config.rs @@ -26,7 +26,7 @@ use serde::{ /// "uds:///tmp/sock" /// ] /// ``` -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Deserialize)] pub struct Config { /// A list of addresses to listen on. #[serde(default)] @@ -59,7 +59,7 @@ pub struct Config { } /// Compression settings. -#[derive(Debug, Default, Deserialize)] +#[derive(Debug, Default, Deserialize, Clone)] pub struct CompressionConfig { /// Compression algorithm. #[serde(default)] @@ -67,7 +67,7 @@ pub struct CompressionConfig { } /// Compression algorithms. -#[derive(Debug, Default, PartialEq, Eq, Deserialize)] +#[derive(Debug, Default, PartialEq, Eq, Deserialize, Clone)] pub enum CompressionAlgorithm { /// LZ4 with default compression level. Lz4, @@ -85,7 +85,7 @@ fn default_idle_timeout() -> Duration { } /// How to discover other nodes. -#[derive(Debug, Deserialize, Default)] +#[derive(Debug, Deserialize, Clone, Default)] pub struct DiscoveryConfig { /// Predefined list of transports to connect to. pub predefined: Vec, diff --git a/elfo-network/src/discovery/diff.rs b/elfo-network/src/discovery/diff.rs new file mode 100644 index 0000000..cdb8f59 --- /dev/null +++ b/elfo-network/src/discovery/diff.rs @@ -0,0 +1,100 @@ +#[cfg_attr(test, derive(Debug))] +pub(crate) struct Diff { + pub(crate) new: Vec, + pub(crate) removed: fxhash::FxHashSet, +} + +impl Diff { + pub(crate) fn make<'a, I>(old: fxhash::FxHashSet, new: I) -> Self + where + T: std::hash::Hash + Eq + Clone + 'a, + I: IntoIterator, + { + let mut new_items = Vec::new(); + let mut removed = old; + // Iterate over `new` version, removing each item from the + // `removed` set. Thus, the `removed` set in the end will + // contain items which are present in `old`, but not present + // in `new` - those elements are removed. + for item in new { + // If `removed` set previously didn't contain the item, then + // it's new. + let had = removed.remove(item); + if !had { + new_items.push(item.clone()); + } + } + + Self { + new: new_items, + removed, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + impl PartialEq for Diff { + fn eq(&self, other: &Self) -> bool { + let mut lhs_new = self.new.clone(); + let mut lhs_removed = self.removed.iter().cloned().collect::>(); + + lhs_new.sort(); + lhs_removed.sort(); + + let mut rhs_new = other.new.clone(); + let mut rhs_removed = other.removed.iter().cloned().collect::>(); + + rhs_new.sort(); + rhs_removed.sort(); + + (lhs_new == rhs_new) && (lhs_removed == rhs_removed) + } + } + impl Eq for Diff {} + + #[test] + fn diff_works() { + struct Input { + old: Vec, + new: Vec, + expected: Diff, + } + + impl Input { + fn new( + old: impl AsRef<[u64]>, + new: impl AsRef<[u64]>, + expected: (impl AsRef<[u64]>, impl AsRef<[u64]>), + ) -> Self { + let old = old.as_ref(); + let new = new.as_ref(); + let (diff_new, diff_removed) = (expected.0.as_ref(), expected.1.as_ref()); + + Self { + old: old.to_vec(), + new: new.to_vec(), + expected: Diff { + new: diff_new.to_vec(), + removed: diff_removed.iter().copied().collect(), + }, + } + } + } + + let inputs = [ + Input::new([], [], ([], [])), + Input::new([1, 2, 3], [1, 2, 3], ([], [])), + Input::new([1, 2, 3], [1, 3], ([], [2])), + Input::new([1, 2, 3], [1, 2, 3, 4], ([4], [])), + Input::new([1, 2, 3], [1, 3, 4], ([4], [2])), + ]; + for Input { new, old, expected } in inputs { + let actual = Diff::make(old.into_iter().collect(), new.iter()); + + assert_eq!(expected, actual); + } + } +} diff --git a/elfo-network/src/discovery/mod.rs b/elfo-network/src/discovery/mod.rs index 6a0dee2..05ba34e 100644 --- a/elfo-network/src/discovery/mod.rs +++ b/elfo-network/src/discovery/mod.rs @@ -1,4 +1,4 @@ -use std::{future::Future, sync::Arc, time::Duration}; +use std::{future::Future, mem, sync::Arc, time::Duration}; use eyre::{bail, eyre, Result, WrapErr}; use futures::StreamExt; @@ -12,13 +12,17 @@ use elfo_core::{ use crate::{ codec::format::{NetworkAddr, NetworkEnvelope, NetworkEnvelopePayload}, - config::{CompressionAlgorithm, Transport}, + config::{self, CompressionAlgorithm, Transport}, node_map::{NodeInfo, NodeMap}, protocol::{internode, DataConnectionFailed, GroupInfo, HandleConnection}, socket::{self, ReadError, Socket}, NetworkContext, }; +use self::diff::Diff; + +mod diff; + /// Initial window size of every flow. /// TODO: should be different for groups and actors. const INITIAL_WINDOW_SIZE: i32 = 100_000; @@ -69,6 +73,7 @@ struct ControlConnectionFailed { } pub(super) struct Discovery { + cfg: config::Config, ctx: NetworkContext, node_map: Arc, } @@ -82,7 +87,9 @@ pub(super) struct Discovery { impl Discovery { pub(super) fn new(ctx: NetworkContext, topology: Topology) -> Self { + let cfg = ctx.config().clone(); Self { + cfg, ctx, node_map: Arc::new(NodeMap::new(&topology)), } @@ -97,14 +104,12 @@ impl Discovery { ))); self.listen().await?; - self.discover(); + self.discover_all(); while let Some(envelope) = self.ctx.recv().await { msg!(match envelope { ConfigUpdated => { - // TODO: update listeners. - // TODO: stop discovering for removed transports. - // TODO: self.discover(); + self.on_update_config(); } msg @ ConnectionEstablished => self.on_connection_established(msg), msg @ ConnectionAccepted => self.on_connection_accepted(msg), @@ -121,7 +126,7 @@ impl Discovery { msg @ ControlConnectionFailed => { if let Some(transport) = msg.transport { tokio::time::sleep(std::time::Duration::from_secs(5)).await; - self.discover_one(transport); + self.discover(transport); } } }); @@ -132,19 +137,52 @@ impl Discovery { fn get_capabilities(&self) -> socket::Capabilities { let mut capabilities = socket::Capabilities::empty(); - if self.ctx.config().compression.algorithm == CompressionAlgorithm::Lz4 { + if self.cfg.compression.algorithm == CompressionAlgorithm::Lz4 { capabilities |= socket::Capabilities::LZ4; } capabilities } + fn on_update_config(&mut self) { + // TODO: Update listeners. + let cfg = self.ctx.config().clone(); + let old = mem::replace(&mut self.cfg, cfg); + + self.update_discovery(old.discovery); + } + + fn update_discovery(&mut self, old: config::DiscoveryConfig) { + let config::DiscoveryConfig { + predefined, + attempt_interval: _, + } = old; + + { + let Diff { new, removed } = Diff::make( + predefined.into_iter().collect(), + self.ctx.config().discovery.predefined.iter(), + ); + for transport in new { + self.discover(transport); + } + + // FIXME: handle removal. + if !removed.is_empty() { + warn!( + ?removed, + "got removal of several discovery.predefined entries, this is not supported as of now" + ); + } + } + } + async fn listen(&mut self) -> Result<()> { let node_no = self.node_map.this.node_no; let launch_id = self.node_map.this.launch_id; let capabilities = self.get_capabilities(); - for transport in self.ctx.config().listen.clone() { - let stream = socket::listen(&transport, node_no, launch_id, capabilities) + for transport in &self.cfg.listen { + let stream = socket::listen(transport, node_no, launch_id, capabilities) .await .wrap_err_with(|| eyre!("cannot listen {}", transport))? .filter_map(move |socket| async move { @@ -176,13 +214,13 @@ impl Discovery { Ok(()) } - fn discover(&mut self) { - for transport in self.ctx.config().discovery.predefined.clone() { - self.discover_one(transport); + fn discover_all(&mut self) { + for transport in self.cfg.discovery.predefined.clone() { + self.discover(transport); } } - fn discover_one(&mut self, transport: Transport) { + fn discover(&mut self, transport: Transport) { let msg = internode::SwitchToControl { groups: self.node_map.this.groups.clone(), }; @@ -194,7 +232,7 @@ impl Discovery { transport: &Transport, role: ConnectionRole, ) -> Stream { - let interval = self.ctx.config().discovery.attempt_interval; + let interval = self.cfg.discovery.attempt_interval; let transport = transport.clone(); let node_no = self.node_map.this.node_no; let launch_id = self.node_map.this.launch_id; @@ -254,7 +292,7 @@ impl Discovery { ); let node_map = self.node_map.clone(); - let idle_timeout = self.ctx.config().idle_timeout; + let idle_timeout = self.cfg.idle_timeout; self.ctx.attach(Stream::once(async move { let info = socket.info.clone(); let peer = socket.peer.clone();