Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement dedup for the new memtable and expose the config #3377

Merged
merged 6 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use serde::{Deserialize, Serialize};
use serde_with::{serde_as, NoneAsEmptyString};

use crate::error::Result;
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;

/// Default max running background job.
Expand Down Expand Up @@ -102,6 +103,9 @@ pub struct MitoConfig {

/// Inverted index configs.
pub inverted_index: InvertedIndexConfig,

/// Experimental memtable.
pub experimental_memtable: Option<MergeTreeConfig>,
}

impl Default for MitoConfig {
Expand All @@ -127,6 +131,7 @@ impl Default for MitoConfig {
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
inverted_index: InvertedIndexConfig::default(),
experimental_memtable: None,
};

// Adjust buffer and cache size according to system memory if we can.
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,11 @@ async fn test_region_usage() {
flush_region(&engine, region_id, None).await;

let region_stat = region.region_usage().await;
assert_eq!(region_stat.wal_usage, 0);
assert_eq!(region_stat.sst_usage, 2962);

// region total usage
assert_eq!(region_stat.disk_usage(), 4028);
// Some memtables may share items.
assert!(region_stat.disk_usage() >= 4028);
}

#[tokio::test]
Expand Down
48 changes: 41 additions & 7 deletions src/mito2/src/memtable/key_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl<'a> KeyValue<'a> {

/// Get number of field columns.
pub fn num_fields(&self) -> usize {
self.row.values.len() - self.helper.num_primary_key_column - 1
self.helper.indices.len() - self.helper.num_primary_key_column - 1
}

/// Get sequence.
Expand Down Expand Up @@ -261,7 +261,13 @@ mod tests {
}
}

fn check_key_values(kvs: &KeyValues, num_rows: usize, keys: &[i64], ts: i64, values: &[i64]) {
fn check_key_values(
kvs: &KeyValues,
num_rows: usize,
keys: &[Option<i64>],
ts: i64,
values: &[Option<i64>],
) {
assert_eq!(num_rows, kvs.num_rows());
let mut expect_seq = START_SEQ;
let expect_ts = ValueRef::Int64(ts);
Expand All @@ -273,10 +279,10 @@ mod tests {
assert_eq!(values.len(), kv.num_fields());

assert_eq!(expect_ts, kv.timestamp());
let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::Int64(*k)).collect();
let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::from(*k)).collect();
let actual_keys: Vec<_> = kv.primary_keys().collect();
assert_eq!(expect_keys, actual_keys);
let expect_values: Vec<_> = values.iter().map(|v| ValueRef::Int64(*v)).collect();
let expect_values: Vec<_> = values.iter().map(|v| ValueRef::from(*v)).collect();
let actual_values: Vec<_> = kv.fields().collect();
assert_eq!(expect_values, actual_values);
}
Expand Down Expand Up @@ -312,7 +318,7 @@ mod tests {
// KeyValues
// keys: [k0=2, k1=0]
// ts: 1,
check_key_values(&kvs, 3, &[2, 0], 1, &[]);
check_key_values(&kvs, 3, &[Some(2), Some(0)], 1, &[]);
}

#[test]
Expand All @@ -325,7 +331,7 @@ mod tests {
// KeyValues (note that v0 is in front of v1 in region schema)
// ts: 2,
// fields: [v0=1, v1=0]
check_key_values(&kvs, 3, &[], 2, &[1, 0]);
check_key_values(&kvs, 3, &[], 2, &[Some(1), Some(0)]);
}

#[test]
Expand All @@ -339,6 +345,34 @@ mod tests {
// keys: [k0=0, k1=3]
// ts: 2,
// fields: [v0=1, v1=4]
check_key_values(&kvs, 3, &[0, 3], 2, &[1, 4]);
check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), Some(4)]);
}

