Skip to content

Commit

Permalink
feat: support 2+2 and /status/buildinfo (#3604)
Browse files Browse the repository at this point in the history
* feat: implement buildinfo endpoint

Signed-off-by: Ruihang Xia <[email protected]>

* refactor prom result struct

Signed-off-by: Ruihang Xia <[email protected]>

* add more integration test

Signed-off-by: Ruihang Xia <[email protected]>

* format toml file

Signed-off-by: Ruihang Xia <[email protected]>

* Update src/servers/src/http/prometheus_resp.rs

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Mar 29, 2024
1 parent 63681f0 commit 77cc721
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 37 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ reqwest = { version = "0.11", default-features = false, features = [
] }
rskafka = "0.5"
rust_decimal = "1.33"
schemars = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["float_roundtrip"] }
serde_with = "3"
Expand Down
5 changes: 5 additions & 0 deletions src/common/version/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,10 @@ license.workspace = true
[lints]
workspace = true

[features]
codec = ["dep:serde", "dep:schemars"]

[dependencies]
build-data = "0.1.4"
schemars = { workspace = true, optional = true }
serde = { workspace = true, optional = true }
5 changes: 5 additions & 0 deletions src/common/version/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ use std::sync::OnceLock;

const UNKNOWN: &str = "unknown";

#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(
feature = "codec",
derive(serde::Serialize, serde::Deserialize, schemars::JsonSchema)
)]
pub struct BuildInfo {
pub branch: Cow<'static, str>,
pub commit: Cow<'static, str>,
Expand Down
3 changes: 2 additions & 1 deletion src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version = { workspace = true, features = ["codec"] }
dashmap.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
Expand Down Expand Up @@ -83,7 +84,7 @@ rust-embed = { version = "6.6", features = ["debug-embed"] }
rustls = "0.22"
rustls-pemfile = "2.0"
rustls-pki-types = "1.0"
schemars = "0.8"
schemars.workspace = true
secrecy = { version = "0.8", features = ["serde", "alloc"] }
serde.workspace = true
serde_json.workspace = true
Expand Down
4 changes: 3 additions & 1 deletion src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ use crate::http::greptime_result_v1::GreptimedbV1Response;
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::http::influxdb_result_v1::InfluxdbV1Response;
use crate::http::prometheus::{
format_query, instant_query, label_values_query, labels_query, range_query, series_query,
build_info_query, format_query, instant_query, label_values_query, labels_query, range_query,
series_query,
};
use crate::metrics::http_metrics_layer;
use crate::metrics_handler::MetricsHandler;
Expand Down Expand Up @@ -682,6 +683,7 @@ impl HttpServer {
"/format_query",
routing::post(format_query).get(format_query),
)
.route("/status/buildinfo", routing::get(build_info_query))
.route("/query", routing::post(instant_query).get(instant_query))
.route("/query_range", routing::post(range_query).get(range_query))
.route("/labels", routing::post(labels_query).get(labels_query))
Expand Down
46 changes: 41 additions & 5 deletions src/servers/src/http/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use common_query::{Output, OutputData};
use common_recordbatch::RecordBatches;
use common_telemetry::tracing;
use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
use common_version::BuildInfo;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Float64Vector, StringVector};
Expand All @@ -51,21 +52,42 @@ use crate::http::header::collect_plan_metrics;
use crate::prom_store::METRIC_NAME_LABEL;
use crate::prometheus_handler::PrometheusHandlerRef;

/// For [ValueType::Vector] result type
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
pub struct PromSeries {
pub struct PromSeriesVector {
pub metric: HashMap<String, String>,
/// For [ValueType::Matrix] result type
pub values: Vec<(f64, String)>,
/// For [ValueType::Vector] result type
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<(f64, String)>,
}

/// For [ValueType::Matrix] result type
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
pub struct PromSeriesMatrix {
pub metric: HashMap<String, String>,
pub values: Vec<(f64, String)>,
}

/// Variants corresponding to [ValueType]
#[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq)]
#[serde(untagged)]
pub enum PromQueryResult {
Matrix(Vec<PromSeriesMatrix>),
Vector(Vec<PromSeriesVector>),
Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
}

impl Default for PromQueryResult {
fn default() -> Self {
PromQueryResult::Matrix(Default::default())
}
}

#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
pub struct PromData {
#[serde(rename = "resultType")]
pub result_type: String,
pub result: Vec<PromSeries>,
pub result: PromQueryResult,
}

#[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq)]
Expand All @@ -76,6 +98,7 @@ pub enum PrometheusResponse {
Series(Vec<HashMap<String, String>>),
LabelValues(Vec<String>),
FormatQuery(String),
BuildInfo(BuildInfo),
}

impl Default for PrometheusResponse {
Expand Down Expand Up @@ -113,6 +136,19 @@ pub async fn format_query(
}
}

#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct BuildInfoQuery {}

#[axum_macros::debug_handler]
#[tracing::instrument(
skip_all,
fields(protocol = "prometheus", request_type = "build_info_query")
)]
pub async fn build_info_query() -> PrometheusJsonResponse {
let build_info = common_version::build_info().clone();
PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info))
}

#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct InstantQuery {
query: Option<String>,
Expand Down
45 changes: 29 additions & 16 deletions src/servers/src/http/prometheus_resp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use serde_json::Value;
use snafu::{OptionExt, ResultExt};

use super::header::{collect_plan_metrics, GREPTIME_DB_HEADER_METRICS};
use super::prometheus::{PromData, PromSeries, PrometheusResponse};
use super::prometheus::{
PromData, PromQueryResult, PromSeriesMatrix, PromSeriesVector, PrometheusResponse,
};
use crate::error::{CollectRecordbatchSnafu, InternalSnafu, Result};

