From 33f57ce071cd901edc95663d42007f1a84873275 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Sun, 6 Oct 2024 18:21:34 +0400 Subject: [PATCH] feat(network): use the idle timeout as a health check --- CHANGELOG.md | 1 + elfo-network/src/config.rs | 28 ++++++++++++++++++++++++---- elfo-network/src/socket/mod.rs | 31 +++++++++++++++++++++++++------ elfo-network/src/worker/mod.rs | 18 ++++++++++++++---- 4 files changed, 64 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c518002c..ad6d22a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - core/config: impl `FromStr` for `Secret` ([#135]). - core/init: emit the `elfo_start_time_seconds` metric. - core/actor: implement `Display` for `ActorMeta` ([#74]). +- network: add `idle_timeout` to detect and disconnect stuck connections. ### Changed - **BREAKING** core/node: remove `node` module, `NodeNo` is moved to `addr`. diff --git a/elfo-network/src/config.rs b/elfo-network/src/config.rs index a8cfa43d..8c78ae5f 100644 --- a/elfo-network/src/config.rs +++ b/elfo-network/src/config.rs @@ -29,17 +29,33 @@ use serde::{ #[derive(Debug, Deserialize)] pub struct Config { /// A list of addresses to listen on. + #[serde(default)] pub listen: Vec, - /// How often nodes should ping each other. - /// `5s` by default - #[serde(with = "humantime_serde", default = "default_ping_interval")] - pub ping_interval: Duration, /// How to discover other nodes. #[serde(default)] pub discovery: DiscoveryConfig, // TODO: optional? /// Compression settings. #[serde(default)] pub compression: CompressionConfig, + /// How often nodes should ping each other. + /// + /// Pings are used to measure RTT and detect dead connections. + /// For the latest purpose, see `idle_timeout`. + /// + /// `5s` by default. + #[serde(with = "humantime_serde", default = "default_ping_interval")] + pub ping_interval: Duration, + /// The maximum inactivity time of every connection. + /// + /// If no data is received on a connection for over `idle_timeout` time, + /// the connection is considered dead and will be automatically closed. + /// + /// This timeout is checked every `ping_interval` time, so the actual time + /// lies in the range of `idle_timeout` to `idle_timeout + ping_interval`. + /// + /// `30s` by default. + #[serde(with = "humantime_serde", default = "default_idle_timeout")] + pub idle_timeout: Duration, } /// Compression settings. @@ -64,6 +80,10 @@ fn default_ping_interval() -> Duration { Duration::from_secs(5) } +fn default_idle_timeout() -> Duration { + Duration::from_secs(30) +} + /// How to discover other nodes. #[derive(Debug, Deserialize, Default)] pub struct DiscoveryConfig { diff --git a/elfo-network/src/socket/mod.rs b/elfo-network/src/socket/mod.rs index a431eedd..51e4e41f 100644 --- a/elfo-network/src/socket/mod.rs +++ b/elfo-network/src/socket/mod.rs @@ -2,7 +2,7 @@ use std::{future::Future, time::Duration}; use derive_more::{Constructor, Display}; use eyre::{eyre, Result, WrapErr}; -use futures::StreamExt; +use futures::{stream::BoxStream, StreamExt}; use metrics::counter; use tokio::io; use tracing::{trace, warn}; @@ -10,6 +10,7 @@ use tracing::{trace, warn}; use elfo_core::addr::{NodeLaunchId, NodeNo}; use elfo_utils::likely; +use self::idleness::{IdleTrack, IdleTracker}; use crate::{ codec::{decode::EnvelopeDetails, encode::EncodeError, format::NetworkEnvelope}, config::Transport, @@ -20,6 +21,7 @@ use crate::{ }; mod handshake; +mod idleness; mod raw; bitflags::bitflags! { @@ -34,6 +36,7 @@ pub(crate) struct Socket { pub(crate) peer: Peer, pub(crate) read: ReadHalf, pub(crate) write: WriteHalf, + pub(crate) idle: IdleTracker, } #[derive(Display, Clone, Constructor)] @@ -53,11 +56,14 @@ impl Socket { (FramedRead::none(), FramedWrite::none(None)) }; + let (idle_tracker, idle_track) = IdleTracker::new(); + Self { info: raw.info, peer: Peer::new(handshake.node_no, handshake.launch_id), - read: ReadHalf::new(framed_read, raw.read), + read: ReadHalf::new(framed_read, raw.read, idle_track), write: WriteHalf::new(framed_write, raw.write), + idle: idle_tracker, } } } @@ -65,6 +71,7 @@ impl Socket { pub(crate) struct ReadHalf { framing: FramedRead, read: raw::OwnedReadHalf, + idle: IdleTrack, } #[derive(Debug)] @@ -83,8 +90,12 @@ where } impl ReadHalf { - fn new(framing: FramedRead, read: raw::OwnedReadHalf) -> Self { - Self { framing, read } + fn new(framing: FramedRead, read: raw::OwnedReadHalf, idle: IdleTrack) -> Self { + Self { + framing, + read, + idle, + } } fn report_framing_metrics(&mut self) { @@ -104,13 +115,15 @@ impl ReadHalf { let envelope = loop { let buffer = match self.framing.read()? { FramedReadState::NeedMoreData { buffer } => { - trace!(message = "framed read strategy requested more data"); + trace!("framed read strategy requested more data"); buffer } FramedReadState::EnvelopeSkipped(details) => { + self.idle.update(); return Err(ReadError::EnvelopeSkipped(details)); } FramedReadState::Done { decoded } => { + self.idle.update(); let (protocol, name) = decoded.payload.protocol_and_name(); trace!( message = "framed read strategy decoded single envelope", @@ -123,6 +136,12 @@ impl ReadHalf { }; let bytes_read = io::AsyncReadExt::read(&mut self.read, buffer).await?; + + // Large messages cannot be read in a single `read()` call, so we should + // additionally update the idle tracker even without waiting until the + // message is fully decoded to prevent false positive disconnects. + self.idle.update(); + if bytes_read == 0 { // EOF. return Ok(None); @@ -239,7 +258,7 @@ pub(crate) async fn listen( node_no: NodeNo, launch_id: NodeLaunchId, capabilities: Capabilities, -) -> Result> { +) -> Result> { let stream = timeout(LISTEN_TIMEOUT, raw::listen(addr)).await?; let stream = stream .map(move |mut raw_socket| async move { diff --git a/elfo-network/src/worker/mod.rs b/elfo-network/src/worker/mod.rs index bbba5274..55986fdb 100644 --- a/elfo-network/src/worker/mod.rs +++ b/elfo-network/src/worker/mod.rs @@ -164,28 +164,38 @@ impl Worker { }; self.ctx.attach(Stream::once(sr.exec())); + let mut idle = socket.idle; + // Start ping ticks. let ping_interval = self.ctx.attach(Interval::new(PingTick)); ping_interval.start_after(Duration::ZERO, self.ctx.config().ping_interval); while let Some(envelope) = self.ctx.recv().await { // TODO: graceful termination - // TODO: handle another `HandleConnection` msg!(match envelope { ConfigUpdated => { ping_interval.set_period(self.ctx.config().ping_interval); } PingTick => { + let idle_time = idle.check(); + + if idle_time >= self.ctx.config().idle_timeout { + error!( + message = "no data is received for a long time, closing", + idle_time = ?idle_time, + timeout = ?self.ctx.config().idle_timeout, + ); + break; + } + let envelope = make_system_envelope(internode::Ping { payload: Instant::now().nanos_since(time_origin), }); let _ = local_tx.try_send(KanalItem::simple(NetworkAddr::NULL, envelope)); - - // TODO: perform health check } msg @ HandleConnection => { - info!("duplicate connection, skipping"); // TODO + info!("duplicate connection, skipping"); // TODO: replace? if self.transport.is_none() { self.transport = msg.transport; }