Skip to content

Commit

Permalink
feat: Implement partition eviction and only add value size to write b…
Browse files Browse the repository at this point in the history
…uffer size (#3393)

* feat: track key bytes in dict

* chore: done allocating on finish

* feat: evict keys

* chore: do not add to write buffer

* chore: only count value bytes

* fix: reset key bytes

* feat: remove write buffer manager from shards

* feat: change dict size compute method

* chore: adjust dictionary size by os memory
  • Loading branch information
evenyag authored Feb 27, 2024
1 parent 7453d97 commit 206666b
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 50 deletions.
42 changes: 26 additions & 16 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::fmt;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;

use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
Expand All @@ -42,6 +43,9 @@ use crate::memtable::{
MemtableRef, MemtableStats,
};

/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
const DICTIONARY_SIZE_FACTOR: u64 = 16;

/// Id of a shard, only unique inside a partition.
type ShardId = u32;
/// Index of a primary key in a shard.
Expand All @@ -64,14 +68,26 @@ pub struct MergeTreeConfig {
pub data_freeze_threshold: usize,
/// Whether to delete duplicates rows.
pub dedup: bool,
/// Total bytes of dictionary to keep in fork.
pub fork_dictionary_bytes: ReadableSize,
}

impl Default for MergeTreeConfig {
fn default() -> Self {
let mut fork_dictionary_bytes = ReadableSize::mb(512);
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
let adjust_dictionary_bytes =
std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
if adjust_dictionary_bytes.0 > 0 {
fork_dictionary_bytes = adjust_dictionary_bytes;
}
}

Self {
index_max_keys_per_shard: 8192,
data_freeze_threshold: 102400,
dedup: true,
fork_dictionary_bytes,
}
}
}
Expand Down Expand Up @@ -103,6 +119,7 @@ impl Memtable for MergeTreeMemtable {

let mut metrics = WriteMetrics::default();
let mut pk_buffer = Vec::new();
// Ensures the memtable always updates stats.
let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);

self.update_stats(&metrics);
Expand Down Expand Up @@ -159,8 +176,7 @@ impl Memtable for MergeTreeMemtable {
fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
let tree = self.tree.fork(metadata.clone());

let memtable =
MergeTreeMemtable::with_tree(id, tree, self.alloc_tracker.write_buffer_manager());
let memtable = MergeTreeMemtable::with_tree(id, tree);
Arc::new(memtable)
}
}
Expand All @@ -173,23 +189,17 @@ impl MergeTreeMemtable {
write_buffer_manager: Option<WriteBufferManagerRef>,
config: &MergeTreeConfig,
) -> Self {
Self::with_tree(id, MergeTree::new(metadata, config), write_buffer_manager)
Self::with_tree(
id,
MergeTree::new(metadata, config, write_buffer_manager.clone()),
)
}

