From 8dc90ecebe8e2a747169937d3ddae312c9167969 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 28 Feb 2024 17:11:40 +0800 Subject: [PATCH 01/20] feat: reuse value buf --- .../src/memtable/merge_tree/partition.rs | 21 +++++++++++-------- src/mito2/src/row_converter.rs | 14 +++++++++++++ 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 424fad693944..520a3ad143fa 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -22,6 +22,7 @@ use std::time::{Duration, Instant}; use api::v1::SemanticType; use common_recordbatch::filter::SimpleFilterEvaluator; +use datatypes::value::Value; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; use store_api::storage::ColumnId; @@ -280,6 +281,7 @@ pub struct PartitionReader { context: ReadPartitionContext, source: BoxedDataBatchSource, last_yield_pk_id: Option, + values_buffer: Vec, } impl PartitionReader { @@ -288,6 +290,7 @@ impl PartitionReader { context, source, last_yield_pk_id: None, + values_buffer: Vec::new(), }; // Find next valid batch. reader.prune_batch_by_key()?; @@ -361,6 +364,7 @@ impl PartitionReader { &self.context.row_codec, key, &mut self.context.metrics, + &mut self.values_buffer, ) { // We need this key. self.last_yield_pk_id = Some(pk_id); @@ -379,9 +383,10 @@ fn prune_primary_key( codec: &McmpRowCodec, pk: &[u8], metrics: &mut PartitionReaderMetrics, + values_buffer: &mut Vec, ) -> bool { let start = Instant::now(); - let res = prune_primary_key_inner(metadata, filters, codec, pk); + let res = prune_primary_key_inner(metadata, filters, codec, pk, values_buffer); metrics.prune_pk += start.elapsed(); res } @@ -394,6 +399,7 @@ fn prune_primary_key_inner( filters: &[SimpleFilterEvaluator], codec: &McmpRowCodec, pk: &[u8], + values_buffer: &mut Vec, ) -> bool { if filters.is_empty() { return true; @@ -404,13 +410,10 @@ fn prune_primary_key_inner( return true; } - let pk_values = match codec.decode(pk) { - Ok(values) => values, - Err(e) => { - common_telemetry::error!(e; "Failed to decode primary key"); - return true; - } - }; + if let Err(e) = codec.decode_to_vec(pk, values_buffer) { + common_telemetry::error!(e; "Failed to decode primary key"); + return true; + } // evaluate filters against primary key values let mut result = true; @@ -429,7 +432,7 @@ fn prune_primary_key_inner( // Safety: A tag column is always in primary key. let index = metadata.primary_key_index(column.column_id).unwrap(); // Safety: arrow schema and datatypes are constructed from the same source. - let scalar_value = pk_values[index] + let scalar_value = values_buffer[index] .try_to_scalar_value(&column.column_schema.data_type) .unwrap(); result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true); diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index a83d237457a0..b91b34edb076 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -46,6 +46,9 @@ pub trait RowCodec { /// Decode row values from bytes. fn decode(&self, bytes: &[u8]) -> Result>; + + /// Decode row values from bytes to a vec. + fn decode_to_vec(&self, bytes: &[u8], values: &mut Vec) -> Result<()>; } #[derive(Debug, Clone, PartialEq, Eq)] @@ -269,6 +272,17 @@ impl RowCodec for McmpRowCodec { } Ok(values) } + + fn decode_to_vec(&self, bytes: &[u8], values: &mut Vec) -> Result<()> { + values.clear(); + values.reserve(self.fields.len()); + let mut deserializer = Deserializer::new(bytes); + for f in &self.fields { + let value = f.deserialize(&mut deserializer)?; + values.push(value); + } + Ok(()) + } } #[cfg(test)] From f86106e7e7793eba8a7403aba13941c68ed0feaa Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 28 Feb 2024 18:48:30 +0800 Subject: [PATCH 02/20] feat: skip values to decode --- .../src/memtable/merge_tree/partition.rs | 21 +++++--- src/mito2/src/row_converter.rs | 49 +++++++++++++++++++ 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 520a3ad143fa..ffa875a219fa 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -399,7 +399,7 @@ fn prune_primary_key_inner( filters: &[SimpleFilterEvaluator], codec: &McmpRowCodec, pk: &[u8], - values_buffer: &mut Vec, + _values_buffer: &mut Vec, ) -> bool { if filters.is_empty() { return true; @@ -410,10 +410,10 @@ fn prune_primary_key_inner( return true; } - if let Err(e) = codec.decode_to_vec(pk, values_buffer) { - common_telemetry::error!(e; "Failed to decode primary key"); - return true; - } + // if let Err(e) = codec.decode_to_vec(pk, values_buffer) { + // common_telemetry::error!(e; "Failed to decode primary key"); + // return true; + // } // evaluate filters against primary key values let mut result = true; @@ -431,8 +431,17 @@ fn prune_primary_key_inner( // index of the column in primary keys. // Safety: A tag column is always in primary key. let index = metadata.primary_key_index(column.column_id).unwrap(); + + let value = match codec.decode_value_at(pk, index) { + Ok(v) => v, + Err(e) => { + common_telemetry::error!(e; "Failed to decode primary key"); + return true; + } + }; + // Safety: arrow schema and datatypes are constructed from the same source. - let scalar_value = values_buffer[index] + let scalar_value = value .try_to_scalar_value(&column.column_schema.data_type) .unwrap(); result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true); diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index b91b34edb076..67ae3c349b62 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -218,6 +218,47 @@ impl SortField { Decimal128, Decimal128 ) } + + /// Skip deserializing this field. + fn skip_deserialize(&self, bytes: &[u8], deserializer: &mut Deserializer<&[u8]>) -> Result<()> { + let pos = deserializer.position(); + match bytes[pos] { + 0 => { + deserializer.advance(1); + return Ok(()); + } + _ => (), + } + + let to_skip = match &self.data_type { + ConcreteDataType::Boolean(_) => 2, + ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2, + ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3, + ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5, + ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9, + ConcreteDataType::Float32(_) => 5, + ConcreteDataType::Float64(_) => 9, + ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => { + deserializer.advance(1); + deserializer + .skip_bytes() + .context(error::DeserializeFieldSnafu)?; + return Ok(()); + } + ConcreteDataType::Date(_) => 5, + ConcreteDataType::DateTime(_) => 9, + ConcreteDataType::Timestamp(_) => 9, // We treat timestamp as Option + ConcreteDataType::Time(_) => 10, // i64 and 1 byte time unit + ConcreteDataType::Duration(_) => 10, + ConcreteDataType::Interval(_) => 18, + ConcreteDataType::Decimal128(_) => 19, + ConcreteDataType::Null(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) => 0, + }; + deserializer.advance(to_skip); + Ok(()) + } } /// A memory-comparable row [Value] encoder/decoder. @@ -239,6 +280,14 @@ impl McmpRowCodec { pub fn estimated_size(&self) -> usize { self.fields.iter().map(|f| f.estimated_size()).sum() } + + pub fn decode_value_at(&self, bytes: &[u8], pos: usize) -> Result { + let mut deserializer = Deserializer::new(bytes); + for i in 0..pos { + self.fields[i].skip_deserialize(bytes, &mut deserializer)?; + } + self.fields[pos].deserialize(&mut deserializer) + } } impl RowCodec for McmpRowCodec { From c04c53ddbcc0a33b4901279a41aec069ff5aeac0 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 28 Feb 2024 20:32:27 +0800 Subject: [PATCH 03/20] feat: prune shard chore: fix compiler errors refactor: shard prune metrics --- .../src/memtable/merge_tree/partition.rs | 272 ++++++++++-------- src/mito2/src/memtable/merge_tree/shard.rs | 85 +++++- .../src/memtable/merge_tree/shard_builder.rs | 87 +++++- 3 files changed, 301 insertions(+), 143 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index ffa875a219fa..9f1a4fbc3c63 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -22,7 +22,6 @@ use std::time::{Duration, Instant}; use api::v1::SemanticType; use common_recordbatch::filter::SimpleFilterEvaluator; -use datatypes::value::Value; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; use store_api::storage::ColumnId; @@ -39,7 +38,7 @@ use crate::memtable::merge_tree::shard_builder::ShardBuilder; use crate::memtable::merge_tree::{MergeTreeConfig, PkId}; use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED; use crate::read::{Batch, BatchBuilder}; -use crate::row_converter::{McmpRowCodec, RowCodec}; +use crate::row_converter::McmpRowCodec; /// Key of a partition. pub type PartitionKey = u32; @@ -124,6 +123,15 @@ impl Partition { /// Scans data in the partition. pub fn read(&self, mut context: ReadPartitionContext) -> Result { + let key_filter = if context.need_prune_key { + Some(PrimaryKeyFilter { + metadata: context.metadata.clone(), + filters: context.filters.clone(), + codec: context.row_codec.clone(), + }) + } else { + None + }; let (builder_source, shard_reader_builders) = { let inner = self.inner.read().unwrap(); let mut shard_source = Vec::with_capacity(inner.shards.len() + 1); @@ -144,12 +152,17 @@ impl Partition { let mut nodes = shard_reader_builders .into_iter() - .map(|builder| Ok(ShardNode::new(ShardSource::Shard(builder.build()?)))) + .map(|builder| { + Ok(ShardNode::new(ShardSource::Shard( + builder.build(key_filter.clone())?, + ))) + }) .collect::>>()?; if let Some(builder) = builder_source { // Move the initialization of ShardBuilderReader out of read lock. - let shard_builder_reader = builder.build(Some(&context.pk_weights))?; + let shard_builder_reader = + builder.build(Some(&context.pk_weights), key_filter.clone())?; nodes.push(ShardNode::new(ShardSource::Builder(shard_builder_reader))); } @@ -267,10 +280,9 @@ pub(crate) struct PartitionStats { #[derive(Default)] struct PartitionReaderMetrics { - prune_pk: Duration, + // prune_pk: Duration, read_source: Duration, data_batch_to_batch: Duration, - keys_before_pruning: usize, keys_after_pruning: usize, } @@ -280,20 +292,11 @@ struct PartitionReaderMetrics { pub struct PartitionReader { context: ReadPartitionContext, source: BoxedDataBatchSource, - last_yield_pk_id: Option, - values_buffer: Vec, } impl PartitionReader { fn new(context: ReadPartitionContext, source: BoxedDataBatchSource) -> Result { - let mut reader = Self { - context, - source, - last_yield_pk_id: None, - values_buffer: Vec::new(), - }; - // Find next valid batch. - reader.prune_batch_by_key()?; + let reader = Self { context, source }; Ok(reader) } @@ -308,8 +311,7 @@ impl PartitionReader { /// # Panics /// Panics if the reader is invalid. pub fn next(&mut self) -> Result<()> { - self.advance_source()?; - self.prune_batch_by_key() + self.advance_source() } /// Converts current data batch into a [Batch]. @@ -339,123 +341,141 @@ impl PartitionReader { self.context.metrics.read_source += read_source.elapsed(); Ok(()) } - - fn prune_batch_by_key(&mut self) -> Result<()> { - if self.context.metadata.primary_key.is_empty() || !self.context.need_prune_key { - // Nothing to prune. - return Ok(()); - } - - while self.source.is_valid() { - let pk_id = self.source.current_pk_id(); - if let Some(yield_pk_id) = self.last_yield_pk_id { - if pk_id == yield_pk_id { - // If this batch has the same key as last returned batch. - // We can return it without evaluating filters. - break; - } - } - let key = self.source.current_key().unwrap(); - self.context.metrics.keys_before_pruning += 1; - // Prune batch by primary key. - if prune_primary_key( - &self.context.metadata, - &self.context.filters, - &self.context.row_codec, - key, - &mut self.context.metrics, - &mut self.values_buffer, - ) { - // We need this key. - self.last_yield_pk_id = Some(pk_id); - self.context.metrics.keys_after_pruning += 1; - break; - } - self.advance_source()?; - } - Ok(()) - } } -fn prune_primary_key( - metadata: &RegionMetadataRef, - filters: &[SimpleFilterEvaluator], - codec: &McmpRowCodec, - pk: &[u8], - metrics: &mut PartitionReaderMetrics, - values_buffer: &mut Vec, -) -> bool { - let start = Instant::now(); - let res = prune_primary_key_inner(metadata, filters, codec, pk, values_buffer); - metrics.prune_pk += start.elapsed(); - res +#[derive(Clone)] +pub(crate) struct PrimaryKeyFilter { + metadata: RegionMetadataRef, + filters: Arc>, + codec: Arc, } -// TODO(yingwen): Improve performance of key pruning. Now we need to find index and -// then decode and convert each value. -/// Returns true if the `pk` is still needed. -fn prune_primary_key_inner( - metadata: &RegionMetadataRef, - filters: &[SimpleFilterEvaluator], - codec: &McmpRowCodec, - pk: &[u8], - _values_buffer: &mut Vec, -) -> bool { - if filters.is_empty() { - return true; - } +impl PrimaryKeyFilter { + pub(crate) fn prune_primary_key(&self, pk: &[u8]) -> bool { + if self.filters.is_empty() { + return true; + } - // no primary key, we simply return true. - if metadata.primary_key.is_empty() { - return true; - } + // no primary key, we simply return true. + if self.metadata.primary_key.is_empty() { + return true; + } - // if let Err(e) = codec.decode_to_vec(pk, values_buffer) { - // common_telemetry::error!(e; "Failed to decode primary key"); - // return true; - // } + // evaluate filters against primary key values + let mut result = true; + for filter in &*self.filters { + if Partition::is_partition_column(filter.column_name()) { + continue; + } + let Some(column) = self.metadata.column_by_name(filter.column_name()) else { + continue; + }; + // ignore filters that are not referencing primary key columns + if column.semantic_type != SemanticType::Tag { + continue; + } + // index of the column in primary keys. + // Safety: A tag column is always in primary key. + let index = self.metadata.primary_key_index(column.column_id).unwrap(); + + let value = match self.codec.decode_value_at(pk, index) { + Ok(v) => v, + Err(e) => { + common_telemetry::error!(e; "Failed to decode primary key"); + return true; + } + }; - // evaluate filters against primary key values - let mut result = true; - for filter in filters { - if Partition::is_partition_column(filter.column_name()) { - continue; - } - let Some(column) = metadata.column_by_name(filter.column_name()) else { - continue; - }; - // ignore filters that are not referencing primary key columns - if column.semantic_type != SemanticType::Tag { - continue; + // Safety: arrow schema and datatypes are constructed from the same source. + let scalar_value = value + .try_to_scalar_value(&column.column_schema.data_type) + .unwrap(); + result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true); } - // index of the column in primary keys. - // Safety: A tag column is always in primary key. - let index = metadata.primary_key_index(column.column_id).unwrap(); - - let value = match codec.decode_value_at(pk, index) { - Ok(v) => v, - Err(e) => { - common_telemetry::error!(e; "Failed to decode primary key"); - return true; - } - }; - // Safety: arrow schema and datatypes are constructed from the same source. - let scalar_value = value - .try_to_scalar_value(&column.column_schema.data_type) - .unwrap(); - result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true); + result } - - result } +// fn prune_primary_key( +// metadata: &RegionMetadataRef, +// filters: &[SimpleFilterEvaluator], +// codec: &McmpRowCodec, +// pk: &[u8], +// metrics: &mut PartitionReaderMetrics, +// values_buffer: &mut Vec, +// ) -> bool { +// let start = Instant::now(); +// let res = prune_primary_key_inner(metadata, filters, codec, pk, values_buffer); +// metrics.prune_pk += start.elapsed(); +// res +// } + +// // TODO(yingwen): Improve performance of key pruning. Now we need to find index and +// // then decode and convert each value. +// /// Returns true if the `pk` is still needed. +// fn prune_primary_key_inner( +// metadata: &RegionMetadataRef, +// filters: &[SimpleFilterEvaluator], +// codec: &McmpRowCodec, +// pk: &[u8], +// _values_buffer: &mut Vec, +// ) -> bool { +// if filters.is_empty() { +// return true; +// } + +// // no primary key, we simply return true. +// if metadata.primary_key.is_empty() { +// return true; +// } + +// // if let Err(e) = codec.decode_to_vec(pk, values_buffer) { +// // common_telemetry::error!(e; "Failed to decode primary key"); +// // return true; +// // } + +// // evaluate filters against primary key values +// let mut result = true; +// for filter in filters { +// if Partition::is_partition_column(filter.column_name()) { +// continue; +// } +// let Some(column) = metadata.column_by_name(filter.column_name()) else { +// continue; +// }; +// // ignore filters that are not referencing primary key columns +// if column.semantic_type != SemanticType::Tag { +// continue; +// } +// // index of the column in primary keys. +// // Safety: A tag column is always in primary key. +// let index = metadata.primary_key_index(column.column_id).unwrap(); + +// let value = match codec.decode_value_at(pk, index) { +// Ok(v) => v, +// Err(e) => { +// common_telemetry::error!(e; "Failed to decode primary key"); +// return true; +// } +// }; + +// // Safety: arrow schema and datatypes are constructed from the same source. +// let scalar_value = value +// .try_to_scalar_value(&column.column_schema.data_type) +// .unwrap(); +// result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true); +// } + +// result +// } + /// Structs to reuse across readers to avoid allocating for each reader. pub(crate) struct ReadPartitionContext { metadata: RegionMetadataRef, row_codec: Arc, projection: HashSet, - filters: Vec, + filters: Arc>, /// Buffer to store pk weights. pk_weights: Vec, need_prune_key: bool, @@ -464,10 +484,10 @@ pub(crate) struct ReadPartitionContext { impl Drop for ReadPartitionContext { fn drop(&mut self) { - let partition_prune_pk = self.metrics.prune_pk.as_secs_f64(); - MERGE_TREE_READ_STAGE_ELAPSED - .with_label_values(&["partition_prune_pk"]) - .observe(partition_prune_pk); + // let partition_prune_pk = self.metrics.prune_pk.as_secs_f64(); + // MERGE_TREE_READ_STAGE_ELAPSED + // .with_label_values(&["partition_prune_pk"]) + // .observe(partition_prune_pk); let partition_read_source = self.metrics.read_source.as_secs_f64(); MERGE_TREE_READ_STAGE_ELAPSED .with_label_values(&["partition_read_source"]) @@ -477,13 +497,11 @@ impl Drop for ReadPartitionContext { .with_label_values(&["partition_data_batch_to_batch"]) .observe(partition_data_batch_to_batch); - if self.metrics.keys_before_pruning != 0 { + if self.metrics.keys_after_pruning != 0 { common_telemetry::debug!( - "TreeIter pruning, before: {}, after: {}, partition_read_source: {}s, partition_prune_pk: {}s, partition_data_batch_to_batch: {}s", - self.metrics.keys_before_pruning, + "TreeIter partitions metrics, after: {}, partition_read_source: {}s, partition_data_batch_to_batch: {}s", self.metrics.keys_after_pruning, partition_read_source, - partition_prune_pk, partition_data_batch_to_batch, ); } @@ -502,7 +520,7 @@ impl ReadPartitionContext { metadata, row_codec, projection, - filters, + filters: Arc::new(filters), pk_weights: Vec::new(), need_prune_key, metrics: Default::default(), diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 889c05a582a0..85d142300706 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -15,6 +15,7 @@ //! Shard in a partition. use std::cmp::Ordering; +use std::time::{Duration, Instant}; use store_api::metadata::RegionMetadataRef; @@ -25,8 +26,10 @@ use crate::memtable::merge_tree::data::{ }; use crate::memtable::merge_tree::dict::KeyDictRef; use crate::memtable::merge_tree::merger::{Merger, Node}; +use crate::memtable::merge_tree::partition::PrimaryKeyFilter; use crate::memtable::merge_tree::shard_builder::ShardBuilderReader; -use crate::memtable::merge_tree::{PkId, ShardId}; +use crate::memtable::merge_tree::{PkId, PkIndex, ShardId}; +use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED; /// Shard stores data related to the same key dictionary. pub struct Shard { @@ -131,18 +134,14 @@ pub struct ShardReaderBuilder { } impl ShardReaderBuilder { - pub(crate) fn build(self) -> Result { + pub(crate) fn build(self, key_filter: Option) -> Result { let ShardReaderBuilder { shard_id, key_dict, inner, } = self; let parts_reader = inner.build()?; - Ok(ShardReader { - shard_id, - key_dict, - parts_reader, - }) + ShardReader::new(shard_id, key_dict, parts_reader, key_filter) } } @@ -151,9 +150,36 @@ pub struct ShardReader { shard_id: ShardId, key_dict: Option, parts_reader: DataPartsReader, + key_filter: Option, + last_yield_pk_index: Option, + keys_before_pruning: usize, + keys_after_pruning: usize, + prune_pk_cost: Duration, } impl ShardReader { + fn new( + shard_id: ShardId, + key_dict: Option, + parts_reader: DataPartsReader, + key_filter: Option, + ) -> Result { + let has_pk = key_dict.is_some(); + let mut reader = Self { + shard_id, + key_dict, + parts_reader, + key_filter: if has_pk { key_filter } else { None }, + last_yield_pk_index: None, + keys_before_pruning: 0, + keys_after_pruning: 0, + prune_pk_cost: Duration::default(), + }; + reader.prune_batch_by_key()?; + + Ok(reader) + } + fn is_valid(&self) -> bool { self.parts_reader.is_valid() } @@ -180,6 +206,49 @@ impl ShardReader { fn current_data_batch(&self) -> DataBatch { self.parts_reader.current_data_batch() } + + fn prune_batch_by_key(&mut self) -> Result<()> { + let Some(key_filter) = &self.key_filter else { + return Ok(()); + }; + + while self.parts_reader.is_valid() { + let pk_index = self.parts_reader.current_data_batch().pk_index(); + if let Some(yield_pk_index) = self.last_yield_pk_index { + if pk_index == yield_pk_index { + break; + } + } + self.keys_before_pruning += 1; + let key = self.current_key().unwrap(); + let now = Instant::now(); + if key_filter.prune_primary_key(key) { + self.prune_pk_cost += now.elapsed(); + self.last_yield_pk_index = Some(pk_index); + self.keys_after_pruning += 1; + break; + } + self.prune_pk_cost += now.elapsed(); + self.parts_reader.next()?; + } + + Ok(()) + } +} + +impl Drop for ShardReader { + fn drop(&mut self) { + let shard_prune_pk = self.prune_pk_cost.as_secs_f64(); + MERGE_TREE_READ_STAGE_ELAPSED + .with_label_values(&["shard_prune_pk"]) + .observe(shard_prune_pk); + common_telemetry::debug!( + "ShardReader metrics, before pruning: {}, after pruning: {}, cost: {:?}s", + self.keys_before_pruning, + self.keys_after_pruning, + shard_prune_pk, + ); + } } /// A merger that merges batches from multiple shards. @@ -422,7 +491,7 @@ mod tests { } assert!(!shard.is_empty()); - let mut reader = shard.read().unwrap().build().unwrap(); + let mut reader = shard.read().unwrap().build(None).unwrap(); let mut timestamps = Vec::new(); while reader.is_valid() { let rb = reader.current_data_batch().slice_record_batch(); diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index 07fedc38ddba..bc5d166d8514 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; +use std::time::{Duration, Instant}; use store_api::metadata::RegionMetadataRef; @@ -26,8 +27,9 @@ use crate::memtable::merge_tree::data::{ }; use crate::memtable::merge_tree::dict::{DictBuilderReader, KeyDictBuilder}; use crate::memtable::merge_tree::metrics::WriteMetrics; +use crate::memtable::merge_tree::partition::PrimaryKeyFilter; use crate::memtable::merge_tree::shard::Shard; -use crate::memtable::merge_tree::{MergeTreeConfig, PkId, ShardId}; +use crate::memtable::merge_tree::{MergeTreeConfig, PkId, PkIndex, ShardId}; use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED; /// Builder to write keys and data to a shard that the key dictionary @@ -176,13 +178,13 @@ pub(crate) struct ShardBuilderReaderBuilder { } impl ShardBuilderReaderBuilder { - pub(crate) fn build(self, pk_weights: Option<&[u16]>) -> Result { + pub(crate) fn build( + self, + pk_weights: Option<&[u16]>, + key_filter: Option, + ) -> Result { let data_reader = self.data_reader.build(pk_weights)?; - Ok(ShardBuilderReader { - shard_id: self.shard_id, - dict_reader: self.dict_reader, - data_reader, - }) + ShardBuilderReader::new(self.shard_id, self.dict_reader, data_reader, key_filter) } } @@ -191,9 +193,35 @@ pub struct ShardBuilderReader { shard_id: ShardId, dict_reader: DictBuilderReader, data_reader: DataBufferReader, + key_filter: Option, + last_yield_pk_index: Option, + keys_before_pruning: usize, + keys_after_pruning: usize, + prune_pk_cost: Duration, } impl ShardBuilderReader { + fn new( + shard_id: ShardId, + dict_reader: DictBuilderReader, + data_reader: DataBufferReader, + key_filter: Option, + ) -> Result { + let mut reader = ShardBuilderReader { + shard_id, + dict_reader, + data_reader, + key_filter, + last_yield_pk_index: None, + keys_before_pruning: 0, + keys_after_pruning: 0, + prune_pk_cost: Duration::default(), + }; + reader.prune_batch_by_key()?; + + Ok(reader) + } + pub fn is_valid(&self) -> bool { self.data_reader.is_valid() } @@ -218,6 +246,49 @@ impl ShardBuilderReader { pub fn current_data_batch(&self) -> DataBatch { self.data_reader.current_data_batch() } + + fn prune_batch_by_key(&mut self) -> Result<()> { + let Some(key_filter) = &self.key_filter else { + return Ok(()); + }; + + while self.data_reader.is_valid() { + let pk_index = self.data_reader.current_data_batch().pk_index(); + if let Some(yield_pk_index) = self.last_yield_pk_index { + if pk_index == yield_pk_index { + break; + } + } + self.keys_before_pruning += 1; + let key = self.current_key().unwrap(); + let now = Instant::now(); + if key_filter.prune_primary_key(key) { + self.prune_pk_cost += now.elapsed(); + self.last_yield_pk_index = Some(pk_index); + self.keys_after_pruning += 1; + break; + } + self.prune_pk_cost += now.elapsed(); + self.data_reader.next()?; + } + + Ok(()) + } +} + +impl Drop for ShardBuilderReader { + fn drop(&mut self) { + let shard_builder_prune_pk = self.prune_pk_cost.as_secs_f64(); + MERGE_TREE_READ_STAGE_ELAPSED + .with_label_values(&["shard_builder_prune_pk"]) + .observe(shard_builder_prune_pk); + common_telemetry::debug!( + "ShardBuilderReader metrics, before pruning: {}, after pruning: {}, cost: {}s", + self.keys_before_pruning, + self.keys_after_pruning, + shard_builder_prune_pk, + ); + } } #[cfg(test)] @@ -306,7 +377,7 @@ mod tests { let mut reader = shard_builder .read(&mut pk_weights) .unwrap() - .build(Some(&pk_weights)) + .build(Some(&pk_weights), None) .unwrap(); let mut timestamps = Vec::new(); while reader.is_valid() { From 067aedd363ed40817a2e59ae9f947b9430994b7f Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 28 Feb 2024 22:07:57 +0800 Subject: [PATCH 04/20] fix: panic on DedupReader::try_new --- src/mito2/src/memtable/merge_tree/dedup.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/memtable/merge_tree/dedup.rs b/src/mito2/src/memtable/merge_tree/dedup.rs index 6f98601821ff..0a68f9f56441 100644 --- a/src/mito2/src/memtable/merge_tree/dedup.rs +++ b/src/mito2/src/memtable/merge_tree/dedup.rs @@ -45,7 +45,7 @@ impl DataBatchSource for DedupReader { } fn next(&mut self) -> Result<()> { - loop { + while self.inner.is_valid() { match &mut self.prev_batch_last_row { None => { // First shot, fill prev_batch_last_row and current_batch_range with first batch. From f02ca5168811f0d82b87612b05998fc62964f399 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 28 Feb 2024 22:15:58 +0800 Subject: [PATCH 05/20] fix: prune after next --- src/mito2/src/memtable/merge_tree/shard.rs | 3 ++- src/mito2/src/memtable/merge_tree/shard_builder.rs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 85d142300706..5fa7a4f09895 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -185,7 +185,8 @@ impl ShardReader { } fn next(&mut self) -> Result<()> { - self.parts_reader.next() + self.parts_reader.next()?; + self.prune_batch_by_key() } fn current_key(&self) -> Option<&[u8]> { diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index bc5d166d8514..9c67684ed939 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -227,7 +227,8 @@ impl ShardBuilderReader { } pub fn next(&mut self) -> Result<()> { - self.data_reader.next() + self.data_reader.next()?; + self.prune_batch_by_key() } pub fn current_key(&self) -> Option<&[u8]> { From b6e22622c6bd04d8574efc67fb032e000940d4fc Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 28 Feb 2024 22:23:38 +0800 Subject: [PATCH 06/20] chore: num parts metrics --- src/mito2/src/memtable/merge_tree/data.rs | 7 +++++++ src/mito2/src/memtable/merge_tree/shard.rs | 3 ++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 7bafdb904b9d..e43d06a22361 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -994,9 +994,11 @@ impl DataPartsReaderBuilder { for p in self.parts { nodes.push(DataNode::new(DataSource::Part(p))); } + let num_parts = nodes.len(); let merger = Merger::try_new(nodes)?; Ok(DataPartsReader { merger, + num_parts, elapsed: Default::default(), }) } @@ -1005,6 +1007,7 @@ impl DataPartsReaderBuilder { /// Reader for all parts inside a `DataParts`. pub struct DataPartsReader { merger: Merger, + num_parts: usize, elapsed: Duration, } @@ -1032,6 +1035,10 @@ impl DataPartsReader { pub(crate) fn is_valid(&self) -> bool { self.merger.is_valid() } + + pub(crate) fn num_parts(&self) -> usize { + self.num_parts + } } #[cfg(test)] diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 5fa7a4f09895..c4acfd05ca88 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -244,7 +244,8 @@ impl Drop for ShardReader { .with_label_values(&["shard_prune_pk"]) .observe(shard_prune_pk); common_telemetry::debug!( - "ShardReader metrics, before pruning: {}, after pruning: {}, cost: {:?}s", + "ShardReader metrics, data parts: {}, before pruning: {}, after pruning: {}, cost: {:?}s", + self.parts_reader.num_parts(), self.keys_before_pruning, self.keys_after_pruning, shard_prune_pk, From 5a06d332a83134d0a00398897fd689ba91a6a871 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 28 Feb 2024 22:33:24 +0800 Subject: [PATCH 07/20] feat: metrics and logs --- .../src/memtable/merge_tree/partition.rs | 92 +------------------ src/mito2/src/memtable/merge_tree/shard.rs | 16 ++-- .../src/memtable/merge_tree/shard_builder.rs | 14 +-- 3 files changed, 22 insertions(+), 100 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 9f1a4fbc3c63..575d737b50c0 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -280,10 +280,8 @@ pub(crate) struct PartitionStats { #[derive(Default)] struct PartitionReaderMetrics { - // prune_pk: Duration, read_source: Duration, data_batch_to_batch: Duration, - keys_after_pruning: usize, } /// Reader to scan rows in a partition. @@ -397,79 +395,6 @@ impl PrimaryKeyFilter { } } -// fn prune_primary_key( -// metadata: &RegionMetadataRef, -// filters: &[SimpleFilterEvaluator], -// codec: &McmpRowCodec, -// pk: &[u8], -// metrics: &mut PartitionReaderMetrics, -// values_buffer: &mut Vec, -// ) -> bool { -// let start = Instant::now(); -// let res = prune_primary_key_inner(metadata, filters, codec, pk, values_buffer); -// metrics.prune_pk += start.elapsed(); -// res -// } - -// // TODO(yingwen): Improve performance of key pruning. Now we need to find index and -// // then decode and convert each value. -// /// Returns true if the `pk` is still needed. -// fn prune_primary_key_inner( -// metadata: &RegionMetadataRef, -// filters: &[SimpleFilterEvaluator], -// codec: &McmpRowCodec, -// pk: &[u8], -// _values_buffer: &mut Vec, -// ) -> bool { -// if filters.is_empty() { -// return true; -// } - -// // no primary key, we simply return true. -// if metadata.primary_key.is_empty() { -// return true; -// } - -// // if let Err(e) = codec.decode_to_vec(pk, values_buffer) { -// // common_telemetry::error!(e; "Failed to decode primary key"); -// // return true; -// // } - -// // evaluate filters against primary key values -// let mut result = true; -// for filter in filters { -// if Partition::is_partition_column(filter.column_name()) { -// continue; -// } -// let Some(column) = metadata.column_by_name(filter.column_name()) else { -// continue; -// }; -// // ignore filters that are not referencing primary key columns -// if column.semantic_type != SemanticType::Tag { -// continue; -// } -// // index of the column in primary keys. -// // Safety: A tag column is always in primary key. -// let index = metadata.primary_key_index(column.column_id).unwrap(); - -// let value = match codec.decode_value_at(pk, index) { -// Ok(v) => v, -// Err(e) => { -// common_telemetry::error!(e; "Failed to decode primary key"); -// return true; -// } -// }; - -// // Safety: arrow schema and datatypes are constructed from the same source. -// let scalar_value = value -// .try_to_scalar_value(&column.column_schema.data_type) -// .unwrap(); -// result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true); -// } - -// result -// } - /// Structs to reuse across readers to avoid allocating for each reader. pub(crate) struct ReadPartitionContext { metadata: RegionMetadataRef, @@ -484,10 +409,6 @@ pub(crate) struct ReadPartitionContext { impl Drop for ReadPartitionContext { fn drop(&mut self) { - // let partition_prune_pk = self.metrics.prune_pk.as_secs_f64(); - // MERGE_TREE_READ_STAGE_ELAPSED - // .with_label_values(&["partition_prune_pk"]) - // .observe(partition_prune_pk); let partition_read_source = self.metrics.read_source.as_secs_f64(); MERGE_TREE_READ_STAGE_ELAPSED .with_label_values(&["partition_read_source"]) @@ -497,14 +418,11 @@ impl Drop for ReadPartitionContext { .with_label_values(&["partition_data_batch_to_batch"]) .observe(partition_data_batch_to_batch); - if self.metrics.keys_after_pruning != 0 { - common_telemetry::debug!( - "TreeIter partitions metrics, after: {}, partition_read_source: {}s, partition_data_batch_to_batch: {}s", - self.metrics.keys_after_pruning, - partition_read_source, - partition_data_batch_to_batch, - ); - } + common_telemetry::debug!( + "TreeIter partitions metrics, partition_read_source: {}s, partition_data_batch_to_batch: {}s", + partition_read_source, + partition_data_batch_to_batch, + ); } } diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index c4acfd05ca88..57f3857a8b3f 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -243,13 +243,15 @@ impl Drop for ShardReader { MERGE_TREE_READ_STAGE_ELAPSED .with_label_values(&["shard_prune_pk"]) .observe(shard_prune_pk); - common_telemetry::debug!( - "ShardReader metrics, data parts: {}, before pruning: {}, after pruning: {}, cost: {:?}s", - self.parts_reader.num_parts(), - self.keys_before_pruning, - self.keys_after_pruning, - shard_prune_pk, - ); + if self.keys_before_pruning > 0 { + common_telemetry::debug!( + "ShardReader metrics, data parts: {}, before pruning: {}, after pruning: {}, cost: {:?}s", + self.parts_reader.num_parts(), + self.keys_before_pruning, + self.keys_after_pruning, + shard_prune_pk, + ); + } } } diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index 9c67684ed939..9d86dd717c50 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -283,12 +283,14 @@ impl Drop for ShardBuilderReader { MERGE_TREE_READ_STAGE_ELAPSED .with_label_values(&["shard_builder_prune_pk"]) .observe(shard_builder_prune_pk); - common_telemetry::debug!( - "ShardBuilderReader metrics, before pruning: {}, after pruning: {}, cost: {}s", - self.keys_before_pruning, - self.keys_after_pruning, - shard_builder_prune_pk, - ); + if self.keys_before_pruning > 0 { + common_telemetry::debug!( + "ShardBuilderReader metrics, before pruning: {}, after pruning: {}, cost: {}s", + self.keys_before_pruning, + self.keys_after_pruning, + shard_builder_prune_pk, + ); + } } } From 3d4f82f99693c70963aed8fbcb202a556e8a3ade Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 29 Feb 2024 11:30:22 +0800 Subject: [PATCH 08/20] chore: data build cost --- src/mito2/src/memtable/merge_tree/shard.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 57f3857a8b3f..b3a5ac7854b4 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -140,8 +140,9 @@ impl ShardReaderBuilder { key_dict, inner, } = self; + let now = Instant::now(); let parts_reader = inner.build()?; - ShardReader::new(shard_id, key_dict, parts_reader, key_filter) + ShardReader::new(shard_id, key_dict, parts_reader, key_filter, now.elapsed()) } } @@ -155,6 +156,7 @@ pub struct ShardReader { keys_before_pruning: usize, keys_after_pruning: usize, prune_pk_cost: Duration, + data_build_cost: Duration, } impl ShardReader { @@ -163,6 +165,7 @@ impl ShardReader { key_dict: Option, parts_reader: DataPartsReader, key_filter: Option, + data_build_cost: Duration, ) -> Result { let has_pk = key_dict.is_some(); let mut reader = Self { @@ -174,6 +177,7 @@ impl ShardReader { keys_before_pruning: 0, keys_after_pruning: 0, prune_pk_cost: Duration::default(), + data_build_cost, }; reader.prune_batch_by_key()?; @@ -245,11 +249,12 @@ impl Drop for ShardReader { .observe(shard_prune_pk); if self.keys_before_pruning > 0 { common_telemetry::debug!( - "ShardReader metrics, data parts: {}, before pruning: {}, after pruning: {}, cost: {:?}s", + "ShardReader metrics, data parts: {}, before pruning: {}, after pruning: {}, prune cost: {}s, data build cost: {}s", self.parts_reader.num_parts(), self.keys_before_pruning, self.keys_after_pruning, shard_prune_pk, + self.data_build_cost.as_secs_f64(), ); } } From 903383e8c318bb64361f10d89b6ea5354b64612c Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 29 Feb 2024 11:48:17 +0800 Subject: [PATCH 09/20] chore: more logs --- src/mito2/src/memtable/merge_tree/partition.rs | 8 +++++++- src/mito2/src/memtable/merge_tree/shard.rs | 2 +- .../src/memtable/merge_tree/shard_builder.rs | 15 +++++++++++++-- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 575d737b50c0..dac8664d044b 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -150,6 +150,7 @@ impl Partition { (builder_reader, shard_source) }; + context.metrics.num_shards = shard_reader_builders.len(); let mut nodes = shard_reader_builders .into_iter() .map(|builder| { @@ -160,6 +161,7 @@ impl Partition { .collect::>>()?; if let Some(builder) = builder_source { + context.metrics.read_builder = true; // Move the initialization of ShardBuilderReader out of read lock. let shard_builder_reader = builder.build(Some(&context.pk_weights), key_filter.clone())?; @@ -282,6 +284,8 @@ pub(crate) struct PartitionStats { struct PartitionReaderMetrics { read_source: Duration, data_batch_to_batch: Duration, + read_builder: bool, + num_shards: usize, } /// Reader to scan rows in a partition. @@ -419,7 +423,9 @@ impl Drop for ReadPartitionContext { .observe(partition_data_batch_to_batch); common_telemetry::debug!( - "TreeIter partitions metrics, partition_read_source: {}s, partition_data_batch_to_batch: {}s", + "TreeIter partitions metrics, read_builder: {}, num_shards: {}, partition_read_source: {}s, partition_data_batch_to_batch: {}s", + self.metrics.read_builder, + self.metrics.num_shards, partition_read_source, partition_data_batch_to_batch, ); diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index b3a5ac7854b4..c0a6d2c270a1 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -249,7 +249,7 @@ impl Drop for ShardReader { .observe(shard_prune_pk); if self.keys_before_pruning > 0 { common_telemetry::debug!( - "ShardReader metrics, data parts: {}, before pruning: {}, after pruning: {}, prune cost: {}s, data build cost: {}s", + "ShardReader metrics, data parts: {}, before pruning: {}, after pruning: {}, prune cost: {}s, build cost: {}s", self.parts_reader.num_parts(), self.keys_before_pruning, self.keys_after_pruning, diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index 9d86dd717c50..f7c42363de17 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -183,8 +183,15 @@ impl ShardBuilderReaderBuilder { pk_weights: Option<&[u16]>, key_filter: Option, ) -> Result { + let now = Instant::now(); let data_reader = self.data_reader.build(pk_weights)?; - ShardBuilderReader::new(self.shard_id, self.dict_reader, data_reader, key_filter) + ShardBuilderReader::new( + self.shard_id, + self.dict_reader, + data_reader, + key_filter, + now.elapsed(), + ) } } @@ -198,6 +205,7 @@ pub struct ShardBuilderReader { keys_before_pruning: usize, keys_after_pruning: usize, prune_pk_cost: Duration, + data_build_cost: Duration, } impl ShardBuilderReader { @@ -206,6 +214,7 @@ impl ShardBuilderReader { dict_reader: DictBuilderReader, data_reader: DataBufferReader, key_filter: Option, + data_build_cost: Duration, ) -> Result { let mut reader = ShardBuilderReader { shard_id, @@ -216,6 +225,7 @@ impl ShardBuilderReader { keys_before_pruning: 0, keys_after_pruning: 0, prune_pk_cost: Duration::default(), + data_build_cost, }; reader.prune_batch_by_key()?; @@ -285,10 +295,11 @@ impl Drop for ShardBuilderReader { .observe(shard_builder_prune_pk); if self.keys_before_pruning > 0 { common_telemetry::debug!( - "ShardBuilderReader metrics, before pruning: {}, after pruning: {}, cost: {}s", + "ShardBuilderReader metrics, before pruning: {}, after pruning: {}, prune cost: {}s, build cost: {}s", self.keys_before_pruning, self.keys_after_pruning, shard_builder_prune_pk, + self.data_build_cost.as_secs_f64(), ); } } From d67ed98284a6c849fe60d269017235655966dc76 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 29 Feb 2024 15:08:20 +0800 Subject: [PATCH 10/20] feat: cache skip result --- .../src/memtable/merge_tree/partition.rs | 28 ++++++++++---- src/mito2/src/memtable/merge_tree/shard.rs | 5 ++- .../src/memtable/merge_tree/shard_builder.rs | 4 +- src/mito2/src/row_converter.rs | 37 +++++++++++++++---- 4 files changed, 55 insertions(+), 19 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index dac8664d044b..1ef67f03c30d 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -124,11 +124,11 @@ impl Partition { /// Scans data in the partition. pub fn read(&self, mut context: ReadPartitionContext) -> Result { let key_filter = if context.need_prune_key { - Some(PrimaryKeyFilter { - metadata: context.metadata.clone(), - filters: context.filters.clone(), - codec: context.row_codec.clone(), - }) + Some(PrimaryKeyFilter::new( + context.metadata.clone(), + context.filters.clone(), + context.row_codec.clone(), + )) } else { None }; @@ -350,10 +350,24 @@ pub(crate) struct PrimaryKeyFilter { metadata: RegionMetadataRef, filters: Arc>, codec: Arc, + offsets_buf: Vec, } impl PrimaryKeyFilter { - pub(crate) fn prune_primary_key(&self, pk: &[u8]) -> bool { + pub(crate) fn new( + metadata: RegionMetadataRef, + filters: Arc>, + codec: Arc, + ) -> Self { + Self { + metadata, + filters, + codec, + offsets_buf: Vec::new(), + } + } + + pub(crate) fn prune_primary_key(&mut self, pk: &[u8]) -> bool { if self.filters.is_empty() { return true; } @@ -380,7 +394,7 @@ impl PrimaryKeyFilter { // Safety: A tag column is always in primary key. let index = self.metadata.primary_key_index(column.column_id).unwrap(); - let value = match self.codec.decode_value_at(pk, index) { + let value = match self.codec.decode_value_at(pk, index, &mut self.offsets_buf) { Ok(v) => v, Err(e) => { common_telemetry::error!(e; "Failed to decode primary key"); diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index c0a6d2c270a1..7f981f91623c 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -213,7 +213,7 @@ impl ShardReader { } fn prune_batch_by_key(&mut self) -> Result<()> { - let Some(key_filter) = &self.key_filter else { + let Some(key_filter) = &mut self.key_filter else { return Ok(()); }; @@ -225,7 +225,8 @@ impl ShardReader { } } self.keys_before_pruning += 1; - let key = self.current_key().unwrap(); + // Safety: `key_filter` is some so the shard has primary keys. + let key = self.key_dict.as_ref().unwrap().key_by_pk_index(pk_index); let now = Instant::now(); if key_filter.prune_primary_key(key) { self.prune_pk_cost += now.elapsed(); diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index f7c42363de17..2b007ebd87a1 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -259,7 +259,7 @@ impl ShardBuilderReader { } fn prune_batch_by_key(&mut self) -> Result<()> { - let Some(key_filter) = &self.key_filter else { + let Some(key_filter) = &mut self.key_filter else { return Ok(()); }; @@ -271,7 +271,7 @@ impl ShardBuilderReader { } } self.keys_before_pruning += 1; - let key = self.current_key().unwrap(); + let key = self.dict_reader.key_by_pk_index(pk_index); let now = Instant::now(); if key_filter.prune_primary_key(key) { self.prune_pk_cost += now.elapsed(); diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 67ae3c349b62..69d70a5cd8e8 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -219,13 +219,17 @@ impl SortField { ) } - /// Skip deserializing this field. - fn skip_deserialize(&self, bytes: &[u8], deserializer: &mut Deserializer<&[u8]>) -> Result<()> { + /// Skip deserializing this field, returns the length of it. + fn skip_deserialize( + &self, + bytes: &[u8], + deserializer: &mut Deserializer<&[u8]>, + ) -> Result { let pos = deserializer.position(); match bytes[pos] { 0 => { deserializer.advance(1); - return Ok(()); + return Ok(1); } _ => (), } @@ -240,10 +244,10 @@ impl SortField { ConcreteDataType::Float64(_) => 9, ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => { deserializer.advance(1); - deserializer + let bytes_length = deserializer .skip_bytes() .context(error::DeserializeFieldSnafu)?; - return Ok(()); + return Ok(bytes_length); } ConcreteDataType::Date(_) => 5, ConcreteDataType::DateTime(_) => 9, @@ -257,7 +261,7 @@ impl SortField { | ConcreteDataType::Dictionary(_) => 0, }; deserializer.advance(to_skip); - Ok(()) + Ok(to_skip) } } @@ -281,10 +285,27 @@ impl McmpRowCodec { self.fields.iter().map(|f| f.estimated_size()).sum() } - pub fn decode_value_at(&self, bytes: &[u8], pos: usize) -> Result { + /// Decode value at `pos` in `bytes`.a + pub fn decode_value_at( + &self, + bytes: &[u8], + pos: usize, + offsets_buf: &mut Vec, + ) -> Result { let mut deserializer = Deserializer::new(bytes); + if pos <= offsets_buf.len() { + // We computed the offset before. + let to_skip = offsets_buf[pos]; + deserializer.advance(to_skip); + } + + offsets_buf.clear(); + let mut offset = 0; + offsets_buf.push(offset); for i in 0..pos { - self.fields[i].skip_deserialize(bytes, &mut deserializer)?; + let skip = self.fields[i].skip_deserialize(bytes, &mut deserializer)?; + offset += skip; + offsets_buf.push(offset); } self.fields[pos].deserialize(&mut deserializer) } From 4303aaab883a97a68a0acc93f106ad502ea117ca Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 29 Feb 2024 15:16:48 +0800 Subject: [PATCH 11/20] chore: todo --- src/mito2/src/memtable/merge_tree/partition.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 1ef67f03c30d..52bcac3b93a9 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -402,6 +402,9 @@ impl PrimaryKeyFilter { } }; + // TODO(yingwen): `evaluate_scalar()` creates temporary arrays to compare scalars. We + // can compare the bytes directly without allocation and matching types as we use + // comparable encoding. // Safety: arrow schema and datatypes are constructed from the same source. let scalar_value = value .try_to_scalar_value(&column.column_schema.data_type) From e3a1acae832f9aea41d844f35073ec81c5d62f9a Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 29 Feb 2024 15:20:21 +0800 Subject: [PATCH 12/20] fix: index out of bound --- src/mito2/src/row_converter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 69d70a5cd8e8..ea2bb30c45b0 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -293,7 +293,7 @@ impl McmpRowCodec { offsets_buf: &mut Vec, ) -> Result { let mut deserializer = Deserializer::new(bytes); - if pos <= offsets_buf.len() { + if pos < offsets_buf.len() { // We computed the offset before. let to_skip = offsets_buf[pos]; deserializer.advance(to_skip); From f9ddc9856678bed9a7d15ebf9e69415ba64ba46c Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 29 Feb 2024 15:45:39 +0800 Subject: [PATCH 13/20] test: test codec --- src/mito2/src/row_converter.rs | 78 +++++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 16 deletions(-) diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index ea2bb30c45b0..b30f3f43fa4a 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -46,9 +46,6 @@ pub trait RowCodec { /// Decode row values from bytes. fn decode(&self, bytes: &[u8]) -> Result>; - - /// Decode row values from bytes to a vec. - fn decode_to_vec(&self, bytes: &[u8], values: &mut Vec) -> Result<()>; } #[derive(Debug, Clone, PartialEq, Eq)] @@ -285,7 +282,7 @@ impl McmpRowCodec { self.fields.iter().map(|f| f.estimated_size()).sum() } - /// Decode value at `pos` in `bytes`.a + /// Decode value at `pos` in `bytes`. pub fn decode_value_at( &self, bytes: &[u8], @@ -342,23 +339,12 @@ impl RowCodec for McmpRowCodec { } Ok(values) } - - fn decode_to_vec(&self, bytes: &[u8], values: &mut Vec) -> Result<()> { - values.clear(); - values.reserve(self.fields.len()); - let mut deserializer = Deserializer::new(bytes); - for f in &self.fields { - let value = f.deserialize(&mut deserializer)?; - values.push(value); - } - Ok(()) - } } #[cfg(test)] mod tests { use common_base::bytes::StringBytes; - use common_time::Timestamp; + use common_time::{DateTime, Timestamp}; use datatypes::value::Value; use super::*; @@ -376,6 +362,18 @@ mod tests { let result = encoder.encode(value_ref.iter().cloned()).unwrap(); let decoded = encoder.decode(&result).unwrap(); assert_eq!(decoded, row); + let mut decoded = Vec::new(); + let mut offsets = Vec::new(); + // Iter two times to test offsets buffer. + for _ in 0..2 { + decoded.clear(); + for _ in 0..data_types.len() { + let value = encoder.decode_value_at(&result, 0, &mut offsets).unwrap(); + decoded.push(value); + } + assert_eq!(data_types.len(), offsets.len()); + assert_eq!(decoded, row); + } } #[test] @@ -500,5 +498,53 @@ mod tests { ], vec![Value::Null, Value::Int64(43), Value::Boolean(true)], ); + + // All types. + check_encode_and_decode( + &[ + ConcreteDataType::boolean_datatype(), + ConcreteDataType::int8_datatype(), + ConcreteDataType::uint8_datatype(), + ConcreteDataType::int16_datatype(), + ConcreteDataType::uint16_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::uint32_datatype(), + ConcreteDataType::int64_datatype(), + ConcreteDataType::uint64_datatype(), + ConcreteDataType::float32_datatype(), + ConcreteDataType::float64_datatype(), + ConcreteDataType::binary_datatype(), + ConcreteDataType::string_datatype(), + ConcreteDataType::date_datatype(), + ConcreteDataType::datetime_datatype(), + ConcreteDataType::timestamp_millisecond_datatype(), + ConcreteDataType::time_millisecond_datatype(), + ConcreteDataType::duration_millisecond_datatype(), + ConcreteDataType::interval_month_day_nano_datatype(), + ConcreteDataType::decimal128_default_datatype(), + ], + vec![ + Value::Boolean(true), + Value::Int8(8), + Value::UInt8(8), + Value::Int16(16), + Value::UInt16(16), + Value::Int32(32), + Value::UInt32(32), + Value::Int64(64), + Value::UInt64(64), + Value::Float32(1.0.into()), + Value::Float64(1.0.into()), + Value::Binary(b"hello"[..].into()), + Value::String("world".into()), + Value::Date(Date::new(10)), + Value::DateTime(DateTime::new(11)), + Value::Timestamp(Timestamp::new_millisecond(12)), + Value::Time(Time::new_millisecond(13)), + Value::Duration(Duration::new_millisecond(14)), + Value::Interval(Interval::from_month_day_nano(1, 1, 15)), + Value::Decimal128(Decimal128::from(16)), + ], + ); } } From bfca0d55bd35da6a3a8e5aab1afb2084adc0c57a Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 29 Feb 2024 15:52:07 +0800 Subject: [PATCH 14/20] fix: invalid offsets --- src/mito2/src/row_converter.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index b30f3f43fa4a..740688a8a362 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -298,12 +298,13 @@ impl McmpRowCodec { offsets_buf.clear(); let mut offset = 0; - offsets_buf.push(offset); for i in 0..pos { + offsets_buf.push(offset); let skip = self.fields[i].skip_deserialize(bytes, &mut deserializer)?; offset += skip; - offsets_buf.push(offset); } + // Push offset for the this field. + offsets_buf.push(offset); self.fields[pos].deserialize(&mut deserializer) } } From 6d59bebe374da22fff8d327704514d4974df1d77 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 29 Feb 2024 16:42:58 +0800 Subject: [PATCH 15/20] fix: skip binary --- src/mito2/src/row_converter.rs | 42 ++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 740688a8a362..806da1646ba3 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -239,12 +239,25 @@ impl SortField { ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9, ConcreteDataType::Float32(_) => 5, ConcreteDataType::Float64(_) => 9, - ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => { + ConcreteDataType::Binary(_) => { + // Now the encoder encode binary as a list of bytes so we can't use + // skip bytes. + let pos_before = deserializer.position(); + let mut current = pos_before + 1; + while bytes[current] == 1 { + current += 2; + } + let to_skip = current - pos_before + 1; + deserializer.advance(to_skip); + return Ok(to_skip); + } + ConcreteDataType::String(_) => { + let pos_before = deserializer.position(); deserializer.advance(1); - let bytes_length = deserializer + deserializer .skip_bytes() .context(error::DeserializeFieldSnafu)?; - return Ok(bytes_length); + return Ok(deserializer.position() - pos_before); } ConcreteDataType::Date(_) => 5, ConcreteDataType::DateTime(_) => 9, @@ -294,17 +307,18 @@ impl McmpRowCodec { // We computed the offset before. let to_skip = offsets_buf[pos]; deserializer.advance(to_skip); - } - - offsets_buf.clear(); - let mut offset = 0; - for i in 0..pos { + } else { + offsets_buf.clear(); + let mut offset = 0; + for i in 0..pos { + offsets_buf.push(offset); + let skip = self.fields[i].skip_deserialize(bytes, &mut deserializer)?; + offset += skip; + } + // Push offset for the this field. offsets_buf.push(offset); - let skip = self.fields[i].skip_deserialize(bytes, &mut deserializer)?; - offset += skip; } - // Push offset for the this field. - offsets_buf.push(offset); + self.fields[pos].deserialize(&mut deserializer) } } @@ -368,8 +382,8 @@ mod tests { // Iter two times to test offsets buffer. for _ in 0..2 { decoded.clear(); - for _ in 0..data_types.len() { - let value = encoder.decode_value_at(&result, 0, &mut offsets).unwrap(); + for i in 0..data_types.len() { + let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap(); decoded.push(value); } assert_eq!(data_types.len(), offsets.len()); From 3d0df51254b5d0a01163463ac807aa073ad8962b Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 29 Feb 2024 17:36:48 +0800 Subject: [PATCH 16/20] fix: offset buffer reuse --- .../src/memtable/merge_tree/partition.rs | 1 + src/mito2/src/row_converter.rs | 25 ++++++++++++++++--- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 52bcac3b93a9..5c50d4c861d2 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -379,6 +379,7 @@ impl PrimaryKeyFilter { // evaluate filters against primary key values let mut result = true; + self.offsets_buf.clear(); for filter in &*self.filters { if Partition::is_partition_column(filter.column_name()) { continue; diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 806da1646ba3..dc913e2607b0 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -307,16 +307,33 @@ impl McmpRowCodec { // We computed the offset before. let to_skip = offsets_buf[pos]; deserializer.advance(to_skip); - } else { - offsets_buf.clear(); + return self.fields[pos].deserialize(&mut deserializer); + } + + if offsets_buf.is_empty() { let mut offset = 0; + // Skip values before `pos`. for i in 0..pos { + // Offset to skip before reading value i. offsets_buf.push(offset); let skip = self.fields[i].skip_deserialize(bytes, &mut deserializer)?; offset += skip; } - // Push offset for the this field. + // Offset to skip before reading this value. offsets_buf.push(offset); + } else { + // Offsets are not enough. + let value_start = offsets_buf.len() - 1; + // Advances to decode value at `value_start`. + let mut offset = offsets_buf[value_start]; + deserializer.advance(offset); + for i in value_start..pos { + // Skip value i. + let skip = self.fields[i].skip_deserialize(bytes, &mut deserializer)?; + // Offset for the value at i + 1. + offset += skip; + offsets_buf.push(offset); + } } self.fields[pos].deserialize(&mut deserializer) @@ -386,7 +403,7 @@ mod tests { let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap(); decoded.push(value); } - assert_eq!(data_types.len(), offsets.len()); + assert_eq!(data_types.len(), offsets.len(), "offsets: {:?}", offsets); assert_eq!(decoded, row); } } From cfecab1c9859f6086e815b7b407311a4dde6e523 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 29 Feb 2024 17:40:46 +0800 Subject: [PATCH 17/20] chore: comment --- src/mito2/src/row_converter.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index dc913e2607b0..b4c901bbc1e7 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -296,6 +296,8 @@ impl McmpRowCodec { } /// Decode value at `pos` in `bytes`. + /// + /// The i-th element in offsets buffer is how many bytes to skip in order to read value at `pos`. pub fn decode_value_at( &self, bytes: &[u8], From d7aec0f7a26d3f4d35f42ef435e806413d3a3ace Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 29 Feb 2024 20:23:38 +0800 Subject: [PATCH 18/20] test: test memtable filter --- src/mito2/src/memtable/merge_tree.rs | 53 +++++++++++++++++++ .../src/memtable/merge_tree/partition.rs | 1 - 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index 2a7799524342..6c2f15a7fc9d 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -293,6 +293,8 @@ mod tests { use std::collections::BTreeSet; use common_time::Timestamp; + use datafusion_common::{Column, ScalarValue}; + use datafusion_expr::{BinaryExpr, Expr, Operator}; use datatypes::scalars::ScalarVector; use datatypes::vectors::{Int64Vector, TimestampMillisecondVector}; @@ -528,4 +530,55 @@ mod tests { .collect::>(); assert_eq!(expect, read); } + + #[test] + fn test_memtable_filter() { + let metadata = memtable_util::metadata_with_primary_key(vec![0, 1], false); + // Try to build a memtable via the builder. + let memtable = MergeTreeMemtableBuilder::new( + MergeTreeConfig { + index_max_keys_per_shard: 40, + ..Default::default() + }, + None, + ) + .build(1, &metadata); + + for i in 0..100 { + let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect(); + let kvs = + memtable_util::build_key_values(&metadata, "hello".to_string(), i, ×tamps, 1); + memtable.write(&kvs).unwrap(); + } + + for i in 0..100 { + let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect(); + let expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "k1".to_string(), + })), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))), + }); + let iter = memtable + .iter(None, Some(Predicate::new(vec![expr.into()]))) + .unwrap(); + let read = iter + .flat_map(|batch| { + batch + .unwrap() + .timestamps() + .as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .collect::>() + .into_iter() + }) + .map(|v| v.unwrap().0.value()) + .collect::>(); + assert_eq!(timestamps, read); + } + } } diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 5c50d4c861d2..06968766a166 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -394,7 +394,6 @@ impl PrimaryKeyFilter { // index of the column in primary keys. // Safety: A tag column is always in primary key. let index = self.metadata.primary_key_index(column.column_id).unwrap(); - let value = match self.codec.decode_value_at(pk, index, &mut self.offsets_buf) { Ok(v) => v, Err(e) => { From 69bdbf0fcb81191e94301268a5ce719ad4aa2f74 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 29 Feb 2024 20:25:55 +0800 Subject: [PATCH 19/20] style: fix clippy --- src/mito2/src/row_converter.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index b4c901bbc1e7..16c7ea8771ea 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -223,12 +223,9 @@ impl SortField { deserializer: &mut Deserializer<&[u8]>, ) -> Result { let pos = deserializer.position(); - match bytes[pos] { - 0 => { - deserializer.advance(1); - return Ok(1); - } - _ => (), + if bytes[pos] == 0 { + deserializer.advance(1); + return Ok(1); } let to_skip = match &self.data_type { From aea9828b7b2770466e138ac940dbede7736df205 Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 1 Mar 2024 14:51:39 +0800 Subject: [PATCH 20/20] chore: fix compiler error --- src/mito2/src/memtable/merge_tree/partition.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 06968766a166..d4bd0644b582 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -38,7 +38,7 @@ use crate::memtable::merge_tree::shard_builder::ShardBuilder; use crate::memtable::merge_tree::{MergeTreeConfig, PkId}; use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED; use crate::read::{Batch, BatchBuilder}; -use crate::row_converter::McmpRowCodec; +use crate::row_converter::{McmpRowCodec, RowCodec}; /// Key of a partition. pub type PartitionKey = u32;