Skip to content

Commit

Permalink
feat: update quanta and abstract in utils::time::Instant
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Dec 13, 2023
1 parent 832abe2 commit a14bb91
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 54 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
<!-- next-header -->

## [Unreleased] - ReleaseDate
### Changed
- deps: update `quanta` to v0.12.
- context: slightly improve performance of time measurements.
- utils: slightly improve performance of `RateLimiter`.

## [0.2.0-alpha.11] - 2023-11-10
### Added
Expand Down
3 changes: 2 additions & 1 deletion elfo-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ derive_more = "0.99.11"
tracing = "0.1.25"
tracing-subscriber = { version = "0.3.2", default-features = false, features = ["std", "smallvec"] }
metrics = "0.17"
quanta = "0.11"
futures = "0.3.12"
static_assertions = "1.1.0"
dashmap = "5"
Expand All @@ -53,6 +52,8 @@ unicycle = "0.9.3"
rmp-serde = { version = "1.1.0", optional = true }

[dev-dependencies]
elfo-utils = { version = "0.2.3", path = "../elfo-utils", features = ["test-util"] }

anyhow = "1.0.40"
tokio = { version = "1", features = ["full"] }
proptest = "1.2.0"
Expand Down
10 changes: 6 additions & 4 deletions elfo-core/src/context/stats.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use derive_more::Constructor;
use metrics::{self, Key, Label};
use quanta::Instant;

use elfo_utils::time::Instant;

use crate::{envelope::Envelope, message::Message};

