Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Commit

Permalink
Initial work towards observability (#174)
Browse files Browse the repository at this point in the history
This is the first step towards improving observability of Gevulot node.

This change adds some initial basic Prometheus metrics and a simple
optional HTTP server that serves the metrics.
  • Loading branch information
tuommaki authored Apr 4, 2024
1 parent ee2104a commit 458d0ed
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 8 deletions.
36 changes: 36 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,16 @@ futures-util = { version = "0.3", features = [ "io" ], optional = true }
gevulot-shim = { path = "../shim", default-features = false, optional = true }
home = { version = "0.5", optional = true}
http-body-util = { version = "0.1", optional = true }
hyper = { version = "1", features = ["full"], optional = true }
hyper = { version = "1.2", features = ["full"], optional = true }
hyper-util = { version = "0.1", features = ["full"], optional = true }
num_cpus = { version = "1.4.0", optional = true }
lazy_static = { version = "1.4", optional = true }
lru = "0.12.3"
num-traits = { version = "0.2", optional = true }
parking_lot = { version = "0.12", optional = true }
pea2pea = { version = "0.48", optional = true }
prometheus = { version = "0.13", optional = true }
prometheus-hyper = { version = "0.1", optional = true }
prost = { version = "0.11", optional = true }
qapi = { version = "0.14", features = [ "qmp", "async-tokio-net" ], optional = true }
snow = { version = "0.9", optional = true }
Expand All @@ -69,10 +72,13 @@ node-binary = [
"http-body-util",
"hyper",
"hyper-util",
"lazy_static",
"num_cpus",
"num-traits",
"parking_lot",
"pea2pea",
"prometheus",
"prometheus-hyper",
"prost",
"qapi",
"snow",
Expand Down
7 changes: 7 additions & 0 deletions crates/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ pub struct Config {
default_value = "127.0.0.1:8888"
)]
pub http_healthcheck_listen_addr: SocketAddr,

#[arg(
long,
long_help = "Metrics server listen address",
env = "GEVULOT_METRICS_LISTEN_ADDR"
)]
pub http_metrics_listen_addr: Option<SocketAddr>,
}

#[derive(Debug, Args)]
Expand Down
10 changes: 10 additions & 0 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use types::{transaction::Validated, Hash, Transaction};

