From 3ee53360eec5c7b7681eb7ac378e1b19e9871bfc Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 8 Mar 2024 10:54:00 +0800 Subject: [PATCH] perf: Reduce decode overhead during pruning keys in the memtable (#3415) * feat: reuse value buf * feat: skip values to decode * feat: prune shard chore: fix compiler errors refactor: shard prune metrics * fix: panic on DedupReader::try_new * fix: prune after next * chore: num parts metrics * feat: metrics and logs * chore: data build cost * chore: more logs * feat: cache skip result * chore: todo * fix: index out of bound * test: test codec * fix: invalid offsets * fix: skip binary * fix: offset buffer reuse * chore: comment * test: test memtable filter * style: fix clippy * chore: fix compiler error --- src/mito2/src/memtable/merge_tree.rs | 53 +++++ src/mito2/src/memtable/merge_tree/data.rs | 7 + src/mito2/src/memtable/merge_tree/dedup.rs | 2 +- .../src/memtable/merge_tree/partition.rs | 209 ++++++++---------- src/mito2/src/memtable/merge_tree/shard.rs | 97 +++++++- .../src/memtable/merge_tree/shard_builder.rs | 101 ++++++++- src/mito2/src/row_converter.rs | 163 +++++++++++++- 7 files changed, 494 insertions(+), 138 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index ff56cb2010e7..5f80ba746a5f 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -293,6 +293,8 @@ mod tests { use std::collections::BTreeSet; use common_time::Timestamp; + use datafusion_common::{Column, ScalarValue}; + use datafusion_expr::{BinaryExpr, Expr, Operator}; use datatypes::scalars::ScalarVector; use datatypes::vectors::{Int64Vector, TimestampMillisecondVector}; @@ -528,4 +530,55 @@ mod tests { .collect::>(); assert_eq!(expect, read); } + + #[test] + fn test_memtable_filter() { + let metadata = memtable_util::metadata_with_primary_key(vec![0, 1], false); + // Try to build a memtable via the builder. + let memtable = MergeTreeMemtableBuilder::new( + MergeTreeConfig { + index_max_keys_per_shard: 40, + ..Default::default() + }, + None, + ) + .build(1, &metadata); + + for i in 0..100 { + let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect(); + let kvs = + memtable_util::build_key_values(&metadata, "hello".to_string(), i, ×tamps, 1); + memtable.write(&kvs).unwrap(); + } + + for i in 0..100 { + let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect(); + let expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "k1".to_string(), + })), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))), + }); + let iter = memtable + .iter(None, Some(Predicate::new(vec![expr.into()]))) + .unwrap(); + let read = iter + .flat_map(|batch| { + batch + .unwrap() + .timestamps() + .as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .collect::>() + .into_iter() + }) + .map(|v| v.unwrap().0.value()) + .collect::>(); + assert_eq!(timestamps, read); + } + } } diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 7bafdb904b9d..e43d06a22361 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -994,9 +994,11 @@ impl DataPartsReaderBuilder { for p in self.parts { nodes.push(DataNode::new(DataSource::Part(p))); } + let num_parts = nodes.len(); let merger = Merger::try_new(nodes)?; Ok(DataPartsReader { merger, + num_parts, elapsed: Default::default(), }) } @@ -1005,6 +1007,7 @@ impl DataPartsReaderBuilder { /// Reader for all parts inside a `DataParts`. pub struct DataPartsReader { merger: Merger, + num_parts: usize, elapsed: Duration, } @@ -1032,6 +1035,10 @@ impl DataPartsReader { pub(crate) fn is_valid(&self) -> bool { self.merger.is_valid() } + + pub(crate) fn num_parts(&self) -> usize { + self.num_parts + } } #[cfg(test)] diff --git a/src/mito2/src/memtable/merge_tree/dedup.rs b/src/mito2/src/memtable/merge_tree/dedup.rs index 6f98601821ff..0a68f9f56441 100644 --- a/src/mito2/src/memtable/merge_tree/dedup.rs +++ b/src/mito2/src/memtable/merge_tree/dedup.rs @@ -45,7 +45,7 @@ impl DataBatchSource for DedupReader { } fn next(&mut self) -> Result<()> { - loop { + while self.inner.is_valid() { match &mut self.prev_batch_last_row { None => { // First shot, fill prev_batch_last_row and current_batch_range with first batch. diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 424fad693944..d4bd0644b582 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -123,6 +123,15 @@ impl Partition { /// Scans data in the partition. pub fn read(&self, mut context: ReadPartitionContext) -> Result { + let key_filter = if context.need_prune_key { + Some(PrimaryKeyFilter::new( + context.metadata.clone(), + context.filters.clone(), + context.row_codec.clone(), + )) + } else { + None + }; let (builder_source, shard_reader_builders) = { let inner = self.inner.read().unwrap(); let mut shard_source = Vec::with_capacity(inner.shards.len() + 1); @@ -141,14 +150,21 @@ impl Partition { (builder_reader, shard_source) }; + context.metrics.num_shards = shard_reader_builders.len(); let mut nodes = shard_reader_builders .into_iter() - .map(|builder| Ok(ShardNode::new(ShardSource::Shard(builder.build()?)))) + .map(|builder| { + Ok(ShardNode::new(ShardSource::Shard( + builder.build(key_filter.clone())?, + ))) + }) .collect::>>()?; if let Some(builder) = builder_source { + context.metrics.read_builder = true; // Move the initialization of ShardBuilderReader out of read lock. - let shard_builder_reader = builder.build(Some(&context.pk_weights))?; + let shard_builder_reader = + builder.build(Some(&context.pk_weights), key_filter.clone())?; nodes.push(ShardNode::new(ShardSource::Builder(shard_builder_reader))); } @@ -266,11 +282,10 @@ pub(crate) struct PartitionStats { #[derive(Default)] struct PartitionReaderMetrics { - prune_pk: Duration, read_source: Duration, data_batch_to_batch: Duration, - keys_before_pruning: usize, - keys_after_pruning: usize, + read_builder: bool, + num_shards: usize, } /// Reader to scan rows in a partition. @@ -279,18 +294,11 @@ struct PartitionReaderMetrics { pub struct PartitionReader { context: ReadPartitionContext, source: BoxedDataBatchSource, - last_yield_pk_id: Option, } impl PartitionReader { fn new(context: ReadPartitionContext, source: BoxedDataBatchSource) -> Result { - let mut reader = Self { - context, - source, - last_yield_pk_id: None, - }; - // Find next valid batch. - reader.prune_batch_by_key()?; + let reader = Self { context, source }; Ok(reader) } @@ -305,8 +313,7 @@ impl PartitionReader { /// # Panics /// Panics if the reader is invalid. pub fn next(&mut self) -> Result<()> { - self.advance_source()?; - self.prune_batch_by_key() + self.advance_source() } /// Converts current data batch into a [Batch]. @@ -336,106 +343,77 @@ impl PartitionReader { self.context.metrics.read_source += read_source.elapsed(); Ok(()) } - - fn prune_batch_by_key(&mut self) -> Result<()> { - if self.context.metadata.primary_key.is_empty() || !self.context.need_prune_key { - // Nothing to prune. - return Ok(()); - } - - while self.source.is_valid() { - let pk_id = self.source.current_pk_id(); - if let Some(yield_pk_id) = self.last_yield_pk_id { - if pk_id == yield_pk_id { - // If this batch has the same key as last returned batch. - // We can return it without evaluating filters. - break; - } - } - let key = self.source.current_key().unwrap(); - self.context.metrics.keys_before_pruning += 1; - // Prune batch by primary key. - if prune_primary_key( - &self.context.metadata, - &self.context.filters, - &self.context.row_codec, - key, - &mut self.context.metrics, - ) { - // We need this key. - self.last_yield_pk_id = Some(pk_id); - self.context.metrics.keys_after_pruning += 1; - break; - } - self.advance_source()?; - } - Ok(()) - } } -fn prune_primary_key( - metadata: &RegionMetadataRef, - filters: &[SimpleFilterEvaluator], - codec: &McmpRowCodec, - pk: &[u8], - metrics: &mut PartitionReaderMetrics, -) -> bool { - let start = Instant::now(); - let res = prune_primary_key_inner(metadata, filters, codec, pk); - metrics.prune_pk += start.elapsed(); - res +#[derive(Clone)] +pub(crate) struct PrimaryKeyFilter { + metadata: RegionMetadataRef, + filters: Arc>, + codec: Arc, + offsets_buf: Vec, } -// TODO(yingwen): Improve performance of key pruning. Now we need to find index and -// then decode and convert each value. -/// Returns true if the `pk` is still needed. -fn prune_primary_key_inner( - metadata: &RegionMetadataRef, - filters: &[SimpleFilterEvaluator], - codec: &McmpRowCodec, - pk: &[u8], -) -> bool { - if filters.is_empty() { - return true; - } - - // no primary key, we simply return true. - if metadata.primary_key.is_empty() { - return true; +impl PrimaryKeyFilter { + pub(crate) fn new( + metadata: RegionMetadataRef, + filters: Arc>, + codec: Arc, + ) -> Self { + Self { + metadata, + filters, + codec, + offsets_buf: Vec::new(), + } } - let pk_values = match codec.decode(pk) { - Ok(values) => values, - Err(e) => { - common_telemetry::error!(e; "Failed to decode primary key"); + pub(crate) fn prune_primary_key(&mut self, pk: &[u8]) -> bool { + if self.filters.is_empty() { return true; } - }; - // evaluate filters against primary key values - let mut result = true; - for filter in filters { - if Partition::is_partition_column(filter.column_name()) { - continue; + // no primary key, we simply return true. + if self.metadata.primary_key.is_empty() { + return true; } - let Some(column) = metadata.column_by_name(filter.column_name()) else { - continue; - }; - // ignore filters that are not referencing primary key columns - if column.semantic_type != SemanticType::Tag { - continue; + + // evaluate filters against primary key values + let mut result = true; + self.offsets_buf.clear(); + for filter in &*self.filters { + if Partition::is_partition_column(filter.column_name()) { + continue; + } + let Some(column) = self.metadata.column_by_name(filter.column_name()) else { + continue; + }; + // ignore filters that are not referencing primary key columns + if column.semantic_type != SemanticType::Tag { + continue; + } + // index of the column in primary keys. + // Safety: A tag column is always in primary key. + let index = self.metadata.primary_key_index(column.column_id).unwrap(); + let value = match self.codec.decode_value_at(pk, index, &mut self.offsets_buf) { + Ok(v) => v, + Err(e) => { + common_telemetry::error!(e; "Failed to decode primary key"); + return true; + } + }; + + // TODO(yingwen): `evaluate_scalar()` creates temporary arrays to compare scalars. We + // can compare the bytes directly without allocation and matching types as we use + // comparable encoding. + // Safety: arrow schema and datatypes are constructed from the same source. + let scalar_value = value + .try_to_scalar_value(&column.column_schema.data_type) + .unwrap(); + result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true); } - // index of the column in primary keys. - // Safety: A tag column is always in primary key. - let index = metadata.primary_key_index(column.column_id).unwrap(); - // Safety: arrow schema and datatypes are constructed from the same source. - let scalar_value = pk_values[index] - .try_to_scalar_value(&column.column_schema.data_type) - .unwrap(); - result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true); - } - result + result + } } /// Structs to reuse across readers to avoid allocating for each reader. @@ -443,7 +421,7 @@ pub(crate) struct ReadPartitionContext { metadata: RegionMetadataRef, row_codec: Arc, projection: HashSet, - filters: Vec, + filters: Arc>, /// Buffer to store pk weights. pk_weights: Vec, need_prune_key: bool, @@ -452,10 +430,6 @@ pub(crate) struct ReadPartitionContext { impl Drop for ReadPartitionContext { fn drop(&mut self) { - let partition_prune_pk = self.metrics.prune_pk.as_secs_f64(); - MERGE_TREE_READ_STAGE_ELAPSED - .with_label_values(&["partition_prune_pk"]) - .observe(partition_prune_pk); let partition_read_source = self.metrics.read_source.as_secs_f64(); MERGE_TREE_READ_STAGE_ELAPSED .with_label_values(&["partition_read_source"]) @@ -465,16 +439,13 @@ impl Drop for ReadPartitionContext { .with_label_values(&["partition_data_batch_to_batch"]) .observe(partition_data_batch_to_batch); - if self.metrics.keys_before_pruning != 0 { - common_telemetry::debug!( - "TreeIter pruning, before: {}, after: {}, partition_read_source: {}s, partition_prune_pk: {}s, partition_data_batch_to_batch: {}s", - self.metrics.keys_before_pruning, - self.metrics.keys_after_pruning, - partition_read_source, - partition_prune_pk, - partition_data_batch_to_batch, - ); - } + common_telemetry::debug!( + "TreeIter partitions metrics, read_builder: {}, num_shards: {}, partition_read_source: {}s, partition_data_batch_to_batch: {}s", + self.metrics.read_builder, + self.metrics.num_shards, + partition_read_source, + partition_data_batch_to_batch, + ); } } @@ -490,7 +461,7 @@ impl ReadPartitionContext { metadata, row_codec, projection, - filters, + filters: Arc::new(filters), pk_weights: Vec::new(), need_prune_key, metrics: Default::default(), diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 889c05a582a0..7f981f91623c 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -15,6 +15,7 @@ //! Shard in a partition. use std::cmp::Ordering; +use std::time::{Duration, Instant}; use store_api::metadata::RegionMetadataRef; @@ -25,8 +26,10 @@ use crate::memtable::merge_tree::data::{ }; use crate::memtable::merge_tree::dict::KeyDictRef; use crate::memtable::merge_tree::merger::{Merger, Node}; +use crate::memtable::merge_tree::partition::PrimaryKeyFilter; use crate::memtable::merge_tree::shard_builder::ShardBuilderReader; -use crate::memtable::merge_tree::{PkId, ShardId}; +use crate::memtable::merge_tree::{PkId, PkIndex, ShardId}; +use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED; /// Shard stores data related to the same key dictionary. pub struct Shard { @@ -131,18 +134,15 @@ pub struct ShardReaderBuilder { } impl ShardReaderBuilder { - pub(crate) fn build(self) -> Result { + pub(crate) fn build(self, key_filter: Option) -> Result { let ShardReaderBuilder { shard_id, key_dict, inner, } = self; + let now = Instant::now(); let parts_reader = inner.build()?; - Ok(ShardReader { - shard_id, - key_dict, - parts_reader, - }) + ShardReader::new(shard_id, key_dict, parts_reader, key_filter, now.elapsed()) } } @@ -151,15 +151,46 @@ pub struct ShardReader { shard_id: ShardId, key_dict: Option, parts_reader: DataPartsReader, + key_filter: Option, + last_yield_pk_index: Option, + keys_before_pruning: usize, + keys_after_pruning: usize, + prune_pk_cost: Duration, + data_build_cost: Duration, } impl ShardReader { + fn new( + shard_id: ShardId, + key_dict: Option, + parts_reader: DataPartsReader, + key_filter: Option, + data_build_cost: Duration, + ) -> Result { + let has_pk = key_dict.is_some(); + let mut reader = Self { + shard_id, + key_dict, + parts_reader, + key_filter: if has_pk { key_filter } else { None }, + last_yield_pk_index: None, + keys_before_pruning: 0, + keys_after_pruning: 0, + prune_pk_cost: Duration::default(), + data_build_cost, + }; + reader.prune_batch_by_key()?; + + Ok(reader) + } + fn is_valid(&self) -> bool { self.parts_reader.is_valid() } fn next(&mut self) -> Result<()> { - self.parts_reader.next() + self.parts_reader.next()?; + self.prune_batch_by_key() } fn current_key(&self) -> Option<&[u8]> { @@ -180,6 +211,54 @@ impl ShardReader { fn current_data_batch(&self) -> DataBatch { self.parts_reader.current_data_batch() } + + fn prune_batch_by_key(&mut self) -> Result<()> { + let Some(key_filter) = &mut self.key_filter else { + return Ok(()); + }; + + while self.parts_reader.is_valid() { + let pk_index = self.parts_reader.current_data_batch().pk_index(); + if let Some(yield_pk_index) = self.last_yield_pk_index { + if pk_index == yield_pk_index { + break; + } + } + self.keys_before_pruning += 1; + // Safety: `key_filter` is some so the shard has primary keys. + let key = self.key_dict.as_ref().unwrap().key_by_pk_index(pk_index); + let now = Instant::now(); + if key_filter.prune_primary_key(key) { + self.prune_pk_cost += now.elapsed(); + self.last_yield_pk_index = Some(pk_index); + self.keys_after_pruning += 1; + break; + } + self.prune_pk_cost += now.elapsed(); + self.parts_reader.next()?; + } + + Ok(()) + } +} + +impl Drop for ShardReader { + fn drop(&mut self) { + let shard_prune_pk = self.prune_pk_cost.as_secs_f64(); + MERGE_TREE_READ_STAGE_ELAPSED + .with_label_values(&["shard_prune_pk"]) + .observe(shard_prune_pk); + if self.keys_before_pruning > 0 { + common_telemetry::debug!( + "ShardReader metrics, data parts: {}, before pruning: {}, after pruning: {}, prune cost: {}s, build cost: {}s", + self.parts_reader.num_parts(), + self.keys_before_pruning, + self.keys_after_pruning, + shard_prune_pk, + self.data_build_cost.as_secs_f64(), + ); + } + } } /// A merger that merges batches from multiple shards. @@ -422,7 +501,7 @@ mod tests { } assert!(!shard.is_empty()); - let mut reader = shard.read().unwrap().build().unwrap(); + let mut reader = shard.read().unwrap().build(None).unwrap(); let mut timestamps = Vec::new(); while reader.is_valid() { let rb = reader.current_data_batch().slice_record_batch(); diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index 07fedc38ddba..2b007ebd87a1 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; +use std::time::{Duration, Instant}; use store_api::metadata::RegionMetadataRef; @@ -26,8 +27,9 @@ use crate::memtable::merge_tree::data::{ }; use crate::memtable::merge_tree::dict::{DictBuilderReader, KeyDictBuilder}; use crate::memtable::merge_tree::metrics::WriteMetrics; +use crate::memtable::merge_tree::partition::PrimaryKeyFilter; use crate::memtable::merge_tree::shard::Shard; -use crate::memtable::merge_tree::{MergeTreeConfig, PkId, ShardId}; +use crate::memtable::merge_tree::{MergeTreeConfig, PkId, PkIndex, ShardId}; use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED; /// Builder to write keys and data to a shard that the key dictionary @@ -176,13 +178,20 @@ pub(crate) struct ShardBuilderReaderBuilder { } impl ShardBuilderReaderBuilder { - pub(crate) fn build(self, pk_weights: Option<&[u16]>) -> Result { + pub(crate) fn build( + self, + pk_weights: Option<&[u16]>, + key_filter: Option, + ) -> Result { + let now = Instant::now(); let data_reader = self.data_reader.build(pk_weights)?; - Ok(ShardBuilderReader { - shard_id: self.shard_id, - dict_reader: self.dict_reader, + ShardBuilderReader::new( + self.shard_id, + self.dict_reader, data_reader, - }) + key_filter, + now.elapsed(), + ) } } @@ -191,15 +200,45 @@ pub struct ShardBuilderReader { shard_id: ShardId, dict_reader: DictBuilderReader, data_reader: DataBufferReader, + key_filter: Option, + last_yield_pk_index: Option, + keys_before_pruning: usize, + keys_after_pruning: usize, + prune_pk_cost: Duration, + data_build_cost: Duration, } impl ShardBuilderReader { + fn new( + shard_id: ShardId, + dict_reader: DictBuilderReader, + data_reader: DataBufferReader, + key_filter: Option, + data_build_cost: Duration, + ) -> Result { + let mut reader = ShardBuilderReader { + shard_id, + dict_reader, + data_reader, + key_filter, + last_yield_pk_index: None, + keys_before_pruning: 0, + keys_after_pruning: 0, + prune_pk_cost: Duration::default(), + data_build_cost, + }; + reader.prune_batch_by_key()?; + + Ok(reader) + } + pub fn is_valid(&self) -> bool { self.data_reader.is_valid() } pub fn next(&mut self) -> Result<()> { - self.data_reader.next() + self.data_reader.next()?; + self.prune_batch_by_key() } pub fn current_key(&self) -> Option<&[u8]> { @@ -218,6 +257,52 @@ impl ShardBuilderReader { pub fn current_data_batch(&self) -> DataBatch { self.data_reader.current_data_batch() } + + fn prune_batch_by_key(&mut self) -> Result<()> { + let Some(key_filter) = &mut self.key_filter else { + return Ok(()); + }; + + while self.data_reader.is_valid() { + let pk_index = self.data_reader.current_data_batch().pk_index(); + if let Some(yield_pk_index) = self.last_yield_pk_index { + if pk_index == yield_pk_index { + break; + } + } + self.keys_before_pruning += 1; + let key = self.dict_reader.key_by_pk_index(pk_index); + let now = Instant::now(); + if key_filter.prune_primary_key(key) { + self.prune_pk_cost += now.elapsed(); + self.last_yield_pk_index = Some(pk_index); + self.keys_after_pruning += 1; + break; + } + self.prune_pk_cost += now.elapsed(); + self.data_reader.next()?; + } + + Ok(()) + } +} + +impl Drop for ShardBuilderReader { + fn drop(&mut self) { + let shard_builder_prune_pk = self.prune_pk_cost.as_secs_f64(); + MERGE_TREE_READ_STAGE_ELAPSED + .with_label_values(&["shard_builder_prune_pk"]) + .observe(shard_builder_prune_pk); + if self.keys_before_pruning > 0 { + common_telemetry::debug!( + "ShardBuilderReader metrics, before pruning: {}, after pruning: {}, prune cost: {}s, build cost: {}s", + self.keys_before_pruning, + self.keys_after_pruning, + shard_builder_prune_pk, + self.data_build_cost.as_secs_f64(), + ); + } + } } #[cfg(test)] @@ -306,7 +391,7 @@ mod tests { let mut reader = shard_builder .read(&mut pk_weights) .unwrap() - .build(Some(&pk_weights)) + .build(Some(&pk_weights), None) .unwrap(); let mut timestamps = Vec::new(); while reader.is_valid() { diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index a83d237457a0..16c7ea8771ea 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -215,6 +215,61 @@ impl SortField { Decimal128, Decimal128 ) } + + /// Skip deserializing this field, returns the length of it. + fn skip_deserialize( + &self, + bytes: &[u8], + deserializer: &mut Deserializer<&[u8]>, + ) -> Result { + let pos = deserializer.position(); + if bytes[pos] == 0 { + deserializer.advance(1); + return Ok(1); + } + + let to_skip = match &self.data_type { + ConcreteDataType::Boolean(_) => 2, + ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2, + ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3, + ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5, + ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9, + ConcreteDataType::Float32(_) => 5, + ConcreteDataType::Float64(_) => 9, + ConcreteDataType::Binary(_) => { + // Now the encoder encode binary as a list of bytes so we can't use + // skip bytes. + let pos_before = deserializer.position(); + let mut current = pos_before + 1; + while bytes[current] == 1 { + current += 2; + } + let to_skip = current - pos_before + 1; + deserializer.advance(to_skip); + return Ok(to_skip); + } + ConcreteDataType::String(_) => { + let pos_before = deserializer.position(); + deserializer.advance(1); + deserializer + .skip_bytes() + .context(error::DeserializeFieldSnafu)?; + return Ok(deserializer.position() - pos_before); + } + ConcreteDataType::Date(_) => 5, + ConcreteDataType::DateTime(_) => 9, + ConcreteDataType::Timestamp(_) => 9, // We treat timestamp as Option + ConcreteDataType::Time(_) => 10, // i64 and 1 byte time unit + ConcreteDataType::Duration(_) => 10, + ConcreteDataType::Interval(_) => 18, + ConcreteDataType::Decimal128(_) => 19, + ConcreteDataType::Null(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) => 0, + }; + deserializer.advance(to_skip); + Ok(to_skip) + } } /// A memory-comparable row [Value] encoder/decoder. @@ -236,6 +291,52 @@ impl McmpRowCodec { pub fn estimated_size(&self) -> usize { self.fields.iter().map(|f| f.estimated_size()).sum() } + + /// Decode value at `pos` in `bytes`. + /// + /// The i-th element in offsets buffer is how many bytes to skip in order to read value at `pos`. + pub fn decode_value_at( + &self, + bytes: &[u8], + pos: usize, + offsets_buf: &mut Vec, + ) -> Result { + let mut deserializer = Deserializer::new(bytes); + if pos < offsets_buf.len() { + // We computed the offset before. + let to_skip = offsets_buf[pos]; + deserializer.advance(to_skip); + return self.fields[pos].deserialize(&mut deserializer); + } + + if offsets_buf.is_empty() { + let mut offset = 0; + // Skip values before `pos`. + for i in 0..pos { + // Offset to skip before reading value i. + offsets_buf.push(offset); + let skip = self.fields[i].skip_deserialize(bytes, &mut deserializer)?; + offset += skip; + } + // Offset to skip before reading this value. + offsets_buf.push(offset); + } else { + // Offsets are not enough. + let value_start = offsets_buf.len() - 1; + // Advances to decode value at `value_start`. + let mut offset = offsets_buf[value_start]; + deserializer.advance(offset); + for i in value_start..pos { + // Skip value i. + let skip = self.fields[i].skip_deserialize(bytes, &mut deserializer)?; + // Offset for the value at i + 1. + offset += skip; + offsets_buf.push(offset); + } + } + + self.fields[pos].deserialize(&mut deserializer) + } } impl RowCodec for McmpRowCodec { @@ -274,7 +375,7 @@ impl RowCodec for McmpRowCodec { #[cfg(test)] mod tests { use common_base::bytes::StringBytes; - use common_time::Timestamp; + use common_time::{DateTime, Timestamp}; use datatypes::value::Value; use super::*; @@ -292,6 +393,18 @@ mod tests { let result = encoder.encode(value_ref.iter().cloned()).unwrap(); let decoded = encoder.decode(&result).unwrap(); assert_eq!(decoded, row); + let mut decoded = Vec::new(); + let mut offsets = Vec::new(); + // Iter two times to test offsets buffer. + for _ in 0..2 { + decoded.clear(); + for i in 0..data_types.len() { + let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap(); + decoded.push(value); + } + assert_eq!(data_types.len(), offsets.len(), "offsets: {:?}", offsets); + assert_eq!(decoded, row); + } } #[test] @@ -416,5 +529,53 @@ mod tests { ], vec![Value::Null, Value::Int64(43), Value::Boolean(true)], ); + + // All types. + check_encode_and_decode( + &[ + ConcreteDataType::boolean_datatype(), + ConcreteDataType::int8_datatype(), + ConcreteDataType::uint8_datatype(), + ConcreteDataType::int16_datatype(), + ConcreteDataType::uint16_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::uint32_datatype(), + ConcreteDataType::int64_datatype(), + ConcreteDataType::uint64_datatype(), + ConcreteDataType::float32_datatype(), + ConcreteDataType::float64_datatype(), + ConcreteDataType::binary_datatype(), + ConcreteDataType::string_datatype(), + ConcreteDataType::date_datatype(), + ConcreteDataType::datetime_datatype(), + ConcreteDataType::timestamp_millisecond_datatype(), + ConcreteDataType::time_millisecond_datatype(), + ConcreteDataType::duration_millisecond_datatype(), + ConcreteDataType::interval_month_day_nano_datatype(), + ConcreteDataType::decimal128_default_datatype(), + ], + vec![ + Value::Boolean(true), + Value::Int8(8), + Value::UInt8(8), + Value::Int16(16), + Value::UInt16(16), + Value::Int32(32), + Value::UInt32(32), + Value::Int64(64), + Value::UInt64(64), + Value::Float32(1.0.into()), + Value::Float64(1.0.into()), + Value::Binary(b"hello"[..].into()), + Value::String("world".into()), + Value::Date(Date::new(10)), + Value::DateTime(DateTime::new(11)), + Value::Timestamp(Timestamp::new_millisecond(12)), + Value::Time(Time::new_millisecond(13)), + Value::Duration(Duration::new_millisecond(14)), + Value::Interval(Interval::from_month_day_nano(1, 1, 15)), + Value::Decimal128(Decimal128::from(16)), + ], + ); } }