Skip to content

Commit

Permalink
feat(network): use the idle timeout as a health check
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Oct 6, 2024
1 parent 1a86514 commit 33f57ce
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- core/config: impl `FromStr` for `Secret` ([#135]).
- core/init: emit the `elfo_start_time_seconds` metric.
- core/actor: implement `Display` for `ActorMeta` ([#74]).
- network: add `idle_timeout` to detect and disconnect stuck connections.

### Changed
- **BREAKING** core/node: remove `node` module, `NodeNo` is moved to `addr`.
Expand Down
28 changes: 24 additions & 4 deletions elfo-network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,33 @@ use serde::{
#[derive(Debug, Deserialize)]
pub struct Config {
/// A list of addresses to listen on.
#[serde(default)]
pub listen: Vec<Transport>,
/// How often nodes should ping each other.
/// `5s` by default
#[serde(with = "humantime_serde", default = "default_ping_interval")]
pub ping_interval: Duration,
/// How to discover other nodes.
#[serde(default)]
pub discovery: DiscoveryConfig, // TODO: optional?
/// Compression settings.
#[serde(default)]
pub compression: CompressionConfig,
/// How often nodes should ping each other.
///
/// Pings are used to measure RTT and detect dead connections.
/// For the latest purpose, see `idle_timeout`.
///
/// `5s` by default.
#[serde(with = "humantime_serde", default = "default_ping_interval")]
pub ping_interval: Duration,
/// The maximum inactivity time of every connection.
///
/// If no data is received on a connection for over `idle_timeout` time,
/// the connection is considered dead and will be automatically closed.
///
/// This timeout is checked every `ping_interval` time, so the actual time
/// lies in the range of `idle_timeout` to `idle_timeout + ping_interval`.
///
/// `30s` by default.
#[serde(with = "humantime_serde", default = "default_idle_timeout")]
pub idle_timeout: Duration,
}

/// Compression settings.
Expand All @@ -64,6 +80,10 @@ fn default_ping_interval() -> Duration {
Duration::from_secs(5)
}

fn default_idle_timeout() -> Duration {
Duration::from_secs(30)
}

/// How to discover other nodes.
#[derive(Debug, Deserialize, Default)]
pub struct DiscoveryConfig {
Expand Down
31 changes: 25 additions & 6 deletions elfo-network/src/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use std::{future::Future, time::Duration};

use derive_more::{Constructor, Display};
use eyre::{eyre, Result, WrapErr};
use futures::StreamExt;
use futures::{stream::BoxStream, StreamExt};
use metrics::counter;
use tokio::io;
use tracing::{trace, warn};

use elfo_core::addr::{NodeLaunchId, NodeNo};
use elfo_utils::likely;

use self::idleness::{IdleTrack, IdleTracker};
use crate::{
codec::{decode::EnvelopeDetails, encode::EncodeError, format::NetworkEnvelope},
config::Transport,
Expand All @@ -20,6 +21,7 @@ use crate::{
};

mod handshake;
mod idleness;
mod raw;

bitflags::bitflags! {
Expand All @@ -34,6 +36,7 @@ pub(crate) struct Socket {
pub(crate) peer: Peer,
pub(crate) read: ReadHalf,
pub(crate) write: WriteHalf,
pub(crate) idle: IdleTracker,
}

#[derive(Display, Clone, Constructor)]
Expand All @@ -53,18 +56,22 @@ impl Socket {
(FramedRead::none(), FramedWrite::none(None))
};

let (idle_tracker, idle_track) = IdleTracker::new();

Self {
info: raw.info,
peer: Peer::new(handshake.node_no, handshake.launch_id),
read: ReadHalf::new(framed_read, raw.read),
read: ReadHalf::new(framed_read, raw.read, idle_track),
write: WriteHalf::new(framed_write, raw.write),
idle: idle_tracker,
}
}
}

pub(crate) struct ReadHalf {
framing: FramedRead,
read: raw::OwnedReadHalf,
idle: IdleTrack,
}

#[derive(Debug)]
Expand All @@ -83,8 +90,12 @@ where
}

impl ReadHalf {
fn new(framing: FramedRead, read: raw::OwnedReadHalf) -> Self {
Self { framing, read }
fn new(framing: FramedRead, read: raw::OwnedReadHalf, idle: IdleTrack) -> Self {
Self {
framing,
read,
idle,
}
}

fn report_framing_metrics(&mut self) {
Expand All @@ -104,13 +115,15 @@ impl ReadHalf {
let envelope = loop {
let buffer = match self.framing.read()? {
FramedReadState::NeedMoreData { buffer } => {
trace!(message = "framed read strategy requested more data");
trace!("framed read strategy requested more data");
buffer
}
FramedReadState::EnvelopeSkipped(details) => {
self.idle.update();
return Err(ReadError::EnvelopeSkipped(details));
}
FramedReadState::Done { decoded } => {
self.idle.update();
let (protocol, name) = decoded.payload.protocol_and_name();
trace!(
message = "framed read strategy decoded single envelope",
Expand All @@ -123,6 +136,12 @@ impl ReadHalf {
};

let bytes_read = io::AsyncReadExt::read(&mut self.read, buffer).await?;

// Large messages cannot be read in a single `read()` call, so we should
// additionally update the idle tracker even without waiting until the
// message is fully decoded to prevent false positive disconnects.
self.idle.update();

if bytes_read == 0 {
// EOF.
return Ok(None);
Expand Down Expand Up @@ -239,7 +258,7 @@ pub(crate) async fn listen(
node_no: NodeNo,
launch_id: NodeLaunchId,
capabilities: Capabilities,
) -> Result<futures::stream::BoxStream<'static, Socket>> {
) -> Result<BoxStream<'static, Socket>> {
let stream = timeout(LISTEN_TIMEOUT, raw::listen(addr)).await?;
let stream = stream
.map(move |mut raw_socket| async move {
Expand Down
18 changes: 14 additions & 4 deletions elfo-network/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,28 +164,38 @@ impl Worker {
};
self.ctx.attach(Stream::once(sr.exec()));

let mut idle = socket.idle;

// Start ping ticks.
let ping_interval = self.ctx.attach(Interval::new(PingTick));
ping_interval.start_after(Duration::ZERO, self.ctx.config().ping_interval);

while let Some(envelope) = self.ctx.recv().await {
// TODO: graceful termination
// TODO: handle another `HandleConnection`

msg!(match envelope {
ConfigUpdated => {
ping_interval.set_period(self.ctx.config().ping_interval);
}
PingTick => {
let idle_time = idle.check();

if idle_time >= self.ctx.config().idle_timeout {
error!(
message = "no data is received for a long time, closing",
idle_time = ?idle_time,
timeout = ?self.ctx.config().idle_timeout,
);
break;
}

let envelope = make_system_envelope(internode::Ping {
payload: Instant::now().nanos_since(time_origin),
});
let _ = local_tx.try_send(KanalItem::simple(NetworkAddr::NULL, envelope));

// TODO: perform health check
}
msg @ HandleConnection => {
info!("duplicate connection, skipping"); // TODO
info!("duplicate connection, skipping"); // TODO: replace?
if self.transport.is_none() {
self.transport = msg.transport;
}
Expand Down

0 comments on commit 33f57ce

Please sign in to comment.