diff --git a/CHANGELOG.md b/CHANGELOG.md index a769d49d..35285c3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - proxy: add `Proxy::try_send_to()` and `Proxy::request_to()`. - telemeter: support gzip. +### Changed +- telemeter: a new sharded-by-threads storage, it increases perf and dramatically reduces contention. + ### Fixed - telemetry: now `elfo_message_handling_time_seconds` doesn't include the time of task switching if an actor is preempted due to elfo's budget system. diff --git a/elfo-telemeter/Cargo.toml b/elfo-telemeter/Cargo.toml index 51e70207..79ef122b 100644 --- a/elfo-telemeter/Cargo.toml +++ b/elfo-telemeter/Cargo.toml @@ -19,6 +19,7 @@ unstable = [] [dependencies] elfo-core = { version = "0.2.0-alpha.14", path = "../elfo-core", features = ["unstable"] } # TODO: do not need +elfo-utils = { version = "0.2.5", path = "../elfo-utils" } tokio = "1" hyper = { version = "1.0.1", features = ["server", "http1"] } @@ -27,6 +28,8 @@ http-body-util = "0.1" serde = { version = "1.0.120", features = ["derive"] } metrics = "0.17" metrics-util = "0.10" +seqlock = "0.2" +thread_local = "1.1.8" tracing = "0.1.25" parking_lot = "0.12" fxhash = "0.2.1" @@ -39,5 +42,6 @@ flate2 = "1" elfo-configurer = { path = "../elfo-configurer" } criterion = "0.5.1" +proptest = "1.4" tokio = { version = "1.36.0", features = ["rt-multi-thread"] } toml = "0.7" diff --git a/elfo-telemeter/benches/telemetry.rs b/elfo-telemeter/benches/telemetry.rs index 04f392dd..bcb0e95c 100644 --- a/elfo-telemeter/benches/telemetry.rs +++ b/elfo-telemeter/benches/telemetry.rs @@ -9,7 +9,7 @@ use std::{ use criterion::{ criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, BenchmarkId, Criterion, }; -use metrics::{counter, gauge, histogram}; +use metrics::{counter, histogram, increment_gauge}; use tokio::{ runtime::Builder, sync::{mpsc, oneshot}, @@ -165,12 +165,12 @@ fn all_cases(c: &mut Criterion) { let mut group = c.benchmark_group("telemetry"); for contention in 1..=max_parallelism() { - case(&mut group, &tx, "gauge", contention, |v| { - gauge!("prefix_some_more_realistic_name", v) - }); case(&mut group, &tx, "counter", contention, |v| { counter!("prefix_some_more_realistic_name", v as u64) }); + case(&mut group, &tx, "gauge", contention, |v| { + increment_gauge!("prefix_some_more_realistic_name", v) + }); case(&mut group, &tx, "histogram", contention, |v| { histogram!("prefix_some_more_realistic_name", v) }); diff --git a/elfo-telemeter/src/actor.rs b/elfo-telemeter/src/actor.rs index 315d2851..c6d84d18 100644 --- a/elfo-telemeter/src/actor.rs +++ b/elfo-telemeter/src/actor.rs @@ -1,6 +1,5 @@ use std::{sync::Arc, time::Duration}; -use metrics::gauge; use tracing::{error, info}; use elfo_core::{ @@ -84,14 +83,14 @@ impl Telemeter { // Rendering includes compaction, skip extra compaction tick. self.interval.start(self.ctx.config().compaction_interval); - self.fill_snapshot(/* only_histograms = */ false); + self.update_snapshot(/* only_compact = */ false); self.ctx.respond(token, self.snapshot.clone().into()); } (Render, token) => { // Rendering includes compaction, skip extra compaction tick. self.interval.start(self.ctx.config().compaction_interval); - self.fill_snapshot(/* only_histograms = */ false); + self.update_snapshot(/* only_compact = */ false); let descriptions = self.storage.descriptions(); let output = self.renderer.render(&self.snapshot, &descriptions); drop(descriptions); @@ -103,7 +102,7 @@ impl Telemeter { } } CompactionTick => { - self.fill_snapshot(/* only_histograms = */ true); + self.update_snapshot(/* only_compact = */ true); } ServerFailed(err) => { error!(error = %err, "server failed"); @@ -113,20 +112,16 @@ impl Telemeter { } } - fn fill_snapshot(&mut self, only_histograms: bool) { + fn update_snapshot(&mut self, only_compact: bool) { // Reuse the latest snapshot if possible. let snapshot = Arc::make_mut(&mut self.snapshot); - let size = self.storage.fill_snapshot(snapshot, only_histograms); - - if !only_histograms { - gauge!("elfo_metrics_usage_bytes", size as f64); - } + self.storage.merge(snapshot, only_compact); } fn reset_distributions(&mut self) { // Reuse the latest snapshot if possible. let snapshot = Arc::make_mut(&mut self.snapshot); - snapshot.distributions_mut().for_each(|d| d.reset()); + snapshot.histograms_mut().for_each(|d| d.reset()); } fn start_server(&mut self) { diff --git a/elfo-telemeter/src/lib.rs b/elfo-telemeter/src/lib.rs index aaeabcb7..802dafa5 100644 --- a/elfo-telemeter/src/lib.rs +++ b/elfo-telemeter/src/lib.rs @@ -26,6 +26,7 @@ pub mod protocol; mod actor; mod config; mod hyper; +mod metrics; mod recorder; mod render; mod storage; @@ -38,11 +39,11 @@ pub use allocator::AllocatorStats; /// Installs a global metric recorder and returns a group to handle metrics. pub fn init() -> Blueprint { - let storage = Arc::new(Storage::new()); + let storage = Arc::new(Storage::default()); let recorder = Recorder::new(storage.clone()); let blueprint = actor::new(storage); - if let Err(err) = metrics::set_boxed_recorder(Box::new(recorder)) { + if let Err(err) = ::metrics::set_boxed_recorder(Box::new(recorder)) { error!(error = %err, "failed to set a metric recorder"); } diff --git a/elfo-telemeter/src/metrics/counter.rs b/elfo-telemeter/src/metrics/counter.rs new file mode 100644 index 00000000..ff80b95e --- /dev/null +++ b/elfo-telemeter/src/metrics/counter.rs @@ -0,0 +1,21 @@ +use super::MetricKind; + +pub(crate) struct Counter(u64); + +impl MetricKind for Counter { + type Output = u64; + type Shared = (); + type Value = u64; + + fn new(_: Self::Shared) -> Self { + Self(0) + } + + fn update(&mut self, value: Self::Value) { + self.0 += value; + } + + fn merge(self, out: &mut Self::Output) { + *out += self.0; + } +} diff --git a/elfo-telemeter/src/metrics/gauge.rs b/elfo-telemeter/src/metrics/gauge.rs new file mode 100644 index 00000000..84a77810 --- /dev/null +++ b/elfo-telemeter/src/metrics/gauge.rs @@ -0,0 +1,189 @@ +use std::sync::Arc; + +use metrics::GaugeValue; +use seqlock::SeqLock; + +use super::MetricKind; +use crate::protocol::GaugeEpoch; + +// Sharded gauges are tricky to implement. +// Concurrent actors can update the same gauge in parallel either by using +// deltas (increment/decrement) or by setting an absolute value. +// In case of deltas, it should behaves like a counter. +// +// The main idea is to share the last absolute value with its epoch between +// all shards of the same gauge. Everytime the gauge is updated by setting +// an absolute value, the epoch is incremented, so all other shards can +// reset their cumulative deltas. +pub(crate) struct Gauge { + origin: Arc, + epoch: GaugeEpoch, + delta: f64, +} + +impl MetricKind for Gauge { + type Output = (f64, GaugeEpoch); + type Shared = Arc; + type Value = GaugeValue; + + fn new(origin: Self::Shared) -> Self { + Self { + origin: origin.clone(), + epoch: 0, + delta: 0.0, + } + } + + fn update(&mut self, value: Self::Value) { + let delta = match value { + GaugeValue::Absolute(value) => { + // This is the only place where the contention is possible. + // However, it's rare, because the absolute value is usually + // set only by one thread at a time. + self.epoch = self.origin.set(value); + self.delta = 0.0; + return; + } + GaugeValue::Increment(delta) => delta, + GaugeValue::Decrement(delta) => -delta, + }; + + let current_epoch = self.origin.get().1; + + if self.epoch == current_epoch { + // The new epoch is the same as the current one, just accumulate the delta. + self.delta += delta; + } else { + // The new epoch is already set by another shard, reset the delta. + self.epoch = current_epoch; + self.delta = delta; + } + } + + // NOTE: Shards are merged one by one without blocking the whole storage, + // thus while executing this method for one specific shard, other shards + // are still available for updates, the same as the shared state (origin). + // + // However, all shards are merged consecutively. + fn merge(self, (out_value, out_epoch): &mut Self::Output) { + let (last_absolute, current_epoch) = self.origin.get(); + + // The epoch is always monotonically increasing. + debug_assert!(current_epoch >= *out_epoch); + debug_assert!(current_epoch >= self.epoch); + + if current_epoch > *out_epoch { + *out_value = last_absolute; + *out_epoch = current_epoch; + } + + if current_epoch == self.epoch { + *out_value += self.delta; + } + } +} + +#[derive(Default)] +pub(crate) struct GaugeOrigin(SeqLock<(f64, GaugeEpoch)>); + +impl GaugeOrigin { + fn get(&self) -> (f64, GaugeEpoch) { + self.0.read() + } + + fn set(&self, value: f64) -> GaugeEpoch { + let mut pair = self.0.lock_write(); + let new_epoch = pair.1 + 1; + *pair = (value, new_epoch); + new_epoch + } +} + +#[cfg(test)] +mod tests { + use std::{collections::VecDeque, ops::Range}; + + use proptest::prelude::*; + + use super::*; + + const ACTIONS: Range = 1..1000; + const SHARDS: usize = 3; + + #[derive(Debug, Clone)] + enum Action { + Update(Update), + // How many shards to merge (max `SHARDS`). + // It emulates the real-world scenario when shards are merged one by one, + // see `Gauge::merge` for details about the non-blocking merge. + Merge(usize), + } + + #[derive(Debug, Clone)] + struct Update { + shard: usize, // max `SHARDS - 1` + value: GaugeValue, + } + + fn action_strategy() -> impl Strategy { + // Use integers here to avoid floating point errors. + prop_oneof![ + 1 => (1..=SHARDS).prop_map(Action::Merge), + 10 => update_strategy().prop_map(Action::Update), + ] + } + + prop_compose! { + fn update_strategy()(shard in 0..SHARDS, value in gauge_value_strategy()) -> Update { + Update { shard, value } + } + } + + fn gauge_value_strategy() -> impl Strategy { + // Use integers here to avoid floating point errors. + prop_oneof![ + 1 => (0..10).prop_map(|v| GaugeValue::Absolute(v as f64)), + 5 => (1..10).prop_map(|v| GaugeValue::Increment(v as f64)), + 5 => (1..10).prop_map(|v| GaugeValue::Decrement(v as f64)), + ] + } + + proptest! { + #[test] + fn linearizability(actions in prop::collection::vec(action_strategy(), ACTIONS)) { + let origin = Arc::new(GaugeOrigin::default()); + + let mut shards = (0..SHARDS).map(|_| Gauge::new(origin.clone())).collect::>(); + let mut expected = 0.0; + let mut actual = (0.0, 0); + + for action in actions { + match action { + Action::Update(update) => { + expected = update.value.update_value(expected); + shards[update.shard].update(update.value); + } + Action::Merge(limit) => { + for _ in 0..limit { + let shard = shards.pop_front().unwrap(); + shard.merge(&mut actual); + shards.push_back(Gauge::new(origin.clone())); + assert_eq!(shards.len(), SHARDS); + } + + // Check eventually consistency. + if limit == SHARDS { + prop_assert_eq!(actual.0, expected); + } + } + } + } + + // Check eventually consistency. + for shard in shards { + shard.merge(&mut actual); + } + prop_assert_eq!(actual.0, expected); + } + } +} diff --git a/elfo-telemeter/src/metrics/histogram.rs b/elfo-telemeter/src/metrics/histogram.rs new file mode 100644 index 00000000..b24adbbb --- /dev/null +++ b/elfo-telemeter/src/metrics/histogram.rs @@ -0,0 +1,123 @@ +use std::mem; + +use super::MetricKind; +use crate::protocol::Distribution; + +// TODO: *lazy* reservior sampling to improve memory usage? +pub(crate) struct Histogram(SegVec); + +impl MetricKind for Histogram { + type Output = Distribution; + type Shared = (); + type Value = f64; + + fn new(_: Self::Shared) -> Self { + Self(SegVec::default()) + } + + fn update(&mut self, value: Self::Value) { + self.0.push(value); + } + + fn merge(self, out: &mut Self::Output) { + for segment in self.0.into_segments() { + out.record_samples(&segment); + } + } +} + +// A vector of segments to prevent reallocations. +// It improves tail latency, what's important for near RT actors. +struct SegVec { + active: Vec, + bag: Vec>, +} + +const INITIAL_LIMIT: usize = 8; +const GROWTH_FACTOR: usize = 4; + +impl Default for SegVec { + fn default() -> Self { + Self { + active: vec_with_exact_capacity(INITIAL_LIMIT), + bag: Vec::new(), + } + } +} + +impl SegVec { + fn push(&mut self, value: T) { + if self.active.len() == self.active.capacity() { + self.new_segment(); + } + + #[cfg(test)] + let capacity = self.active.capacity(); + self.active.push(value); + #[cfg(test)] + assert_eq!(self.active.capacity(), capacity); + } + + #[cold] + fn new_segment(&mut self) { + let new = vec_with_exact_capacity(self.active.capacity() * GROWTH_FACTOR); + let old = mem::replace(&mut self.active, new); + self.bag.push(old); + } + + fn into_segments(mut self) -> Vec> { + if !self.active.is_empty() { + self.bag.push(self.active); + } + + self.bag + } +} + +fn vec_with_exact_capacity(capacity: usize) -> Vec { + let mut vec = Vec::new(); + vec.reserve_exact(capacity); + debug_assert_eq!(vec.capacity(), capacity); + vec +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn seg_vec() { + let mut vec = SegVec::default(); + let iters = 2024; + + for i in 0..iters { + vec.push(i); + } + + let segments = vec.into_segments(); + + for (no, segment) in segments.iter().enumerate() { + let expected_capacity = INITIAL_LIMIT * GROWTH_FACTOR.pow(no as u32); + assert_eq!(segment.capacity(), expected_capacity); + + if no + 1 == segments.len() { + assert_ne!(segment.len(), 0); + } else { + assert_eq!(segment.len(), segment.capacity()); + } + } + + assert_eq!( + segments.iter().map(|segment| segment.len()).sum::(), + iters + ); + + segments + .into_iter() + .flatten() + .enumerate() + .for_each(|(i, v)| { + assert_eq!(i, v); + }); + } +} diff --git a/elfo-telemeter/src/metrics/mod.rs b/elfo-telemeter/src/metrics/mod.rs new file mode 100644 index 00000000..c706a281 --- /dev/null +++ b/elfo-telemeter/src/metrics/mod.rs @@ -0,0 +1,19 @@ +mod counter; +mod gauge; +mod histogram; + +pub(crate) use self::{ + counter::Counter, + gauge::{Gauge, GaugeOrigin}, + histogram::Histogram, +}; + +pub(crate) trait MetricKind: Sized { + type Output; + type Shared; + type Value; + + fn new(shared: Self::Shared) -> Self; + fn update(&mut self, value: Self::Value); + fn merge(self, out: &mut Self::Output); +} diff --git a/elfo-telemeter/src/protocol.rs b/elfo-telemeter/src/protocol.rs index 06b745d5..8f3adeeb 100644 --- a/elfo-telemeter/src/protocol.rs +++ b/elfo-telemeter/src/protocol.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use fxhash::FxHashMap; use metrics::Key; -use metrics_util::Summary; +use metrics_util::Summary; // TODO: avoid this dependency use elfo_core::{message, ActorMeta, Local}; @@ -21,7 +21,9 @@ pub(crate) struct ServerFailed(pub(crate) String); /// The response is restricted to be local only for now. #[message(ret = Local>)] #[non_exhaustive] -pub struct GetSnapshot; +pub(crate) struct GetSnapshot; + +pub(crate) type GaugeEpoch = u64; /// Actual values of all metrics. #[derive(Default, Clone)] @@ -29,24 +31,24 @@ pub struct Snapshot { /// Metrics ouside the actor system. pub global: Metrics, /// Metrics aggregated per group. - pub per_group: FxHashMap, + pub groupwise: FxHashMap, /// Metrics aggregated per actor. - pub per_actor: FxHashMap, Metrics>, + pub actorwise: FxHashMap, Metrics>, } impl Snapshot { - pub(crate) fn distributions_mut(&mut self) -> impl Iterator { - let global = self.global.distributions.values_mut(); + pub(crate) fn histograms_mut(&mut self) -> impl Iterator { + let global = self.global.histograms.values_mut(); let per_group = self - .per_group + .groupwise .values_mut() - .flat_map(|m| m.distributions.values_mut()); + .flat_map(|m| m.histograms.values_mut()); let per_actor = self - .per_actor + .actorwise .values_mut() - .flat_map(|m| m.distributions.values_mut()); + .flat_map(|m| m.histograms.values_mut()); global.chain(per_group).chain(per_actor) } @@ -58,9 +60,9 @@ pub struct Metrics { /// Monotonically increasing counters. pub counters: FxHashMap, /// Numerical values that can arbitrarily go up and down. - pub gauges: FxHashMap, + pub gauges: FxHashMap, /// Summaries of samples, used to calculate of quantiles. - pub distributions: FxHashMap, + pub histograms: FxHashMap, } /// Summaries of samples, used to calculate of quantiles. diff --git a/elfo-telemeter/src/recorder.rs b/elfo-telemeter/src/recorder.rs index 834603f1..b6826f62 100644 --- a/elfo-telemeter/src/recorder.rs +++ b/elfo-telemeter/src/recorder.rs @@ -2,9 +2,12 @@ use std::sync::Arc; use metrics::{GaugeValue, Key, Unit}; -use elfo_core::scope::{self, Scope}; +use elfo_core::scope; -use crate::storage::Storage; +use crate::{ + metrics::{Counter, Gauge, Histogram}, + storage::{ActorScope, GlobalScope, GroupScope, Storable, Storage}, +}; pub(crate) struct Recorder { storage: Arc, @@ -15,66 +18,55 @@ impl Recorder { Self { storage } } - fn with_params(&self, f: impl Fn(&Storage, Option<&Scope>, bool)) { - let is_global = scope::try_with(|scope| { + fn record(&self, key: &Key, value: M::Value) + where + M: Storable, + M::Value: Clone, + { + scope::try_with(|scope| { let perm = scope.permissions(); + if perm.is_telemetry_per_actor_group_enabled() { - f(&self.storage, Some(scope), false); + self.storage + .upsert::(scope, key, value.clone()) } - if perm.is_telemetry_per_actor_key_enabled() && !scope.telemetry_meta().key.is_empty() { - f(&self.storage, Some(scope), true); + + if perm.is_telemetry_per_actor_key_enabled() { + // TODO: get rid of this check. + if scope.telemetry_meta().key.is_empty() { + return; + } + + self.storage + .upsert::(scope, key, value.clone()) } }) - .is_none(); - - if is_global { - f(&self.storage, None, false); - } + .unwrap_or_else(|| self.storage.upsert::(&(), key, value)) } } impl metrics::Recorder for Recorder { - fn register_counter(&self, key: &Key, _unit: Option, description: Option<&'static str>) { - self.storage.add_description_if_missing(key, description); - self.with_params(|storage, scope, with_actor_key| { - storage.touch_counter(scope, key, with_actor_key) - }); + fn register_counter(&self, key: &Key, unit: Option, description: Option<&'static str>) { + self.storage.describe(key, unit, description); } - fn register_gauge(&self, key: &Key, _unit: Option, description: Option<&'static str>) { - self.storage.add_description_if_missing(key, description); - self.with_params(|storage, scope, with_actor_key| { - storage.touch_gauge(scope, key, with_actor_key) - }); + fn register_gauge(&self, key: &Key, unit: Option, description: Option<&'static str>) { + self.storage.describe(key, unit, description); } - fn register_histogram( - &self, - key: &Key, - _unit: Option, - description: Option<&'static str>, - ) { - self.storage.add_description_if_missing(key, description); - self.with_params(|storage, scope, with_actor_key| { - storage.touch_histogram(scope, key, with_actor_key) - }); + fn register_histogram(&self, key: &Key, unit: Option, description: Option<&'static str>) { + self.storage.describe(key, unit, description); } fn increment_counter(&self, key: &Key, value: u64) { - self.with_params(|storage, scope, with_actor_key| { - storage.increment_counter(scope, key, value, with_actor_key) - }); + self.record::(key, value) } fn update_gauge(&self, key: &Key, value: GaugeValue) { - self.with_params(|storage, scope, with_actor_key| { - storage.update_gauge(scope, key, value.clone(), with_actor_key) - }); + self.record::(key, value) } fn record_histogram(&self, key: &Key, value: f64) { - self.with_params(|storage, scope, with_actor_key| { - storage.record_histogram(scope, key, value, with_actor_key) - }); + self.record::(key, value) } } diff --git a/elfo-telemeter/src/render/prometheus.rs b/elfo-telemeter/src/render/prometheus.rs index c5dddf8c..ada7c384 100644 --- a/elfo-telemeter/src/render/prometheus.rs +++ b/elfo-telemeter/src/render/prometheus.rs @@ -136,7 +136,7 @@ fn group_by_name(snapshot: &Snapshot) -> GroupedData<'_> { ); } - for (group, per_group) in &snapshot.per_group { + for (group, per_group) in &snapshot.groupwise { for (key, value, kind) in iter_metrics(per_group) { data.entry((kind, key.name())).or_default().insert( MetricMeta { @@ -149,7 +149,7 @@ fn group_by_name(snapshot: &Snapshot) -> GroupedData<'_> { } } - for (actor_meta, per_actor) in &snapshot.per_actor { + for (actor_meta, per_actor) in &snapshot.actorwise { for (key, value, kind) in iter_metrics(per_actor) { data.entry((kind, key.name())).or_default().insert( MetricMeta { @@ -173,9 +173,9 @@ fn iter_metrics(metrics: &Metrics) -> impl Iterator>, - descriptions: RwLock>, +// === Scopes === + +// TODO: use real `Key` (with pointer comparison). +type KeyHash = u64; + +pub(crate) trait ScopeKind: Sized { + type Scope; + type Key: Copy + Hash + Eq; + type Meta: Clone; + + fn get_meta(scope: &Self::Scope) -> &Self::Meta; + fn make_key(scope: &Self::Scope, key: &Key) -> Self::Key; + fn registries(shard: &Shard) -> &Registries; + fn gauge_shared(storage: &Storage) -> &Mutex>; + fn snapshot<'s>(snapshot: &'s mut Snapshot, meta: &Self::Meta) -> &'s mut Metrics; } -#[derive(Clone, PartialEq, Eq, Hash)] -struct ExtKey { - group: Addr, // `Addr::NULL` if global. - // XXX: we are forced to use hash here, because API of `Registry` - // isn't composable with composite keys for now. - key_hash: u64, +pub(crate) struct GlobalScope; + +impl ScopeKind for GlobalScope { + type Key = KeyHash; + type Meta = (); + type Scope = (); + + fn get_meta(_scope: &Self::Scope) -> &Self::Meta { + &() + } + + fn make_key(_scope: &Self::Scope, key: &Key) -> Self::Key { + key.get_hash() + } + + fn registries(shard: &Shard) -> &Registries { + &shard.global + } + + fn gauge_shared(storage: &Storage) -> &Mutex> { + &storage.gauge_shared.global + } + + fn snapshot<'s>(snapshot: &'s mut Snapshot, _meta: &Self::Meta) -> &'s mut Metrics { + &mut snapshot.global + } } -fn make_ext_key(scope: Option<&Scope>, key: &Key, with_actor_key: bool) -> ExtKey { - let mut key_hash = key.get_hash(); +pub(crate) struct GroupScope; - if let Some(scope) = scope.filter(|_| with_actor_key) { - debug_assert!(!scope.telemetry_meta().key.is_empty()); - let mut hasher = KeyHasher::default(); - scope.telemetry_meta().key.hash(&mut hasher); - key_hash ^= hasher.finish(); +impl ScopeKind for GroupScope { + type Key = (Addr, KeyHash); + // TODO: replace with EBR? + type Meta = Arc; + type Scope = Scope; + + fn get_meta(scope: &Self::Scope) -> &Self::Meta { + scope.telemetry_meta() } - let group = if let Some(scope) = scope { + fn make_key(scope: &Self::Scope, key: &Key) -> Self::Key { debug_assert_ne!(scope.group(), Addr::NULL); - scope.group() - } else { - Addr::NULL - }; + (scope.group(), key.get_hash()) + } + + fn registries(shard: &Shard) -> &Registries { + &shard.groupwise + } + + fn gauge_shared(storage: &Storage) -> &Mutex> { + &storage.gauge_shared.groupwise + } - ExtKey { group, key_hash } + fn snapshot<'s>(snapshot: &'s mut Snapshot, meta: &Self::Meta) -> &'s mut Metrics { + snapshot.groupwise.entry(meta.group.clone()).or_default() + } } -impl Hashable for ExtKey { - #[inline] - fn hashable(&self) -> u64 { - // TODO: get rid of double hashing. - let mut hasher = KeyHasher::default(); - self.hash(&mut hasher); - hasher.finish() +pub(crate) struct ActorScope; + +impl ScopeKind for ActorScope { + type Key = (/* group */ Addr, KeyHash); + // TODO: replace with EBR? + type Meta = Arc; + type Scope = Scope; + + fn get_meta(scope: &Self::Scope) -> &Self::Meta { + scope.telemetry_meta() + } + + fn make_key(scope: &Self::Scope, key: &Key) -> Self::Key { + debug_assert_ne!(scope.group(), Addr::NULL); + + let telemetry_key = &scope.telemetry_meta().key; + debug_assert!(!telemetry_key.is_empty()); + + // TODO: cache a hash of the telemetry key. + let key_hash = fxhash::hash64(&(telemetry_key, key.get_hash())); + (scope.group(), key_hash) + } + + fn registries(shard: &Shard) -> &Registries { + &shard.actorwise + } + + fn gauge_shared(storage: &Storage) -> &Mutex> { + &storage.gauge_shared.actorwise } + + fn snapshot<'s>(snapshot: &'s mut Snapshot, meta: &Self::Meta) -> &'s mut Metrics { + snapshot.actorwise.entry(meta.clone()).or_default() + } +} + +// === Storage === + +/// The storage for metrics. +/// +/// The main idea here is to have a separate shard for each thread and merge +/// them periodically by the telemeter actor. It *dramatically* reduces +/// contention, especially if multiple actors use the same telemetry key, +/// what's common for per-group telemetry or per-actor grouped telemetry. +pub(crate) struct Storage { + shards: ThreadLocal, + // Shared gauge origins between shards. See `Gauge` for more details. + gauge_shared: GaugeShared, + descriptions: Mutex>, } -#[derive(Clone)] -struct ExtHandle { - meta: Option>, // `None` if global. - with_actor_key: bool, +#[derive(Default)] +struct Shard { + global: Registries, + groupwise: Registries, + actorwise: Registries, +} + +// Most of the time, the mutexes are uncontended, because they are accessed only +// by one thread. Periodically, they are accessed by the telemeter actor for the +// short period of time in order to replace these registries with empty ones. +struct Registries { + counters: Mutex>, + gauges: Mutex>, + histograms: Mutex>, +} + +impl Default for Registries { + fn default() -> Self { + Self { + counters: Default::default(), + gauges: Default::default(), + histograms: Default::default(), + } + } +} + +type Registry = FxHashMap<::Key, RegEntry>; + +struct RegEntry { key: Key, - handle: Handle, + data: M, + meta: S::Meta, } -fn make_ext_handle( - scope: Option<&Scope>, - key: &Key, - handle: Handle, - with_actor_key: bool, -) -> ExtHandle { - ExtHandle { - meta: scope.map(|scope| scope.telemetry_meta().clone()), - with_actor_key, - key: key.clone(), - handle, +impl RegEntry { + #[cold] + fn new(scope: &S::Scope, key: &Key, shared: M::Shared) -> Self { + Self { + key: key.clone(), + data: M::new(shared), + meta: S::get_meta(scope).clone(), + } } } -impl Storage { - pub(crate) fn new() -> Self { +#[derive(Default)] +struct GaugeShared { + global: Mutex>, + groupwise: Mutex>, + actorwise: Mutex>, +} + +type GaugeOrigins = FxHashMap<::Key, Arc>; + +impl Default for Storage { + fn default() -> Self { Self { - registry: Registry::>::untracked(), + shards: ThreadLocal::new(), + gauge_shared: Default::default(), descriptions: Default::default(), } } +} - pub(crate) fn descriptions(&self) -> RwLockReadGuard<'_, FxHashMap> { - self.descriptions.read() +impl Storage { + pub(crate) fn descriptions(&self) -> MutexGuard<'_, FxHashMap> { + self.descriptions.lock() } - pub(crate) fn add_description_if_missing(&self, key: &Key, description: Option<&'static str>) { + // TODO: use `unit` + pub(crate) fn describe( + &self, + key: &Key, + _unit: Option, + description: Option<&'static str>, + ) { if let Some(description) = description { - let mut descriptions = self.descriptions.write(); + let mut descriptions = self.descriptions.lock(); if !descriptions.contains_key(key.name().to_string().as_str()) { descriptions.insert(key.name().to_string(), description); } } } - pub(crate) fn touch_counter(&self, scope: Option<&Scope>, key: &Key, with_actor_key: bool) { - let ext_key = make_ext_key(scope, key, with_actor_key); - self.registry.op( - MetricKind::Counter, - &ext_key, - |_| {}, - || make_ext_handle(scope, key, Handle::counter(), with_actor_key), - ); - } + pub(crate) fn upsert(&self, scope: &S::Scope, key: &Key, value: M::Value) + where + S: ScopeKind, + M: Storable, + { + let shard = self.shards.get_or_default(); + let registries = S::registries(shard); + let reg_key = S::make_key(scope, key); + let mut registry = M::registry(registries).lock(); - pub(crate) fn touch_gauge(&self, scope: Option<&Scope>, key: &Key, with_actor_key: bool) { - let ext_key = make_ext_key(scope, key, with_actor_key); - self.registry.op( - MetricKind::Gauge, - &ext_key, - |_| {}, - || make_ext_handle(scope, key, Handle::gauge(), with_actor_key), - ); - } + let entry = registry.entry(reg_key).or_insert_with(|| { + let shared = M::shared::(self, reg_key); + RegEntry::::new(scope, key, shared) + }); - pub(crate) fn touch_histogram(&self, scope: Option<&Scope>, key: &Key, with_actor_key: bool) { - let ext_key = make_ext_key(scope, key, with_actor_key); - self.registry.op( - MetricKind::Histogram, - &ext_key, - |_| {}, - || make_ext_handle(scope, key, Handle::histogram(), with_actor_key), - ); + entry.data.update(value); } - pub(crate) fn increment_counter( - &self, - scope: Option<&Scope>, - key: &Key, - value: u64, - with_actor_key: bool, - ) { - let ext_key = make_ext_key(scope, key, with_actor_key); - self.registry.op( - MetricKind::Counter, - &ext_key, - |h| h.handle.increment_counter(value), - || make_ext_handle(scope, key, Handle::counter(), with_actor_key), - ); + pub(crate) fn merge(&self, snapshot: &mut Snapshot, only_compact: bool) { + for shard in self.shards.iter() { + self.merge_registries::(shard, snapshot, only_compact); + self.merge_registries::(shard, snapshot, only_compact); + self.merge_registries::(shard, snapshot, only_compact); + } } - pub(crate) fn update_gauge( + fn merge_registries( &self, - scope: Option<&Scope>, - key: &Key, - value: GaugeValue, - with_actor_key: bool, + shard: &Shard, + snapshot: &mut Snapshot, + only_compact: bool, ) { - let ext_key = make_ext_key(scope, key, with_actor_key); - self.registry.op( - MetricKind::Gauge, - &ext_key, - |h| h.handle.update_gauge(value), - || make_ext_handle(scope, key, Handle::gauge(), with_actor_key), - ); + let registries = S::registries(shard); + + if !only_compact { + self.merge_registry::(registries, snapshot); + self.merge_registry::(registries, snapshot); + } + self.merge_registry::(registries, snapshot); } - pub(crate) fn record_histogram( + fn merge_registry( &self, - scope: Option<&Scope>, - key: &Key, - value: f64, - with_actor_key: bool, + registries: &Registries, + snapshot: &mut Snapshot, ) { - let ext_key = make_ext_key(scope, key, with_actor_key); - self.registry.op( - MetricKind::Histogram, - &ext_key, - |h| h.handle.record_histogram(value), - || make_ext_handle(scope, key, Handle::histogram(), with_actor_key), - ); - } - - pub(crate) fn fill_snapshot(&self, snapshot: &mut Snapshot, only_histograms: bool) -> usize { - let mut histograms = Vec::new(); - let mut estimated_size = 0; - - self.registry.visit(|kind, (_, h)| { - if kind == MetricKind::Histogram { - // Defer processing to unlock the registry faster. - histograms.push(h.get_inner().clone()); - return; - } + let registry = M::registry(registries); - if only_histograms { - return; - } + let registry = { + // Allocate a new empty registry with enough capacity. + // It improves tail latency, what's important for near RT actors. + let len = registry.lock().len(); + let empty = Registry::with_capacity_and_hasher(len, <_>::default()); - estimated_size += fill_metric(snapshot, h.get_inner()); - }); + let mut registry = registry.lock(); + mem::replace(&mut *registry, empty) + }; + + // TODO: stats + // elfo_metrics_usage_bytes{object="Storage|Snapshot"} + // elfo_metrics_storage_shards{status="Inactive|Active"} GAUGE + // elfo_metrics{kind="Counter|Gauge|Histogram"} GAUGE - // Process deferred histograms. - for handle in histograms { - estimated_size += fill_metric(snapshot, &handle); + for (_, entry) in registry.into_iter() { + let metrics = S::snapshot(snapshot, &entry.meta); + let out = M::snapshot(metrics, &entry.key); + entry.data.merge(out); } + } +} + +pub(crate) trait Storable: MetricKind { + fn registry(registries: &Registries) -> &Mutex>; + fn shared(storage: &Storage, key: S::Key) -> Self::Shared; + fn snapshot<'s>(metrics: &'s mut Metrics, key: &Key) -> &'s mut Self::Output; +} + +impl Storable for Counter { + fn registry(registries: &Registries) -> &Mutex> { + ®istries.counters + } - estimated_size + fn shared(_: &Storage, _: S::Key) -> Self::Shared {} + + fn snapshot<'s>(metrics: &'s mut Metrics, key: &Key) -> &'s mut Self::Output { + // TODO: hashbrown `entry_ref` (extra crate) or `contains_key` (double lookup). + metrics.counters.entry(key.clone()).or_default() } } -fn fill_metric(snapshot: &mut Snapshot, handle: &ExtHandle) -> usize { - let m = get_metrics(snapshot, handle); - let h = &handle.handle; +impl Storable for Gauge { + fn registry(registries: &Registries) -> &Mutex> { + ®istries.gauges + } - let estimated_size = match h { - Handle::Counter(_) => { - m.counters.insert(handle.key.clone(), h.read_counter()); - 8 - } - Handle::Gauge(_) => { - m.gauges.insert(handle.key.clone(), h.read_gauge()); - 8 - } - Handle::Histogram(_) => { - let mut bucket_len = 0; - let d = m.distributions.entry(handle.key.clone()).or_default(); - h.read_histogram_with_clear(|samples| { - bucket_len += samples.len(); - d.record_samples(samples); - }); - d.estimated_size() + 8 * bucket_len - } - }; + fn shared(storage: &Storage, key: S::Key) -> Self::Shared { + let mut shared = S::gauge_shared(storage).lock(); + shared.entry(key).or_default().clone() + } - mem::size_of::() + mem::size_of::() + estimated_size + fn snapshot<'s>(metrics: &'s mut Metrics, key: &Key) -> &'s mut Self::Output { + // TODO: hashbrown `entry_ref` (extra crate) or `contains_key` (double lookup). + metrics.gauges.entry(key.clone()).or_default() + } } -fn get_metrics<'a>(snapshot: &'a mut Snapshot, handle: &ExtHandle) -> &'a mut Metrics { - // If meta is known, it's a per-actor or per-group metric. - if let Some(meta) = &handle.meta { - if handle.with_actor_key { - snapshot.per_actor.entry(meta.clone()).or_default() - } else if snapshot.per_group.contains_key(&meta.group) { - snapshot.per_group.get_mut(&meta.group).unwrap() - } else { - snapshot.per_group.entry(meta.group.clone()).or_default() - } - } else { - // Otherwise, it's a global metric. - &mut snapshot.global +impl Storable for Histogram { + fn registry(registries: &Registries) -> &Mutex> { + ®istries.histograms + } + + fn shared(_: &Storage, _: S::Key) -> Self::Shared {} + + fn snapshot<'s>(metrics: &'s mut Metrics, key: &Key) -> &'s mut Self::Output { + // TODO: hashbrown `entry_ref` (extra crate) or `contains_key` (double lookup). + metrics.histograms.entry(key.clone()).or_default() } }