/// Creates a mutable memtable from the tree.
///
/// It also adds the bytes used by shared parts (e.g. index) to the memory usage.
fn with_tree(
id: MemtableId,
tree: MergeTree,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self {
let alloc_tracker = AllocTracker::new(write_buffer_manager);
// Track space allocated by the tree.
let allocated = tree.shared_memory_size();
// Here we still add the bytes of shared parts to the tracker as the old memtable
// will release its tracker soon.
alloc_tracker.on_allocation(allocated);
fn with_tree(id: MemtableId, tree: MergeTree) -> Self {
let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());

Self {
id,
Expand All @@ -202,8 +212,8 @@ impl MergeTreeMemtable {

/// Updates stats of the memtable.
fn update_stats(&self, metrics: &WriteMetrics) {
self.alloc_tracker
.on_allocation(metrics.key_bytes + metrics.value_bytes);
// Only let the tracker tracks value bytes.
self.alloc_tracker.on_allocation(metrics.value_bytes);

loop {
let current_min = self.min_timestamp.load(Ordering::Relaxed);
Expand Down
51 changes: 45 additions & 6 deletions src/mito2/src/memtable/merge_tree/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use datatypes::arrow::array::{Array, ArrayBuilder, BinaryArray, BinaryBuilder};

use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::PkIndex;
use crate::metrics::MEMTABLE_DICT_BYTES;

/// Maximum keys in a [DictBlock].
const MAX_KEYS_PER_BLOCK: u16 = 256;
Expand All @@ -39,7 +40,7 @@ pub struct KeyDictBuilder {
key_buffer: KeyBuffer,
/// Dictionary blocks.
dict_blocks: Vec<DictBlock>,
/// Bytes allocated by keys in the [pk_to_index](Self::pk_to_index).
/// Bytes allocated by keys in the index.
key_bytes_in_index: usize,
}

Expand Down Expand Up @@ -88,6 +89,9 @@ impl KeyDictBuilder {
metrics.key_bytes += key.len() * 2;
self.key_bytes_in_index += key.len();

// Adds key size of index to the metrics.
MEMTABLE_DICT_BYTES.add(key.len() as i64);

pk_index
}

Expand Down Expand Up @@ -123,12 +127,14 @@ impl KeyDictBuilder {
*pk_index = i as PkIndex;
}
self.num_keys = 0;
let key_bytes_in_index = self.key_bytes_in_index;
self.key_bytes_in_index = 0;

Some(KeyDict {
pk_to_index,
dict_blocks: std::mem::take(&mut self.dict_blocks),
key_positions,
key_bytes_in_index: self.key_bytes_in_index,
key_bytes_in_index,
})
}

Expand All @@ -144,6 +150,12 @@ impl KeyDictBuilder {
}
}

impl Drop for KeyDictBuilder {
fn drop(&mut self) {
MEMTABLE_DICT_BYTES.sub(self.key_bytes_in_index as i64);
}
}

/// Reader to scan the [KeyDictBuilder].
#[derive(Default)]
pub struct DictBuilderReader {
Expand Down Expand Up @@ -208,6 +220,7 @@ pub struct KeyDict {
dict_blocks: Vec<DictBlock>,
/// Maps pk index to position of the key in [Self::dict_blocks].
key_positions: Vec<PkIndex>,
/// Bytes of keys in the index.
key_bytes_in_index: usize,
}

Expand Down Expand Up @@ -239,6 +252,17 @@ impl KeyDict {
/// Returns the shared memory size.
pub(crate) fn shared_memory_size(&self) -> usize {
self.key_bytes_in_index
+ self
.dict_blocks
.iter()
.map(|block| block.buffer_memory_size())
.sum::<usize>()
}
}

impl Drop for KeyDict {
fn drop(&mut self) {
MEMTABLE_DICT_BYTES.sub(self.key_bytes_in_index as i64);
}
}

Expand Down Expand Up @@ -325,6 +349,9 @@ struct DictBlock {

impl DictBlock {
fn new(keys: BinaryArray) -> Self {
let buffer_size = keys.get_buffer_memory_size();
MEMTABLE_DICT_BYTES.add(buffer_size as i64);

Self { keys }
}

Expand All @@ -333,12 +360,18 @@ impl DictBlock {
self.keys.value(pos as usize)
}

#[cfg(test)]
fn buffer_memory_size(&self) -> usize {
self.keys.get_buffer_memory_size()
}
}

impl Drop for DictBlock {
fn drop(&mut self) {
let buffer_size = self.keys.get_buffer_memory_size();
MEMTABLE_DICT_BYTES.sub(buffer_size as i64);
}
}

#[cfg(test)]
mod tests {
use rand::Rng;
Expand Down Expand Up @@ -393,7 +426,7 @@ mod tests {
}

#[test]
fn test_builder_memory_size() {
fn test_dict_memory_size() {
let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 3).into());
let mut metrics = WriteMetrics::default();
// 513 keys
Expand All @@ -404,9 +437,15 @@ mod tests {
let key = format!("{i:05}");
builder.insert_key(key.as_bytes(), &mut metrics);
}
// num_keys * 5 * 2
assert_eq!(5130, metrics.key_bytes);
let key_bytes = num_keys as usize * 5;
assert_eq!(key_bytes * 2, metrics.key_bytes);
assert_eq!(key_bytes, builder.key_bytes_in_index);
assert_eq!(8850, builder.memory_size());

let dict = builder.finish().unwrap();
assert_eq!(0, builder.key_bytes_in_index);
assert_eq!(key_bytes, dict.key_bytes_in_index);
assert!(dict.shared_memory_size() > key_bytes);
}

#[test]
Expand Down
21 changes: 17 additions & 4 deletions src/mito2/src/memtable/merge_tree/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,21 @@ impl Partition {
inner.num_rows > 0
}

/// Returns shared memory size of the partition.
pub fn shared_memory_size(&self) -> usize {
/// Gets the stats of the partition.
pub(crate) fn stats(&self) -> PartitionStats {
let inner = self.inner.read().unwrap();
inner
let num_rows = inner.num_rows;
let shard_num = inner.shards.len();
let shared_memory_size = inner
.shards
.iter()
.map(|shard| shard.shared_memory_size())
.sum()
.sum();
PartitionStats {
num_rows,
shard_num,
shared_memory_size,
}
}

/// Get partition key from the key value.
Expand Down Expand Up @@ -211,6 +218,12 @@ impl Partition {
}
}

pub(crate) struct PartitionStats {
pub(crate) num_rows: usize,
pub(crate) shard_num: usize,
pub(crate) shared_memory_size: usize,
}

/// Reader to scan rows in a partition.
///
/// It can merge rows from multiple shards.
Expand Down
16 changes: 8 additions & 8 deletions src/mito2/src/memtable/merge_tree/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,6 @@ impl Shard {
})
}

/// Returns the memory size of the shard part.
pub fn shared_memory_size(&self) -> usize {
self.key_dict
.as_ref()
.map(|dict| dict.shared_memory_size())
.unwrap_or(0)
}

/// Forks a shard.
pub fn fork(&self, metadata: RegionMetadataRef) -> Shard {
Shard {
Expand All @@ -104,6 +96,14 @@ impl Shard {
pub fn is_empty(&self) -> bool {
self.data_parts.is_empty()
}

/// Returns the memory size of the shard part.
pub(crate) fn shared_memory_size(&self) -> usize {
self.key_dict
.as_ref()
.map(|dict| dict.shared_memory_size())
.unwrap_or(0)
}
}

/// Source that returns [DataBatch].
Expand Down
Loading

0 comments on commit 206666b

Please sign in to comment.