diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index a1e0b2ec9df9..a2015c45d327 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -28,6 +28,7 @@ use std::fmt; use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; +use common_base::readable_size::ReadableSize; use serde::{Deserialize, Serialize}; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; @@ -42,6 +43,9 @@ use crate::memtable::{ MemtableRef, MemtableStats, }; +/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size. +const DICTIONARY_SIZE_FACTOR: u64 = 16; + /// Id of a shard, only unique inside a partition. type ShardId = u32; /// Index of a primary key in a shard. @@ -64,14 +68,26 @@ pub struct MergeTreeConfig { pub data_freeze_threshold: usize, /// Whether to delete duplicates rows. pub dedup: bool, + /// Total bytes of dictionary to keep in fork. + pub fork_dictionary_bytes: ReadableSize, } impl Default for MergeTreeConfig { fn default() -> Self { + let mut fork_dictionary_bytes = ReadableSize::mb(512); + if let Some(sys_memory) = common_config::utils::get_sys_total_memory() { + let adjust_dictionary_bytes = + std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes); + if adjust_dictionary_bytes.0 > 0 { + fork_dictionary_bytes = adjust_dictionary_bytes; + } + } + Self { index_max_keys_per_shard: 8192, data_freeze_threshold: 102400, dedup: true, + fork_dictionary_bytes, } } } @@ -103,6 +119,7 @@ impl Memtable for MergeTreeMemtable { let mut metrics = WriteMetrics::default(); let mut pk_buffer = Vec::new(); + // Ensures the memtable always updates stats. let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics); self.update_stats(&metrics); @@ -159,8 +176,7 @@ impl Memtable for MergeTreeMemtable { fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef { let tree = self.tree.fork(metadata.clone()); - let memtable = - MergeTreeMemtable::with_tree(id, tree, self.alloc_tracker.write_buffer_manager()); + let memtable = MergeTreeMemtable::with_tree(id, tree); Arc::new(memtable) } } @@ -173,23 +189,17 @@ impl MergeTreeMemtable { write_buffer_manager: Option, config: &MergeTreeConfig, ) -> Self { - Self::with_tree(id, MergeTree::new(metadata, config), write_buffer_manager) + Self::with_tree( + id, + MergeTree::new(metadata, config, write_buffer_manager.clone()), + ) } /// Creates a mutable memtable from the tree. /// /// It also adds the bytes used by shared parts (e.g. index) to the memory usage. - fn with_tree( - id: MemtableId, - tree: MergeTree, - write_buffer_manager: Option, - ) -> Self { - let alloc_tracker = AllocTracker::new(write_buffer_manager); - // Track space allocated by the tree. - let allocated = tree.shared_memory_size(); - // Here we still add the bytes of shared parts to the tracker as the old memtable - // will release its tracker soon. - alloc_tracker.on_allocation(allocated); + fn with_tree(id: MemtableId, tree: MergeTree) -> Self { + let alloc_tracker = AllocTracker::new(tree.write_buffer_manager()); Self { id, @@ -202,8 +212,8 @@ impl MergeTreeMemtable { /// Updates stats of the memtable. fn update_stats(&self, metrics: &WriteMetrics) { - self.alloc_tracker - .on_allocation(metrics.key_bytes + metrics.value_bytes); + // Only let the tracker tracks value bytes. + self.alloc_tracker.on_allocation(metrics.value_bytes); loop { let current_min = self.min_timestamp.load(Ordering::Relaxed); diff --git a/src/mito2/src/memtable/merge_tree/dict.rs b/src/mito2/src/memtable/merge_tree/dict.rs index b3adc445e0c2..4be3bac5c3fa 100644 --- a/src/mito2/src/memtable/merge_tree/dict.rs +++ b/src/mito2/src/memtable/merge_tree/dict.rs @@ -21,6 +21,7 @@ use datatypes::arrow::array::{Array, ArrayBuilder, BinaryArray, BinaryBuilder}; use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::merge_tree::PkIndex; +use crate::metrics::MEMTABLE_DICT_BYTES; /// Maximum keys in a [DictBlock]. const MAX_KEYS_PER_BLOCK: u16 = 256; @@ -39,7 +40,7 @@ pub struct KeyDictBuilder { key_buffer: KeyBuffer, /// Dictionary blocks. dict_blocks: Vec, - /// Bytes allocated by keys in the [pk_to_index](Self::pk_to_index). + /// Bytes allocated by keys in the index. key_bytes_in_index: usize, } @@ -88,6 +89,9 @@ impl KeyDictBuilder { metrics.key_bytes += key.len() * 2; self.key_bytes_in_index += key.len(); + // Adds key size of index to the metrics. + MEMTABLE_DICT_BYTES.add(key.len() as i64); + pk_index } @@ -123,12 +127,14 @@ impl KeyDictBuilder { *pk_index = i as PkIndex; } self.num_keys = 0; + let key_bytes_in_index = self.key_bytes_in_index; + self.key_bytes_in_index = 0; Some(KeyDict { pk_to_index, dict_blocks: std::mem::take(&mut self.dict_blocks), key_positions, - key_bytes_in_index: self.key_bytes_in_index, + key_bytes_in_index, }) } @@ -144,6 +150,12 @@ impl KeyDictBuilder { } } +impl Drop for KeyDictBuilder { + fn drop(&mut self) { + MEMTABLE_DICT_BYTES.sub(self.key_bytes_in_index as i64); + } +} + /// Reader to scan the [KeyDictBuilder]. #[derive(Default)] pub struct DictBuilderReader { @@ -208,6 +220,7 @@ pub struct KeyDict { dict_blocks: Vec, /// Maps pk index to position of the key in [Self::dict_blocks]. key_positions: Vec, + /// Bytes of keys in the index. key_bytes_in_index: usize, } @@ -239,6 +252,17 @@ impl KeyDict { /// Returns the shared memory size. pub(crate) fn shared_memory_size(&self) -> usize { self.key_bytes_in_index + + self + .dict_blocks + .iter() + .map(|block| block.buffer_memory_size()) + .sum::() + } +} + +impl Drop for KeyDict { + fn drop(&mut self) { + MEMTABLE_DICT_BYTES.sub(self.key_bytes_in_index as i64); } } @@ -325,6 +349,9 @@ struct DictBlock { impl DictBlock { fn new(keys: BinaryArray) -> Self { + let buffer_size = keys.get_buffer_memory_size(); + MEMTABLE_DICT_BYTES.add(buffer_size as i64); + Self { keys } } @@ -333,12 +360,18 @@ impl DictBlock { self.keys.value(pos as usize) } - #[cfg(test)] fn buffer_memory_size(&self) -> usize { self.keys.get_buffer_memory_size() } } +impl Drop for DictBlock { + fn drop(&mut self) { + let buffer_size = self.keys.get_buffer_memory_size(); + MEMTABLE_DICT_BYTES.sub(buffer_size as i64); + } +} + #[cfg(test)] mod tests { use rand::Rng; @@ -393,7 +426,7 @@ mod tests { } #[test] - fn test_builder_memory_size() { + fn test_dict_memory_size() { let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 3).into()); let mut metrics = WriteMetrics::default(); // 513 keys @@ -404,9 +437,15 @@ mod tests { let key = format!("{i:05}"); builder.insert_key(key.as_bytes(), &mut metrics); } - // num_keys * 5 * 2 - assert_eq!(5130, metrics.key_bytes); + let key_bytes = num_keys as usize * 5; + assert_eq!(key_bytes * 2, metrics.key_bytes); + assert_eq!(key_bytes, builder.key_bytes_in_index); assert_eq!(8850, builder.memory_size()); + + let dict = builder.finish().unwrap(); + assert_eq!(0, builder.key_bytes_in_index); + assert_eq!(key_bytes, dict.key_bytes_in_index); + assert!(dict.shared_memory_size() > key_bytes); } #[test] diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 926fb360ec3c..b147045256c3 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -173,14 +173,21 @@ impl Partition { inner.num_rows > 0 } - /// Returns shared memory size of the partition. - pub fn shared_memory_size(&self) -> usize { + /// Gets the stats of the partition. + pub(crate) fn stats(&self) -> PartitionStats { let inner = self.inner.read().unwrap(); - inner + let num_rows = inner.num_rows; + let shard_num = inner.shards.len(); + let shared_memory_size = inner .shards .iter() .map(|shard| shard.shared_memory_size()) - .sum() + .sum(); + PartitionStats { + num_rows, + shard_num, + shared_memory_size, + } } /// Get partition key from the key value. @@ -211,6 +218,12 @@ impl Partition { } } +pub(crate) struct PartitionStats { + pub(crate) num_rows: usize, + pub(crate) shard_num: usize, + pub(crate) shared_memory_size: usize, +} + /// Reader to scan rows in a partition. /// /// It can merge rows from multiple shards. diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 90b4ce02c483..9f975d7002f4 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -82,14 +82,6 @@ impl Shard { }) } - /// Returns the memory size of the shard part. - pub fn shared_memory_size(&self) -> usize { - self.key_dict - .as_ref() - .map(|dict| dict.shared_memory_size()) - .unwrap_or(0) - } - /// Forks a shard. pub fn fork(&self, metadata: RegionMetadataRef) -> Shard { Shard { @@ -104,6 +96,14 @@ impl Shard { pub fn is_empty(&self) -> bool { self.data_parts.is_empty() } + + /// Returns the memory size of the shard part. + pub(crate) fn shared_memory_size(&self) -> usize { + self.key_dict + .as_ref() + .map(|dict| dict.shared_memory_size()) + .unwrap_or(0) + } } /// Source that returns [DataBatch]. diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index 67fd604bc8e5..6472af49973a 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -27,6 +27,7 @@ use store_api::storage::ColumnId; use table::predicate::Predicate; use crate::error::{PrimaryKeyLengthMismatchSnafu, Result}; +use crate::flush::WriteBufferManagerRef; use crate::memtable::key_values::KeyValue; use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::merge_tree::partition::{ @@ -49,11 +50,17 @@ pub struct MergeTree { partitions: RwLock>, /// Whether the tree has multiple partitions. is_partitioned: bool, + /// Manager to report size of the tree. + write_buffer_manager: Option, } impl MergeTree { /// Creates a new merge tree. - pub fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> MergeTree { + pub fn new( + metadata: RegionMetadataRef, + config: &MergeTreeConfig, + write_buffer_manager: Option, + ) -> MergeTree { let row_codec = McmpRowCodec::new( metadata .primary_key_columns() @@ -68,6 +75,7 @@ impl MergeTree { row_codec: Arc::new(row_codec), partitions: Default::default(), is_partitioned, + write_buffer_manager, } } @@ -183,19 +191,52 @@ impl MergeTree { || self.metadata.column_metadatas != metadata.column_metadatas { // The schema has changed, we can't reuse the tree. - return MergeTree::new(metadata, &self.config); + return MergeTree::new(metadata, &self.config, self.write_buffer_manager.clone()); } - let mut forked = BTreeMap::new(); - let partitions = self.partitions.read().unwrap(); - for (part_key, part) in partitions.iter() { - if !part.has_data() { - continue; + let mut total_shared_size = 0; + let mut part_infos = { + let partitions = self.partitions.read().unwrap(); + partitions + .iter() + .filter_map(|(part_key, part)| { + let stats = part.stats(); + if stats.num_rows > 0 { + // Only fork partitions that have data. + total_shared_size += stats.shared_memory_size; + Some((*part_key, part.clone(), stats)) + } else { + None + } + }) + .collect::>() + }; + + // TODO(yingwen): Optimize eviction strategy. Now we evict the whole partition. + let fork_size = self.config.fork_dictionary_bytes.as_bytes() as usize; + if total_shared_size > fork_size { + // Sort partitions by memory size desc. + part_infos.sort_unstable_by_key(|info| info.2.shared_memory_size); + while total_shared_size > fork_size { + let Some(info) = part_infos.pop() else { + break; + }; + + common_telemetry::debug!( + "Evict partition {} with memory size {}, {} shards", + info.0, + info.2.shared_memory_size, + info.2.shard_num, + ); + + total_shared_size -= info.2.shared_memory_size; } + } - // Only fork partitions that have data. + let mut forked = BTreeMap::new(); + for (part_key, part, _) in part_infos { let forked_part = part.fork(&metadata, &self.config); - forked.insert(*part_key, Arc::new(forked_part)); + forked.insert(part_key, Arc::new(forked_part)); } MergeTree { @@ -204,16 +245,13 @@ impl MergeTree { row_codec: self.row_codec.clone(), partitions: RwLock::new(forked), is_partitioned: self.is_partitioned, + write_buffer_manager: self.write_buffer_manager.clone(), } } - /// Returns the memory size shared by forked trees. - pub fn shared_memory_size(&self) -> usize { - let partitions = self.partitions.read().unwrap(); - partitions - .values() - .map(|part| part.shared_memory_size()) - .sum() + /// Returns the write buffer manager. + pub(crate) fn write_buffer_manager(&self) -> Option { + self.write_buffer_manager.clone() } fn write_with_key( diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 374b5954a890..b25fd393bdb4 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -28,6 +28,9 @@ lazy_static! { /// Global write buffer size in bytes. pub static ref WRITE_BUFFER_BYTES: IntGauge = register_int_gauge!("greptime_mito_write_buffer_bytes", "mito write buffer bytes").unwrap(); + /// Global memtable dictionary size in bytes. + pub static ref MEMTABLE_DICT_BYTES: IntGauge = + register_int_gauge!("greptime_mito_memtable_dict_bytes", "mito memtable dictionary size in bytes").unwrap(); /// Gauge for open regions pub static ref REGION_COUNT: IntGauge = register_int_gauge!("greptime_mito_region_count", "mito region count").unwrap();