Skip to content

Commit

Permalink
perf(telemeter): new sharded-by-threads storage
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Apr 7, 2024
1 parent c700653 commit 8d63b2d
Show file tree
Hide file tree
Showing 13 changed files with 701 additions and 256 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- proxy: add `Proxy::try_send_to()` and `Proxy::request_to()`.
- telemeter: support gzip.

### Changed
- telemeter: a new sharded-by-threads storage, it increases perf and dramatically reduces contention.

### Fixed
- telemetry: now `elfo_message_handling_time_seconds` doesn't include the time of task switching if an actor is preempted due to elfo's budget system.

Expand Down
4 changes: 4 additions & 0 deletions elfo-telemeter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ unstable = []

[dependencies]
elfo-core = { version = "0.2.0-alpha.14", path = "../elfo-core", features = ["unstable"] } # TODO: do not need
elfo-utils = { version = "0.2.5", path = "../elfo-utils" }

tokio = "1"
hyper = { version = "1.0.1", features = ["server", "http1"] }
Expand All @@ -27,6 +28,8 @@ http-body-util = "0.1"
serde = { version = "1.0.120", features = ["derive"] }
metrics = "0.17"
metrics-util = "0.10"
seqlock = "0.2"
thread_local = "1.1.8"
tracing = "0.1.25"
parking_lot = "0.12"
fxhash = "0.2.1"
Expand All @@ -39,5 +42,6 @@ flate2 = "1"
elfo-configurer = { path = "../elfo-configurer" }