mod cli;
mod mempool;
mod metrics;
mod networking;
mod rpc_server;
mod scheduler;
Expand Down Expand Up @@ -182,6 +183,15 @@ impl workflow::TransactionStore for storage::Database {
async fn run(config: Arc<Config>) -> Result<()> {
let node_key = read_node_key(&config.node_key_file)?;

// Register metrics counters.
metrics::register_metrics();

if let Some(http_metrics_bind_addr) = config.http_metrics_listen_addr {
// Start HTTP metrics server.
metrics::serve_metrics(http_metrics_bind_addr).await?;
tracing::info!("listening for metrics at {http_metrics_bind_addr}");
}

let database = Arc::new(Database::new(&config.db_url).await?);

// Launch the ACL whitelist syncing early in the startup.
Expand Down
115 changes: 115 additions & 0 deletions crates/node/src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use eyre::Result;
use prometheus_hyper::Server;
use std::{net::SocketAddr, sync::Arc};

use lazy_static::lazy_static;
use prometheus::{HistogramOpts, HistogramVec, IntCounter, IntGauge, Registry};

lazy_static! {
pub static ref REGISTRY: Arc<Registry> = Arc::new(Registry::new());

// RPC metrics.
pub static ref RPC_INCOMING_REQUESTS: IntCounter =
IntCounter::new("rpc_incoming_requests", "Incoming RPC Requests")
.expect("metric can be created");
pub static ref RPC_RESPONSE_TIME_COLLECTOR: HistogramVec = HistogramVec::new(
HistogramOpts::new("rpc_response_time", "RPC Response Times"),
&["method"]
)
.expect("metric can be created");

// P2P metrics.
pub static ref P2P_PROTOCOL_VERSION: IntGauge =
IntGauge::new("p2p_protocol_version", "P2P Protocol Version").expect("metric can be created");
pub static ref P2P_CONNECTED_PEERS: IntGauge =
IntGauge::new("p2p_connected_peers", "Connected P2P Peers").expect("metric can be created");
pub static ref P2P_INCOMING_MESSAGES: IntCounter =
IntCounter::new("p2p_incoming_messages", "Incoming P2P Messages")
.expect("metric can be created");


// Transaction metrics.
pub static ref TX_EXECUTION_TIME_COLLECTOR: HistogramVec = HistogramVec::new(
HistogramOpts::new("tx_execution_time", "Transaction Execution Times (ms)"),
&["kind","status"]
)
.expect("metric can be created");
pub static ref TX_SCHEDULING_REQUEUED: IntCounter =
IntCounter::new("tx_scheduling_requeued", "Transaction Requeued in Scheduling")
.expect("metric can be created");

// Resources metrics.
pub static ref CPUS_AVAILABLE: IntGauge =
IntGauge::new("gevulot_cpus_available", "Available CPUs in Gevulot")
.expect("metric can be created");
pub static ref MEM_AVAILABLE: IntGauge =
IntGauge::new("gevulot_mem_available", "Available MEM in Gevulot")
.expect("metric can be created");
pub static ref GPUS_AVAILABLE: IntGauge =
IntGauge::new("gevulot_gpus_available", "Available GPUs in Gevulot")
.expect("metric can be created");
pub static ref CPUS_TOTAL: IntGauge =
IntGauge::new("gevulot_cpus_total", "Total number of CPUs in Gevulot")
.expect("metric can be created");
pub static ref MEM_TOTAL: IntGauge =
IntGauge::new("gevulot_mem_total", "Total amount of MEM in Gevulot")
.expect("metric can be created");
pub static ref GPUS_TOTAL: IntGauge =
IntGauge::new("gevulot_gpus_total", "Total number of GPUs in Gevulot")
.expect("metric can be created");
}

pub(crate) fn register_metrics() {
REGISTRY
.register(Box::new(RPC_INCOMING_REQUESTS.clone()))
.expect("collector can be registered");

REGISTRY
.register(Box::new(RPC_RESPONSE_TIME_COLLECTOR.clone()))
.expect("collector can be registered");

REGISTRY
.register(Box::new(P2P_PROTOCOL_VERSION.clone()))
.expect("collector can be registered");
REGISTRY
.register(Box::new(P2P_CONNECTED_PEERS.clone()))
.expect("collector can be registered");
REGISTRY
.register(Box::new(P2P_INCOMING_MESSAGES.clone()))
.expect("collector can be registered");

REGISTRY
.register(Box::new(TX_EXECUTION_TIME_COLLECTOR.clone()))
.expect("collector can be registered");
REGISTRY
.register(Box::new(TX_SCHEDULING_REQUEUED.clone()))
.expect("collector can be registered");

REGISTRY
.register(Box::new(CPUS_AVAILABLE.clone()))
.expect("collector can be registered");
REGISTRY
.register(Box::new(MEM_AVAILABLE.clone()))
.expect("collector can be registered");
REGISTRY
.register(Box::new(GPUS_AVAILABLE.clone()))
.expect("collector can be registered");
REGISTRY
.register(Box::new(CPUS_TOTAL.clone()))
.expect("collector can be registered");
REGISTRY
.register(Box::new(MEM_TOTAL.clone()))
.expect("collector can be registered");
REGISTRY
.register(Box::new(GPUS_TOTAL.clone()))
.expect("collector can be registered");
}

pub(crate) async fn serve_metrics(bind_addr: SocketAddr) -> Result<()> {
// Start Server endlessly.
tokio::spawn(async move {
Server::run(REGISTRY.clone(), bind_addr, futures_util::future::pending()).await
});

Ok(())
}
18 changes: 14 additions & 4 deletions crates/node/src/networking/p2p/pea2pea.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::metrics;
use crate::txvalidation::P2pSender;
use crate::txvalidation::TxEventSender;
use futures_util::Stream;
Expand Down Expand Up @@ -49,6 +50,8 @@ pub struct P2P {

// Send Tx to the process loop.
tx_sender: TxEventSender<P2pSender>,

protocol_version: u64,
}

impl Pea2Pea for P2P {
Expand Down Expand Up @@ -97,6 +100,7 @@ impl P2P {
http_port,
nat_listen_addr,
tx_sender,
protocol_version: 0,
};

// Enable node functionalities.
Expand All @@ -105,6 +109,8 @@ impl P2P {
instance.enable_writing().await;
instance.enable_disconnect().await;

metrics::P2P_PROTOCOL_VERSION.set(instance.protocol_version as i64);

// Start a new Tx stream loop.
tokio::spawn({
let p2p = instance.clone();
Expand Down Expand Up @@ -255,8 +261,8 @@ impl Handshake for P2P {

let peer_handshake_msg: protocol::Handshake = match node_conn_side {
ConnectionSide::Initiator => {
// Send protocol version. Set to 0 .
stream.write_u64(0).await?;
// Send protocol version.
stream.write_u64(self.protocol_version).await?;
// Get Responder protocol version.
let _protocol_version = stream.read_u64().await?;

Expand Down Expand Up @@ -288,8 +294,8 @@ impl Handshake for P2P {
ConnectionSide::Responder => {
// Get Initiator protocol version.
let _protocol_version = stream.read_u64().await?;
// Send protocol version. Set to 0 .
stream.write_u64(0).await?;
// Send protocol version.
stream.write_u64(self.protocol_version).await?;

// Receive the handshake message from the connecting peer.
let buffer_len = stream.read_u32().await? as usize;
Expand Down Expand Up @@ -404,6 +410,8 @@ impl Handshake for P2P {
.await
.insert(handshake_msg.my_p2p_listen_addr, handshake_msg.http_port);

metrics::P2P_CONNECTED_PEERS.inc();

Ok(conn)
}
}
Expand All @@ -421,6 +429,8 @@ impl Reading for P2P {
async fn process_message(&self, source: SocketAddr, message: Self::Message) -> io::Result<()> {
tracing::debug!(parent: self.node().span(), "decrypted a message from {}", source);

metrics::P2P_INCOMING_MESSAGES.inc();

match bincode::deserialize(message.as_ref()) {
Ok(protocol::Message::V0(msg)) => match msg {
protocol::MessageV0::Transaction(tx) => {
Expand Down
Loading

0 comments on commit 458d0ed

Please sign in to comment.