From f08a35d6b9ecb8735dd2ed2fe6e018218a54f204 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 25 Oct 2023 16:19:30 +0800 Subject: [PATCH] feat: implement `histogram_quantile` in PromQL (#2651) * add to planner Signed-off-by: Ruihang Xia * impl evaluate_array Signed-off-by: Ruihang Xia * compute quantile Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * fix required input ordering Signed-off-by: Ruihang Xia * add more tests Signed-off-by: Ruihang Xia * todo to fixme Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/promql/src/error.rs | 6 +- src/promql/src/extension_plan.rs | 1 + .../src/extension_plan/histogram_fold.rs | 328 ++++++++++++------ src/promql/src/extension_plan/planner.rs | 3 + src/promql/src/planner.rs | 88 ++++- .../common/promql/simple_histogram.result | 246 +++++++++++++ .../common/promql/simple_histogram.sql | 134 +++++++ 7 files changed, 693 insertions(+), 113 deletions(-) create mode 100644 tests/cases/standalone/common/promql/simple_histogram.result create mode 100644 tests/cases/standalone/common/promql/simple_histogram.sql diff --git a/src/promql/src/error.rs b/src/promql/src/error.rs index 01f48c7d2478..31c44e5715e8 100644 --- a/src/promql/src/error.rs +++ b/src/promql/src/error.rs @@ -109,6 +109,9 @@ pub enum Error { #[snafu(display("Expect a metric matcher, but not found"))] NoMetricMatcher { location: Location }, + + #[snafu(display("Invalid function argument for {}", fn_name))] + FunctionInvalidArgument { fn_name: String, location: Location }, } impl ErrorExt for Error { @@ -124,7 +127,8 @@ impl ErrorExt for Error { | ExpectRangeSelector { .. } | ZeroRangeSelector { .. } | ColumnNotFound { .. } - | Deserialize { .. } => StatusCode::InvalidArguments, + | Deserialize { .. } + | FunctionInvalidArgument { .. } => StatusCode::InvalidArguments, UnknownTable { .. } | DataFusionPlanning { .. } diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index 2bc1abaf3648..49a9199bf0cc 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -22,6 +22,7 @@ mod series_divide; use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType}; pub use empty_metric::{build_special_time_expr, EmptyMetric, EmptyMetricExec, EmptyMetricStream}; +pub use histogram_fold::{HistogramFold, HistogramFoldExec, HistogramFoldStream}; pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream}; pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream}; pub use planner::PromExtensionPlanner; diff --git a/src/promql/src/extension_plan/histogram_fold.rs b/src/promql/src/extension_plan/histogram_fold.rs index 9d6d5340f33f..a4fb2b315fe9 100644 --- a/src/promql/src/extension_plan/histogram_fold.rs +++ b/src/promql/src/extension_plan/histogram_fold.rs @@ -22,14 +22,14 @@ use common_recordbatch::RecordBatch as GtRecordBatch; use common_telemetry::warn; use datafusion::arrow::array::AsArray; use datafusion::arrow::compute::{self, concat_batches, SortOptions}; -use datafusion::arrow::datatypes::{DataType, Field, Float64Type, SchemaRef}; +use datafusion::arrow::datatypes::{DataType, Float64Type, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::common::{DFField, DFSchema, DFSchemaRef}; +use datafusion::common::{DFSchema, DFSchemaRef}; use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::execution::TaskContext; use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement}; -use datafusion::physical_plan::expressions::Column as PhyColumn; +use datafusion::physical_plan::expressions::{CastExpr as PhyCast, Column as PhyColumn}; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr, @@ -38,7 +38,7 @@ use datafusion::physical_plan::{ use datafusion::prelude::{Column, Expr}; use datatypes::prelude::{ConcreteDataType, DataType as GtDataType}; use datatypes::schema::Schema as GtSchema; -use datatypes::value::{ListValue, Value}; +use datatypes::value::{OrderedF64, ValueRef}; use datatypes::vectors::MutableVector; use futures::{ready, Stream, StreamExt}; @@ -56,7 +56,7 @@ use futures::{ready, Stream, StreamExt}; /// - The value set of `le` should be same. I.e., buckets of every series should be same. /// /// [1]: https://prometheus.io/docs/concepts/metric_types/#histogram -#[derive(Debug, PartialEq, Eq, Hash)] +#[derive(Debug, PartialEq, Hash, Eq)] pub struct HistogramFold { /// Name of the `le` column. It's a special column in prometheus /// for implementing conventional histogram. It's a string column @@ -65,6 +65,7 @@ pub struct HistogramFold { ts_column: String, input: LogicalPlan, field_column: String, + quantile: OrderedF64, output_schema: DFSchemaRef, } @@ -88,8 +89,8 @@ impl UserDefinedLogicalNodeCore for HistogramFold { fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "HistogramFold: le={}, field={}", - self.le_column, self.field_column + "HistogramFold: le={}, field={}, quantile={}", + self.le_column, self.field_column, self.quantile ) } @@ -99,6 +100,7 @@ impl UserDefinedLogicalNodeCore for HistogramFold { ts_column: self.ts_column.clone(), input: inputs[0].clone(), field_column: self.field_column.clone(), + quantile: self.quantile, // This method cannot return error. Otherwise we should re-calculate // the output schema output_schema: self.output_schema.clone(), @@ -107,21 +109,22 @@ impl UserDefinedLogicalNodeCore for HistogramFold { } impl HistogramFold { - #[allow(dead_code)] pub fn new( le_column: String, field_column: String, ts_column: String, + quantile: f64, input: LogicalPlan, ) -> DataFusionResult { let input_schema = input.schema(); Self::check_schema(input_schema, &le_column, &field_column, &ts_column)?; - let output_schema = Self::convert_schema(input_schema, &le_column, &field_column)?; + let output_schema = Self::convert_schema(input_schema, &le_column)?; Ok(Self { le_column, ts_column, input, field_column, + quantile: quantile.into(), output_schema, }) } @@ -158,7 +161,6 @@ impl HistogramFold { check_column(field_column) } - #[allow(dead_code)] pub fn to_execution_plan(&self, exec_input: Arc) -> Arc { let input_schema = self.input.schema(); // safety: those fields are checked in `check_schema()` @@ -180,6 +182,7 @@ impl HistogramFold { field_column_index, ts_column_index, input: exec_input, + quantile: self.quantile.into(), output_schema: Arc::new(self.output_schema.as_ref().into()), metric: ExecutionPlanMetricsSet::new(), }) @@ -187,46 +190,17 @@ impl HistogramFold { /// Transform the schema /// - /// - `le` will become a [ListArray] of [f64]. With each bucket bound parsed - /// - `field` will become a [ListArray] of [f64] + /// - `le` will be removed fn convert_schema( input_schema: &DFSchemaRef, le_column: &str, - field_column: &str, ) -> DataFusionResult { let mut fields = input_schema.fields().clone(); // safety: those fields are checked in `check_schema()` let le_column_idx = input_schema .index_of_column_by_name(None, le_column)? .unwrap(); - let field_column_idx = input_schema - .index_of_column_by_name(None, field_column)? - .unwrap(); - - // transform `le` - let le_field: Field = fields[le_column_idx].field().as_ref().clone(); - let le_field = le_field.with_data_type(DataType::Float64); - let folded_le_datatype = DataType::List(Arc::new(le_field)); - let folded_le = DFField::new( - fields[le_column_idx].qualifier().cloned(), - fields[le_column_idx].name(), - folded_le_datatype, - false, - ); - - // transform `field` - // to avoid ambiguity, that field will be referenced as `the_field` below. - let the_field: Field = fields[field_column_idx].field().as_ref().clone(); - let folded_field_datatype = DataType::List(Arc::new(the_field)); - let folded_field = DFField::new( - fields[field_column_idx].qualifier().cloned(), - fields[field_column_idx].name(), - folded_field_datatype, - false, - ); - - fields[le_column_idx] = folded_le; - fields[field_column_idx] = folded_field; + fields.remove(le_column_idx); Ok(Arc::new(DFSchema::new_with_metadata( fields, @@ -244,6 +218,7 @@ pub struct HistogramFoldExec { /// Index for field column in the schema of input. field_column_index: usize, ts_column_index: usize, + quantile: f64, metric: ExecutionPlanMetricsSet, } @@ -275,9 +250,13 @@ impl ExecutionPlan for HistogramFoldExec { .collect::>(); // add le ASC cols.push(PhysicalSortRequirement { - expr: Arc::new(PhyColumn::new( - self.output_schema.field(self.le_column_index).name(), - self.le_column_index, + expr: Arc::new(PhyCast::new( + Arc::new(PhyColumn::new( + self.input.schema().field(self.le_column_index).name(), + self.le_column_index, + )), + DataType::Float64, + None, )), options: Some(SortOptions { descending: false, // +INF in the last @@ -287,7 +266,7 @@ impl ExecutionPlan for HistogramFoldExec { // add ts cols.push(PhysicalSortRequirement { expr: Arc::new(PhyColumn::new( - self.output_schema.field(self.ts_column_index).name(), + self.input.schema().field(self.ts_column_index).name(), self.ts_column_index, )), options: None, @@ -320,6 +299,7 @@ impl ExecutionPlan for HistogramFoldExec { metric: self.metric.clone(), le_column_index: self.le_column_index, ts_column_index: self.ts_column_index, + quantile: self.quantile, output_schema: self.output_schema.clone(), field_column_index: self.field_column_index, })) @@ -336,12 +316,13 @@ impl ExecutionPlan for HistogramFoldExec { let input = self.input.execute(partition, context)?; let output_schema = self.output_schema.clone(); - let mut normal_indices = (0..output_schema.fields().len()).collect::>(); - normal_indices.remove(&self.le_column_index); + let mut normal_indices = (0..input.schema().fields().len()).collect::>(); normal_indices.remove(&self.field_column_index); + normal_indices.remove(&self.le_column_index); Ok(Box::pin(HistogramFoldStream { le_column_index: self.le_column_index, field_column_index: self.field_column_index, + quantile: self.quantile, normal_indices: normal_indices.into_iter().collect(), bucket_size: None, input_buffer: vec![], @@ -350,7 +331,10 @@ impl ExecutionPlan for HistogramFoldExec { metric: baseline_metric, batch_size, input_buffered_rows: 0, - output_buffer: HistogramFoldStream::empty_output_buffer(&self.output_schema)?, + output_buffer: HistogramFoldStream::empty_output_buffer( + &self.output_schema, + self.le_column_index, + )?, output_buffered_rows: 0, })) } @@ -399,8 +383,8 @@ impl DisplayAs for HistogramFoldExec { DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( f, - "HistogramFoldExec: le=@{}, field=@{}", - self.le_column_index, self.field_column_index + "HistogramFoldExec: le=@{}, field=@{}, quantile={}", + self.le_column_index, self.field_column_index, self.quantile ) } } @@ -411,7 +395,8 @@ pub struct HistogramFoldStream { // internal states le_column_index: usize, field_column_index: usize, - /// Columns need not folding + quantile: f64, + /// Columns need not folding. This indices is based on input schema normal_indices: Vec, bucket_size: Option, /// Expected output batch size @@ -485,15 +470,25 @@ impl HistogramFoldStream { Ok(None) } + /// Generate a group of empty [MutableVector]s from the output schema. + /// + /// For simplicity, this method will insert a placeholder for `le`. So that + /// the output buffers has the same schema with input. This placeholder needs + /// to be removed before returning the output batch. pub fn empty_output_buffer( schema: &SchemaRef, + le_column_index: usize, ) -> DataFusionResult>> { - let mut builders = Vec::with_capacity(schema.fields().len()); + let mut builders = Vec::with_capacity(schema.fields().len() + 1); for field in schema.fields() { let concrete_datatype = ConcreteDataType::try_from(field.data_type()).unwrap(); let mutable_vector = concrete_datatype.create_mutable_vector(0); builders.push(mutable_vector); } + builders.insert( + le_column_index, + ConcreteDataType::float64_datatype().create_mutable_vector(0), + ); Ok(builders) } @@ -536,8 +531,8 @@ impl HistogramFoldStream { // "fold" `le` and field columns let le_array = batch.column(self.le_column_index); let field_array = batch.column(self.field_column_index); - let mut le_item = vec![]; - let mut field_item = vec![]; + let mut bucket = vec![]; + let mut counters = vec![]; for bias in 0..bucket_num { let le_str_val = le_array.get(cursor + bias); let le_str_val_ref = le_str_val.as_value_ref(); @@ -546,24 +541,18 @@ impl HistogramFoldStream { .unwrap() .expect("le column should not be nullable"); let le = le_str.parse::().unwrap(); - let le_val = Value::from(le); - le_item.push(le_val); + bucket.push(le); - let field = field_array.get(cursor + bias); - field_item.push(field); + let counter = field_array + .get(cursor + bias) + .as_value_ref() + .as_f64() + .unwrap() + .expect("field column should not be nullable"); + counters.push(counter); } - let le_list_val = Value::List(ListValue::new( - Some(Box::new(le_item)), - ConcreteDataType::float64_datatype(), - )); - let field_list_val = Value::List(ListValue::new( - Some(Box::new(field_item)), - ConcreteDataType::float64_datatype(), - )); - self.output_buffer[self.le_column_index].push_value_ref(le_list_val.as_value_ref()); - self.output_buffer[self.field_column_index] - .push_value_ref(field_list_val.as_value_ref()); - + let result = Self::evaluate_row(self.quantile, &bucket, &counters)?; + self.output_buffer[self.field_column_index].push_value_ref(ValueRef::from(result)); cursor += bucket_num; remaining_rows -= bucket_num; self.output_buffered_rows += 1; @@ -581,6 +570,7 @@ impl HistogramFoldStream { self.input_buffer.push(batch); } + /// Compute result from output buffer fn take_output_buf(&mut self) -> DataFusionResult> { if self.output_buffered_rows == 0 { if self.input_buffered_rows != 0 { @@ -592,24 +582,14 @@ impl HistogramFoldStream { return Ok(None); } - let mut output_buf = Self::empty_output_buffer(&self.output_schema)?; + let mut output_buf = Self::empty_output_buffer(&self.output_schema, self.le_column_index)?; std::mem::swap(&mut self.output_buffer, &mut output_buf); let mut columns = Vec::with_capacity(output_buf.len()); for builder in output_buf.iter_mut() { columns.push(builder.to_vector().to_arrow_array()); } - - // overwrite default list datatype to change field name - columns[self.le_column_index] = compute::cast( - &columns[self.le_column_index], - self.output_schema.field(self.le_column_index).data_type(), - )?; - columns[self.field_column_index] = compute::cast( - &columns[self.field_column_index], - self.output_schema - .field(self.field_column_index) - .data_type(), - )?; + // remove the placeholder column for `le` + columns.remove(self.le_column_index); self.output_buffered_rows = 0; RecordBatch::try_new(self.output_schema.clone(), columns) @@ -651,6 +631,58 @@ impl HistogramFoldStream { Ok(batch.num_rows()) } + + /// Evaluate the field column and return the result + fn evaluate_row(quantile: f64, bucket: &[f64], counter: &[f64]) -> DataFusionResult { + // check bucket + if bucket.len() <= 1 { + return Ok(f64::NAN); + } + if *bucket.last().unwrap() != f64::INFINITY { + return Err(DataFusionError::Execution( + "last bucket should be +Inf".to_string(), + )); + } + if bucket.len() != counter.len() { + return Err(DataFusionError::Execution( + "bucket and counter should have the same length".to_string(), + )); + } + // check quantile + if quantile < 0.0 { + return Ok(f64::NEG_INFINITY); + } else if quantile > 1.0 { + return Ok(f64::INFINITY); + } else if quantile.is_nan() { + return Ok(f64::NAN); + } + + // check input value + debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1])); + debug_assert!(counter.windows(2).all(|w| w[0] <= w[1])); + + let total = *counter.last().unwrap(); + let expected_pos = total * quantile; + let mut fit_bucket_pos = 0; + while fit_bucket_pos < bucket.len() && counter[fit_bucket_pos] < expected_pos { + fit_bucket_pos += 1; + } + if fit_bucket_pos >= bucket.len() - 1 { + Ok(bucket[bucket.len() - 2]) + } else { + let upper_bound = bucket[fit_bucket_pos]; + let upper_count = counter[fit_bucket_pos]; + let mut lower_bound = bucket[0].min(0.0); + let mut lower_count = 0.0; + if fit_bucket_pos > 0 { + lower_bound = bucket[fit_bucket_pos - 1]; + lower_count = counter[fit_bucket_pos - 1]; + } + Ok(lower_bound + + (upper_bound - lower_bound) / (upper_count - lower_count) + * (expected_pos - lower_count)) + } + } } #[cfg(test)] @@ -658,7 +690,7 @@ mod test { use std::sync::Arc; use datafusion::arrow::array::Float64Array; - use datafusion::arrow::datatypes::Schema; + use datafusion::arrow::datatypes::{Field, Schema}; use datafusion::common::ToDFSchema; use datafusion::physical_plan::memory::MemoryExec; use datafusion::prelude::SessionContext; @@ -729,7 +761,6 @@ mod test { (*HistogramFold::convert_schema( &Arc::new(memory_exec.schema().to_dfschema().unwrap()), "le", - "val", ) .unwrap() .as_ref()) @@ -739,6 +770,7 @@ mod test { let fold_exec = Arc::new(HistogramFoldExec { le_column_index: 1, field_column_index: 2, + quantile: 0.4, ts_column_index: 9999, // not exist but doesn't matter input: memory_exec, output_schema, @@ -754,15 +786,15 @@ mod test { .to_string(); let expected = String::from( - "+--------+---------------------------------+--------------------------------+ -| host | le | val | -+--------+---------------------------------+--------------------------------+ -| host_1 | [0.001, 0.1, 10.0, 1000.0, inf] | [0.0, 1.0, 1.0, 5.0, 5.0] | -| host_1 | [0.001, 0.1, 10.0, 1000.0, inf] | [0.0, 20.0, 60.0, 70.0, 100.0] | -| host_1 | [0.001, 0.1, 10.0, 1000.0, inf] | [1.0, 1.0, 1.0, 1.0, 1.0] | -| host_2 | [0.001, 0.1, 10.0, 1000.0, inf] | [0.0, 0.0, 0.0, 0.0, 0.0] | -| host_2 | [0.001, 0.1, 10.0, 1000.0, inf] | [0.0, 1.0, 2.0, 3.0, 4.0] | -+--------+---------------------------------+--------------------------------+", + "+--------+-------------------+ +| host | val | ++--------+-------------------+ +| host_1 | 257.5 | +| host_1 | 5.05 | +| host_1 | 0.0004 | +| host_2 | NaN | +| host_2 | 6.040000000000001 | ++--------+-------------------+", ); assert_eq!(result_literal, expected); } @@ -778,21 +810,107 @@ mod test { .unwrap(); let expected_output_schema = Schema::new(vec![ Field::new("host", DataType::Utf8, true), - Field::new( - "le", - DataType::List(Arc::new(Field::new("le", DataType::Float64, true))), - false, - ), - Field::new( - "val", - DataType::List(Arc::new(Field::new("val", DataType::Float64, true))), - false, - ), + Field::new("val", DataType::Float64, true), ]) .to_dfschema_ref() .unwrap(); - let actual = HistogramFold::convert_schema(&input_schema, "le", "val").unwrap(); + let actual = HistogramFold::convert_schema(&input_schema, "le").unwrap(); assert_eq!(actual, expected_output_schema) } + + #[test] + fn evaluate_row_normal_case() { + let bucket = [0.0, 1.0, 2.0, 3.0, 4.0, f64::INFINITY]; + + #[derive(Debug)] + struct Case { + quantile: f64, + counters: Vec, + expected: f64, + } + + let cases = [ + Case { + quantile: 0.9, + counters: vec![0.0, 10.0, 20.0, 30.0, 40.0, 50.0], + expected: 4.0, + }, + Case { + quantile: 0.89, + counters: vec![0.0, 10.0, 20.0, 30.0, 40.0, 50.0], + expected: 4.0, + }, + Case { + quantile: 0.78, + counters: vec![0.0, 10.0, 20.0, 30.0, 40.0, 50.0], + expected: 3.9, + }, + Case { + quantile: 0.5, + counters: vec![0.0, 10.0, 20.0, 30.0, 40.0, 50.0], + expected: 2.5, + }, + Case { + quantile: 0.5, + counters: vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + expected: f64::NAN, + }, + Case { + quantile: 1.0, + counters: vec![0.0, 10.0, 20.0, 30.0, 40.0, 50.0], + expected: 4.0, + }, + Case { + quantile: 0.0, + counters: vec![0.0, 10.0, 20.0, 30.0, 40.0, 50.0], + expected: f64::NAN, + }, + Case { + quantile: 1.1, + counters: vec![0.0, 10.0, 20.0, 30.0, 40.0, 50.0], + expected: f64::INFINITY, + }, + Case { + quantile: -1.0, + counters: vec![0.0, 10.0, 20.0, 30.0, 40.0, 50.0], + expected: f64::NEG_INFINITY, + }, + ]; + + for case in cases { + let actual = + HistogramFoldStream::evaluate_row(case.quantile, &bucket, &case.counters).unwrap(); + assert_eq!( + format!("{actual}"), + format!("{}", case.expected), + "{:?}", + case + ); + } + } + + #[test] + #[should_panic] + fn evaluate_out_of_order_input() { + let bucket = [0.0, 1.0, 2.0, 3.0, 4.0, f64::INFINITY]; + let counters = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0]; + HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap(); + } + + #[test] + fn evaluate_wrong_bucket() { + let bucket = [0.0, 1.0, 2.0, 3.0, 4.0, f64::INFINITY, 5.0]; + let counters = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]; + let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters); + assert!(result.is_err()); + } + + #[test] + fn evaluate_small_fraction() { + let bucket = [0.0, 2.0, 4.0, 6.0, f64::INFINITY]; + let counters = [0.0, 1.0 / 300.0, 2.0 / 300.0, 0.01, 0.01]; + let result = HistogramFoldStream::evaluate_row(0.5, &bucket, &counters).unwrap(); + assert_eq!(3.0, result); + } } diff --git a/src/promql/src/extension_plan/planner.rs b/src/promql/src/extension_plan/planner.rs index 1198108012c4..7798c9b32193 100644 --- a/src/promql/src/extension_plan/planner.rs +++ b/src/promql/src/extension_plan/planner.rs @@ -21,6 +21,7 @@ use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode}; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; +use super::HistogramFold; use crate::extension_plan::{ EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize, }; @@ -47,6 +48,8 @@ impl ExtensionPlanner for PromExtensionPlanner { Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) } else if let Some(node) = node.as_any().downcast_ref::() { Ok(Some(node.to_execution_plan(session_state, planner)?)) + } else if let Some(node) = node.as_any().downcast_ref::() { + Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) } else { Ok(None) } diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index c8d45033dc1b..dcadf8c4ebaf 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -44,14 +44,14 @@ use table::table::adapter::DfTableProviderAdapter; use crate::error::{ CatalogSnafu, ColumnNotFoundSnafu, DataFusionPlanningSnafu, ExpectExprSnafu, - ExpectRangeSelectorSnafu, MultipleMetricMatchersSnafu, MultipleVectorSnafu, - NoMetricMatcherSnafu, Result, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, - UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu, - ValueNotFoundSnafu, ZeroRangeSelectorSnafu, + ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, MultipleMetricMatchersSnafu, + MultipleVectorSnafu, NoMetricMatcherSnafu, Result, TableNameNotFoundSnafu, + TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, + UnsupportedExprSnafu, ValueNotFoundSnafu, ZeroRangeSelectorSnafu, }; use crate::extension_plan::{ - build_special_time_expr, EmptyMetric, InstantManipulate, Millisecond, RangeManipulate, - SeriesDivide, SeriesNormalize, + build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, + RangeManipulate, SeriesDivide, SeriesNormalize, }; use crate::functions::{ AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta, @@ -63,6 +63,8 @@ use crate::functions::{ const SPECIAL_TIME_FUNCTION: &str = "time"; /// `histogram_quantile` function in PromQL const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile"; +/// `le` column for conventional histogram. +const LE_COLUMN_NAME: &str = "le"; const DEFAULT_TIME_INDEX_COLUMN: &str = "time"; @@ -110,6 +112,11 @@ impl PromPlannerContext { self.field_column_matcher = None; self.range = None; } + + /// Check if `le` is present in tag columns + fn has_le_tag(&self) -> bool { + self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME)) + } } pub struct PromPlanner { @@ -443,7 +450,55 @@ impl PromPlanner { } if func.name == SPECIAL_HISTOGRAM_QUANTILE { - todo!() + if args.args.len() != 2 { + return FunctionInvalidArgumentSnafu { + fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(), + } + .fail(); + } + let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| { + FunctionInvalidArgumentSnafu { + fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(), + } + })?; + let input = args.args[1].as_ref().clone(); + let input_plan = self.prom_expr_to_plan(input).await?; + + if !self.ctx.has_le_tag() { + common_telemetry::info!("[DEBUG] valid tags: {:?}", self.ctx.tag_columns); + return ColumnNotFoundSnafu { + col: LE_COLUMN_NAME.to_string(), + } + .fail(); + } + let time_index_column = + self.ctx.time_index_column.clone().with_context(|| { + TimeIndexNotFoundSnafu { + table: self.ctx.table_name.clone().unwrap_or_default(), + } + })?; + // FIXME(ruihang): support multi fields + let field_column = self + .ctx + .field_columns + .first() + .with_context(|| FunctionInvalidArgumentSnafu { + fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(), + })? + .clone(); + + return Ok(LogicalPlan::Extension(Extension { + node: Arc::new( + HistogramFold::new( + LE_COLUMN_NAME.to_string(), + field_column, + time_index_column, + phi, + input_plan, + ) + .context(DataFusionPlanningSnafu)?, + ), + })); } let args = self.create_function_args(&args.args)?; @@ -1189,6 +1244,25 @@ impl PromPlanner { } } + /// Try to build a [f64] from [PromExpr]. + fn try_build_float_literal(expr: &PromExpr) -> Option { + match expr { + PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val), + PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr), + PromExpr::Unary(UnaryExpr { expr, .. }) => { + Self::try_build_float_literal(expr).map(|f| -f) + } + PromExpr::StringLiteral(_) + | PromExpr::Binary(_) + | PromExpr::VectorSelector(_) + | PromExpr::MatrixSelector(_) + | PromExpr::Call(_) + | PromExpr::Extension(_) + | PromExpr::Aggregate(_) + | PromExpr::Subquery(_) => None, + } + } + /// Return a lambda to build binary expression from token. /// Because some binary operator are function in DataFusion like `atan2` or `^`. #[allow(clippy::type_complexity)] diff --git a/tests/cases/standalone/common/promql/simple_histogram.result b/tests/cases/standalone/common/promql/simple_histogram.result new file mode 100644 index 000000000000..f67659b3a111 --- /dev/null +++ b/tests/cases/standalone/common/promql/simple_histogram.result @@ -0,0 +1,246 @@ +-- from prometheus/promql/testdata/histograms.test +-- cases related to metric `testhistogram_bucket` +create table histogram_bucket ( + ts timestamp time index, + le string, + s string, + val double, + primary key (s, le), +); + +Affected Rows: 0 + +insert into histogram_bucket values + (3000000, "0.1", "positive", 50), + (3000000, ".2", "positive", 70), + (3000000, "1e0", "positive", 110), + (3000000, "+Inf", "positive", 120), + (3000000, "-.2", "negative", 10), + (3000000, "-0.1", "negative", 20), + (3000000, "0.3", "negative", 20), + (3000000, "+Inf", "negative", 30); + +Affected Rows: 8 + +-- Quantile too low. +-- SQLNESS SORT_RESULT 3 1 +tql eval (3000, 3000, '1s') histogram_quantile(-0.1, histogram_bucket); + ++---------------------+----------+------+ +| ts | s | val | ++---------------------+----------+------+ +| 1970-01-01T00:50:00 | negative | -inf | +| 1970-01-01T00:50:00 | positive | -inf | ++---------------------+----------+------+ + +-- Quantile too high. +-- SQLNESS SORT_RESULT 3 1 +tql eval (3000, 3000, '1s') histogram_quantile(1.01, histogram_bucket); + ++---------------------+----------+-----+ +| ts | s | val | ++---------------------+----------+-----+ +| 1970-01-01T00:50:00 | negative | inf | +| 1970-01-01T00:50:00 | positive | inf | ++---------------------+----------+-----+ + +-- Quantile invalid. +-- SQLNESS SORT_RESULT 3 1 +tql eval (3000, 3000, '1s') histogram_quantile(NaN, histogram_bucket); + ++---------------------+----------+-----+ +| ts | s | val | ++---------------------+----------+-----+ +| 1970-01-01T00:50:00 | negative | NaN | +| 1970-01-01T00:50:00 | positive | NaN | ++---------------------+----------+-----+ + +-- Quantile value in lowest bucket, which is positive. +tql eval (3000, 3000, '1s') histogram_quantile(0, histogram_bucket{s="positive"}); + ++---------------------+----------+-----+ +| ts | s | val | ++---------------------+----------+-----+ +| 1970-01-01T00:50:00 | positive | 0.0 | ++---------------------+----------+-----+ + +-- Quantile value in lowest bucket, which is negative. +tql eval (3000, 3000, '1s') histogram_quantile(0, histogram_bucket{s="negative"}); + ++---------------------+----------+------+ +| ts | s | val | ++---------------------+----------+------+ +| 1970-01-01T00:50:00 | negative | -0.2 | ++---------------------+----------+------+ + +-- Quantile value in highest bucket. +-- SQLNESS SORT_RESULT 3 1 +tql eval (3000, 3000, '1s') histogram_quantile(1, histogram_bucket); + ++---------------------+----------+-----+ +| ts | s | val | ++---------------------+----------+-----+ +| 1970-01-01T00:50:00 | negative | 0.3 | +| 1970-01-01T00:50:00 | positive | 1.0 | ++---------------------+----------+-----+ + +-- Finally some useful quantiles. +-- SQLNESS SORT_RESULT 3 1 +tql eval (3000, 3000, '1s') histogram_quantile(0.2, histogram_bucket); + ++---------------------+----------+-------+ +| ts | s | val | ++---------------------+----------+-------+ +| 1970-01-01T00:50:00 | negative | -0.2 | +| 1970-01-01T00:50:00 | positive | 0.048 | ++---------------------+----------+-------+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (3000, 3000, '1s') histogram_quantile(0.5, histogram_bucket); + ++---------------------+----------+----------------------+ +| ts | s | val | ++---------------------+----------+----------------------+ +| 1970-01-01T00:50:00 | negative | -0.15000000000000002 | +| 1970-01-01T00:50:00 | positive | 0.15000000000000002 | ++---------------------+----------+----------------------+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (3000, 3000, '1s') histogram_quantile(0.8, histogram_bucket); + ++---------------------+----------+------+ +| ts | s | val | ++---------------------+----------+------+ +| 1970-01-01T00:50:00 | negative | 0.3 | +| 1970-01-01T00:50:00 | positive | 0.72 | ++---------------------+----------+------+ + +-- More realistic with rates. +-- This case doesn't contains value because other point are not inserted. +-- quantile with rate is covered in other cases +tql eval (3000, 3000, '1s') histogram_quantile(0.2, rate(histogram_bucket[5m])); + +++ +++ + +drop table histogram_bucket; + +Affected Rows: 0 + +-- cases related to `testhistogram2_bucket` +create table histogram2_bucket ( + ts timestamp time index, + le string, + val double, + primary key (le), +); + +Affected Rows: 0 + +insert into histogram2_bucket values + (0, "0", 0), + (300000, "0", 0), + (600000, "0", 0), + (900000, "0", 0), + (1200000, "0", 0), + (1500000, "0", 0), + (1800000, "0", 0), + (2100000, "0", 0), + (2400000, "0", 0), + (2700000, "0", 0), + (0, "2", 1), + (300000, "2", 2), + (600000, "2", 3), + (900000, "2", 4), + (1200000, "2", 5), + (1500000, "2", 6), + (1800000, "2", 7), + (2100000, "2", 8), + (2400000, "2", 9), + (2700000, "2", 10), + (0, "4", 2), + (300000, "4", 4), + (600000, "4", 6), + (900000, "4", 8), + (1200000, "4", 10), + (1500000, "4", 12), + (1800000, "4", 14), + (2100000, "4", 16), + (2400000, "4", 18), + (2700000, "4", 20), + (0, "6", 3), + (300000, "6", 6), + (600000, "6", 9), + (900000, "6", 12), + (1200000, "6", 15), + (1500000, "6", 18), + (1800000, "6", 21), + (2100000, "6", 24), + (2400000, "6", 27), + (2700000, "6", 30), + (0, "+Inf", 3), + (300000, "+Inf", 6), + (600000, "+Inf", 9), + (900000, "+Inf", 12), + (1200000, "+Inf", 15), + (1500000, "+Inf", 18), + (1800000, "+Inf", 21), + (2100000, "+Inf", 24), + (2400000, "+Inf", 27), + (2700000, "+Inf", 30); + +Affected Rows: 50 + +-- Want results exactly in the middle of the bucket. +tql eval (420, 420, '1s') histogram_quantile(0.166, histogram2_bucket); + ++---------------------+-------+ +| ts | val | ++---------------------+-------+ +| 1970-01-01T00:07:00 | 0.996 | ++---------------------+-------+ + +tql eval (420, 420, '1s') histogram_quantile(0.5, histogram2_bucket); + ++---------------------+-----+ +| ts | val | ++---------------------+-----+ +| 1970-01-01T00:07:00 | 3.0 | ++---------------------+-----+ + +tql eval (420, 420, '1s') histogram_quantile(0.833, histogram2_bucket); + ++---------------------+-------------------+ +| ts | val | ++---------------------+-------------------+ +| 1970-01-01T00:07:00 | 4.997999999999999 | ++---------------------+-------------------+ + +tql eval (2820, 2820, '1s') histogram_quantile(0.166, rate(histogram2_bucket[15m])); + ++---------------------+----------------------------+ +| ts | prom_rate(ts_range,val,ts) | ++---------------------+----------------------------+ +| 1970-01-01T00:47:00 | 0.996 | ++---------------------+----------------------------+ + +tql eval (2820, 2820, '1s') histogram_quantile(0.5, rate(histogram2_bucket[15m])); + ++---------------------+----------------------------+ +| ts | prom_rate(ts_range,val,ts) | ++---------------------+----------------------------+ +| 1970-01-01T00:47:00 | 3.0 | ++---------------------+----------------------------+ + +tql eval (2820, 2820, '1s') histogram_quantile(0.833, rate(histogram2_bucket[15m])); + ++---------------------+----------------------------+ +| ts | prom_rate(ts_range,val,ts) | ++---------------------+----------------------------+ +| 1970-01-01T00:47:00 | 4.998 | ++---------------------+----------------------------+ + +drop table histogram2_bucket; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/simple_histogram.sql b/tests/cases/standalone/common/promql/simple_histogram.sql new file mode 100644 index 000000000000..2eb31670e5c1 --- /dev/null +++ b/tests/cases/standalone/common/promql/simple_histogram.sql @@ -0,0 +1,134 @@ +-- from prometheus/promql/testdata/histograms.test +-- cases related to metric `testhistogram_bucket` + +create table histogram_bucket ( + ts timestamp time index, + le string, + s string, + val double, + primary key (s, le), +); + +insert into histogram_bucket values + (3000000, "0.1", "positive", 50), + (3000000, ".2", "positive", 70), + (3000000, "1e0", "positive", 110), + (3000000, "+Inf", "positive", 120), + (3000000, "-.2", "negative", 10), + (3000000, "-0.1", "negative", 20), + (3000000, "0.3", "negative", 20), + (3000000, "+Inf", "negative", 30); + +-- Quantile too low. +-- SQLNESS SORT_RESULT 3 1 +tql eval (3000, 3000, '1s') histogram_quantile(-0.1, histogram_bucket); + +-- Quantile too high. +-- SQLNESS SORT_RESULT 3 1 +tql eval (3000, 3000, '1s') histogram_quantile(1.01, histogram_bucket); + +-- Quantile invalid. +-- SQLNESS SORT_RESULT 3 1 +tql eval (3000, 3000, '1s') histogram_quantile(NaN, histogram_bucket); + +-- Quantile value in lowest bucket, which is positive. +tql eval (3000, 3000, '1s') histogram_quantile(0, histogram_bucket{s="positive"}); + +-- Quantile value in lowest bucket, which is negative. +tql eval (3000, 3000, '1s') histogram_quantile(0, histogram_bucket{s="negative"}); + +-- Quantile value in highest bucket. +-- SQLNESS SORT_RESULT 3 1 +tql eval (3000, 3000, '1s') histogram_quantile(1, histogram_bucket); + +-- Finally some useful quantiles. +-- SQLNESS SORT_RESULT 3 1 +tql eval (3000, 3000, '1s') histogram_quantile(0.2, histogram_bucket); + +-- SQLNESS SORT_RESULT 3 1 +tql eval (3000, 3000, '1s') histogram_quantile(0.5, histogram_bucket); + +-- SQLNESS SORT_RESULT 3 1 +tql eval (3000, 3000, '1s') histogram_quantile(0.8, histogram_bucket); + +-- More realistic with rates. +-- This case doesn't contains value because other point are not inserted. +-- quantile with rate is covered in other cases +tql eval (3000, 3000, '1s') histogram_quantile(0.2, rate(histogram_bucket[5m])); + +drop table histogram_bucket; + +-- cases related to `testhistogram2_bucket` +create table histogram2_bucket ( + ts timestamp time index, + le string, + val double, + primary key (le), +); + +insert into histogram2_bucket values + (0, "0", 0), + (300000, "0", 0), + (600000, "0", 0), + (900000, "0", 0), + (1200000, "0", 0), + (1500000, "0", 0), + (1800000, "0", 0), + (2100000, "0", 0), + (2400000, "0", 0), + (2700000, "0", 0), + (0, "2", 1), + (300000, "2", 2), + (600000, "2", 3), + (900000, "2", 4), + (1200000, "2", 5), + (1500000, "2", 6), + (1800000, "2", 7), + (2100000, "2", 8), + (2400000, "2", 9), + (2700000, "2", 10), + (0, "4", 2), + (300000, "4", 4), + (600000, "4", 6), + (900000, "4", 8), + (1200000, "4", 10), + (1500000, "4", 12), + (1800000, "4", 14), + (2100000, "4", 16), + (2400000, "4", 18), + (2700000, "4", 20), + (0, "6", 3), + (300000, "6", 6), + (600000, "6", 9), + (900000, "6", 12), + (1200000, "6", 15), + (1500000, "6", 18), + (1800000, "6", 21), + (2100000, "6", 24), + (2400000, "6", 27), + (2700000, "6", 30), + (0, "+Inf", 3), + (300000, "+Inf", 6), + (600000, "+Inf", 9), + (900000, "+Inf", 12), + (1200000, "+Inf", 15), + (1500000, "+Inf", 18), + (1800000, "+Inf", 21), + (2100000, "+Inf", 24), + (2400000, "+Inf", 27), + (2700000, "+Inf", 30); + +-- Want results exactly in the middle of the bucket. +tql eval (420, 420, '1s') histogram_quantile(0.166, histogram2_bucket); + +tql eval (420, 420, '1s') histogram_quantile(0.5, histogram2_bucket); + +tql eval (420, 420, '1s') histogram_quantile(0.833, histogram2_bucket); + +tql eval (2820, 2820, '1s') histogram_quantile(0.166, rate(histogram2_bucket[15m])); + +tql eval (2820, 2820, '1s') histogram_quantile(0.5, rate(histogram2_bucket[15m])); + +tql eval (2820, 2820, '1s') histogram_quantile(0.833, rate(histogram2_bucket[15m])); + +drop table histogram2_bucket;