From a598008ec3fcc2b7f4df447455852cededa380ab Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 16 Jan 2025 19:05:46 +0800 Subject: [PATCH 1/4] fix: panic when received invalid query string (#5366) Signed-off-by: Ruihang Xia --- src/servers/src/mysql/federated.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/servers/src/mysql/federated.rs b/src/servers/src/mysql/federated.rs index ae4ac70ed6e5..cf5f9a744da2 100644 --- a/src/servers/src/mysql/federated.rs +++ b/src/servers/src/mysql/federated.rs @@ -273,8 +273,11 @@ pub(crate) fn check( ) -> Option { // INSERT don't need MySQL federated check. We assume the query doesn't contain // federated or driver setup command if it starts with a 'INSERT' statement. - if query.len() > 6 && query[..6].eq_ignore_ascii_case("INSERT") { - return None; + let the_6th_index = query.char_indices().nth(6).map(|(i, _)| i); + if let Some(index) = the_6th_index { + if query[..index].eq_ignore_ascii_case("INSERT") { + return None; + } } // First to check the query is like "select @@variables". @@ -295,6 +298,15 @@ mod test { use super::*; + #[test] + fn test_check_abnormal() { + let session = Arc::new(Session::new(None, Channel::Mysql, Default::default())); + let query = "🫣一点不正常的东西🫣"; + let output = check(query, QueryContext::arc(), session.clone()); + + assert!(output.is_none()); + } + #[test] fn test_check() { let session = Arc::new(Session::new(None, Channel::Mysql, Default::default())); From 8d5d4000e67170d67f6189f3336c4901e985eb6b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 16 Jan 2025 19:16:56 +0800 Subject: [PATCH 2/4] feat: set default compaction parallelism (#5371) Signed-off-by: Ruihang Xia --- src/mito2/src/compaction.rs | 14 ++++++++++++++ src/mito2/src/compaction/compactor.rs | 14 +++++++++++--- src/mito2/src/compaction/task.rs | 2 +- src/mito2/src/engine/open_test.rs | 1 + src/mito2/src/worker/handle_compaction.rs | 3 +++ 5 files changed, 30 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index bf8df5fcec7a..6f9e5c0261ff 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -40,6 +40,7 @@ use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{RegionId, TableId}; use table::predicate::Predicate; +use task::MAX_PARALLEL_COMPACTION; use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; @@ -85,6 +86,7 @@ pub struct CompactionRequest { pub(crate) manifest_ctx: ManifestContextRef, pub(crate) listener: WorkerListener, pub(crate) schema_metadata_manager: SchemaMetadataManagerRef, + pub(crate) max_parallelism: usize, } impl CompactionRequest { @@ -145,6 +147,7 @@ impl CompactionScheduler { waiter: OptionOutputTx, manifest_ctx: &ManifestContextRef, schema_metadata_manager: SchemaMetadataManagerRef, + max_parallelism: usize, ) -> Result<()> { if let Some(status) = self.region_status.get_mut(®ion_id) { // Region is compacting. Add the waiter to pending list. @@ -163,6 +166,7 @@ impl CompactionScheduler { manifest_ctx, self.listener.clone(), schema_metadata_manager, + max_parallelism, ); self.region_status.insert(region_id, status); let result = self @@ -193,6 +197,7 @@ impl CompactionScheduler { manifest_ctx, self.listener.clone(), schema_metadata_manager, + MAX_PARALLEL_COMPACTION, ); // Try to schedule next compaction task for this region. if let Err(e) = self @@ -264,6 +269,7 @@ impl CompactionScheduler { manifest_ctx, listener, schema_metadata_manager, + max_parallelism, } = request; let ttl = find_ttl( @@ -294,6 +300,7 @@ impl CompactionScheduler { manifest_ctx: manifest_ctx.clone(), file_purger: None, ttl: Some(ttl), + max_parallelism, }; let picker_output = { @@ -521,6 +528,7 @@ impl CompactionStatus { manifest_ctx: &ManifestContextRef, listener: WorkerListener, schema_metadata_manager: SchemaMetadataManagerRef, + max_parallelism: usize, ) -> CompactionRequest { let current_version = CompactionVersion::from(self.version_control.current().version); let start_time = Instant::now(); @@ -535,6 +543,7 @@ impl CompactionStatus { manifest_ctx: manifest_ctx.clone(), listener, schema_metadata_manager, + max_parallelism, }; if let Some(pending) = self.pending_compaction.take() { @@ -722,6 +731,7 @@ mod tests { waiter, &manifest_ctx, schema_metadata_manager.clone(), + 1, ) .await .unwrap(); @@ -742,6 +752,7 @@ mod tests { waiter, &manifest_ctx, schema_metadata_manager, + 1, ) .await .unwrap(); @@ -795,6 +806,7 @@ mod tests { OptionOutputTx::none(), &manifest_ctx, schema_metadata_manager.clone(), + 1, ) .await .unwrap(); @@ -825,6 +837,7 @@ mod tests { OptionOutputTx::none(), &manifest_ctx, schema_metadata_manager.clone(), + 1, ) .await .unwrap(); @@ -860,6 +873,7 @@ mod tests { OptionOutputTx::none(), &manifest_ctx, schema_metadata_manager, + 1, ) .await .unwrap(); diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index ceeb509bc17e..ae3aeea45b65 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -91,6 +91,12 @@ pub struct CompactionRegion { pub(crate) current_version: CompactionVersion, pub(crate) file_purger: Option>, pub(crate) ttl: Option, + + /// Controls the parallelism of this compaction task. Default is 1. + /// + /// The parallel is inside this compaction task, not across different compaction tasks. + /// It can be different windows of the same compaction task or something like this. + pub max_parallelism: usize, } /// OpenCompactionRegionRequest represents the request to open a compaction region. @@ -99,6 +105,7 @@ pub struct OpenCompactionRegionRequest { pub region_id: RegionId, pub region_dir: String, pub region_options: RegionOptions, + pub max_parallelism: usize, } /// Open a compaction region from a compaction request. @@ -205,6 +212,7 @@ pub async fn open_compaction_region( current_version, file_purger: Some(file_purger), ttl: Some(ttl), + max_parallelism: req.max_parallelism, }) } @@ -266,6 +274,7 @@ impl Compactor for DefaultCompactor { let mut futs = Vec::with_capacity(picker_output.outputs.len()); let mut compacted_inputs = Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum()); + let internal_parallelism = compaction_region.max_parallelism.max(1); for output in picker_output.outputs.drain(..) { compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); @@ -358,9 +367,8 @@ impl Compactor for DefaultCompactor { } let mut output_files = Vec::with_capacity(futs.len()); while !futs.is_empty() { - let mut task_chunk = - Vec::with_capacity(crate::compaction::task::MAX_PARALLEL_COMPACTION); - for _ in 0..crate::compaction::task::MAX_PARALLEL_COMPACTION { + let mut task_chunk = Vec::with_capacity(internal_parallelism); + for _ in 0..internal_parallelism { if let Some(task) = futs.pop() { task_chunk.push(common_runtime::spawn_compact(task)); } diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index c76595097753..f083e09587fe 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -32,7 +32,7 @@ use crate::request::{ use crate::worker::WorkerListener; /// Maximum number of compaction tasks in parallel. -pub const MAX_PARALLEL_COMPACTION: usize = 8; +pub const MAX_PARALLEL_COMPACTION: usize = 1; pub(crate) struct CompactionTaskImpl { pub compaction_region: CompactionRegion, diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 55bae04633f0..32b963b5067b 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -464,6 +464,7 @@ async fn test_open_compaction_region() { region_id, region_dir: region_dir.clone(), region_options: RegionOptions::default(), + max_parallelism: 1, }; let compaction_region = open_compaction_region( diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 292eb237357b..6fb9f640f7c0 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -45,6 +45,8 @@ impl RegionWorkerLoop { sender, ®ion.manifest_ctx, self.schema_metadata_manager.clone(), + // TODO(yingwen): expose this to frontend + 1, ) .await { @@ -113,6 +115,7 @@ impl RegionWorkerLoop { OptionOutputTx::none(), ®ion.manifest_ctx, self.schema_metadata_manager.clone(), + 1, ) .await { From 3a55f5d17c714e091e2409a9e846fcf3c5e6be9f Mon Sep 17 00:00:00 2001 From: Yingwen Date: Thu, 16 Jan 2025 20:28:31 +0800 Subject: [PATCH 3/4] test: fix config api test (#5386) put content_cache_page_size to correct place --- tests-integration/tests/http.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index becf8cd52998..2c912aa0af60 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -904,13 +904,13 @@ min_compaction_interval = "0s" aux_path = "" staging_size = "2GiB" write_buffer_size = "8MiB" +content_cache_page_size = "64KiB" [region_engine.mito.inverted_index] create_on_flush = "auto" create_on_compaction = "auto" apply_on_query = "auto" mem_threshold_on_create = "auto" -content_cache_page_size = "64KiB" [region_engine.mito.fulltext_index] create_on_flush = "auto" @@ -941,7 +941,7 @@ write_interval = "30s" .trim() .to_string(); let body_text = drop_lines_with_inconsistent_results(res_get.text().await); - similar_asserts::assert_eq!(body_text, expected_toml_str); + similar_asserts::assert_eq!(expected_toml_str, body_text); } fn drop_lines_with_inconsistent_results(input: String) -> String { From 7eaabb3ca232be4135c3d31df567751e743c8d3f Mon Sep 17 00:00:00 2001 From: Yingwen Date: Thu, 16 Jan 2025 20:53:03 +0800 Subject: [PATCH 4/4] fix: increase in progress scan gauge and adjust histogram buckets (#5370) * fix: in progress scan doesn't inc * feat(mito): adjust mito histogram buckets * chore(metric-engine): adjust metric engine histogram bucket --- src/metric-engine/src/metrics.rs | 3 ++- src/mito2/src/metrics.rs | 39 ++++++++++++++++++++++++-------- src/mito2/src/read/scan_util.rs | 1 + 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/src/metric-engine/src/metrics.rs b/src/metric-engine/src/metrics.rs index 4309097533f2..253a2d6154be 100644 --- a/src/metric-engine/src/metrics.rs +++ b/src/metric-engine/src/metrics.rs @@ -46,7 +46,8 @@ lazy_static! { "greptime_metric_engine_mito_op_elapsed", "metric engine's mito operation elapsed", &[OPERATION_LABEL], - vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] + // 0.01 ~ 10000 + exponential_buckets(0.01, 10.0, 7).unwrap(), ) .unwrap(); } diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 5a5d76da4c0b..65a8e1dc8578 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -48,7 +48,8 @@ lazy_static! { "greptime_mito_handle_request_elapsed", "mito handle request elapsed", &[TYPE_LABEL], - vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 60.0, 300.0] + // 0.01 ~ 10000 + exponential_buckets(0.01, 10.0, 7).unwrap(), ) .unwrap(); @@ -69,7 +70,8 @@ lazy_static! { "greptime_mito_flush_elapsed", "mito flush elapsed", &[TYPE_LABEL], - vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] + // 1 ~ 625 + exponential_buckets(1.0, 5.0, 6).unwrap(), ) .unwrap(); /// Histogram of flushed bytes. @@ -99,7 +101,8 @@ lazy_static! { "greptime_mito_write_stage_elapsed", "mito write stage elapsed", &[STAGE_LABEL], - vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] + // 0.01 ~ 1000 + exponential_buckets(0.01, 10.0, 6).unwrap(), ) .unwrap(); /// Counter of rows to write. @@ -118,12 +121,18 @@ lazy_static! { "greptime_mito_compaction_stage_elapsed", "mito compaction stage elapsed", &[STAGE_LABEL], - vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] + // 1 ~ 100000 + exponential_buckets(1.0, 10.0, 6).unwrap(), ) .unwrap(); /// Timer of whole compaction task. pub static ref COMPACTION_ELAPSED_TOTAL: Histogram = - register_histogram!("greptime_mito_compaction_total_elapsed", "mito compaction total elapsed").unwrap(); + register_histogram!( + "greptime_mito_compaction_total_elapsed", + "mito compaction total elapsed", + // 1 ~ 100000 + exponential_buckets(1.0, 10.0, 6).unwrap(), + ).unwrap(); /// Counter of all requested compaction task. pub static ref COMPACTION_REQUEST_COUNT: IntCounter = register_int_counter!("greptime_mito_compaction_requests_total", "mito compaction requests total").unwrap(); @@ -145,7 +154,8 @@ lazy_static! { "greptime_mito_read_stage_elapsed", "mito read stage elapsed", &[STAGE_LABEL], - vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] + // 0.01 ~ 10000 + exponential_buckets(0.01, 10.0, 7).unwrap(), ) .unwrap(); pub static ref READ_STAGE_FETCH_PAGES: Histogram = READ_STAGE_ELAPSED.with_label_values(&["fetch_pages"]); @@ -222,6 +232,8 @@ lazy_static! { "mito_write_cache_download_elapsed", "mito write cache download elapsed", &[TYPE_LABEL], + // 0.1 ~ 10000 + exponential_buckets(0.1, 10.0, 6).unwrap(), ).unwrap(); /// Upload bytes counter. pub static ref UPLOAD_BYTES_TOTAL: IntCounter = register_int_counter!( @@ -243,6 +255,8 @@ lazy_static! { "greptime_index_apply_elapsed", "index apply elapsed", &[TYPE_LABEL], + // 0.01 ~ 1000 + exponential_buckets(0.01, 10.0, 6).unwrap(), ) .unwrap(); /// Gauge of index apply memory usage. @@ -256,7 +270,8 @@ lazy_static! { "greptime_index_create_elapsed", "index create elapsed", &[STAGE_LABEL, TYPE_LABEL], - vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] + // 0.1 ~ 10000 + exponential_buckets(0.1, 10.0, 6).unwrap(), ) .unwrap(); /// Counter of rows indexed. @@ -337,7 +352,8 @@ lazy_static! { "greptime_partition_tree_buffer_freeze_stage_elapsed", "mito partition tree data buffer freeze stage elapsed", &[STAGE_LABEL], - vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0] + // 0.01 ~ 1000 + exponential_buckets(0.01, 10.0, 6).unwrap(), ) .unwrap(); @@ -346,7 +362,8 @@ lazy_static! { "greptime_partition_tree_read_stage_elapsed", "mito partition tree read stage elapsed", &[STAGE_LABEL], - vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0] + // 0.01 ~ 1000 + exponential_buckets(0.01, 10.0, 6).unwrap(), ) .unwrap(); @@ -359,6 +376,8 @@ lazy_static! { pub static ref MANIFEST_OP_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_manifest_op_elapsed", "mito manifest operation elapsed", - &["op"] + &["op"], + // 0.01 ~ 1000 + exponential_buckets(0.01, 10.0, 6).unwrap(), ).unwrap(); } diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 77a9bb161254..8fa48d6c6259 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -82,6 +82,7 @@ impl PartitionMetrics { ) -> Self { let partition_str = partition.to_string(); let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]); + in_progress_scan.inc(); let inner = PartitionMetricsInner { region_id, partition,