Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement iter for the new memtable #3373

Merged
merged 15 commits into from
Feb 25, 2024
106 changes: 98 additions & 8 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ impl Memtable for MergeTreeMemtable {

fn iter(
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Result<BoxedBatchIterator> {
todo!()
self.tree.read(projection, predicate)
}

fn is_empty(&self) -> bool {
Expand Down Expand Up @@ -275,18 +275,22 @@ impl MemtableBuilder for MergeTreeMemtableBuilder {

#[cfg(test)]
mod tests {
use std::collections::BTreeSet;

use common_time::Timestamp;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};

use super::*;
use crate::test_util::memtable_util;

#[test]
fn test_memtable_sorted_input() {
write_sorted_input(true);
write_sorted_input(false);
write_iter_sorted_input(true);
write_iter_sorted_input(false);
}

fn write_sorted_input(has_pk: bool) {
fn write_iter_sorted_input(has_pk: bool) {
let metadata = if has_pk {
memtable_util::metadata_with_primary_key(vec![1, 0], true)
} else {
Expand All @@ -298,7 +302,27 @@ mod tests {
let memtable = MergeTreeMemtable::new(1, metadata, None, &MergeTreeConfig::default());
memtable.write(&kvs).unwrap();

// TODO(yingwen): Test iter.
let expected_ts = kvs
.iter()
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
.collect::<BTreeSet<_>>();

let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<BTreeSet<_>>();
assert_eq!(expected_ts, read);

let stats = memtable.stats();
assert!(stats.bytes_allocated() > 0);
Expand Down Expand Up @@ -344,7 +368,36 @@ mod tests {
);
memtable.write(&kvs).unwrap();

// TODO(yingwen): Test iter.
let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);

let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.sequences()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap())
.collect::<Vec<_>>();
assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);

let stats = memtable.stats();
assert!(stats.bytes_allocated() > 0);
Expand All @@ -353,4 +406,41 @@ mod tests {
stats.time_range()
);
}

#[test]
fn test_memtable_projection() {
write_iter_projection(true);
write_iter_projection(false);
}

fn write_iter_projection(has_pk: bool) {
let metadata = if has_pk {
memtable_util::metadata_with_primary_key(vec![1, 0], true)
} else {
memtable_util::metadata_with_primary_key(vec![], false)
};
// Try to build a memtable via the builder.
let memtable = MergeTreeMemtableBuilder::new(None).build(1, &metadata);

let expect = (0..100).collect::<Vec<_>>();
let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
memtable.write(&kvs).unwrap();
let iter = memtable.iter(Some(&[3]), None).unwrap();

let mut v0_all = vec![];
for res in iter {
let batch = res.unwrap();
assert_eq!(1, batch.fields().len());
let v0 = batch
.fields()
.first()
.unwrap()
.data
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
}
assert_eq!(expect, v0_all);
}
}
13 changes: 7 additions & 6 deletions src/mito2/src/memtable/merge_tree/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ impl DictBuilderReader {
}

/// Returns pk weights to sort a data part and replaces pk indices.
pub(crate) fn pk_weights_to_sort_data(&self) -> Vec<u16> {
compute_pk_weights(&self.sorted_pk_indices)
pub(crate) fn pk_weights_to_sort_data(&self, pk_weights: &mut Vec<u16>) {
compute_pk_weights(&self.sorted_pk_indices, pk_weights)
}

/// Returns pk indices sorted by keys.
Expand All @@ -199,12 +199,11 @@ impl DictBuilderReader {
}

/// Returns pk weights to sort a data part and replaces pk indices.
fn compute_pk_weights(sorted_pk_indices: &[PkIndex]) -> Vec<u16> {
let mut pk_weights = vec![0; sorted_pk_indices.len()];
fn compute_pk_weights(sorted_pk_indices: &[PkIndex], pk_weights: &mut Vec<u16>) {
pk_weights.resize(sorted_pk_indices.len(), 0);
for (weight, pk_index) in sorted_pk_indices.iter().enumerate() {
pk_weights[*pk_index as usize] = weight as u16;
}
pk_weights
}

/// A key dictionary.
Expand Down Expand Up @@ -240,7 +239,9 @@ impl KeyDict {

/// Returns pk weights to sort a data part and replaces pk indices.
pub(crate) fn pk_weights_to_sort_data(&self) -> Vec<u16> {
compute_pk_weights(&self.key_positions)
let mut pk_weights = Vec::with_capacity(self.key_positions.len());
compute_pk_weights(&self.key_positions, &mut pk_weights);
pk_weights
}

/// Returns the shared memory size.
Expand Down
Loading