Skip to content

Commit

Permalink
feat: add kubo rpc api metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Oct 31, 2023
1 parent 3dac28d commit f25a445
Show file tree
Hide file tree
Showing 13 changed files with 329 additions and 374 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion beetle/iroh-bitswap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async-stream.workspace = true
async-trait.workspace = true
asynchronous-codec.workspace = true
bytes.workspace = true
ceramic-metrics = { workspace = true, features = ["bitswap"] }
ceramic-metrics.workspace = true
cid.workspace = true
deadqueue.workspace = true
derivative.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions kubo-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ http = [
"dep:pin-project",
"dep:serde",
"dep:serde_json",
"dep:prometheus-client",
]

[dependencies]
Expand All @@ -23,6 +24,7 @@ async-trait.workspace = true
bytes.workspace = true
ceramic-kubo-rpc-server = { workspace = true, optional = true }
ceramic-metadata.workspace = true
ceramic-metrics.workspace = true
ceramic-p2p.workspace = true
cid.workspace = true
dag-jose.workspace = true
Expand All @@ -37,6 +39,7 @@ libp2p-identity.workspace = true
libp2p.workspace = true
multiaddr.workspace = true
pin-project = { version = "1.1.3", optional = true }
prometheus-client = { workspace = true, optional = true }
serde = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
swagger.workspace = true
Expand Down
7 changes: 6 additions & 1 deletion kubo-rpc/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
//! Provides an http implementation of the Kubo RPC methods.
mod metrics;
mod stream_drop;

use std::{collections::HashSet, io::Cursor, marker::PhantomData, str::FromStr, time::Duration};
pub use metrics::{api::MetricsMiddleware, Metrics};

use std::{
collections::HashSet, io::Cursor, marker::PhantomData, str::FromStr, sync::Arc, time::Duration,
};

use anyhow::anyhow;
use async_trait::async_trait;
Expand Down
78 changes: 78 additions & 0 deletions kubo-rpc/src/http/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
pub mod api;

use std::time::Duration;

use ceramic_kubo_rpc_server::{Api, API_VERSION, BASE_PATH};
use ceramic_metrics::Recorder;
use prometheus_client::{
encoding::EncodeLabelSet,
metrics::{
counter::Counter,
family::Family,
histogram::{exponential_buckets, Histogram},
info::Info,
},
registry::Registry,
};

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct RequestLabels {
path: &'static str,
}