#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
Expand Down Expand Up @@ -256,24 +258,35 @@ impl PrometheusJsonResponse {
}
}

let result = buffer
.into_iter()
.map(|(tags, mut values)| {
let metric = tags.into_iter().collect();
match result_type {
ValueType::Vector | ValueType::Scalar | ValueType::String => Ok(PromSeries {
// initialize result to return
let mut result = match result_type {
ValueType::Vector => PromQueryResult::Vector(vec![]),
ValueType::Matrix => PromQueryResult::Matrix(vec![]),
ValueType::Scalar => PromQueryResult::Scalar(None),
ValueType::String => PromQueryResult::String(None),
};

// accumulate data into result
buffer.into_iter().for_each(|(tags, mut values)| {
let metric = tags.into_iter().collect();
match result {
PromQueryResult::Vector(ref mut v) => {
v.push(PromSeriesVector {
metric,
value: values.pop(),
..Default::default()
}),
ValueType::Matrix => Ok(PromSeries {
metric,
values,
..Default::default()
}),
});
}
})
.collect::<Result<Vec<_>>>()?;
PromQueryResult::Matrix(ref mut v) => {
v.push(PromSeriesMatrix { metric, values });
}
PromQueryResult::Scalar(ref mut v) => {
*v = values.pop();
}
PromQueryResult::String(ref mut _v) => {
// TODO(ruihang): Not supported yet
}
}
});

let result_type_string = result_type.to_string();
let data = PrometheusResponse::PromData(PromData {
Expand Down
29 changes: 15 additions & 14 deletions tests-integration/tests/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use common_catalog::consts::MITO_ENGINE;
use common_query::Output;
use common_recordbatch::RecordBatches;
use servers::grpc::GrpcServerConfig;
use servers::http::prometheus::{PromData, PromSeries, PrometheusJsonResponse, PrometheusResponse};
use servers::http::prometheus::{
PromData, PromQueryResult, PromSeriesMatrix, PromSeriesVector, PrometheusJsonResponse,
PrometheusResponse,
};
use servers::server::Server;
use tests_integration::test_util::{
setup_grpc_server, setup_grpc_server_with, setup_grpc_server_with_user_provider, StorageType,
Expand Down Expand Up @@ -465,6 +468,8 @@ pub async fn test_health_check(store_type: StorageType) {
}

pub async fn test_prom_gateway_query(store_type: StorageType) {
common_telemetry::init_default_ut_logging();

// prepare connection
let (addr, mut guard, fe_grpc_server) = setup_grpc_server(store_type, "prom_gateway").await;
let grpc_client = Client::with_urls(vec![addr]);
Expand Down Expand Up @@ -516,28 +521,26 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
status: "success".to_string(),
data: PrometheusResponse::PromData(PromData {
result_type: "vector".to_string(),
result: vec![
PromSeries {
result: PromQueryResult::Vector(vec![
PromSeriesVector {
metric: [
("k".to_string(), "a".to_string()),
("__name__".to_string(), "test".to_string()),
]
.into_iter()
.collect(),
value: Some((5.0, "2".to_string())),
..Default::default()
},
PromSeries {
PromSeriesVector {
metric: [
("__name__".to_string(), "test".to_string()),
("k".to_string(), "b".to_string()),
]
.into_iter()
.collect(),
value: Some((5.0, "1".to_string())),
..Default::default()
},
],
]),
}),
error: None,
error_type: None,
Expand Down Expand Up @@ -568,28 +571,26 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
status: "success".to_string(),
data: PrometheusResponse::PromData(PromData {
result_type: "matrix".to_string(),
result: vec![
PromSeries {
result: PromQueryResult::Matrix(vec![
PromSeriesMatrix {
metric: [
("__name__".to_string(), "test".to_string()),
("k".to_string(), "a".to_string()),
]
.into_iter()
.collect(),
values: vec![(5.0, "2".to_string()), (10.0, "2".to_string())],
..Default::default()
},
PromSeries {
PromSeriesMatrix {
metric: [
("__name__".to_string(), "test".to_string()),
("k".to_string(), "b".to_string()),
]
.into_iter()
.collect(),
values: vec![(5.0, "1".to_string()), (10.0, "1".to_string())],
..Default::default()
},
],
]),
}),
error: None,
error_type: None,
Expand Down Expand Up @@ -620,7 +621,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
status: "success".to_string(),
data: PrometheusResponse::PromData(PromData {
result_type: "matrix".to_string(),
result: vec![],
result: PromQueryResult::Matrix(vec![]),
}),
error: None,
error_type: None,
Expand Down
25 changes: 25 additions & 0 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,22 @@ pub async fn test_prom_http_api(store_type: StorageType) {
.await;
assert_eq!(res.status(), StatusCode::OK);

// instant query 1+1
let res = client
.get("/v1/prometheus/api/v1/query?query=1%2B1&time=1")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(
json!({"resultType":"scalar","result":[1.0,"2"]})
)
.unwrap()
);

// range query
let res = client
.get("/v1/prometheus/api/v1/query_range?query=up&start=1&end=100&step=5")
Expand Down Expand Up @@ -539,6 +555,15 @@ pub async fn test_prom_http_api(store_type: StorageType) {
assert!(prom_resp.error.is_none());
assert!(prom_resp.error_type.is_none());

// buildinfo
let res = client
.get("/v1/prometheus/api/v1/status/buildinfo")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");

guard.remove_all().await;
}

Expand Down

0 comments on commit 77cc721

Please sign in to comment.