diff --git a/CHANGELOG.md b/CHANGELOG.md index 8719d385..173a07da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate +### Changed +- deps: update `quanta` to v0.12. +- context: slightly improve performance of time measurements. +- utils: slightly improve performance of `RateLimiter`. ## [0.2.0-alpha.11] - 2023-11-10 ### Added diff --git a/elfo-core/Cargo.toml b/elfo-core/Cargo.toml index f5043cb1..ce3f6235 100644 --- a/elfo-core/Cargo.toml +++ b/elfo-core/Cargo.toml @@ -31,7 +31,6 @@ derive_more = "0.99.11" tracing = "0.1.25" tracing-subscriber = { version = "0.3.2", default-features = false, features = ["std", "smallvec"] } metrics = "0.17" -quanta = "0.11" futures = "0.3.12" static_assertions = "1.1.0" dashmap = "5" @@ -53,6 +52,8 @@ unicycle = "0.9.3" rmp-serde = { version = "1.1.0", optional = true } [dev-dependencies] +elfo-utils = { version = "0.2.3", path = "../elfo-utils", features = ["test-util"] } + anyhow = "1.0.40" tokio = { version = "1", features = ["full"] } proptest = "1.2.0" diff --git a/elfo-core/src/context/stats.rs b/elfo-core/src/context/stats.rs index d625984a..b50bcf28 100644 --- a/elfo-core/src/context/stats.rs +++ b/elfo-core/src/context/stats.rs @@ -1,6 +1,7 @@ use derive_more::Constructor; use metrics::{self, Key, Label}; -use quanta::Instant; + +use elfo_utils::time::Instant; use crate::{envelope::Envelope, message::Message}; @@ -11,7 +12,7 @@ pub(super) struct Stats { #[derive(Constructor)] struct InHandling { labels: &'static [Label], - start_time: quanta::Instant, + start_time: Instant, } static STARTUP_LABELS: &[Label] = &[Label::from_static_parts("message", "")]; @@ -39,8 +40,9 @@ impl Stats { let recorder = ward!(metrics::try_recorder()); let key = Key::from_static_name("elfo_message_waiting_time_seconds"); let now = Instant::now(); + // Now envelope cannot be forwarded, so use the created time as a start time. - let value = (now - envelope.created_time()).as_secs_f64(); + let value = now.secs_f64_since(envelope.created_time()); recorder.record_histogram(&key, value); self.in_handling = Some(InHandling::new(envelope.message().labels(), now)); @@ -62,7 +64,7 @@ impl Stats { let in_handling = ward!(self.in_handling.take()); let recorder = ward!(metrics::try_recorder()); let key = Key::from_static_parts("elfo_message_handling_time_seconds", in_handling.labels); - let value = (Instant::now() - in_handling.start_time).as_secs_f64(); + let value = Instant::now().secs_f64_since(in_handling.start_time); recorder.record_histogram(&key, value); } } diff --git a/elfo-core/src/envelope.rs b/elfo-core/src/envelope.rs index f0a2f4ee..a6ab5d22 100644 --- a/elfo-core/src/envelope.rs +++ b/elfo-core/src/envelope.rs @@ -1,4 +1,4 @@ -use quanta::Instant; +use elfo_utils::time::Instant; use crate::{ message::{AnyMessage, Message}, diff --git a/elfo-core/src/init.rs b/elfo-core/src/init.rs index 1dd80ae2..06d75ef6 100644 --- a/elfo-core/src/init.rs +++ b/elfo-core/src/init.rs @@ -1,13 +1,14 @@ use std::{future::Future, sync::Arc, time::Duration}; use futures::future::join_all; -use quanta::Instant; use tokio::{ pin, select, time::{sleep, timeout}, }; use tracing::{error, info, level_filters::LevelFilter, warn}; +use elfo_utils::time::Instant; + #[cfg(target_os = "linux")] use crate::{memory_tracker::MemoryTracker, time::Interval}; @@ -155,6 +156,9 @@ pub async fn do_start( is_check_only: bool, and_then: impl FnOnce(Context, Topology) -> F, ) -> Result { + // Perform the clock calibration if needed. + Instant::now(); + let group_no = GroupNo::new(SYSTEM_INIT_GROUP_NO, topology.launch_id()).unwrap(); let entry = topology.book.vacant_entry(group_no); let addr = entry.addr(); diff --git a/elfo-core/src/supervisor/backoff.rs b/elfo-core/src/supervisor/backoff.rs index 64fe7e5b..043ef33b 100644 --- a/elfo-core/src/supervisor/backoff.rs +++ b/elfo-core/src/supervisor/backoff.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use quanta::Instant; +use elfo_utils::time::Instant; const BACKOFF_STEP: Duration = Duration::from_secs(5); const MAX_BACKOFF: Duration = Duration::from_secs(30); @@ -25,10 +25,8 @@ impl Backoff { } pub(crate) fn next(&mut self) -> Duration { - let now = Instant::now(); - // If an actor is alive enough time, reset the backoff. - if self.start_time + BACKOFF_STEP <= now { + if self.start_time.elapsed() >= BACKOFF_STEP { self.next_backoff = Duration::ZERO; } @@ -40,43 +38,38 @@ impl Backoff { #[cfg(test)] mod tests { - use quanta::{Clock, Mock}; + use elfo_utils::time; use super::*; - fn with_time_mock(f: impl FnOnce(&Mock)) { - let (clock, mock) = Clock::mock(); - quanta::with_clock(&clock, || f(&mock)); - } - #[test] fn it_works() { - with_time_mock(|mock| { + time::with_instant_mock(|mock| { let mut backoff = Backoff::default(); // Immediately failed. assert_eq!(backoff.next(), BACKOFF_STEP); - mock.increment(BACKOFF_STEP); + mock.advance(BACKOFF_STEP); backoff.start(); // And again. assert_eq!(backoff.next(), 2 * BACKOFF_STEP); - mock.increment(2 * BACKOFF_STEP); + mock.advance(2 * BACKOFF_STEP); backoff.start(); // After some, not enough to reset the backoff, time. - mock.increment(BACKOFF_STEP * 2 / 3); + mock.advance(BACKOFF_STEP * 2 / 3); assert_eq!(backoff.next(), 3 * BACKOFF_STEP); - mock.increment(3 * BACKOFF_STEP); + mock.advance(3 * BACKOFF_STEP); backoff.start(); // After some, enough to reset the backoff, time. - mock.increment(BACKOFF_STEP); + mock.advance(BACKOFF_STEP); assert_eq!(backoff.next(), Duration::ZERO); // resetted backoff.start(); // After some, not enough to reset the backoff, time. - mock.increment(BACKOFF_STEP * 2 / 3); + mock.advance(BACKOFF_STEP * 2 / 3); assert_eq!(backoff.next(), BACKOFF_STEP); }); } diff --git a/elfo-core/src/supervisor/measure_poll.rs b/elfo-core/src/supervisor/measure_poll.rs index 7cfed6a0..2b2b4fa1 100644 --- a/elfo-core/src/supervisor/measure_poll.rs +++ b/elfo-core/src/supervisor/measure_poll.rs @@ -6,7 +6,8 @@ use std::{ use metrics::Key; use pin_project::pin_project; -use quanta::Instant; + +use elfo_utils::time::Instant; #[cfg(feature = "unstable-stuck-detection")] use crate::stuck_detection::StuckDetector; @@ -50,8 +51,8 @@ impl Future for MeasurePoll { let result = if let Some(recorder) = metrics::try_recorder() { let start_time = Instant::now(); let res = this.inner.poll(cx); - let elapsed = Instant::now().duration_since(start_time); - recorder.record_histogram(&BUSY_TIME_SECONDS, elapsed.as_secs_f64()); + let elapsed = Instant::now().secs_f64_since(start_time); + recorder.record_histogram(&BUSY_TIME_SECONDS, elapsed); crate::scope::with(|scope| { recorder.increment_counter(&ALLOCATED_BYTES, scope.take_allocated_bytes() as u64); recorder diff --git a/elfo-network/Cargo.toml b/elfo-network/Cargo.toml index 6616a424..d4fc8e4e 100644 --- a/elfo-network/Cargo.toml +++ b/elfo-network/Cargo.toml @@ -24,7 +24,6 @@ tokio-util = "0.7" tracing = "0.1.25" parking_lot = "0.12" derive_more = "0.99.11" -quanta = "0.11" humantime-serde = "1" kanal = "0.1.0-pre8" dashmap = "5" diff --git a/elfo-network/src/worker/mod.rs b/elfo-network/src/worker/mod.rs index 887fe7f3..449ef07b 100644 --- a/elfo-network/src/worker/mod.rs +++ b/elfo-network/src/worker/mod.rs @@ -3,7 +3,6 @@ use std::{sync::Arc, time::Duration}; use eyre::Result; use metrics::{decrement_gauge, increment_gauge}; use parking_lot::Mutex; -use quanta::Instant; use tracing::{debug, error, info, trace, warn}; use elfo_core::{ @@ -16,7 +15,7 @@ use elfo_core::{ time::Interval, Addr, Context, Envelope, ResponseToken, Topology, }; -use elfo_utils::{likely, unlikely}; +use elfo_utils::{likely, time::Instant, unlikely}; use self::{ flows_rx::RxFlows, @@ -178,7 +177,7 @@ impl Worker { } PingTick => { let envelope = make_system_envelope(internode::Ping { - payload: time_origin.elapsed().as_nanos() as u64, + payload: Instant::now().nanos_since(time_origin), }); let _ = local_tx.try_send(KanalItem::simple(NetworkAddr::NULL, envelope)); @@ -579,7 +578,7 @@ impl SocketReader { })); } msg @ internode::Pong => { - let time_ns = self.time_origin.elapsed().as_nanos() as u64 - msg.payload; + let time_ns = Instant::now().nanos_since(self.time_origin) - msg.payload; self.rtt.push(Duration::from_nanos(time_ns)); } _ => return false, diff --git a/elfo-telemeter/Cargo.toml b/elfo-telemeter/Cargo.toml index 9a7c46bf..9d3b6a9e 100644 --- a/elfo-telemeter/Cargo.toml +++ b/elfo-telemeter/Cargo.toml @@ -24,7 +24,6 @@ metrics-util = "0.10" tracing = "0.1.25" parking_lot = "0.12" fxhash = "0.2.1" -quanta = "0.11" humantime-serde = "1" cow-utils = "0.1.2" stability = "0.1.1" diff --git a/elfo-utils/Cargo.toml b/elfo-utils/Cargo.toml index 8cf331cc..a7de1d66 100644 --- a/elfo-utils/Cargo.toml +++ b/elfo-utils/Cargo.toml @@ -14,9 +14,12 @@ readme.workspace = true name = "rate_limiter" harness = false +[features] +test-util = [] + [dependencies] derive_more = "0.99.11" -quanta = "0.11" +quanta = "0.12" [dev-dependencies] criterion = "0.4" diff --git a/elfo-utils/src/lib.rs b/elfo-utils/src/lib.rs index f2d76172..55db9375 100644 --- a/elfo-utils/src/lib.rs +++ b/elfo-utils/src/lib.rs @@ -9,6 +9,7 @@ pub use self::{ mod likely; mod rate_limiter; +pub mod time; #[derive(Debug, Clone, Copy, Default, Hash, PartialEq, Eq, Deref)] // Spatial prefetcher is now pulling two lines at a time, so we use `align(128)`. diff --git a/elfo-utils/src/rate_limiter.rs b/elfo-utils/src/rate_limiter.rs index 7fccc45c..8c7c5afe 100644 --- a/elfo-utils/src/rate_limiter.rs +++ b/elfo-utils/src/rate_limiter.rs @@ -3,11 +3,10 @@ use std::{ time::Duration, }; -use quanta::Instant; +use crate::time; /// A rate limiter implementing [GCRA](https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm). pub struct RateLimiter { - start_time: Instant, step: AtomicU64, period: AtomicU64, vtime: AtomicU64, @@ -49,7 +48,6 @@ impl RateLimiter { let (step, period) = limit.step_and_period(); Self { - start_time: Instant::now(), step: AtomicU64::new(step), period: AtomicU64::new(period), vtime: AtomicU64::new(0), @@ -60,7 +58,6 @@ impl RateLimiter { pub fn configure(&self, limit: RateLimit) { let (step, period) = limit.step_and_period(); - // FIXME: order matters. self.step.store(step, Relaxed); self.period.store(period, Relaxed); } @@ -85,13 +82,14 @@ impl RateLimiter { } let period = self.period.load(Relaxed); - let now = (Instant::now() - self.start_time).as_nanos() as u64; + let now = time::nanos_since_unknown_epoch(); + let deadline = now + period; // GCRA logic. self.vtime // It seems to be enough to use `Relaxed` here. .fetch_update(Relaxed, Relaxed, |vtime| { - if vtime < now + period { + if vtime < deadline { Some(vtime.max(now) + step) } else { None @@ -117,13 +115,10 @@ fn calculate_step(max_rate: u64, period: u64) -> u64 { #[cfg(test)] mod tests { - use quanta::{Clock, Mock}; - use super::*; - fn with_time_mock(f: impl FnOnce(&Mock)) { - let (clock, mock) = Clock::mock(); - quanta::with_clock(&clock, || f(&mock)); + fn ns(ns: u64) -> Duration { + Duration::from_nanos(ns) } #[test] @@ -141,18 +136,18 @@ mod tests { #[test] fn forbidding() { - with_time_mock(|mock| { + time::with_instant_mock(|mock| { let limiter = RateLimiter::new(RateLimit::Rps(0)); for _ in 0..=5 { assert!(!limiter.acquire()); - mock.increment(SEC); + mock.advance(ns(SEC)); } }); } #[test] fn unlimited() { - with_time_mock(|_mock| { + time::with_instant_mock(|_mock| { let limiter = RateLimiter::new(RateLimit::Unlimited); let limiter2 = RateLimiter::new(RateLimit::Rps(1_000_000_000)); let limiter3 = RateLimiter::new(RateLimit::Custom(2_000, Duration::from_micros(2))); @@ -167,7 +162,7 @@ mod tests { #[test] fn limited() { for limit in [1, 2, 3, 4, 5, 17, 100, 1_000, 1_013] { - with_time_mock(|mock| { + time::with_instant_mock(|mock| { let limiter = RateLimiter::new(RateLimit::Rps(limit)); for _ in 0..=5 { @@ -175,7 +170,7 @@ mod tests { assert!(limiter.acquire()); } assert!(!limiter.acquire()); - mock.increment(SEC); + mock.advance(ns(SEC)); } }); } @@ -184,7 +179,7 @@ mod tests { #[test] fn keeps_rate() { for limit in [1, 5, 25, 50] { - with_time_mock(|mock| { + time::with_instant_mock(|mock| { let limiter = RateLimiter::new(RateLimit::Rps(limit)); // Skip the first second. @@ -197,7 +192,7 @@ mod tests { let mut counter = 0; for _ in 0..(10 * parts) { - mock.increment(SEC / parts); + mock.advance(ns(SEC / parts)); while limiter.acquire() { counter += 1; } @@ -210,7 +205,7 @@ mod tests { #[test] fn reset() { - with_time_mock(|mock| { + time::with_instant_mock(|mock| { let limit = 10; let limiter = RateLimiter::new(RateLimit::Rps(limit)); @@ -223,7 +218,7 @@ mod tests { assert!(limiter.acquire()); } assert!(!limiter.acquire()); - mock.increment(SEC); + mock.advance(ns(SEC)); } }); } diff --git a/elfo-utils/src/time.rs b/elfo-utils/src/time.rs new file mode 100644 index 00000000..6598d018 --- /dev/null +++ b/elfo-utils/src/time.rs @@ -0,0 +1,99 @@ +//! Provides the [`Instant`] type. +//! +//! The main purpose of this module is to abstract over quanta/minstant/etc. + +use std::time::Duration; + +use quanta::Clock; + +/// A measurement of a monotonically nondecreasing clock. +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct Instant(u64); + +impl Instant { + /// Returns the current time. + #[inline] + pub fn now() -> Self { + Self(with_clock(|c| c.raw())) + } + + /// Returns the amount of time elapsed since this instant. + /// + /// Prefer `secs_f64_since()` if used for metrics. + pub fn elapsed(&self) -> Duration { + Self::now().duration_since(*self) + } + + // Returns the amount of time elapsed from another instant to this one. + // + // This method saturates to zero. + #[inline] + pub fn duration_since(&self, earlier: Self) -> Duration { + with_clock(|c| c.delta(earlier.0, self.0)) + } + + // Returns the number of seconds elapsed from another instant to this one. + // + // This method saturates to zero. + #[inline] + pub fn secs_f64_since(&self, earlier: Self) -> f64 { + self.nanos_since(earlier) as f64 * 1e-9 + } + + // Returns the number of nanoseconds elapsed from another instant to this one. + // + // This method saturates to zero. + #[inline] + pub fn nanos_since(&self, earlier: Self) -> u64 { + with_clock(|c| c.delta_as_nanos(earlier.0, self.0)) + } +} + +pub(crate) fn nanos_since_unknown_epoch() -> u64 { + with_clock(|c| c.delta_as_nanos(0, c.raw())) +} + +fn with_clock(f: impl FnOnce(&Clock) -> R) -> R { + use std::sync::OnceLock; + + static CLOCK: OnceLock = OnceLock::new(); + + #[cfg(any(test, feature = "test-util"))] + return mock::CLOCK.with(|c| match c.borrow().as_ref() { + Some(c) => f(c), + None => f(CLOCK.get_or_init(Clock::new)), + }); + + #[cfg(not(any(test, feature = "test-util")))] + f(CLOCK.get_or_init(Clock::new)) +} + +#[cfg(any(test, feature = "test-util"))] +pub use mock::*; + +#[cfg(any(test, feature = "test-util"))] +mod mock { + use super::*; + + thread_local! { + pub(super) static CLOCK: std::cell::RefCell> = std::cell::RefCell::new(None); + } + + /// Mocks `Instant`, see [`InstantMock`]. + pub fn with_instant_mock(f: impl FnOnce(InstantMock)) { + let (clock, mock) = Clock::mock(); + let mock = InstantMock(mock); + CLOCK.with(|c| *c.borrow_mut() = Some(clock)); + f(mock); + } + + /// Controllable time source for use in tests. + pub struct InstantMock(std::sync::Arc); + + impl InstantMock { + /// Increase the time by the given duration. + pub fn advance(&self, duration: Duration) { + self.0.increment(duration); + } + } +}