diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 1ce3509220e6..99fcc7591976 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -14,9 +14,10 @@ //! Memtables are write buffers for regions. -pub mod time_series; - pub mod key_values; +#[allow(dead_code)] +pub mod merge_tree; +pub mod time_series; pub(crate) mod version; use std::fmt; diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs new file mode 100644 index 000000000000..6e7c0329b418 --- /dev/null +++ b/src/mito2/src/memtable/merge_tree.rs @@ -0,0 +1,261 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Memtable implementation based on a merge tree. + +mod data; +mod dict; +mod metrics; +mod partition; +mod shard; +mod shard_builder; +mod tree; + +use std::fmt; +use std::sync::atomic::{AtomicI64, AtomicU32, Ordering}; +use std::sync::Arc; + +use store_api::metadata::RegionMetadataRef; +use store_api::storage::ColumnId; +use table::predicate::Predicate; + +use crate::error::Result; +use crate::flush::WriteBufferManagerRef; +use crate::memtable::merge_tree::metrics::WriteMetrics; +use crate::memtable::merge_tree::tree::MergeTree; +use crate::memtable::{ + AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, + MemtableRef, MemtableStats, +}; + +/// Id of a shard, only unique inside a partition. +type ShardId = u32; +/// Index of a primary key in a shard. +type PkIndex = u16; +/// Id of a primary key inside a tree. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct PkId { + shard_id: ShardId, + pk_index: PkIndex, +} + +/// Config for the merge tree memtable. +#[derive(Debug, Clone)] +pub struct MergeTreeConfig { + /// Max keys in an index shard. + pub index_max_keys_per_shard: usize, + /// Number of rows to freeze a data part. + pub data_freeze_threshold: usize, +} + +impl Default for MergeTreeConfig { + fn default() -> Self { + Self { + index_max_keys_per_shard: 8192, + data_freeze_threshold: 102400, + } + } +} + +/// Memtable based on a merge tree. +pub struct MergeTreeMemtable { + id: MemtableId, + tree: MergeTree, + alloc_tracker: AllocTracker, + max_timestamp: AtomicI64, + min_timestamp: AtomicI64, +} + +impl fmt::Debug for MergeTreeMemtable { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MergeTreeMemtable") + .field("id", &self.id) + .finish() + } +} + +impl Memtable for MergeTreeMemtable { + fn id(&self) -> MemtableId { + self.id + } + + fn write(&self, kvs: &KeyValues) -> Result<()> { + // TODO(yingwen): Validate schema while inserting rows. + + let mut metrics = WriteMetrics::default(); + let res = self.tree.write(kvs, &mut metrics); + + self.update_stats(&metrics); + + res + } + + fn iter( + &self, + _projection: Option<&[ColumnId]>, + _predicate: Option, + ) -> BoxedBatchIterator { + // FIXME(yingwen): Change return value to `Result`. + todo!() + } + + fn is_empty(&self) -> bool { + self.tree.is_empty() + } + + fn mark_immutable(&self) { + self.alloc_tracker.done_allocating(); + } + + fn stats(&self) -> MemtableStats { + let estimated_bytes = self.alloc_tracker.bytes_allocated(); + + if estimated_bytes == 0 { + // no rows ever written + return MemtableStats { + estimated_bytes, + time_range: None, + }; + } + + let ts_type = self + .tree + .metadata + .time_index_column() + .column_schema + .data_type + .clone() + .as_timestamp() + .expect("Timestamp column must have timestamp type"); + let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed)); + let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed)); + MemtableStats { + estimated_bytes, + time_range: Some((min_timestamp, max_timestamp)), + } + } +} + +impl MergeTreeMemtable { + /// Returns a new memtable. + pub fn new( + id: MemtableId, + metadata: RegionMetadataRef, + write_buffer_manager: Option, + config: &MergeTreeConfig, + ) -> Self { + Self::with_tree(id, MergeTree::new(metadata, config), write_buffer_manager) + } + + /// Creates a mutable memtable from the tree. + /// + /// It also adds the bytes used by shared parts (e.g. index) to the memory usage. + fn with_tree( + id: MemtableId, + tree: MergeTree, + write_buffer_manager: Option, + ) -> Self { + let alloc_tracker = AllocTracker::new(write_buffer_manager); + // Track space allocated by the tree. + let allocated = tree.shared_memory_size(); + // Here we still add the bytes of shared parts to the tracker as the old memtable + // will release its tracker soon. + alloc_tracker.on_allocation(allocated); + + Self { + id, + tree, + alloc_tracker, + max_timestamp: AtomicI64::new(i64::MIN), + min_timestamp: AtomicI64::new(i64::MAX), + } + } + + /// Updates stats of the memtable. + fn update_stats(&self, metrics: &WriteMetrics) { + self.alloc_tracker + .on_allocation(metrics.key_bytes + metrics.value_bytes); + + loop { + let current_min = self.min_timestamp.load(Ordering::Relaxed); + if metrics.min_ts >= current_min { + break; + } + + let Err(updated) = self.min_timestamp.compare_exchange( + current_min, + metrics.min_ts, + Ordering::Relaxed, + Ordering::Relaxed, + ) else { + break; + }; + + if updated == metrics.min_ts { + break; + } + } + + loop { + let current_max = self.max_timestamp.load(Ordering::Relaxed); + if metrics.max_ts <= current_max { + break; + } + + let Err(updated) = self.max_timestamp.compare_exchange( + current_max, + metrics.max_ts, + Ordering::Relaxed, + Ordering::Relaxed, + ) else { + break; + }; + + if updated == metrics.max_ts { + break; + } + } + } +} + +/// Builder to build a [MergeTreeMemtable]. +#[derive(Debug, Default)] +pub struct MergeTreeMemtableBuilder { + id: AtomicU32, + write_buffer_manager: Option, + config: MergeTreeConfig, +} + +impl MergeTreeMemtableBuilder { + /// Creates a new builder with specific `write_buffer_manager`. + pub fn new(write_buffer_manager: Option) -> Self { + Self { + id: AtomicU32::new(0), + write_buffer_manager, + config: MergeTreeConfig::default(), + } + } +} + +impl MemtableBuilder for MergeTreeMemtableBuilder { + fn build(&self, metadata: &RegionMetadataRef) -> MemtableRef { + let id = self.id.fetch_add(1, Ordering::Relaxed); + Arc::new(MergeTreeMemtable::new( + id, + metadata.clone(), + self.write_buffer_manager.clone(), + &self.config, + )) + } +} diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs new file mode 100644 index 000000000000..3f2627e9d46c --- /dev/null +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -0,0 +1,21 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Data part of a shard. + +/// Buffer to store columns not in the primary key. +pub struct DataBuffer {} + +/// Data parts under a shard. +pub struct DataParts {} diff --git a/src/mito2/src/memtable/merge_tree/dict.rs b/src/mito2/src/memtable/merge_tree/dict.rs new file mode 100644 index 000000000000..d8e2ba8712ac --- /dev/null +++ b/src/mito2/src/memtable/merge_tree/dict.rs @@ -0,0 +1,28 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Key dictionary of a shard. + +use std::sync::Arc; + +/// Builder to build a key dictionary. +pub struct KeyDictBuilder {} + +/// Buffer to store unsorted primary keys. +pub struct KeyBuffer {} + +/// A key dictionary. +pub struct KeyDict {} + +pub type KeyDictRef = Arc; diff --git a/src/mito2/src/memtable/merge_tree/metrics.rs b/src/mito2/src/memtable/merge_tree/metrics.rs new file mode 100644 index 000000000000..7a2e37359a5b --- /dev/null +++ b/src/mito2/src/memtable/merge_tree/metrics.rs @@ -0,0 +1,38 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Internal metrics of the memtable. + +/// Metrics of writing the merge tree. +pub struct WriteMetrics { + /// Size allocated by keys. + pub key_bytes: usize, + /// Size allocated by values. + pub value_bytes: usize, + /// Minimum timestamp. + pub min_ts: i64, + /// Maximum timestamp + pub max_ts: i64, +} + +impl Default for WriteMetrics { + fn default() -> Self { + Self { + key_bytes: 0, + value_bytes: 0, + min_ts: i64::MAX, + max_ts: i64::MIN, + } + } +} diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs new file mode 100644 index 000000000000..0a5921c0ca5c --- /dev/null +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -0,0 +1,42 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Partition of a merge tree. +//! +//! We only support partitioning the tree by pre-defined internal columns. + +use std::sync::{Arc, RwLock}; + +use crate::memtable::merge_tree::shard::Shard; +use crate::memtable::merge_tree::shard_builder::ShardBuilder; +use crate::memtable::merge_tree::ShardId; + +/// Key of a partition. +pub type PartitionKey = u32; + +/// A tree partition. +pub struct Partition { + inner: RwLock, +} + +pub type PartitionRef = Arc; + +/// Inner struct of the partition. +struct Inner { + /// Shard whose dictionary is active. + shard_builder: ShardBuilder, + next_shard_id: ShardId, + /// Shards with frozon dictionary. + shards: Vec, +} diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs new file mode 100644 index 000000000000..d7fb74b6bafb --- /dev/null +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -0,0 +1,28 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Shard in a partition. + +use crate::memtable::merge_tree::data::DataParts; +use crate::memtable::merge_tree::dict::KeyDictRef; +use crate::memtable::merge_tree::ShardId; + +/// Shard stores data related to the same key dictionary. +pub struct Shard { + shard_id: ShardId, + /// Key dictionary of the shard. `None` if the schema of the tree doesn't have a primary key. + key_dict: Option, + /// Data in the shard. + data_parts: DataParts, +} diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs new file mode 100644 index 000000000000..a66366204989 --- /dev/null +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -0,0 +1,27 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Builder of a shard. + +use crate::memtable::merge_tree::data::DataBuffer; +use crate::memtable::merge_tree::dict::KeyDictBuilder; + +/// Builder to write keys and data to a shard that the key dictionary +/// is still active. +pub struct ShardBuilder { + /// Builder for the key dictionary. + dict_builder: KeyDictBuilder, + /// Buffer to store data. + data_buffer: DataBuffer, +} diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs new file mode 100644 index 000000000000..39b6fbea9887 --- /dev/null +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -0,0 +1,101 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Implementation of the merge tree. + +use std::collections::BTreeMap; +use std::sync::{Arc, RwLock}; + +use store_api::metadata::RegionMetadataRef; +use store_api::storage::ColumnId; +use table::predicate::Predicate; + +use crate::error::Result; +use crate::memtable::merge_tree::metrics::WriteMetrics; +use crate::memtable::merge_tree::partition::{PartitionKey, PartitionRef}; +use crate::memtable::merge_tree::MergeTreeConfig; +use crate::memtable::{BoxedBatchIterator, KeyValues}; +use crate::row_converter::{McmpRowCodec, SortField}; + +/// The merge tree. +pub struct MergeTree { + /// Config of the tree. + config: MergeTreeConfig, + /// Metadata of the region. + pub(crate) metadata: RegionMetadataRef, + /// Primary key codec. + row_codec: Arc, + /// Partitions in the tree. + partitions: RwLock>, +} + +impl MergeTree { + /// Creates a new merge tree. + pub fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> MergeTree { + let row_codec = McmpRowCodec::new( + metadata + .primary_key_columns() + .map(|c| SortField::new(c.column_schema.data_type.clone())) + .collect(), + ); + + MergeTree { + config: config.clone(), + metadata, + row_codec: Arc::new(row_codec), + partitions: Default::default(), + } + } + + // TODO(yingwen): The size computed from values is inaccurate. + /// Write key-values into the tree. + /// + /// # Panics + /// Panics if the tree is immutable (frozen). + pub fn write(&self, _kvs: &KeyValues, _metrics: &mut WriteMetrics) -> Result<()> { + todo!() + } + + /// Scans the tree. + pub fn scan( + &self, + _projection: Option<&[ColumnId]>, + _predicate: Option, + ) -> Result { + todo!() + } + + /// Returns true if the tree is empty. + pub fn is_empty(&self) -> bool { + todo!() + } + + /// Marks the tree as immutable. + /// + /// Once the tree becomes immutable, callers should not write to it again. + pub fn freeze(&self) -> Result<()> { + todo!() + } + + /// Forks an immutable tree. Returns a mutable tree that inherits the index + /// of this tree. + pub fn fork(&self, _metadata: RegionMetadataRef) -> MergeTree { + todo!() + } + + /// Returns the memory size shared by forked trees. + pub fn shared_memory_size(&self) -> usize { + todo!() + } +}