From f25a445cd09adf7a4e0aa054b5c64cd856a25af9 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 31 Oct 2023 11:50:56 -0600 Subject: [PATCH] feat: add kubo rpc api metrics --- Cargo.lock | 2 + beetle/iroh-bitswap/Cargo.toml | 2 +- kubo-rpc/Cargo.toml | 3 + kubo-rpc/src/http.rs | 7 +- kubo-rpc/src/http/metrics.rs | 78 +++++++++ kubo-rpc/src/http/metrics/api.rs | 214 ++++++++++++++++++++++++ metrics/Cargo.toml | 5 - metrics/src/core.rs | 50 +----- metrics/src/lib.rs | 46 +---- metrics/src/store.rs | 278 ------------------------------- one/src/lib.rs | 8 +- one/src/metrics.rs | 8 +- p2p/Cargo.toml | 2 +- 13 files changed, 329 insertions(+), 374 deletions(-) create mode 100644 kubo-rpc/src/http/metrics.rs create mode 100644 kubo-rpc/src/http/metrics/api.rs delete mode 100644 metrics/src/store.rs diff --git a/Cargo.lock b/Cargo.lock index 078c0b440..0a4224fd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1106,6 +1106,7 @@ dependencies = [ "bytes 1.5.0", "ceramic-kubo-rpc-server", "ceramic-metadata", + "ceramic-metrics", "ceramic-p2p", "cid 0.10.1", "dag-jose", @@ -1123,6 +1124,7 @@ dependencies = [ "mockall", "multiaddr", "pin-project", + "prometheus-client", "serde", "serde_json", "swagger", diff --git a/beetle/iroh-bitswap/Cargo.toml b/beetle/iroh-bitswap/Cargo.toml index 3ac1118ab..ae705c116 100644 --- a/beetle/iroh-bitswap/Cargo.toml +++ b/beetle/iroh-bitswap/Cargo.toml @@ -20,7 +20,7 @@ async-stream.workspace = true async-trait.workspace = true asynchronous-codec.workspace = true bytes.workspace = true -ceramic-metrics = { workspace = true, features = ["bitswap"] } +ceramic-metrics.workspace = true cid.workspace = true deadqueue.workspace = true derivative.workspace = true diff --git a/kubo-rpc/Cargo.toml b/kubo-rpc/Cargo.toml index fd1a47704..943368a3d 100644 --- a/kubo-rpc/Cargo.toml +++ b/kubo-rpc/Cargo.toml @@ -14,6 +14,7 @@ http = [ "dep:pin-project", "dep:serde", "dep:serde_json", + "dep:prometheus-client", ] [dependencies] @@ -23,6 +24,7 @@ async-trait.workspace = true bytes.workspace = true ceramic-kubo-rpc-server = { workspace = true, optional = true } ceramic-metadata.workspace = true +ceramic-metrics.workspace = true ceramic-p2p.workspace = true cid.workspace = true dag-jose.workspace = true @@ -37,6 +39,7 @@ libp2p-identity.workspace = true libp2p.workspace = true multiaddr.workspace = true pin-project = { version = "1.1.3", optional = true } +prometheus-client = { workspace = true, optional = true } serde = { workspace = true, optional = true } serde_json = { workspace = true, optional = true } swagger.workspace = true diff --git a/kubo-rpc/src/http.rs b/kubo-rpc/src/http.rs index 87adb5631..e1dc5720e 100644 --- a/kubo-rpc/src/http.rs +++ b/kubo-rpc/src/http.rs @@ -1,8 +1,13 @@ //! Provides an http implementation of the Kubo RPC methods. +mod metrics; mod stream_drop; -use std::{collections::HashSet, io::Cursor, marker::PhantomData, str::FromStr, time::Duration}; +pub use metrics::{api::MetricsMiddleware, Metrics}; + +use std::{ + collections::HashSet, io::Cursor, marker::PhantomData, str::FromStr, sync::Arc, time::Duration, +}; use anyhow::anyhow; use async_trait::async_trait; diff --git a/kubo-rpc/src/http/metrics.rs b/kubo-rpc/src/http/metrics.rs new file mode 100644 index 000000000..69d97f099 --- /dev/null +++ b/kubo-rpc/src/http/metrics.rs @@ -0,0 +1,78 @@ +pub mod api; + +use std::time::Duration; + +use ceramic_kubo_rpc_server::{Api, API_VERSION, BASE_PATH}; +use ceramic_metrics::Recorder; +use prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{ + counter::Counter, + family::Family, + histogram::{exponential_buckets, Histogram}, + info::Info, + }, + registry::Registry, +}; + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +struct RequestLabels { + path: &'static str, +} + +/// Metrics for Kubo RPC API +#[derive(Clone)] +pub struct Metrics { + requests: Family, + request_durations: Family, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +struct InfoLabels { + base_path: &'static str, + version: &'static str, +} + +impl Metrics { + pub fn register(registry: &mut Registry) -> Self { + let sub_registry = registry.sub_registry_with_prefix("kubo_rpc"); + + let requests = Family::::default(); + sub_registry.register("requests", "Number of HTTP requests", requests.clone()); + + let request_durations = Family::::new_with_constructor(|| { + Histogram::new(exponential_buckets(0.005, 2.0, 20)) + }); + sub_registry.register( + "request_durations", + "Duration of HTTP requests", + request_durations.clone(), + ); + + let info: Info = Info::new(InfoLabels { + base_path: BASE_PATH, + version: API_VERSION, + }); + sub_registry.register("api", "Information about the Kubo RPC API", info); + + Self { + requests, + request_durations, + } + } +} + +pub struct Event { + pub(crate) path: &'static str, + pub(crate) duration: Duration, +} + +impl Recorder for Metrics { + fn record(&self, event: &Event) { + let labels = RequestLabels { path: event.path }; + self.requests.get_or_create(&labels).inc(); + self.request_durations + .get_or_create(&labels) + .observe(event.duration.as_secs_f64()); + } +} diff --git a/kubo-rpc/src/http/metrics/api.rs b/kubo-rpc/src/http/metrics/api.rs new file mode 100644 index 000000000..b529677d6 --- /dev/null +++ b/kubo-rpc/src/http/metrics/api.rs @@ -0,0 +1,214 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use ceramic_kubo_rpc_server::{ + models, Api, BlockGetPostResponse, BlockPutPostResponse, BlockStatPostResponse, + DagGetPostResponse, DagImportPostResponse, DagPutPostResponse, DagResolvePostResponse, + IdPostResponse, PinAddPostResponse, PinRmPostResponse, PubsubLsPostResponse, + PubsubPubPostResponse, PubsubSubPostResponse, SwarmConnectPostResponse, SwarmPeersPostResponse, + VersionPostResponse, +}; +use ceramic_metrics::Recorder; +use futures_util::Future; +use swagger::{ApiError, ByteArray}; +use tokio::time::Instant; + +use crate::http::{metrics::Event, Metrics}; + +/// Implement the API and record metrics +#[derive(Clone)] +pub struct MetricsMiddleware { + api: A, + metrics: Metrics, +} + +impl MetricsMiddleware { + /// Construct a new MetricsMiddleware. + /// The metrics should have already be registered. + pub fn new(api: A, metrics: Metrics) -> Self { + Self { api, metrics } + } + // Record metrics for a given API endpoint + async fn record(&self, path: &'static str, fut: impl Future) -> T { + let start = Instant::now(); + let ret = fut.await; + let duration = start.elapsed(); + let event = Event { path, duration }; + self.metrics.record(&event); + ret + } +} + +#[async_trait] +impl Api for MetricsMiddleware +where + A: Api, + A: Clone + Send + Sync, + C: Send + Sync, +{ + /// Get a single IPFS block + async fn block_get_post( + &self, + arg: String, + timeout: Option, + offline: Option, + context: &C, + ) -> Result { + self.record( + "/block/get", + self.api.block_get_post(arg, timeout, offline, context), + ) + .await + } + + /// Put a single IPFS block + async fn block_put_post( + &self, + file: ByteArray, + cid_codec: Option, + mhtype: Option, + pin: Option, + context: &C, + ) -> Result { + self.record( + "/block/put", + self.api + .block_put_post(file, cid_codec, mhtype, pin, context), + ) + .await + } + + /// Report statistics about a block + async fn block_stat_post( + &self, + arg: String, + context: &C, + ) -> Result { + self.record("/block/stat", self.api.block_stat_post(arg, context)) + .await + } + + /// Get an IPLD node from IPFS + async fn dag_get_post( + &self, + arg: String, + output_codec: Option, + context: &C, + ) -> Result { + self.record( + "/dag/get", + self.api.dag_get_post(arg, output_codec, context), + ) + .await + } + + /// Import a CAR file of IPLD nodes into IPFS + async fn dag_import_post( + &self, + file: ByteArray, + context: &C, + ) -> Result { + self.record("/dag/import", self.api.dag_import_post(file, context)) + .await + } + + /// Put an IPLD node into IPFS + async fn dag_put_post( + &self, + file: ByteArray, + store_codec: Option, + input_codec: Option, + context: &C, + ) -> Result { + self.record( + "/dag/put", + self.api + .dag_put_post(file, store_codec, input_codec, context), + ) + .await + } + + /// Resolve an IPFS path to a DAG node + async fn dag_resolve_post( + &self, + arg: String, + context: &C, + ) -> Result { + self.record("/dag/resolve", self.api.dag_resolve_post(arg, context)) + .await + } + + /// Report identifying information about a node + async fn id_post(&self, arg: Option, context: &C) -> Result { + self.record("/id", self.api.id_post(arg, context)).await + } + + /// Add a block to the pin store + async fn pin_add_post( + &self, + arg: String, + recursive: Option, + progress: Option, + context: &C, + ) -> Result { + self.record( + "/pin/add", + self.api.pin_add_post(arg, recursive, progress, context), + ) + .await + } + + /// Remove a block from the pin store + async fn pin_rm_post(&self, arg: String, context: &C) -> Result { + self.record("/pin/rm", self.pin_rm_post(arg, context)).await + } + + /// List topic with active subscriptions + async fn pubsub_ls_post(&self, context: &C) -> Result { + self.record("/pubsub/ls", self.api.pubsub_ls_post(context)) + .await + } + + /// Publish a message to a topic + async fn pubsub_pub_post( + &self, + arg: String, + file: ByteArray, + context: &C, + ) -> Result { + self.record("/pubsub/pub", self.api.pubsub_pub_post(arg, file, context)) + .await + } + + /// Subscribe to a topic, blocks until a message is received + async fn pubsub_sub_post( + &self, + arg: String, + context: &C, + ) -> Result { + self.record("/pubsub/sub", self.api.pubsub_sub_post(arg, context)) + .await + } + + /// Connect to peers + async fn swarm_connect_post( + &self, + arg: &Vec, + context: &C, + ) -> Result { + self.record("/swarm/connect", self.api.swarm_connect_post(arg, context)) + .await + } + + /// Report connected peers + async fn swarm_peers_post(&self, context: &C) -> Result { + self.record("/swarm/peers", self.api.swarm_peers_post(context)) + .await + } + + /// Report server version + async fn version_post(&self, context: &C) -> Result { + self.record("/version", self.api.version_post(context)) + .await + } +} diff --git a/metrics/Cargo.toml b/metrics/Cargo.toml index 3ac884f32..0acc6a966 100644 --- a/metrics/Cargo.toml +++ b/metrics/Cargo.toml @@ -48,12 +48,7 @@ features = [ "autonat", "tokio", ] -optional = true [features] -bitswap = [] -store = [] -p2p = ["libp2p"] - # requires setting RUSTFLAGS="--cfg tokio_unstable" tokio-console = ["tokio/tracing", "console-subscriber"] diff --git a/metrics/src/core.rs b/metrics/src/core.rs index ae7842b44..6efc43f72 100644 --- a/metrics/src/core.rs +++ b/metrics/src/core.rs @@ -8,16 +8,8 @@ use std::{ use prometheus_client::{encoding::text::encode, registry::Registry}; -#[cfg(feature = "bitswap")] use crate::bitswap; -#[cfg(feature = "gateway")] -use crate::gateway; -#[cfg(feature = "p2p")] use crate::p2p; -#[cfg(feature = "resolver")] -use crate::resolver; -#[cfg(feature = "store")] -use crate::store; lazy_static! { pub(crate) static ref CORE: Core = Core::default(); @@ -26,17 +18,8 @@ lazy_static! { pub(crate) struct Core { enabled: AtomicBool, registry: Mutex, - #[cfg(feature = "gateway")] - gateway_metrics: gateway::Metrics, - #[cfg(feature = "resolver")] - resolver_metrics: resolver::Metrics, - #[cfg(feature = "bitswap")] bitswap_metrics: bitswap::Metrics, - #[cfg(feature = "store")] - store_metrics: store::Metrics, - #[cfg(feature = "p2p")] libp2p_metrics: p2p::Libp2pMetrics, - #[cfg(feature = "p2p")] p2p_metrics: p2p::Metrics, } @@ -45,17 +28,8 @@ impl Default for Core { let mut reg = Registry::default(); Core { enabled: AtomicBool::new(false), - #[cfg(feature = "gateway")] - gateway_metrics: gateway::Metrics::new(&mut reg), - #[cfg(feature = "resolver")] - resolver_metrics: resolver::Metrics::new(&mut reg), - #[cfg(feature = "bitswap")] bitswap_metrics: bitswap::Metrics::new(&mut reg), - #[cfg(feature = "store")] - store_metrics: store::Metrics::new(&mut reg), - #[cfg(feature = "p2p")] libp2p_metrics: p2p::Libp2pMetrics::new(&mut reg), - #[cfg(feature = "p2p")] p2p_metrics: p2p::Metrics::new(&mut reg), registry: Mutex::new(reg), } @@ -78,32 +52,14 @@ impl Core { .expect("should be able to acquire lock") } - #[cfg(feature = "gateway")] - pub(crate) fn gateway_metrics(&self) -> &gateway::Metrics { - &self.gateway_metrics - } - - #[cfg(feature = "resolver")] - pub(crate) fn resolver_metrics(&self) -> &resolver::Metrics { - &self.resolver_metrics - } - - #[cfg(feature = "bitswap")] pub(crate) fn bitswap_metrics(&self) -> &bitswap::Metrics { &self.bitswap_metrics } - #[cfg(feature = "store")] - pub(crate) fn store_metrics(&self) -> &store::Metrics { - &self.store_metrics - } - - #[cfg(feature = "p2p")] pub(crate) fn libp2p_metrics(&self) -> &p2p::Libp2pMetrics { &self.libp2p_metrics } - #[cfg(feature = "p2p")] pub(crate) fn p2p_metrics(&self) -> &p2p::Metrics { &self.p2p_metrics } @@ -123,14 +79,18 @@ impl Core { } } +#[deprecated = "use ceramic_metrics::Recorder instead"] pub trait MetricType { fn name(&self) -> &'static str; } +#[deprecated = "use ceramic_metrics::Recorder instead"] pub trait HistogramType { fn name(&self) -> &'static str; } +#[deprecated = "use ceramic_metrics::Recorder instead"] +#[allow(deprecated)] pub trait MetricsRecorder { fn record(&self, m: M, value: u64) where @@ -140,10 +100,12 @@ pub trait MetricsRecorder { M: HistogramType + std::fmt::Display; } +#[deprecated = "use ceramic_metrics::Recorder instead"] pub trait MRecorder { fn record(&self, value: u64); } +#[deprecated = "use ceramic_metrics::Recorder instead"] pub trait MObserver { fn observe(&self, value: f64); } diff --git a/metrics/src/lib.rs b/metrics/src/lib.rs index 2e06356e4..84c2ad518 100644 --- a/metrics/src/lib.rs +++ b/metrics/src/lib.rs @@ -1,30 +1,17 @@ #[macro_use] mod macros; -#[cfg(feature = "bitswap")] pub mod bitswap; pub mod config; pub mod core; -#[cfg(feature = "p2p")] pub mod p2p; -#[cfg(feature = "store")] -pub mod store; #[macro_use] extern crate lazy_static; -use crate::config::Config; use crate::core::HistogramType; use crate::core::MetricType; -#[cfg(any( - feature = "bitswap", - feature = "gateway", - feature = "resolver", - feature = "store", - feature = "p2p" -))] -#[allow(unused_imports)] -use crate::core::MetricsRecorder; use crate::core::CORE; +use crate::{config::Config, core::MetricsRecorder}; use opentelemetry::{ global, sdk::{propagation::TraceContextPropagator, trace, Resource}, @@ -49,6 +36,12 @@ use tracing_subscriber::{ EnvFilter, Layer, }; +/// Recorder that can record metrics about an event. +pub trait Recorder { + /// Record the given event. + fn record(&self, event: &Event); +} + #[derive(Debug)] pub struct MetricsHandle { metrics_task: Option>, @@ -218,15 +211,7 @@ pub fn get_current_trace_id() -> TraceId { #[derive(Debug, PartialEq, Eq)] pub enum Collector { - #[cfg(feature = "gateway")] - Gateway, - #[cfg(feature = "resolver")] - Resolver, - #[cfg(feature = "bitswap")] Bitswap, - #[cfg(feature = "store")] - Store, - #[cfg(feature = "p2p")] P2P, } @@ -237,15 +222,7 @@ where { if CORE.enabled() { match c { - #[cfg(feature = "gateway")] - Collector::Gateway => CORE.gateway_metrics().record(m, v), - #[cfg(feature = "resolver")] - Collector::Resolver => CORE.resolver_metrics().record(m, v), - #[cfg(feature = "bitswap")] Collector::Bitswap => CORE.bitswap_metrics().record(m, v), - #[cfg(feature = "store")] - Collector::Store => CORE.store_metrics().record(m, v), - #[cfg(feature = "p2p")] Collector::P2P => CORE.p2p_metrics().record(m, v), _ => panic!("not enabled/implemented"), }; @@ -259,22 +236,13 @@ where { if CORE.enabled() { match c { - #[cfg(feature = "gateway")] - Collector::Gateway => CORE.gateway_metrics().observe(m, v), - #[cfg(feature = "resolver")] - Collector::Resolver => CORE.resolver_metrics().observe(m, v), - #[cfg(feature = "bitswap")] Collector::Bitswap => CORE.bitswap_metrics().observe(m, v), - #[cfg(feature = "store")] - Collector::Store => CORE.store_metrics().observe(m, v), - #[cfg(feature = "p2p")] Collector::P2P => CORE.p2p_metrics().observe(m, v), _ => panic!("not enabled/implemented"), }; } } -#[cfg(feature = "p2p")] pub fn libp2p_metrics() -> &'static p2p::Libp2pMetrics { CORE.libp2p_metrics() } diff --git a/metrics/src/store.rs b/metrics/src/store.rs deleted file mode 100644 index d3d76c054..000000000 --- a/metrics/src/store.rs +++ /dev/null @@ -1,278 +0,0 @@ -use std::fmt; - -use prometheus_client::{ - metrics::{ - counter::Counter, - histogram::{linear_buckets, Histogram}, - }, - registry::Registry, -}; -use tracing::error; - -use crate::{ - core::{HistogramType, MObserver, MRecorder, MetricType, MetricsRecorder}, - Collector, -}; - -#[derive(Clone)] -pub(crate) struct Metrics { - get_requests_total: Counter, - get_store_hit: Counter, - get_store_miss: Counter, - get_bytes: Counter, - get_request_time: Histogram, - put_requests_total: Counter, - put_bytes: Counter, - put_request_time: Histogram, - get_links_requests_total: Counter, - get_links_hit: Counter, - get_links_miss: Counter, - get_links_request_time: Histogram, -} - -impl fmt::Debug for Metrics { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Store Metrics").finish() - } -} - -impl Default for Metrics { - fn default() -> Self { - Self { - get_requests_total: Counter::default(), - get_store_hit: Counter::default(), - get_store_miss: Counter::default(), - get_bytes: Counter::default(), - get_request_time: Histogram::new(linear_buckets(0.0, 1.0, 1)), - put_requests_total: Counter::default(), - put_bytes: Counter::default(), - put_request_time: Histogram::new(linear_buckets(0.0, 1.0, 1)), - get_links_requests_total: Counter::default(), - get_links_hit: Counter::default(), - get_links_miss: Counter::default(), - get_links_request_time: Histogram::new(linear_buckets(0.0, 1.0, 1)), - } - } -} - -impl Metrics { - pub fn new(registry: &mut Registry) -> Self { - let sub_registry = registry.sub_registry_with_prefix("store"); - let get_requests_total = Counter::default(); - sub_registry.register( - METRICS_CNT_GET_REQUESTS_TOTAL, - "Total number of get requests", - get_requests_total.clone(), - ); - let get_store_hit = Counter::default(); - sub_registry.register( - METRICS_CNT_GET_STORE_HIT, - "Count store hits", - get_store_hit.clone(), - ); - let get_store_miss = Counter::default(); - sub_registry.register( - METRICS_CNT_GET_STORE_MISS, - "Count store miss", - get_store_miss.clone(), - ); - let get_bytes = Counter::default(); - sub_registry.register( - METRICS_CNT_GET_BYTES_TOTAL, - "Bytes served", - get_bytes.clone(), - ); - let get_request_time = Histogram::new(linear_buckets(0.0, 500.0, 240)); - sub_registry.register( - METRICS_HIST_GET_REQUEST_TIME, - "Histogram of get request times", - get_request_time.clone(), - ); - - let put_requests_total = Counter::default(); - sub_registry.register( - METRICS_CNT_PUT_REQUESTS_TOTAL, - "Total number of put requests", - put_requests_total.clone(), - ); - let put_bytes = Counter::default(); - sub_registry.register( - METRICS_CNT_PUT_BYTES_TOTAL, - "Bytes ingested", - put_bytes.clone(), - ); - let put_request_time = Histogram::new(linear_buckets(0.0, 500.0, 240)); - sub_registry.register( - METRICS_HIST_PUT_REQUEST_TIME, - "Histogram of put request times", - put_request_time.clone(), - ); - - let get_links_requests_total = Counter::default(); - sub_registry.register( - METRICS_CNT_GET_LINKS_REQUESTS_TOTAL, - "Total number of get links requests", - get_links_requests_total.clone(), - ); - let get_links_hit = Counter::default(); - sub_registry.register( - METRICS_CNT_GET_LINKS_HIT, - "Count links hits", - get_links_hit.clone(), - ); - let get_links_miss = Counter::default(); - sub_registry.register( - METRICS_CNT_GET_LINKS_MISS, - "Count links miss", - get_links_miss.clone(), - ); - let get_links_request_time = Histogram::new(linear_buckets(0.0, 500.0, 240)); - sub_registry.register( - METRICS_HIST_GET_LINKS_REQUEST_TIME, - "Histogram of get link request times", - get_links_request_time.clone(), - ); - - Self { - get_requests_total, - get_store_hit, - get_store_miss, - get_bytes, - get_request_time, - put_requests_total, - put_bytes, - put_request_time, - get_links_requests_total, - get_links_hit, - get_links_miss, - get_links_request_time, - } - } -} - -impl MetricsRecorder for Metrics { - fn record(&self, m: M, value: u64) - where - M: MetricType + std::fmt::Display, - { - if m.name() == StoreMetrics::GetRequests.name() { - self.get_requests_total.inc_by(value); - } else if m.name() == StoreMetrics::StoreHit.name() { - self.get_store_hit.inc_by(value); - } else if m.name() == StoreMetrics::StoreMiss.name() { - self.get_store_miss.inc_by(value); - } else if m.name() == StoreMetrics::GetBytes.name() { - self.get_bytes.inc_by(value); - } else if m.name() == StoreMetrics::PutRequests.name() { - self.put_requests_total.inc_by(value); - } else if m.name() == StoreMetrics::PutBytes.name() { - self.put_bytes.inc_by(value); - } else if m.name() == StoreMetrics::GetLinksRequests.name() { - self.get_links_requests_total.inc_by(value); - } else if m.name() == StoreMetrics::GetLinksHit.name() { - self.get_links_hit.inc_by(value); - } else if m.name() == StoreMetrics::GetLinksHit.name() { - self.get_links_miss.inc_by(value); - } else { - error!("record (store): unknown metric {}", m.name()); - } - } - - fn observe(&self, m: M, value: f64) - where - M: HistogramType + std::fmt::Display, - { - if m.name() == StoreHistograms::GetRequests.name() { - self.get_request_time.observe(value); - } else if m.name() == StoreHistograms::PutRequests.name() { - self.put_request_time.observe(value); - } else if m.name() == StoreHistograms::GetLinksRequests.name() { - self.get_links_request_time.observe(value); - } else { - error!("observe (store): unknown metric {}", m.name()); - } - } -} - -#[derive(Clone, Debug)] -pub enum StoreMetrics { - GetRequests, - StoreHit, - StoreMiss, - GetBytes, - PutRequests, - PutBytes, - GetLinksRequests, - GetLinksHit, - GetLinksMiss, -} - -impl MetricType for StoreMetrics { - fn name(&self) -> &'static str { - match self { - StoreMetrics::GetRequests => METRICS_CNT_GET_REQUESTS_TOTAL, - StoreMetrics::StoreHit => METRICS_CNT_GET_STORE_HIT, - StoreMetrics::StoreMiss => METRICS_CNT_GET_STORE_MISS, - StoreMetrics::GetBytes => METRICS_CNT_GET_BYTES_TOTAL, - StoreMetrics::PutRequests => METRICS_CNT_PUT_REQUESTS_TOTAL, - StoreMetrics::PutBytes => METRICS_CNT_PUT_BYTES_TOTAL, - StoreMetrics::GetLinksRequests => METRICS_CNT_GET_LINKS_REQUESTS_TOTAL, - StoreMetrics::GetLinksHit => METRICS_CNT_GET_LINKS_HIT, - StoreMetrics::GetLinksMiss => METRICS_CNT_GET_LINKS_MISS, - } - } -} - -impl MRecorder for StoreMetrics { - fn record(&self, value: u64) { - crate::record(Collector::Store, self.clone(), value); - } -} - -impl std::fmt::Display for StoreMetrics { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.name()) - } -} - -#[derive(Clone, Debug)] -pub enum StoreHistograms { - GetRequests, - PutRequests, - GetLinksRequests, -} - -impl HistogramType for StoreHistograms { - fn name(&self) -> &'static str { - match self { - StoreHistograms::GetRequests => METRICS_HIST_GET_REQUEST_TIME, - StoreHistograms::PutRequests => METRICS_HIST_PUT_REQUEST_TIME, - StoreHistograms::GetLinksRequests => METRICS_HIST_GET_LINKS_REQUEST_TIME, - } - } -} - -impl MObserver for StoreHistograms { - fn observe(&self, value: f64) { - crate::observe(Collector::Store, self.clone(), value); - } -} - -impl std::fmt::Display for StoreHistograms { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.name()) - } -} - -const METRICS_CNT_GET_REQUESTS_TOTAL: &str = "get_requests"; -const METRICS_CNT_GET_STORE_HIT: &str = "get_hit"; -const METRICS_CNT_GET_STORE_MISS: &str = "get_miss"; -const METRICS_CNT_GET_BYTES_TOTAL: &str = "bytes_out"; -const METRICS_HIST_GET_REQUEST_TIME: &str = "get_request_time"; -const METRICS_CNT_PUT_REQUESTS_TOTAL: &str = "put_requests"; -const METRICS_CNT_PUT_BYTES_TOTAL: &str = "bytes_in"; -const METRICS_HIST_PUT_REQUEST_TIME: &str = "put_request_time"; -const METRICS_CNT_GET_LINKS_REQUESTS_TOTAL: &str = "get_links_requests"; -const METRICS_CNT_GET_LINKS_HIT: &str = "get_links_hit"; -const METRICS_CNT_GET_LINKS_MISS: &str = "get_links_miss"; -const METRICS_HIST_GET_LINKS_REQUEST_TIME: &str = "get_links_request_time"; diff --git a/one/src/lib.rs b/one/src/lib.rs index 03ab5df48..1572c3bbf 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -13,13 +13,12 @@ use anyhow::{anyhow, Result}; use ceramic_core::{EventId, Interest, PeerId}; use ceramic_kubo_rpc::{dag, IpfsDep, IpfsPath, Multiaddr}; -use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle}; +use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle, Recorder}; use ceramic_p2p::{load_identity, DiskStorage, Keychain, Libp2pConfig}; use clap::{Args, Parser, Subcommand, ValueEnum}; use futures::StreamExt; use futures_util::future; use libipld::json::DagJsonCodec; -use libp2p::metrics::Recorder; use recon::{FullInterests, Recon, ReconInterestProvider, SQLiteStore, Server, Sha256a}; use signal_hook::consts::signal::*; use signal_hook_tokio::Signals; @@ -339,6 +338,11 @@ impl Daemon { ceramic_api_server::context::MakeAddContext::<_, EmptyContext>::new(ceramic_service); let kubo_rpc_server = ceramic_kubo_rpc::http::Server::new(self.ipfs.api()); + let kubo_rpc_metrics = + ceramic_metrics::MetricsHandle::register(ceramic_kubo_rpc::http::Metrics::register); + // Wrap server in metrics middleware + let kubo_rpc_server = + ceramic_kubo_rpc::http::MetricsMiddleware::new(kubo_rpc_server, kubo_rpc_metrics); let kubo_rpc_service = ceramic_kubo_rpc_server::server::MakeService::new(kubo_rpc_server); let kubo_rpc_service = MakeAllowAllAuthenticator::new(kubo_rpc_service, ""); let kubo_rpc_service = diff --git a/one/src/metrics.rs b/one/src/metrics.rs index b11d97dac..f5b81a434 100644 --- a/one/src/metrics.rs +++ b/one/src/metrics.rs @@ -2,12 +2,12 @@ use std::{convert::Infallible, net::SocketAddr}; use anyhow::Result; use ceramic_kubo_rpc::PeerId; +use ceramic_metrics::Recorder; use hyper::{ http::HeaderValue, service::{make_service_fn, service_fn}, Body, Request, Response, }; -use libp2p::metrics::Recorder; use prometheus_client::{encoding::EncodeLabelSet, metrics::counter::Counter}; use prometheus_client::{encoding::EncodeLabelValue, metrics::family::Family}; use prometheus_client::{metrics::info::Info, registry::Registry}; @@ -163,8 +163,10 @@ impl Recorder for Metrics { async fn handle(_req: Request) -> Result, Infallible> { let data = ceramic_metrics::MetricsHandle::encode(); let mut resp = Response::new(Body::from(data)); - resp.headers_mut() - .insert("Content-Type", HeaderValue::from_static("text/plain")); + resp.headers_mut().insert( + "Content-Type", + HeaderValue::from_static("application/openmetrics-text"), + ); Ok(resp) } diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index fb66d798c..3179c520c 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -22,7 +22,7 @@ futures-util.workspace = true futures.workspace = true git-version.workspace = true iroh-bitswap.workspace = true -ceramic-metrics = { workspace = true, features = ["bitswap", "p2p"] } +ceramic-metrics.workspace = true iroh-rpc-client.workspace = true iroh-rpc-types.workspace = true iroh-util.workspace = true