Skip to content

Commit

Permalink
feat(network): support the turmoil transport
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Oct 6, 2024
1 parent 33f57ce commit 33da3b3
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 26 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- core/config: impl `FromStr` for `Secret` ([#135]).
- core/init: emit the `elfo_start_time_seconds` metric.
- core/actor: implement `Display` for `ActorMeta` ([#74]).
- network: add `idle_timeout` to detect and disconnect stuck connections.
- network: add `idle_timeout` to detect and disconnect stuck connections ([#137]).
- network: add the `turmoil` transport for testing distributed actors ([#137]).

### Changed
- **BREAKING** core/node: remove `node` module, `NodeNo` is moved to `addr`.
Expand All @@ -28,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[#74]: https://github.com/elfo-rs/elfo/issues/74
[#135]: https://github.com/elfo-rs/elfo/pull/135
[#136]: https://github.com/elfo-rs/elfo/pull/136
[#137]: https://github.com/elfo-rs/elfo/pull/137

## [0.2.0-alpha.16] - 2024-07-24
### Added
Expand Down
5 changes: 4 additions & 1 deletion elfo-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ rust-version.workspace = true
[lints]
workspace = true

[features]
turmoil06 = ["dep:turmoil06"]

[dependencies]
elfo-core = { version = "0.2.0-alpha.16", path = "../elfo-core", features = ["unstable", "network"] }
elfo-utils = { version = "0.2.6", path = "../elfo-utils" }
Expand All @@ -27,14 +30,14 @@ eyre = "0.6.8"
fxhash = "0.2.1"
futures = "0.3.21"
tokio = { workspace = true, features = ["net", "io-util"] }
tokio-util = "0.7"
tracing = "0.1.25"
parking_lot = "0.12"
humantime-serde = "1"
kanal = "0.1.0-pre8"
bitflags = "2.3.2"
lz4_flex = { version = "0.11.1", default-features = false, features = ["std"] }
byteorder = "1.4.3"
turmoil06 = { package = "turmoil", version = "0.6", optional = true }

[dev-dependencies]
tracing-test = "0.2.4" # TODO: actually unused?
19 changes: 19 additions & 0 deletions elfo-network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ pub enum Transport {
#[cfg(unix)]
#[display("uds://{}", "_0.display()")]
Uds(PathBuf),
/// Turmoil v0.6 transport ("turmoil06://host").
///
/// Useful for testing purposes only.
#[cfg(feature = "turmoil06")]
#[display("turmoil06://{_0}")]
Turmoil06(String),
}

impl FromStr for Transport {
Expand All @@ -134,6 +140,8 @@ impl FromStr for Transport {
);
Ok(Transport::Uds(PathBuf::from(addr)))
}
#[cfg(feature = "turmoil06")]
"turmoil06" => Ok(Transport::Turmoil06(addr.into())),
proto => bail!("unknown protocol: {proto}"),
}
}
Expand Down Expand Up @@ -184,6 +192,10 @@ mod tests {
Transport::from_str("tcp://127.0.0.1:4242").unwrap(),
Transport::Tcp("127.0.0.1:4242".into())
);
assert_eq!(
Transport::from_str("tcp://alice:4242").unwrap(),
Transport::Tcp("alice:4242".into())
);

// UDS
#[cfg(unix)]
Expand All @@ -201,5 +213,12 @@ mod tests {
"path to UDS socket cannot be directory"
);
}

// Turmoil06
#[cfg(feature = "turmoil06")]
assert_eq!(
Transport::from_str("turmoil06://alice").unwrap(),
Transport::Turmoil06("alice".into())
);
}
}
62 changes: 62 additions & 0 deletions elfo-network/src/socket/idleness.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::{
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::Duration,
};

use tokio::time::Instant;

// === IdleTracker ===

/// Measures the time since the last activity on a socket.
/// It's used to implement health checks based on idle timeout.
pub(crate) struct IdleTracker {
track: IdleTrack,
prev_value: u32,
prev_time: Instant,
}

impl IdleTracker {
pub(super) fn new() -> (Self, IdleTrack) {
let track = IdleTrack(<_>::default());
let this = Self {
track: track.clone(),
prev_value: track.get(),
prev_time: Instant::now(),
};

(this, track)
}

/// Returns the elapsed time since the last `check()` call
/// that observed any [`IdleTrack::update()`] calls.
pub(crate) fn check(&mut self) -> Duration {
let now = Instant::now();
let new_value = self.track.get();

if self.prev_value != new_value {
self.prev_value = new_value;
self.prev_time = now;
}

now.duration_since(self.prev_time)
}
}

// === IdleTrack ===

#[derive(Clone)]
pub(super) struct IdleTrack(Arc<AtomicU32>);

impl IdleTrack {
/// Marks this socket as non-idle.
pub(super) fn update(&self) {
self.0.fetch_add(1, Ordering::Relaxed);
}

fn get(&self) -> u32 {
self.0.load(Ordering::Relaxed)
}
}
46 changes: 33 additions & 13 deletions elfo-network/src/socket/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use std::{

use derive_more::Display;
use eyre::Result;
use futures::{Stream, StreamExt};
use futures::{stream::BoxStream, StreamExt};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
#[cfg(unix)]
use tokio_util::either::Either;

use crate::config::Transport;

mod tcp;
#[cfg(feature = "turmoil06")]
mod turmoil;
mod uds;

macro_rules! delegate_call {
Expand All @@ -22,6 +22,8 @@ macro_rules! delegate_call {
Self::Tcp(v) => Pin::new(v).$method($($args),+),
#[cfg(unix)]
Self::Uds(v) => Pin::new(v).$method($($args),+),
#[cfg(feature = "turmoil06")]
Self::Turmoil06(v) => Pin::new(v).$method($($args),+),
}
}
}
Expand All @@ -31,12 +33,16 @@ pub(crate) enum SocketInfo {
Tcp(tcp::SocketInfo),
#[cfg(unix)]
Uds(uds::SocketInfo),
#[cfg(feature = "turmoil06")]
Turmoil06(turmoil::SocketInfo),
}

pub(super) enum OwnedReadHalf {
Tcp(tcp::OwnedReadHalf),
#[cfg(unix)]
Uds(uds::OwnedReadHalf),
#[cfg(feature = "turmoil06")]
Turmoil06(turmoil::OwnedReadHalf),
}

impl AsyncRead for OwnedReadHalf {
Expand All @@ -53,6 +59,8 @@ pub(super) enum OwnedWriteHalf {
Tcp(tcp::OwnedWriteHalf),
#[cfg(unix)]
Uds(uds::OwnedWriteHalf),
#[cfg(feature = "turmoil06")]
Turmoil06(turmoil::OwnedWriteHalf),
}

impl AsyncWrite for OwnedWriteHalf {
Expand Down Expand Up @@ -81,6 +89,8 @@ impl AsyncWrite for OwnedWriteHalf {
Self::Tcp(v) => v.is_write_vectored(),
#[cfg(unix)]
Self::Uds(v) => v.is_write_vectored(),
#[cfg(feature = "turmoil06")]
Self::Turmoil06(v) => v.is_write_vectored(),
}
}
}
Expand Down Expand Up @@ -112,23 +122,33 @@ impl From<uds::Socket> for Socket {
}
}

#[cfg(feature = "turmoil06")]
impl From<turmoil::Socket> for Socket {
fn from(socket: turmoil::Socket) -> Self {
Self {
read: OwnedReadHalf::Turmoil06(socket.read),
write: OwnedWriteHalf::Turmoil06(socket.write),
info: SocketInfo::Turmoil06(socket.info),
}
}
}

pub(super) async fn connect(addr: &Transport) -> Result<Socket> {
match addr {
Transport::Tcp(addr) => tcp::connect(addr).await.map(Into::into),
#[cfg(unix)]
Transport::Uds(addr) => uds::connect(addr).await.map(Into::into),
#[cfg(feature = "turmoil06")]
Transport::Turmoil06(addr) => turmoil::connect(addr).await.map(Into::into),
}
}

pub(super) async fn listen(addr: &Transport) -> Result<impl Stream<Item = Socket> + 'static> {
match addr {
Transport::Tcp(addr) => {
let result = tcp::listen(addr).await.map(|s| s.map(Into::into));
#[cfg(unix)]
let result = result.map(Either::Left);
result
}
pub(super) async fn listen(addr: &Transport) -> Result<BoxStream<'static, Socket>> {
Ok(match addr {
Transport::Tcp(addr) => Box::pin(tcp::listen(addr).await?.map(Into::into)),
#[cfg(unix)]
Transport::Uds(addr) => uds::listen(addr).map(|s| Either::Right(s.map(Into::into))),
}
Transport::Uds(addr) => Box::pin(uds::listen(addr)?.map(Into::into)),
#[cfg(feature = "turmoil06")]
Transport::Turmoil06(addr) => Box::pin(turmoil::listen(addr).await?.map(Into::into)),
})
}
73 changes: 73 additions & 0 deletions elfo-network/src/socket/raw/turmoil.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::net::SocketAddr;

use derive_more::Display;
use eyre::{Result, WrapErr};
use futures::Stream;
use tracing::warn;
use turmoil06::net::{TcpListener, TcpStream};

pub(super) use turmoil06::net::tcp::{OwnedReadHalf, OwnedWriteHalf};

const PORT: u16 = 0xE1F0;

#[derive(Clone, Display)]
#[display("turmoil06(local={local}, peer={peer})")] // TODO: use `valuable` after tracing#1570
pub(crate) struct SocketInfo {
local: String,
peer: String,
}

pub(super) struct Socket {
pub(super) read: OwnedReadHalf,
pub(super) write: OwnedWriteHalf,
pub(super) info: SocketInfo,
}

fn prepare_stream(stream: TcpStream) -> Result<Socket> {
let info = SocketInfo {
local: stringify_addr(stream.local_addr().wrap_err("cannot get local addr")?),
peer: stringify_addr(stream.peer_addr().wrap_err("cannot get peer addr")?),
};

let (read, write) = stream.into_split();
Ok(Socket { read, write, info })
}

fn stringify_addr(addr: SocketAddr) -> String {
let (ip, port) = (addr.ip(), addr.port());
let host = turmoil06::reverse_lookup(ip).unwrap_or_else(|| ip.to_string());
format!("{host}:{port}")
}

pub(super) async fn connect(host: &str) -> Result<Socket> {
prepare_stream(TcpStream::connect((host, PORT)).await?)
}

pub(super) async fn listen(host: &str) -> Result<impl Stream<Item = Socket> + 'static> {
let listener = TcpListener::bind((host, PORT)).await?;

let accept = move |listener: TcpListener| async move {
loop {
let result = listener
.accept()
.await
.map_err(Into::into)
.and_then(|(socket, _)| prepare_stream(socket));

match result {
Ok(socket) => return Some((socket, listener)),
Err(err) => {
warn!(
message = "cannot accept TCP connection",
error = %err,
// TODO: addr
);

// Continue listening.
}
}
}
};

Ok(futures::stream::unfold(listener, accept))
}
2 changes: 2 additions & 0 deletions elfo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ network = ["elfo-network"]
unstable = ["elfo-core/unstable", "elfo-telemeter/unstable", "elfo-test/unstable" ]
unstable-stuck-detection = ["elfo-core/unstable-stuck-detection"]
tracing-log = ["elfo-logger/tracing-log"]
turmoil06 = ["elfo-network/turmoil06"]

[dependencies]
elfo-core = { version = "0.2.0-alpha.16", path = "../elfo-core" }
Expand Down Expand Up @@ -49,6 +50,7 @@ static_assertions = "1.1.0"
parking_lot = "0.12"
libc = "0.2.97"
futures-intrusive = "0.5"
turmoil = "0.6"

[package.metadata.docs.rs]
all-features = true
Expand Down
10 changes: 10 additions & 0 deletions elfo/tests/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#![allow(dead_code)] // TODO: combine tests into "it/*"

// For tests without `elfo::test::proxy`.
pub(crate) fn setup_logger() {
let _ = tracing_subscriber::fmt()
.with_target(false)
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_test_writer()
.try_init();
}
Loading

0 comments on commit 33da3b3

Please sign in to comment.