#[test]
fn test_sparse_field() {
let meta = new_region_metadata(2, 2);
// The value of each row:
// k0=0, v0=1, ts=2, k1=3, (v1 will be null)
let mutation = new_mutation(&["k0", "v0", "ts", "k1"], 3);
let kvs = KeyValues::new(&meta, mutation).unwrap();
// KeyValues
// keys: [k0=0, k1=3]
// ts: 2,
// fields: [v0=1, v1=null]
check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), None]);
}

#[test]
fn test_sparse_tag_field() {
let meta = new_region_metadata(2, 2);
// The value of each row:
// k0 = 0, v0=1, ts=2, (k1, v1 will be null)
let mutation = new_mutation(&["k0", "v0", "ts"], 3);
let kvs = KeyValues::new(&meta, mutation).unwrap();
// KeyValues
// keys: [k0=0, k1=null]
// ts: 2,
// fields: [v0=1, v1=null]
check_key_values(&kvs, 3, &[Some(0), None], 2, &[Some(1), None]);
}
}
16 changes: 11 additions & 5 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::fmt;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
Expand All @@ -54,7 +55,8 @@ struct PkId {
}

/// Config for the merge tree memtable.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct MergeTreeConfig {
/// Max keys in an index shard.
pub index_max_keys_per_shard: usize,
Expand Down Expand Up @@ -248,16 +250,19 @@ impl MergeTreeMemtable {
/// Builder to build a [MergeTreeMemtable].
#[derive(Debug, Default)]
pub struct MergeTreeMemtableBuilder {
write_buffer_manager: Option<WriteBufferManagerRef>,
config: MergeTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>,
}

impl MergeTreeMemtableBuilder {
/// Creates a new builder with specific `write_buffer_manager`.
pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> Self {
pub fn new(
config: MergeTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self {
Self {
config,
write_buffer_manager,
config: MergeTreeConfig::default(),
}
}
}
Expand Down Expand Up @@ -420,7 +425,8 @@ mod tests {
memtable_util::metadata_with_primary_key(vec![], false)
};
// Try to build a memtable via the builder.
let memtable = MergeTreeMemtableBuilder::new(None).build(1, &metadata);
let memtable =
MergeTreeMemtableBuilder::new(MergeTreeConfig::default(), None).build(1, &metadata);

let expect = (0..100).collect::<Vec<_>>();
let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
Expand Down
27 changes: 3 additions & 24 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,29 +883,6 @@ impl DataPartsReader {
}
}

#[cfg(test)]
pub(crate) fn write_rows_to_buffer(
buffer: &mut DataBuffer,
schema: &RegionMetadataRef,
pk_index: u16,
ts: Vec<i64>,
v0: Vec<Option<f64>>,
sequence: u64,
) {
let kvs = crate::test_util::memtable_util::build_key_values_with_ts_seq_values(
schema,
"whatever".to_string(),
1,
ts.into_iter(),
v0.into_iter(),
sequence,
);

for kv in kvs.iter() {
buffer.write_row(pk_index, kv);
}
}

#[cfg(test)]
mod tests {
use datafusion::arrow::array::Float64Array;
Expand All @@ -914,7 +891,9 @@ mod tests {
use parquet::data_type::AsBytes;

use super::*;
use crate::test_util::memtable_util::{extract_data_batch, metadata_for_test};
use crate::test_util::memtable_util::{
extract_data_batch, metadata_for_test, write_rows_to_buffer,
};

#[test]
fn test_lazy_mutable_vector_builder() {
Expand Down
85 changes: 35 additions & 50 deletions src/mito2/src/memtable/merge_tree/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,19 @@ use std::ops::Range;

use crate::error::Result;
use crate::memtable::merge_tree::data::DataBatch;
use crate::memtable::merge_tree::shard::DataBatchSource;
use crate::memtable::merge_tree::PkId;

pub trait DedupSource {
/// Returns whether current source is still valid.
fn is_valid(&self) -> bool;

/// Advances source to next data batch.
fn next(&mut self) -> Result<()>;

/// Returns current pk id.
/// # Panics
/// If source is not valid.
fn current_pk_id(&self) -> PkId;

/// Returns the current primary key bytes.
/// # Panics
/// If source is not valid.
fn current_key(&self) -> &[u8];

/// Returns the data part.
/// # Panics
/// If source is not valid.
fn current_data_batch(&self) -> DataBatch;
}

struct DedupReader<T> {
/// A reader that dedup sorted batches from a merger.
pub struct DedupReader<T> {
prev_batch_last_row: Option<(PkId, i64)>,
current_batch_range: Option<Range<usize>>,
inner: T,
}

impl<T: DedupSource> DedupReader<T> {
fn try_new(inner: T) -> Result<Self> {
impl<T: DataBatchSource> DedupReader<T> {
/// Creates a new dedup reader.
pub fn try_new(inner: T) -> Result<Self> {
let mut res = Self {
prev_batch_last_row: None,
current_batch_range: None,
Expand All @@ -57,24 +37,13 @@ impl<T: DedupSource> DedupReader<T> {
res.next()?;
Ok(res)
}
}

impl<T: DataBatchSource> DataBatchSource for DedupReader<T> {
fn is_valid(&self) -> bool {
self.current_batch_range.is_some()
}

/// Returns current encoded primary key.
/// # Panics
/// If inner reader is exhausted.
fn current_key(&self) -> &[u8] {
self.inner.current_key()
}

fn current_data_batch(&self) -> DataBatch {
let range = self.current_batch_range.as_ref().unwrap();
let data_batch = self.inner.current_data_batch();
data_batch.slice(range.start, range.len())
}

fn next(&mut self) -> Result<()> {
loop {
match &mut self.prev_batch_last_row {
Expand Down Expand Up @@ -122,40 +91,56 @@ impl<T: DedupSource> DedupReader<T> {
}
Ok(())
}

fn current_pk_id(&self) -> PkId {
self.inner.current_pk_id()
}

fn current_key(&self) -> Option<&[u8]> {
self.inner.current_key()
}

fn current_data_batch(&self) -> DataBatch {
let range = self.current_batch_range.as_ref().unwrap();
let data_batch = self.inner.current_data_batch();
data_batch.slice(range.start, range.len())
}
}

#[cfg(test)]
mod tests {
use store_api::metadata::RegionMetadataRef;

use super::*;
use crate::memtable::merge_tree::data::{
write_rows_to_buffer, DataBuffer, DataParts, DataPartsReader,
use crate::memtable::merge_tree::data::{DataBuffer, DataParts, DataPartsReader};
use crate::test_util::memtable_util::{
extract_data_batch, metadata_for_test, write_rows_to_buffer,
};
use crate::test_util::memtable_util::{extract_data_batch, metadata_for_test};

impl DedupSource for DataPartsReader {
struct MockSource(DataPartsReader);

impl DataBatchSource for MockSource {
fn is_valid(&self) -> bool {
self.is_valid()
self.0.is_valid()
}

fn next(&mut self) -> Result<()> {
self.next()
self.0.next()
}

fn current_pk_id(&self) -> PkId {
PkId {
shard_id: 0,
pk_index: self.current_data_batch().pk_index(),
pk_index: self.0.current_data_batch().pk_index(),
}
}

fn current_key(&self) -> &[u8] {
b"abcf"
fn current_key(&self) -> Option<&[u8]> {
None
}

fn current_data_batch(&self) -> DataBatch {
self.current_data_batch()
self.0.current_data_batch()
}
}

Expand Down Expand Up @@ -194,7 +179,7 @@ mod tests {
let mut parts = DataParts::new(meta, 10, true).with_frozen(frozens);

let mut res = Vec::with_capacity(expected.len());
let mut reader = DedupReader::try_new(parts.read().unwrap()).unwrap();
let mut reader = DedupReader::try_new(MockSource(parts.read().unwrap())).unwrap();
while reader.is_valid() {
let batch = reader.current_data_batch();
res.push(extract_data_batch(&batch));
Expand Down
Loading