Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to change network discovery configuration on the fly #140

Merged
merged 4 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions elfo-network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use serde::{
/// "uds:///tmp/sock"
/// ]
/// ```
#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct Config {
/// A list of addresses to listen on.
#[serde(default)]
Expand Down Expand Up @@ -59,15 +59,15 @@ pub struct Config {
}

/// Compression settings.
#[derive(Debug, Default, Deserialize)]
#[derive(Debug, Default, Deserialize, Clone)]
pub struct CompressionConfig {
/// Compression algorithm.
#[serde(default)]
pub algorithm: CompressionAlgorithm,
}

/// Compression algorithms.
#[derive(Debug, Default, PartialEq, Eq, Deserialize)]
#[derive(Debug, Default, PartialEq, Eq, Deserialize, Clone)]
pub enum CompressionAlgorithm {
/// LZ4 with default compression level.
Lz4,
Expand All @@ -85,7 +85,7 @@ fn default_idle_timeout() -> Duration {
}

/// How to discover other nodes.
#[derive(Debug, Deserialize, Default)]
#[derive(Debug, Deserialize, Clone, Default)]
pub struct DiscoveryConfig {
/// Predefined list of transports to connect to.
pub predefined: Vec<Transport>,
Expand Down
100 changes: 100 additions & 0 deletions elfo-network/src/discovery/diff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#[cfg_attr(test, derive(Debug))]
pub(crate) struct Diff<T> {
pub(crate) new: Vec<T>,
pub(crate) removed: fxhash::FxHashSet<T>,
}

impl<T> Diff<T> {
pub(crate) fn make<'a, I>(old: fxhash::FxHashSet<T>, new: I) -> Self
where
T: std::hash::Hash + Eq + Clone + 'a,
I: IntoIterator<Item = &'a T>,
{
let mut new_items = Vec::new();
let mut removed = old;
// Iterate over `new` version, removing each item from the
// `removed` set. Thus, the `removed` set in the end will
// contain items which are present in `old`, but not present
// in `new` - those elements are removed.
for item in new {
// If `removed` set previously didn't contain the item, then
// it's new.
let had = removed.remove(item);
if !had {
new_items.push(item.clone());
}
}

Self {
new: new_items,
removed,
}
}
}

#[cfg(test)]
mod tests {
use super::*;

impl<T: Eq + Ord + Clone> PartialEq for Diff<T> {
fn eq(&self, other: &Self) -> bool {
let mut lhs_new = self.new.clone();
let mut lhs_removed = self.removed.iter().cloned().collect::<Vec<_>>();

lhs_new.sort();
lhs_removed.sort();

let mut rhs_new = other.new.clone();
let mut rhs_removed = other.removed.iter().cloned().collect::<Vec<_>>();

rhs_new.sort();
rhs_removed.sort();

(lhs_new == rhs_new) && (lhs_removed == rhs_removed)
}
}
impl<T: Eq + Ord + Clone> Eq for Diff<T> {}

#[test]
fn diff_works() {
struct Input {
old: Vec<u64>,
new: Vec<u64>,
expected: Diff<u64>,
}

impl Input {
fn new(
old: impl AsRef<[u64]>,
new: impl AsRef<[u64]>,
expected: (impl AsRef<[u64]>, impl AsRef<[u64]>),
) -> Self {
let old = old.as_ref();
let new = new.as_ref();
let (diff_new, diff_removed) = (expected.0.as_ref(), expected.1.as_ref());

Self {
old: old.to_vec(),
new: new.to_vec(),
expected: Diff {
new: diff_new.to_vec(),
removed: diff_removed.iter().copied().collect(),
},
}
}
}

let inputs = [
Input::new([], [], ([], [])),
Input::new([1, 2, 3], [1, 2, 3], ([], [])),
Input::new([1, 2, 3], [1, 3], ([], [2])),
Input::new([1, 2, 3], [1, 2, 3, 4], ([4], [])),
Input::new([1, 2, 3], [1, 3, 4], ([4], [2])),
];
for Input { new, old, expected } in inputs {
let actual = Diff::make(old.into_iter().collect(), new.iter());

assert_eq!(expected, actual);
}
}
}
70 changes: 54 additions & 16 deletions elfo-network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::Future, sync::Arc, time::Duration};
use std::{future::Future, mem, sync::Arc, time::Duration};

use eyre::{bail, eyre, Result, WrapErr};
use futures::StreamExt;
Expand All @@ -12,13 +12,15 @@ use elfo_core::{

use crate::{
codec::format::{NetworkAddr, NetworkEnvelope, NetworkEnvelopePayload},
config::{CompressionAlgorithm, Transport},
config::{self, CompressionAlgorithm, Transport},
node_map::{NodeInfo, NodeMap},
protocol::{internode, DataConnectionFailed, GroupInfo, HandleConnection},
socket::{self, ReadError, Socket},
NetworkContext,
};

use diff::Diff;

/// Initial window size of every flow.
/// TODO: should be different for groups and actors.
const INITIAL_WINDOW_SIZE: i32 = 100_000;
Expand Down Expand Up @@ -69,6 +71,7 @@ struct ControlConnectionFailed {
}

pub(super) struct Discovery {
cfg: config::Config,
ctx: NetworkContext,
node_map: Arc<NodeMap>,
}
Expand All @@ -82,7 +85,9 @@ pub(super) struct Discovery {

impl Discovery {
pub(super) fn new(ctx: NetworkContext, topology: Topology) -> Self {
let cfg = ctx.config().clone();
Self {
cfg,
ctx,
node_map: Arc::new(NodeMap::new(&topology)),
}
Expand All @@ -97,14 +102,12 @@ impl Discovery {
)));

self.listen().await?;
self.discover();
self.discover_all();

while let Some(envelope) = self.ctx.recv().await {
msg!(match envelope {
ConfigUpdated => {
// TODO: update listeners.
// TODO: stop discovering for removed transports.
// TODO: self.discover();
self.on_update_config();
}
msg @ ConnectionEstablished => self.on_connection_established(msg),
msg @ ConnectionAccepted => self.on_connection_accepted(msg),
Expand All @@ -121,7 +124,7 @@ impl Discovery {
msg @ ControlConnectionFailed => {
if let Some(transport) = msg.transport {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
self.discover_one(transport);
self.discover(transport);
}
}
});
Expand All @@ -132,19 +135,52 @@ impl Discovery {

fn get_capabilities(&self) -> socket::Capabilities {
let mut capabilities = socket::Capabilities::empty();
if self.ctx.config().compression.algorithm == CompressionAlgorithm::Lz4 {
if self.cfg.compression.algorithm == CompressionAlgorithm::Lz4 {
capabilities |= socket::Capabilities::LZ4;
}
capabilities
}

fn on_update_config(&mut self) {
// TODO: Update listeners.
let cfg = self.ctx.config().clone();
let old = mem::replace(&mut self.cfg, cfg);

self.update_discovery(old.discovery);
}

fn update_discovery(&mut self, old: config::DiscoveryConfig) {
let config::DiscoveryConfig {
predefined,
attempt_interval: _,
} = old;

{
loyd marked this conversation as resolved.
Show resolved Hide resolved
let Diff { new, removed } = Diff::make(
predefined.into_iter().collect(),
self.ctx.config().discovery.predefined.iter(),
);
for transport in new {
self.discover(transport);
}

// FIXME: handle removal.
if !removed.is_empty() {
error!(
nerodono marked this conversation as resolved.
Show resolved Hide resolved
?removed,
"got removal of several discovery.predefined entries, this is not supported as of now"
);
}
}
}

async fn listen(&mut self) -> Result<()> {
let node_no = self.node_map.this.node_no;
let launch_id = self.node_map.this.launch_id;
let capabilities = self.get_capabilities();

for transport in self.ctx.config().listen.clone() {
let stream = socket::listen(&transport, node_no, launch_id, capabilities)
for transport in &self.cfg.listen {
let stream = socket::listen(transport, node_no, launch_id, capabilities)
.await
.wrap_err_with(|| eyre!("cannot listen {}", transport))?
.filter_map(move |socket| async move {
Expand Down Expand Up @@ -176,13 +212,13 @@ impl Discovery {
Ok(())
}

fn discover(&mut self) {
for transport in self.ctx.config().discovery.predefined.clone() {
self.discover_one(transport);
fn discover_all(&mut self) {
for transport in self.cfg.discovery.predefined.clone() {
self.discover(transport);
}
}

fn discover_one(&mut self, transport: Transport) {
fn discover(&mut self, transport: Transport) {
let msg = internode::SwitchToControl {
groups: self.node_map.this.groups.clone(),
};
Expand All @@ -194,7 +230,7 @@ impl Discovery {
transport: &Transport,
role: ConnectionRole,
) -> Stream<ConnectionEstablished> {
let interval = self.ctx.config().discovery.attempt_interval;
let interval = self.cfg.discovery.attempt_interval;
let transport = transport.clone();
let node_no = self.node_map.this.node_no;
let launch_id = self.node_map.this.launch_id;
Expand Down Expand Up @@ -254,7 +290,7 @@ impl Discovery {
);

let node_map = self.node_map.clone();
let idle_timeout = self.ctx.config().idle_timeout;
let idle_timeout = self.cfg.idle_timeout;
self.ctx.attach(Stream::once(async move {
let info = socket.info.clone();
let peer = socket.peer.clone();
Expand Down Expand Up @@ -561,3 +597,5 @@ fn unexpected_message_error(envelope: Envelope, expected: &[&str]) -> eyre::Repo
async fn timeout<T>(duration: Duration, fut: impl Future<Output = Result<T>>) -> Result<T> {
tokio::time::timeout(duration, fut).await?
}

mod diff;
nerodono marked this conversation as resolved.
Show resolved Hide resolved