Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): add skip_wal_replay option to OpenRegionRequest #2955

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ impl DatanodeBuilder {
engine: engine.clone(),
region_dir,
options,
skip_wal_replay: false,
}),
)
.await?;
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl RegionHeartbeatResponseHandler {
engine: region_ident.engine,
region_dir: region_dir(&region_storage_path, region_id),
options: region_options,
skip_wal_replay: false,
});
let result = region_server.handle_request(region_id, request).await;

Expand Down
2 changes: 2 additions & 0 deletions src/file-engine/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ mod tests {
engine: "file".to_string(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
};

let region = FileRegion::open(region_id, request, &object_store)
Expand Down Expand Up @@ -211,6 +212,7 @@ mod tests {
engine: "file".to_string(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
};
let err = FileRegion::open(region_id, request, &object_store)
.await
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ async fn test_alter_region() {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down Expand Up @@ -201,6 +202,7 @@ async fn test_put_after_alter() {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ async fn test_region_replay() {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down
98 changes: 96 additions & 2 deletions src/mito2/src/engine/open_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ use std::time::Duration;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{
RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::RegionId;
use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::test_util::{
build_rows, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv,
build_rows, flush_region, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv,
};

#[tokio::test]
Expand All @@ -42,6 +43,7 @@ async fn test_engine_open_empty() {
engine: String::new(),
region_dir: "empty".to_string(),
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down Expand Up @@ -73,6 +75,7 @@ async fn test_engine_open_existing() {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down Expand Up @@ -161,6 +164,7 @@ async fn test_engine_region_open_with_options() {
engine: String::new(),
region_dir,
options: HashMap::from([("ttl".to_string(), "4d".to_string())]),
skip_wal_replay: false,
}),
)
.await
Expand Down Expand Up @@ -205,6 +209,7 @@ async fn test_engine_region_open_with_custom_store() {
engine: String::new(),
region_dir,
options: HashMap::from([("storage".to_string(), "Gcs".to_string())]),
skip_wal_replay: false,
}),
)
.await
Expand All @@ -225,3 +230,92 @@ async fn test_engine_region_open_with_custom_store() {
.await
.unwrap());
}

#[tokio::test]
async fn test_open_region_skip_wal_replay() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();

let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;

flush_region(&engine, region_id, None).await;

let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(3, 5),
};
put_rows(&engine, region_id, rows).await;

let engine = env.reopen_engine(engine, MitoConfig::default()).await;
// Skip the WAL replay .
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir: region_dir.to_string(),
options: Default::default(),
skip_wal_replay: true,
}),
)
.await
.unwrap();

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());

// Replay the WAL.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
// Open the region again with options.
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: Default::default(),
skip_wal_replay: false,
}),
)
.await
.unwrap();

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
1 change: 1 addition & 0 deletions src/mito2/src/engine/parallel_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ async fn scan_in_parallel(
engine: String::new(),
region_dir: region_dir.to_string(),
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/engine/truncate_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ async fn test_engine_truncate_reopen() {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down Expand Up @@ -353,6 +354,7 @@ async fn test_engine_truncate_during_flush() {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down
14 changes: 13 additions & 1 deletion src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub(crate) struct RegionOpener {
scheduler: SchedulerRef,
options: HashMap<String, String>,
cache_manager: Option<CacheManagerRef>,
skip_wal_replay: bool,
}

impl RegionOpener {
Expand All @@ -74,6 +75,7 @@ impl RegionOpener {
scheduler,
options: HashMap::new(),
cache_manager: None,
skip_wal_replay: false,
}
}

Expand All @@ -95,6 +97,12 @@ impl RegionOpener {
self
}

/// Sets the `skip_wal_replay`.
pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
self.skip_wal_replay = skip;
self
}

/// Writes region manifest and creates a new region if it does not exist.
/// Opens the region if it already exists.
///
Expand Down Expand Up @@ -235,7 +243,11 @@ impl RegionOpener {
.build();
let flushed_entry_id = version.flushed_entry_id;
let version_control = Arc::new(VersionControl::new(version));
replay_memtable(wal, region_id, flushed_entry_id, &version_control).await?;
if !self.skip_wal_replay {
replay_memtable(wal, region_id, flushed_entry_id, &version_control).await?;
} else {
info!("Skip the WAL replay for region: {}", region_id);
}

let region = MitoRegion {
region_id: self.region_id,
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
use api::v1::{OpType, Row, Rows, SemanticType};
use common_datasource::compression::CompressionType;
use common_query::Output;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::prelude::ConcreteDataType;
Expand Down Expand Up @@ -697,6 +696,7 @@ pub async fn reopen_region(
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/worker/handle_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.scheduler.clone(),
)
.options(request.options)
.skip_wal_replay(request.skip_wal_replay)
.cache(Some(self.cache_manager.clone()))
.open(&self.config, &self.wal)
.await?;
Expand Down
3 changes: 3 additions & 0 deletions src/store-api/src/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl RegionRequest {
engine: open.engine,
region_dir,
options: open.options,
skip_wal_replay: false,
}),
)])
}
Expand Down Expand Up @@ -197,6 +198,8 @@ pub struct RegionOpenRequest {
pub region_dir: String,
/// Options of the opened region.
pub options: HashMap<String, String>,
/// To skip replaying the WAL.
pub skip_wal_replay: bool,
}

/// Close region request.
Expand Down
Loading