/// Metrics for Kubo RPC API
#[derive(Clone)]
pub struct Metrics {
requests: Family<RequestLabels, Counter>,
request_durations: Family<RequestLabels, Histogram>,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct InfoLabels {
base_path: &'static str,
version: &'static str,
}

impl Metrics {
pub fn register(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("kubo_rpc");

let requests = Family::<RequestLabels, Counter>::default();
sub_registry.register("requests", "Number of HTTP requests", requests.clone());

let request_durations = Family::<RequestLabels, Histogram>::new_with_constructor(|| {
Histogram::new(exponential_buckets(0.005, 2.0, 20))
});
sub_registry.register(
"request_durations",
"Duration of HTTP requests",
request_durations.clone(),
);

let info: Info<InfoLabels> = Info::new(InfoLabels {
base_path: BASE_PATH,
version: API_VERSION,
});
sub_registry.register("api", "Information about the Kubo RPC API", info);

Self {
requests,
request_durations,
}
}
}

pub struct Event {
pub(crate) path: &'static str,
pub(crate) duration: Duration,
}

impl Recorder<Event> for Metrics {
fn record(&self, event: &Event) {
let labels = RequestLabels { path: event.path };
self.requests.get_or_create(&labels).inc();
self.request_durations
.get_or_create(&labels)
.observe(event.duration.as_secs_f64());
}
}
214 changes: 214 additions & 0 deletions kubo-rpc/src/http/metrics/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
use std::sync::Arc;

use async_trait::async_trait;
use ceramic_kubo_rpc_server::{
models, Api, BlockGetPostResponse, BlockPutPostResponse, BlockStatPostResponse,
DagGetPostResponse, DagImportPostResponse, DagPutPostResponse, DagResolvePostResponse,
IdPostResponse, PinAddPostResponse, PinRmPostResponse, PubsubLsPostResponse,
PubsubPubPostResponse, PubsubSubPostResponse, SwarmConnectPostResponse, SwarmPeersPostResponse,
VersionPostResponse,
};
use ceramic_metrics::Recorder;
use futures_util::Future;
use swagger::{ApiError, ByteArray};
use tokio::time::Instant;

use crate::http::{metrics::Event, Metrics};

/// Implement the API and record metrics
#[derive(Clone)]
pub struct MetricsMiddleware<A: Clone> {
api: A,
metrics: Metrics,
}

impl<A: Clone> MetricsMiddleware<A> {
/// Construct a new MetricsMiddleware.
/// The metrics should have already be registered.
pub fn new(api: A, metrics: Metrics) -> Self {
Self { api, metrics }
}
// Record metrics for a given API endpoint
async fn record<T>(&self, path: &'static str, fut: impl Future<Output = T>) -> T {
let start = Instant::now();
let ret = fut.await;
let duration = start.elapsed();
let event = Event { path, duration };
self.metrics.record(&event);
ret
}
}

#[async_trait]
impl<A, C> Api<C> for MetricsMiddleware<A>
where
A: Api<C>,
A: Clone + Send + Sync,
C: Send + Sync,
{
/// Get a single IPFS block
async fn block_get_post(
&self,
arg: String,
timeout: Option<String>,
offline: Option<bool>,
context: &C,
) -> Result<BlockGetPostResponse, ApiError> {
self.record(
"/block/get",
self.api.block_get_post(arg, timeout, offline, context),
)
.await
}

/// Put a single IPFS block
async fn block_put_post(
&self,
file: ByteArray,
cid_codec: Option<models::Codecs>,
mhtype: Option<models::Multihash>,
pin: Option<bool>,
context: &C,
) -> Result<BlockPutPostResponse, ApiError> {
self.record(
"/block/put",
self.api
.block_put_post(file, cid_codec, mhtype, pin, context),
)
.await
}

/// Report statistics about a block
async fn block_stat_post(
&self,
arg: String,
context: &C,
) -> Result<BlockStatPostResponse, ApiError> {
self.record("/block/stat", self.api.block_stat_post(arg, context))
.await
}

/// Get an IPLD node from IPFS
async fn dag_get_post(
&self,
arg: String,
output_codec: Option<models::Codecs>,
context: &C,
) -> Result<DagGetPostResponse, ApiError> {
self.record(
"/dag/get",
self.api.dag_get_post(arg, output_codec, context),
)
.await
}

/// Import a CAR file of IPLD nodes into IPFS
async fn dag_import_post(
&self,
file: ByteArray,
context: &C,
) -> Result<DagImportPostResponse, ApiError> {
self.record("/dag/import", self.api.dag_import_post(file, context))
.await
}

/// Put an IPLD node into IPFS
async fn dag_put_post(
&self,
file: ByteArray,
store_codec: Option<models::Codecs>,
input_codec: Option<models::Codecs>,
context: &C,
) -> Result<DagPutPostResponse, ApiError> {
self.record(
"/dag/put",
self.api
.dag_put_post(file, store_codec, input_codec, context),
)
.await
}

/// Resolve an IPFS path to a DAG node
async fn dag_resolve_post(
&self,
arg: String,
context: &C,
) -> Result<DagResolvePostResponse, ApiError> {
self.record("/dag/resolve", self.api.dag_resolve_post(arg, context))
.await
}

/// Report identifying information about a node
async fn id_post(&self, arg: Option<String>, context: &C) -> Result<IdPostResponse, ApiError> {
self.record("/id", self.api.id_post(arg, context)).await
}

/// Add a block to the pin store
async fn pin_add_post(
&self,
arg: String,
recursive: Option<bool>,
progress: Option<bool>,
context: &C,
) -> Result<PinAddPostResponse, ApiError> {
self.record(
"/pin/add",
self.api.pin_add_post(arg, recursive, progress, context),
)
.await
}

/// Remove a block from the pin store
async fn pin_rm_post(&self, arg: String, context: &C) -> Result<PinRmPostResponse, ApiError> {
self.record("/pin/rm", self.pin_rm_post(arg, context)).await
}

/// List topic with active subscriptions
async fn pubsub_ls_post(&self, context: &C) -> Result<PubsubLsPostResponse, ApiError> {
self.record("/pubsub/ls", self.api.pubsub_ls_post(context))
.await
}

/// Publish a message to a topic
async fn pubsub_pub_post(
&self,
arg: String,
file: ByteArray,
context: &C,
) -> Result<PubsubPubPostResponse, ApiError> {
self.record("/pubsub/pub", self.api.pubsub_pub_post(arg, file, context))
.await
}

/// Subscribe to a topic, blocks until a message is received
async fn pubsub_sub_post(
&self,
arg: String,
context: &C,
) -> Result<PubsubSubPostResponse, ApiError> {
self.record("/pubsub/sub", self.api.pubsub_sub_post(arg, context))
.await
}

/// Connect to peers
async fn swarm_connect_post(
&self,
arg: &Vec<String>,
context: &C,
) -> Result<SwarmConnectPostResponse, ApiError> {
self.record("/swarm/connect", self.api.swarm_connect_post(arg, context))
.await
}

/// Report connected peers
async fn swarm_peers_post(&self, context: &C) -> Result<SwarmPeersPostResponse, ApiError> {
self.record("/swarm/peers", self.api.swarm_peers_post(context))
.await
}

/// Report server version
async fn version_post(&self, context: &C) -> Result<VersionPostResponse, ApiError> {
self.record("/version", self.api.version_post(context))
.await
}
}
5 changes: 0 additions & 5 deletions metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,7 @@ features = [
"autonat",
"tokio",
]
optional = true

[features]
bitswap = []
store = []
p2p = ["libp2p"]

# requires setting RUSTFLAGS="--cfg tokio_unstable"
tokio-console = ["tokio/tracing", "console-subscriber"]
Loading

0 comments on commit f25a445

Please sign in to comment.