Skip to content

Commit

Permalink
Merge branch 'main' into hy/drop_dead_code_use_cargo_workspace_unused…
Browse files Browse the repository at this point in the history
…_pub
  • Loading branch information
yihong0618 authored Jan 16, 2025
2 parents f17be14 + 7eaabb3 commit 821dba7
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 19 deletions.
3 changes: 2 additions & 1 deletion src/metric-engine/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ lazy_static! {
"greptime_metric_engine_mito_op_elapsed",
"metric engine's mito operation elapsed",
&[OPERATION_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
// 0.01 ~ 10000
exponential_buckets(0.01, 10.0, 7).unwrap(),
)
.unwrap();
}
14 changes: 14 additions & 0 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{RegionId, TableId};
use table::predicate::Predicate;
use task::MAX_PARALLEL_COMPACTION;
use tokio::sync::mpsc::{self, Sender};

use crate::access_layer::AccessLayerRef;
Expand Down Expand Up @@ -85,6 +86,7 @@ pub struct CompactionRequest {
pub(crate) manifest_ctx: ManifestContextRef,
pub(crate) listener: WorkerListener,
pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
pub(crate) max_parallelism: usize,
}

impl CompactionRequest {
Expand Down Expand Up @@ -145,6 +147,7 @@ impl CompactionScheduler {
waiter: OptionOutputTx,
manifest_ctx: &ManifestContextRef,
schema_metadata_manager: SchemaMetadataManagerRef,
max_parallelism: usize,
) -> Result<()> {
if let Some(status) = self.region_status.get_mut(&region_id) {
// Region is compacting. Add the waiter to pending list.
Expand All @@ -163,6 +166,7 @@ impl CompactionScheduler {
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
max_parallelism,
);
self.region_status.insert(region_id, status);
let result = self
Expand Down Expand Up @@ -193,6 +197,7 @@ impl CompactionScheduler {
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
MAX_PARALLEL_COMPACTION,
);
// Try to schedule next compaction task for this region.
if let Err(e) = self
Expand Down Expand Up @@ -264,6 +269,7 @@ impl CompactionScheduler {
manifest_ctx,
listener,
schema_metadata_manager,
max_parallelism,
} = request;

let ttl = find_ttl(
Expand Down Expand Up @@ -294,6 +300,7 @@ impl CompactionScheduler {
manifest_ctx: manifest_ctx.clone(),
file_purger: None,
ttl: Some(ttl),
max_parallelism,
};

let picker_output = {
Expand Down Expand Up @@ -521,6 +528,7 @@ impl CompactionStatus {
manifest_ctx: &ManifestContextRef,
listener: WorkerListener,
schema_metadata_manager: SchemaMetadataManagerRef,
max_parallelism: usize,
) -> CompactionRequest {
let current_version = CompactionVersion::from(self.version_control.current().version);
let start_time = Instant::now();
Expand All @@ -535,6 +543,7 @@ impl CompactionStatus {
manifest_ctx: manifest_ctx.clone(),
listener,
schema_metadata_manager,
max_parallelism,
};

if let Some(pending) = self.pending_compaction.take() {
Expand Down Expand Up @@ -722,6 +731,7 @@ mod tests {
waiter,
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
Expand All @@ -742,6 +752,7 @@ mod tests {
waiter,
&manifest_ctx,
schema_metadata_manager,
1,
)
.await
.unwrap();
Expand Down Expand Up @@ -795,6 +806,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
Expand Down Expand Up @@ -825,6 +837,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
Expand Down Expand Up @@ -860,6 +873,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager,
1,
)
.await
.unwrap();
Expand Down
14 changes: 11 additions & 3 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ pub struct CompactionRegion {
pub(crate) current_version: CompactionVersion,
pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
pub(crate) ttl: Option<TimeToLive>,

/// Controls the parallelism of this compaction task. Default is 1.
///
/// The parallel is inside this compaction task, not across different compaction tasks.
/// It can be different windows of the same compaction task or something like this.
pub max_parallelism: usize,
}

/// OpenCompactionRegionRequest represents the request to open a compaction region.
Expand All @@ -99,6 +105,7 @@ pub struct OpenCompactionRegionRequest {
pub region_id: RegionId,
pub region_dir: String,
pub region_options: RegionOptions,
pub max_parallelism: usize,
}

/// Open a compaction region from a compaction request.
Expand Down Expand Up @@ -205,6 +212,7 @@ pub async fn open_compaction_region(
current_version,
file_purger: Some(file_purger),
ttl: Some(ttl),
max_parallelism: req.max_parallelism,
})
}

Expand Down Expand Up @@ -266,6 +274,7 @@ impl Compactor for DefaultCompactor {
let mut futs = Vec::with_capacity(picker_output.outputs.len());
let mut compacted_inputs =
Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
let internal_parallelism = compaction_region.max_parallelism.max(1);

for output in picker_output.outputs.drain(..) {
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
Expand Down Expand Up @@ -358,9 +367,8 @@ impl Compactor for DefaultCompactor {
}
let mut output_files = Vec::with_capacity(futs.len());
while !futs.is_empty() {
let mut task_chunk =
Vec::with_capacity(crate::compaction::task::MAX_PARALLEL_COMPACTION);
for _ in 0..crate::compaction::task::MAX_PARALLEL_COMPACTION {
let mut task_chunk = Vec::with_capacity(internal_parallelism);
for _ in 0..internal_parallelism {
if let Some(task) = futs.pop() {
task_chunk.push(common_runtime::spawn_compact(task));
}
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/compaction/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::request::{
use crate::worker::WorkerListener;

/// Maximum number of compaction tasks in parallel.
pub const MAX_PARALLEL_COMPACTION: usize = 8;
pub const MAX_PARALLEL_COMPACTION: usize = 1;

pub(crate) struct CompactionTaskImpl {
pub compaction_region: CompactionRegion,
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/engine/open_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ async fn test_open_compaction_region() {
region_id,
region_dir: region_dir.clone(),
region_options: RegionOptions::default(),
max_parallelism: 1,
};

let compaction_region = open_compaction_region(
Expand Down
39 changes: 29 additions & 10 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ lazy_static! {
"greptime_mito_handle_request_elapsed",
"mito handle request elapsed",
&[TYPE_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 60.0, 300.0]
// 0.01 ~ 10000
exponential_buckets(0.01, 10.0, 7).unwrap(),
)
.unwrap();

Expand All @@ -69,7 +70,8 @@ lazy_static! {
"greptime_mito_flush_elapsed",
"mito flush elapsed",
&[TYPE_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
// 1 ~ 625
exponential_buckets(1.0, 5.0, 6).unwrap(),
)
.unwrap();
/// Histogram of flushed bytes.
Expand Down Expand Up @@ -99,7 +101,8 @@ lazy_static! {
"greptime_mito_write_stage_elapsed",
"mito write stage elapsed",
&[STAGE_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
// 0.01 ~ 1000
exponential_buckets(0.01, 10.0, 6).unwrap(),
)
.unwrap();
/// Counter of rows to write.
Expand All @@ -118,12 +121,18 @@ lazy_static! {
"greptime_mito_compaction_stage_elapsed",
"mito compaction stage elapsed",
&[STAGE_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
// 1 ~ 100000
exponential_buckets(1.0, 10.0, 6).unwrap(),
)
.unwrap();
/// Timer of whole compaction task.
pub static ref COMPACTION_ELAPSED_TOTAL: Histogram =
register_histogram!("greptime_mito_compaction_total_elapsed", "mito compaction total elapsed").unwrap();
register_histogram!(
"greptime_mito_compaction_total_elapsed",
"mito compaction total elapsed",
// 1 ~ 100000
exponential_buckets(1.0, 10.0, 6).unwrap(),
).unwrap();
/// Counter of all requested compaction task.
pub static ref COMPACTION_REQUEST_COUNT: IntCounter =
register_int_counter!("greptime_mito_compaction_requests_total", "mito compaction requests total").unwrap();
Expand All @@ -145,7 +154,8 @@ lazy_static! {
"greptime_mito_read_stage_elapsed",
"mito read stage elapsed",
&[STAGE_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
// 0.01 ~ 10000
exponential_buckets(0.01, 10.0, 7).unwrap(),
)
.unwrap();
pub static ref READ_STAGE_FETCH_PAGES: Histogram = READ_STAGE_ELAPSED.with_label_values(&["fetch_pages"]);
Expand Down Expand Up @@ -222,6 +232,8 @@ lazy_static! {
"mito_write_cache_download_elapsed",
"mito write cache download elapsed",
&[TYPE_LABEL],
// 0.1 ~ 10000
exponential_buckets(0.1, 10.0, 6).unwrap(),
).unwrap();
/// Upload bytes counter.
pub static ref UPLOAD_BYTES_TOTAL: IntCounter = register_int_counter!(
Expand All @@ -243,6 +255,8 @@ lazy_static! {
"greptime_index_apply_elapsed",
"index apply elapsed",
&[TYPE_LABEL],
// 0.01 ~ 1000
exponential_buckets(0.01, 10.0, 6).unwrap(),
)
.unwrap();
/// Gauge of index apply memory usage.
Expand All @@ -256,7 +270,8 @@ lazy_static! {
"greptime_index_create_elapsed",
"index create elapsed",
&[STAGE_LABEL, TYPE_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
// 0.1 ~ 10000
exponential_buckets(0.1, 10.0, 6).unwrap(),
)
.unwrap();
/// Counter of rows indexed.
Expand Down Expand Up @@ -337,7 +352,8 @@ lazy_static! {
"greptime_partition_tree_buffer_freeze_stage_elapsed",
"mito partition tree data buffer freeze stage elapsed",
&[STAGE_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0]
// 0.01 ~ 1000
exponential_buckets(0.01, 10.0, 6).unwrap(),
)
.unwrap();

Expand All @@ -346,7 +362,8 @@ lazy_static! {
"greptime_partition_tree_read_stage_elapsed",
"mito partition tree read stage elapsed",
&[STAGE_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0]
// 0.01 ~ 1000
exponential_buckets(0.01, 10.0, 6).unwrap(),
)
.unwrap();

Expand All @@ -359,6 +376,8 @@ lazy_static! {
pub static ref MANIFEST_OP_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_manifest_op_elapsed",
"mito manifest operation elapsed",
&["op"]
&["op"],
// 0.01 ~ 1000
exponential_buckets(0.01, 10.0, 6).unwrap(),
).unwrap();
}
1 change: 1 addition & 0 deletions src/mito2/src/read/scan_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl PartitionMetrics {
) -> Self {
let partition_str = partition.to_string();
let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
in_progress_scan.inc();
let inner = PartitionMetricsInner {
region_id,
partition,
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/worker/handle_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ impl<S> RegionWorkerLoop<S> {
sender,
&region.manifest_ctx,
self.schema_metadata_manager.clone(),
// TODO(yingwen): expose this to frontend
1,
)
.await
{
Expand Down Expand Up @@ -113,6 +115,7 @@ impl<S> RegionWorkerLoop<S> {
OptionOutputTx::none(),
&region.manifest_ctx,
self.schema_metadata_manager.clone(),
1,
)
.await
{
Expand Down
16 changes: 14 additions & 2 deletions src/servers/src/mysql/federated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,11 @@ pub(crate) fn check(
) -> Option<Output> {
// INSERT don't need MySQL federated check. We assume the query doesn't contain
// federated or driver setup command if it starts with a 'INSERT' statement.
if query.len() > 6 && query[..6].eq_ignore_ascii_case("INSERT") {
return None;
let the_6th_index = query.char_indices().nth(6).map(|(i, _)| i);
if let Some(index) = the_6th_index {
if query[..index].eq_ignore_ascii_case("INSERT") {
return None;
}
}

// First to check the query is like "select @@variables".
Expand All @@ -295,6 +298,15 @@ mod test {

use super::*;

#[test]
fn test_check_abnormal() {
let session = Arc::new(Session::new(None, Channel::Mysql, Default::default()));
let query = "🫣一点不正常的东西🫣";
let output = check(query, QueryContext::arc(), session.clone());

assert!(output.is_none());
}

#[test]
fn test_check() {
let session = Arc::new(Session::new(None, Channel::Mysql, Default::default()));
Expand Down
4 changes: 2 additions & 2 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,13 +904,13 @@ min_compaction_interval = "0s"
aux_path = ""
staging_size = "2GiB"
write_buffer_size = "8MiB"
content_cache_page_size = "64KiB"
[region_engine.mito.inverted_index]
create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
mem_threshold_on_create = "auto"
content_cache_page_size = "64KiB"
[region_engine.mito.fulltext_index]
create_on_flush = "auto"
Expand Down Expand Up @@ -941,7 +941,7 @@ write_interval = "30s"
.trim()
.to_string();
let body_text = drop_lines_with_inconsistent_results(res_get.text().await);
similar_asserts::assert_eq!(body_text, expected_toml_str);
similar_asserts::assert_eq!(expected_toml_str, body_text);
}

fn drop_lines_with_inconsistent_results(input: String) -> String {
Expand Down

0 comments on commit 821dba7

Please sign in to comment.