Expand All @@ -11,7 +12,7 @@ pub(super) struct Stats {
#[derive(Constructor)]
struct InHandling {
labels: &'static [Label],
start_time: quanta::Instant,
start_time: Instant,
}

static STARTUP_LABELS: &[Label] = &[Label::from_static_parts("message", "<Startup>")];
Expand Down Expand Up @@ -39,8 +40,9 @@ impl Stats {
let recorder = ward!(metrics::try_recorder());
let key = Key::from_static_name("elfo_message_waiting_time_seconds");
let now = Instant::now();

// Now envelope cannot be forwarded, so use the created time as a start time.
let value = (now - envelope.created_time()).as_secs_f64();
let value = now.secs_f64_since(envelope.created_time());
recorder.record_histogram(&key, value);

self.in_handling = Some(InHandling::new(envelope.message().labels(), now));
Expand All @@ -62,7 +64,7 @@ impl Stats {
let in_handling = ward!(self.in_handling.take());
let recorder = ward!(metrics::try_recorder());
let key = Key::from_static_parts("elfo_message_handling_time_seconds", in_handling.labels);
let value = (Instant::now() - in_handling.start_time).as_secs_f64();
let value = Instant::now().secs_f64_since(in_handling.start_time);
recorder.record_histogram(&key, value);
}
}
Expand Down
2 changes: 1 addition & 1 deletion elfo-core/src/envelope.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use quanta::Instant;
use elfo_utils::time::Instant;

use crate::{
message::{AnyMessage, Message},
Expand Down
6 changes: 5 additions & 1 deletion elfo-core/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::{future::Future, sync::Arc, time::Duration};

use futures::future::join_all;
use quanta::Instant;
use tokio::{
pin, select,
time::{sleep, timeout},
};
use tracing::{error, info, level_filters::LevelFilter, warn};

use elfo_utils::time::Instant;

#[cfg(target_os = "linux")]
use crate::{memory_tracker::MemoryTracker, time::Interval};

Expand Down Expand Up @@ -155,6 +156,9 @@ pub async fn do_start<F: Future>(
is_check_only: bool,
and_then: impl FnOnce(Context, Topology) -> F,
) -> Result<F::Output> {
// Perform the clock calibration if needed.
Instant::now();

let group_no = GroupNo::new(SYSTEM_INIT_GROUP_NO, topology.launch_id()).unwrap();
let entry = topology.book.vacant_entry(group_no);
let addr = entry.addr();
Expand Down
27 changes: 10 additions & 17 deletions elfo-core/src/supervisor/backoff.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Duration;

use quanta::Instant;
use elfo_utils::time::Instant;

const BACKOFF_STEP: Duration = Duration::from_secs(5);
const MAX_BACKOFF: Duration = Duration::from_secs(30);
Expand All @@ -25,10 +25,8 @@ impl Backoff {
}

pub(crate) fn next(&mut self) -> Duration {
let now = Instant::now();

// If an actor is alive enough time, reset the backoff.
if self.start_time + BACKOFF_STEP <= now {
if self.start_time.elapsed() >= BACKOFF_STEP {
self.next_backoff = Duration::ZERO;
}

Expand All @@ -40,43 +38,38 @@ impl Backoff {

#[cfg(test)]
mod tests {
use quanta::{Clock, Mock};
use elfo_utils::time;

use super::*;

fn with_time_mock(f: impl FnOnce(&Mock)) {
let (clock, mock) = Clock::mock();
quanta::with_clock(&clock, || f(&mock));
}

#[test]
fn it_works() {
with_time_mock(|mock| {
time::with_instant_mock(|mock| {
let mut backoff = Backoff::default();

// Immediately failed.
assert_eq!(backoff.next(), BACKOFF_STEP);
mock.increment(BACKOFF_STEP);
mock.advance(BACKOFF_STEP);
backoff.start();

// And again.
assert_eq!(backoff.next(), 2 * BACKOFF_STEP);
mock.increment(2 * BACKOFF_STEP);
mock.advance(2 * BACKOFF_STEP);
backoff.start();

// After some, not enough to reset the backoff, time.
mock.increment(BACKOFF_STEP * 2 / 3);
mock.advance(BACKOFF_STEP * 2 / 3);
assert_eq!(backoff.next(), 3 * BACKOFF_STEP);
mock.increment(3 * BACKOFF_STEP);
mock.advance(3 * BACKOFF_STEP);
backoff.start();

// After some, enough to reset the backoff, time.
mock.increment(BACKOFF_STEP);
mock.advance(BACKOFF_STEP);
assert_eq!(backoff.next(), Duration::ZERO); // resetted
backoff.start();

// After some, not enough to reset the backoff, time.
mock.increment(BACKOFF_STEP * 2 / 3);
mock.advance(BACKOFF_STEP * 2 / 3);
assert_eq!(backoff.next(), BACKOFF_STEP);
});
}
Expand Down
7 changes: 4 additions & 3 deletions elfo-core/src/supervisor/measure_poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::{

use metrics::Key;
use pin_project::pin_project;
use quanta::Instant;

use elfo_utils::time::Instant;

#[cfg(feature = "unstable-stuck-detection")]
use crate::stuck_detection::StuckDetector;
Expand Down Expand Up @@ -50,8 +51,8 @@ impl<F: Future> Future for MeasurePoll<F> {
let result = if let Some(recorder) = metrics::try_recorder() {
let start_time = Instant::now();
let res = this.inner.poll(cx);
let elapsed = Instant::now().duration_since(start_time);
recorder.record_histogram(&BUSY_TIME_SECONDS, elapsed.as_secs_f64());
let elapsed = Instant::now().secs_f64_since(start_time);
recorder.record_histogram(&BUSY_TIME_SECONDS, elapsed);
crate::scope::with(|scope| {
recorder.increment_counter(&ALLOCATED_BYTES, scope.take_allocated_bytes() as u64);
recorder
Expand Down
1 change: 0 additions & 1 deletion elfo-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ tokio-util = "0.7"
tracing = "0.1.25"
parking_lot = "0.12"
derive_more = "0.99.11"
quanta = "0.11"
humantime-serde = "1"
kanal = "0.1.0-pre8"
dashmap = "5"
Expand Down
7 changes: 3 additions & 4 deletions elfo-network/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{sync::Arc, time::Duration};
use eyre::Result;
use metrics::{decrement_gauge, increment_gauge};
use parking_lot::Mutex;
use quanta::Instant;
use tracing::{debug, error, info, trace, warn};

use elfo_core::{
Expand All @@ -16,7 +15,7 @@ use elfo_core::{
time::Interval,
Addr, Context, Envelope, ResponseToken, Topology,
};
use elfo_utils::{likely, unlikely};
use elfo_utils::{likely, time::Instant, unlikely};

use self::{
flows_rx::RxFlows,
Expand Down Expand Up @@ -178,7 +177,7 @@ impl Worker {
}
PingTick => {
let envelope = make_system_envelope(internode::Ping {
payload: time_origin.elapsed().as_nanos() as u64,
payload: Instant::now().nanos_since(time_origin),
});
let _ = local_tx.try_send(KanalItem::simple(NetworkAddr::NULL, envelope));

Expand Down Expand Up @@ -579,7 +578,7 @@ impl SocketReader {
}));
}
msg @ internode::Pong => {
let time_ns = self.time_origin.elapsed().as_nanos() as u64 - msg.payload;
let time_ns = Instant::now().nanos_since(self.time_origin) - msg.payload;
self.rtt.push(Duration::from_nanos(time_ns));
}
_ => return false,
Expand Down
1 change: 0 additions & 1 deletion elfo-telemeter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ metrics-util = "0.10"
tracing = "0.1.25"
parking_lot = "0.12"
fxhash = "0.2.1"
quanta = "0.11"
humantime-serde = "1"
cow-utils = "0.1.2"
stability = "0.1.1"
5 changes: 4 additions & 1 deletion elfo-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ readme.workspace = true
name = "rate_limiter"
harness = false

[features]
test-util = []

[dependencies]
derive_more = "0.99.11"
quanta = "0.11"
quanta = "0.12"

[dev-dependencies]
criterion = "0.4"
1 change: 1 addition & 0 deletions elfo-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub use self::{

mod likely;
mod rate_limiter;
pub mod time;

#[derive(Debug, Clone, Copy, Default, Hash, PartialEq, Eq, Deref)]
// Spatial prefetcher is now pulling two lines at a time, so we use `align(128)`.
Expand Down
Loading

0 comments on commit a14bb91

Please sign in to comment.