Skip to content

Commit

Permalink
feat: Defines structs in the merge tree memtable (#3326)
Browse files Browse the repository at this point in the history
* chore: define mods

* feat: memtable struct

* feat: define structs inside the tree
  • Loading branch information
evenyag authored Feb 19, 2024
1 parent 40f43de commit 43fd87e
Show file tree
Hide file tree
Showing 9 changed files with 549 additions and 2 deletions.
5 changes: 3 additions & 2 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

//! Memtables are write buffers for regions.
pub mod time_series;

pub mod key_values;
#[allow(dead_code)]
pub mod merge_tree;
pub mod time_series;
pub(crate) mod version;

use std::fmt;
Expand Down
261 changes: 261 additions & 0 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Memtable implementation based on a merge tree.
mod data;
mod dict;
mod metrics;
mod partition;
mod shard;
mod shard_builder;
mod tree;

use std::fmt;
use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
use std::sync::Arc;

use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;

use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::tree::MergeTree;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRef, MemtableStats,
};

/// Id of a shard, only unique inside a partition.
type ShardId = u32;
/// Index of a primary key in a shard.
type PkIndex = u16;
/// Id of a primary key inside a tree.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct PkId {
shard_id: ShardId,
pk_index: PkIndex,
}

/// Config for the merge tree memtable.
#[derive(Debug, Clone)]
pub struct MergeTreeConfig {
/// Max keys in an index shard.
pub index_max_keys_per_shard: usize,
/// Number of rows to freeze a data part.
pub data_freeze_threshold: usize,
}

impl Default for MergeTreeConfig {
fn default() -> Self {
Self {
index_max_keys_per_shard: 8192,
data_freeze_threshold: 102400,
}
}
}

/// Memtable based on a merge tree.
pub struct MergeTreeMemtable {
id: MemtableId,
tree: MergeTree,
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
min_timestamp: AtomicI64,
}

impl fmt::Debug for MergeTreeMemtable {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MergeTreeMemtable")
.field("id", &self.id)
.finish()
}
}

impl Memtable for MergeTreeMemtable {
fn id(&self) -> MemtableId {
self.id
}

fn write(&self, kvs: &KeyValues) -> Result<()> {
// TODO(yingwen): Validate schema while inserting rows.

let mut metrics = WriteMetrics::default();
let res = self.tree.write(kvs, &mut metrics);

self.update_stats(&metrics);

res
}

fn iter(
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
) -> BoxedBatchIterator {
// FIXME(yingwen): Change return value to `Result<BoxedBatchIterator>`.
todo!()
}

fn is_empty(&self) -> bool {
self.tree.is_empty()
}

fn mark_immutable(&self) {
self.alloc_tracker.done_allocating();
}

fn stats(&self) -> MemtableStats {
let estimated_bytes = self.alloc_tracker.bytes_allocated();

if estimated_bytes == 0 {
// no rows ever written
return MemtableStats {
estimated_bytes,
time_range: None,
};
}

let ts_type = self
.tree
.metadata
.time_index_column()
.column_schema
.data_type
.clone()
.as_timestamp()
.expect("Timestamp column must have timestamp type");
let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
MemtableStats {
estimated_bytes,
time_range: Some((min_timestamp, max_timestamp)),
}
}
}

impl MergeTreeMemtable {
/// Returns a new memtable.
pub fn new(
id: MemtableId,
metadata: RegionMetadataRef,
write_buffer_manager: Option<WriteBufferManagerRef>,
config: &MergeTreeConfig,
) -> Self {
Self::with_tree(id, MergeTree::new(metadata, config), write_buffer_manager)
}

/// Creates a mutable memtable from the tree.
///
/// It also adds the bytes used by shared parts (e.g. index) to the memory usage.
fn with_tree(
id: MemtableId,
tree: MergeTree,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self {
let alloc_tracker = AllocTracker::new(write_buffer_manager);
// Track space allocated by the tree.
let allocated = tree.shared_memory_size();
// Here we still add the bytes of shared parts to the tracker as the old memtable
// will release its tracker soon.
alloc_tracker.on_allocation(allocated);

Self {
id,
tree,
alloc_tracker,
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
}
}

/// Updates stats of the memtable.
fn update_stats(&self, metrics: &WriteMetrics) {
self.alloc_tracker
.on_allocation(metrics.key_bytes + metrics.value_bytes);

loop {
let current_min = self.min_timestamp.load(Ordering::Relaxed);
if metrics.min_ts >= current_min {
break;
}

let Err(updated) = self.min_timestamp.compare_exchange(
current_min,
metrics.min_ts,
Ordering::Relaxed,
Ordering::Relaxed,
) else {
break;
};

if updated == metrics.min_ts {
break;
}
}

loop {
let current_max = self.max_timestamp.load(Ordering::Relaxed);
if metrics.max_ts <= current_max {
break;
}

let Err(updated) = self.max_timestamp.compare_exchange(
current_max,
metrics.max_ts,
Ordering::Relaxed,
Ordering::Relaxed,
) else {
break;
};

if updated == metrics.max_ts {
break;
}
}
}
}

/// Builder to build a [MergeTreeMemtable].
#[derive(Debug, Default)]
pub struct MergeTreeMemtableBuilder {
id: AtomicU32,
write_buffer_manager: Option<WriteBufferManagerRef>,
config: MergeTreeConfig,
}

impl MergeTreeMemtableBuilder {
/// Creates a new builder with specific `write_buffer_manager`.
pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> Self {
Self {
id: AtomicU32::new(0),
write_buffer_manager,
config: MergeTreeConfig::default(),
}
}
}

impl MemtableBuilder for MergeTreeMemtableBuilder {
fn build(&self, metadata: &RegionMetadataRef) -> MemtableRef {
let id = self.id.fetch_add(1, Ordering::Relaxed);
Arc::new(MergeTreeMemtable::new(
id,
metadata.clone(),
self.write_buffer_manager.clone(),
&self.config,
))
}
}
21 changes: 21 additions & 0 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Data part of a shard.
/// Buffer to store columns not in the primary key.
pub struct DataBuffer {}

/// Data parts under a shard.
pub struct DataParts {}
28 changes: 28 additions & 0 deletions src/mito2/src/memtable/merge_tree/dict.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Key dictionary of a shard.
use std::sync::Arc;

/// Builder to build a key dictionary.
pub struct KeyDictBuilder {}

/// Buffer to store unsorted primary keys.
pub struct KeyBuffer {}

/// A key dictionary.
pub struct KeyDict {}

pub type KeyDictRef = Arc<KeyDict>;
38 changes: 38 additions & 0 deletions src/mito2/src/memtable/merge_tree/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Internal metrics of the memtable.
/// Metrics of writing the merge tree.
pub struct WriteMetrics {
/// Size allocated by keys.
pub key_bytes: usize,
/// Size allocated by values.
pub value_bytes: usize,
/// Minimum timestamp.
pub min_ts: i64,
/// Maximum timestamp
pub max_ts: i64,
}

impl Default for WriteMetrics {
fn default() -> Self {
Self {
key_bytes: 0,
value_bytes: 0,
min_ts: i64::MAX,
max_ts: i64::MIN,
}
}
}
Loading

0 comments on commit 43fd87e

Please sign in to comment.