criterion = "0.5.1"
proptest = "1.4"
tokio = { version = "1.36.0", features = ["rt-multi-thread"] }
toml = "0.7"
8 changes: 4 additions & 4 deletions elfo-telemeter/benches/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use criterion::{
criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, BenchmarkId, Criterion,
};
use metrics::{counter, gauge, histogram};
use metrics::{counter, histogram, increment_gauge};
use tokio::{
runtime::Builder,
sync::{mpsc, oneshot},
Expand Down Expand Up @@ -165,12 +165,12 @@ fn all_cases(c: &mut Criterion) {
let mut group = c.benchmark_group("telemetry");

for contention in 1..=max_parallelism() {
case(&mut group, &tx, "gauge", contention, |v| {
gauge!("prefix_some_more_realistic_name", v)
});
case(&mut group, &tx, "counter", contention, |v| {
counter!("prefix_some_more_realistic_name", v as u64)
});
case(&mut group, &tx, "gauge", contention, |v| {
increment_gauge!("prefix_some_more_realistic_name", v)
});
case(&mut group, &tx, "histogram", contention, |v| {
histogram!("prefix_some_more_realistic_name", v)
});
Expand Down
17 changes: 6 additions & 11 deletions elfo-telemeter/src/actor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{sync::Arc, time::Duration};

use metrics::gauge;
use tracing::{error, info};

use elfo_core::{
Expand Down Expand Up @@ -84,14 +83,14 @@ impl Telemeter {
// Rendering includes compaction, skip extra compaction tick.
self.interval.start(self.ctx.config().compaction_interval);

self.fill_snapshot(/* only_histograms = */ false);
self.update_snapshot(/* only_compact = */ false);
self.ctx.respond(token, self.snapshot.clone().into());
}
(Render, token) => {
// Rendering includes compaction, skip extra compaction tick.
self.interval.start(self.ctx.config().compaction_interval);

self.fill_snapshot(/* only_histograms = */ false);
self.update_snapshot(/* only_compact = */ false);
let descriptions = self.storage.descriptions();
let output = self.renderer.render(&self.snapshot, &descriptions);
drop(descriptions);
Expand All @@ -103,7 +102,7 @@ impl Telemeter {
}
}
CompactionTick => {
self.fill_snapshot(/* only_histograms = */ true);
self.update_snapshot(/* only_compact = */ true);
}
ServerFailed(err) => {
error!(error = %err, "server failed");
Expand All @@ -113,20 +112,16 @@ impl Telemeter {
}
}

fn fill_snapshot(&mut self, only_histograms: bool) {
fn update_snapshot(&mut self, only_compact: bool) {
// Reuse the latest snapshot if possible.
let snapshot = Arc::make_mut(&mut self.snapshot);
let size = self.storage.fill_snapshot(snapshot, only_histograms);

if !only_histograms {
gauge!("elfo_metrics_usage_bytes", size as f64);
}
self.storage.merge(snapshot, only_compact);
}

fn reset_distributions(&mut self) {
// Reuse the latest snapshot if possible.
let snapshot = Arc::make_mut(&mut self.snapshot);
snapshot.distributions_mut().for_each(|d| d.reset());
snapshot.histograms_mut().for_each(|d| d.reset());
}

fn start_server(&mut self) {
Expand Down
5 changes: 3 additions & 2 deletions elfo-telemeter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod protocol;
mod actor;
mod config;
mod hyper;
mod metrics;
mod recorder;
mod render;
mod storage;
Expand All @@ -38,11 +39,11 @@ pub use allocator::AllocatorStats;

/// Installs a global metric recorder and returns a group to handle metrics.
pub fn init() -> Blueprint {
let storage = Arc::new(Storage::new());
let storage = Arc::new(Storage::default());
let recorder = Recorder::new(storage.clone());
let blueprint = actor::new(storage);

if let Err(err) = metrics::set_boxed_recorder(Box::new(recorder)) {
if let Err(err) = ::metrics::set_boxed_recorder(Box::new(recorder)) {
error!(error = %err, "failed to set a metric recorder");
}

Expand Down
21 changes: 21 additions & 0 deletions elfo-telemeter/src/metrics/counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use super::MetricKind;

pub(crate) struct Counter(u64);

impl MetricKind for Counter {
type Output = u64;
type Shared = ();
type Value = u64;

fn new(_: Self::Shared) -> Self {
Self(0)
}

fn update(&mut self, value: Self::Value) {
self.0 += value;
}

fn merge(self, out: &mut Self::Output) {
*out += self.0;
}
}
189 changes: 189 additions & 0 deletions elfo-telemeter/src/metrics/gauge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use std::sync::Arc;

use metrics::GaugeValue;
use seqlock::SeqLock;

use super::MetricKind;
use crate::protocol::GaugeEpoch;

// Sharded gauges are tricky to implement.
// Concurrent actors can update the same gauge in parallel either by using
// deltas (increment/decrement) or by setting an absolute value.
// In case of deltas, it should behaves like a counter.
//
// The main idea is to share the last absolute value with its epoch between
// all shards of the same gauge. Everytime the gauge is updated by setting
// an absolute value, the epoch is incremented, so all other shards can
// reset their cumulative deltas.
pub(crate) struct Gauge {
origin: Arc<GaugeOrigin>,
epoch: GaugeEpoch,
delta: f64,
}

impl MetricKind for Gauge {
type Output = (f64, GaugeEpoch);
type Shared = Arc<GaugeOrigin>;
type Value = GaugeValue;

fn new(origin: Self::Shared) -> Self {
Self {
origin: origin.clone(),
epoch: 0,
delta: 0.0,
}
}

fn update(&mut self, value: Self::Value) {
let delta = match value {
GaugeValue::Absolute(value) => {
// This is the only place where the contention is possible.
// However, it's rare, because the absolute value is usually
// set only by one thread at a time.
self.epoch = self.origin.set(value);
self.delta = 0.0;
return;
}
GaugeValue::Increment(delta) => delta,
GaugeValue::Decrement(delta) => -delta,
};

let current_epoch = self.origin.get().1;

if self.epoch == current_epoch {
// The new epoch is the same as the current one, just accumulate the delta.
self.delta += delta;
} else {
// The new epoch is already set by another shard, reset the delta.
self.epoch = current_epoch;
self.delta = delta;
}
}

// NOTE: Shards are merged one by one without blocking the whole storage,
// thus while executing this method for one specific shard, other shards
// are still available for updates, the same as the shared state (origin).
//
// However, all shards are merged consecutively.
fn merge(self, (out_value, out_epoch): &mut Self::Output) {
let (last_absolute, current_epoch) = self.origin.get();

// The epoch is always monotonically increasing.
debug_assert!(current_epoch >= *out_epoch);
debug_assert!(current_epoch >= self.epoch);

if current_epoch > *out_epoch {
*out_value = last_absolute;
*out_epoch = current_epoch;
}

if current_epoch == self.epoch {
*out_value += self.delta;
}
}
}

#[derive(Default)]
pub(crate) struct GaugeOrigin(SeqLock<(f64, GaugeEpoch)>);

impl GaugeOrigin {
fn get(&self) -> (f64, GaugeEpoch) {
self.0.read()
}

fn set(&self, value: f64) -> GaugeEpoch {
let mut pair = self.0.lock_write();
let new_epoch = pair.1 + 1;
*pair = (value, new_epoch);
new_epoch
}
}

#[cfg(test)]
mod tests {
use std::{collections::VecDeque, ops::Range};

use proptest::prelude::*;

use super::*;

const ACTIONS: Range<usize> = 1..1000;
const SHARDS: usize = 3;

#[derive(Debug, Clone)]
enum Action {
Update(Update),
// How many shards to merge (max `SHARDS`).
// It emulates the real-world scenario when shards are merged one by one,
// see `Gauge::merge` for details about the non-blocking merge.
Merge(usize),
}

#[derive(Debug, Clone)]
struct Update {
shard: usize, // max `SHARDS - 1`
value: GaugeValue,
}

fn action_strategy() -> impl Strategy<Value = Action> {
// Use integers here to avoid floating point errors.
prop_oneof![
1 => (1..=SHARDS).prop_map(Action::Merge),
10 => update_strategy().prop_map(Action::Update),
]
}

prop_compose! {
fn update_strategy()(shard in 0..SHARDS, value in gauge_value_strategy()) -> Update {
Update { shard, value }
}
}

fn gauge_value_strategy() -> impl Strategy<Value = GaugeValue> {
// Use integers here to avoid floating point errors.
prop_oneof![
1 => (0..10).prop_map(|v| GaugeValue::Absolute(v as f64)),
5 => (1..10).prop_map(|v| GaugeValue::Increment(v as f64)),
5 => (1..10).prop_map(|v| GaugeValue::Decrement(v as f64)),
]
}

proptest! {
#[test]
fn linearizability(actions in prop::collection::vec(action_strategy(), ACTIONS)) {
let origin = Arc::new(GaugeOrigin::default());

let mut shards = (0..SHARDS).map(|_| Gauge::new(origin.clone())).collect::<VecDeque<_>>();
let mut expected = 0.0;
let mut actual = (0.0, 0);

for action in actions {
match action {
Action::Update(update) => {
expected = update.value.update_value(expected);
shards[update.shard].update(update.value);
}
Action::Merge(limit) => {
for _ in 0..limit {
let shard = shards.pop_front().unwrap();
shard.merge(&mut actual);
shards.push_back(Gauge::new(origin.clone()));
assert_eq!(shards.len(), SHARDS);
}

// Check eventually consistency.
if limit == SHARDS {
prop_assert_eq!(actual.0, expected);
}
}
}
}

// Check eventually consistency.
for shard in shards {
shard.merge(&mut actual);
}
prop_assert_eq!(actual.0, expected);
}
}
}
Loading

0 comments on commit 8d63b2d

Please sign in to comment.