Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tests-integration): add a naive region migration integration test #3078

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
Some((_, Instruction::OpenRegion { .. }))
| Some((_, Instruction::CloseRegion { .. }))
| Some((_, Instruction::DowngradeRegion { .. }))
| Some((_, Instruction::UpgradeRegion { .. }))
)
}

Expand Down Expand Up @@ -134,7 +135,7 @@ mod tests {
use common_meta::heartbeat::mailbox::{
HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
};
use common_meta::instruction::{DowngradeRegion, OpenRegion};
use common_meta::instruction::{DowngradeRegion, OpenRegion, UpgradeRegion};
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
Expand Down Expand Up @@ -175,6 +176,44 @@ mod tests {
}
}

#[test]
fn test_is_acceptable() {
common_telemetry::init_default_ut_logging();
let region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let heartbeat_env = HeartbeatResponseTestEnv::new();
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");

// Open region
let region_id = RegionId::new(1024, 1);
let storage_path = "test";
let instruction = open_region_instruction(region_id, storage_path);
assert!(heartbeat_handler
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));

// Close region
let instruction = close_region_instruction(region_id);
assert!(heartbeat_handler
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));

// Downgrade region
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id: RegionId::new(2048, 1),
});
assert!(heartbeat_handler
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));

// Upgrade region
let instruction = Instruction::UpgradeRegion(UpgradeRegion {
region_id,
last_entry_id: None,
wait_for_replay_timeout: None,
});
assert!(
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
);
}

fn close_region_instruction(region_id: RegionId) -> Instruction {
Instruction::CloseRegion(RegionIdent {
table_id: region_id.table_id(),
Expand Down
8 changes: 3 additions & 5 deletions src/datanode/src/heartbeat/handler/open_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use common_error::ext::ErrorExt;
use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
use common_meta::wal::prepare_wal_option;
use futures_util::future::BoxFuture;
use store_api::path_utils::region_dir;
use store_api::region_request::{RegionOpenRequest, RegionRequest};
Expand All @@ -26,26 +27,23 @@ impl HandlerContext {
OpenRegion {
region_ident,
region_storage_path,
region_options,
mut region_options,
region_wal_options,
skip_wal_replay,
}: OpenRegion,
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
// TODO(niebayes): extends region options with region_wal_options.
let _ = region_wal_options;
prepare_wal_option(&mut region_options, region_id, &region_wal_options);
let request = RegionRequest::Open(RegionOpenRequest {
engine: region_ident.engine,
region_dir: region_dir(&region_storage_path, region_id),
options: region_options,
skip_wal_replay,
});
let result = self.region_server.handle_request(region_id, request).await;

let success = result.is_ok();
let error = result.as_ref().map_err(|e| e.output_msg()).err();

InstructionReply::OpenRegion(SimpleReply {
result: success,
error,
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
pub use manager::RegionMigrationProcedureTask;
use serde::{Deserialize, Serialize};
use snafu::{location, Location, OptionExt, ResultExt};
use store_api::storage::RegionId;
Expand Down
15 changes: 13 additions & 2 deletions src/meta-srv/src/procedure/region_migration/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,24 @@ impl Drop for RegionMigrationProcedureGuard {
}

#[derive(Debug, Clone)]
pub(crate) struct RegionMigrationProcedureTask {
pub struct RegionMigrationProcedureTask {
pub(crate) cluster_id: ClusterId,
pub(crate) region_id: RegionId,
pub(crate) from_peer: Peer,
pub(crate) to_peer: Peer,
}

impl RegionMigrationProcedureTask {
pub fn new(cluster_id: ClusterId, region_id: RegionId, from_peer: Peer, to_peer: Peer) -> Self {
Self {
cluster_id,
region_id,
from_peer,
to_peer,
}
}
}

impl Display for RegionMigrationProcedureTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand Down Expand Up @@ -223,7 +234,7 @@ impl RegionMigrationManager {
}

/// Submits a new region migration procedure.
pub(crate) async fn submit_procedure(
pub async fn submit_procedure(
&self,
task: RegionMigrationProcedureTask,
) -> Result<Option<ProcedureId>> {
Expand Down
46 changes: 32 additions & 14 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use meta_client::client::MetaClientBuilder;
use meta_srv::cluster::MetaPeerClientRef;
use meta_srv::metasrv::{MetaSrv, MetaSrvOptions};
use meta_srv::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef};
use meta_srv::mocks::MockInfo;
use servers::grpc::GrpcServer;
use servers::heartbeat_options::HeartbeatOptions;
use servers::Mode;
use tempfile::TempDir;
use tonic::transport::Server;
use tower::service_fn;

Expand Down Expand Up @@ -75,6 +76,8 @@ pub struct GreptimeDbClusterBuilder {
datanodes: Option<u32>,
wal_config: WalConfig,
meta_wal_config: MetaWalConfig,
shared_home_dir: Option<Arc<TempDir>>,
meta_selector: Option<SelectorRef>,
}

impl GreptimeDbClusterBuilder {
Expand Down Expand Up @@ -102,6 +105,8 @@ impl GreptimeDbClusterBuilder {
datanodes: None,
wal_config: WalConfig::default(),
meta_wal_config: MetaWalConfig::default(),
shared_home_dir: None,
meta_selector: None,
}
}

Expand Down Expand Up @@ -130,6 +135,16 @@ impl GreptimeDbClusterBuilder {
self
}

pub fn with_shared_home_dir(mut self, shared_home_dir: Arc<TempDir>) -> Self {
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
self.shared_home_dir = Some(shared_home_dir);
self
}

pub fn with_meta_selector(mut self, selector: SelectorRef) -> Self {
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
self.meta_selector = Some(selector);
self
}

pub async fn build(self) -> GreptimeDbCluster {
let datanodes = self.datanodes.unwrap_or(4);

Expand All @@ -147,7 +162,13 @@ impl GreptimeDbClusterBuilder {
..Default::default()
};

let meta_srv = self.build_metasrv(opt, datanode_clients.clone()).await;
let meta_srv = meta_srv::mocks::mock(
opt,
self.kv_backend.clone(),
self.meta_selector.clone(),
Some(datanode_clients.clone()),
)
.await;

let (datanode_instances, storage_guards, dir_guards) =
self.build_datanodes(meta_srv.clone(), datanodes).await;
Expand Down Expand Up @@ -175,14 +196,6 @@ impl GreptimeDbClusterBuilder {
}
}

async fn build_metasrv(
&self,
opt: MetaSrvOptions,
datanode_clients: Arc<DatanodeClients>,
) -> MockInfo {
meta_srv::mocks::mock(opt, self.kv_backend.clone(), None, Some(datanode_clients)).await
}

async fn build_datanodes(
&self,
meta_srv: MockInfo,
Expand All @@ -200,10 +213,15 @@ impl GreptimeDbClusterBuilder {
let datanode_id = i as u64 + 1;
let mode = Mode::Distributed;
let mut opts = if let Some(store_config) = &self.store_config {
let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();

dir_guards.push(FileDirGuard::new(home_tmp_dir));
let home_dir = if let Some(home_dir) = &self.shared_home_dir {
home_dir.path().to_str().unwrap().to_string()
} else {
let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
dir_guards.push(FileDirGuard::new(home_tmp_dir));

home_dir
};

create_datanode_opts(
mode,
Expand Down
21 changes: 21 additions & 0 deletions tests-integration/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use common_meta::key::schema_name::SchemaNameKey;
use common_query::Output;
use common_recordbatch::util;
use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::warn;
use common_test_util::ports;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datanode::config::{
Expand All @@ -36,6 +37,7 @@ use datanode::config::{
use frontend::frontend::TomlSerializable;
use frontend::instance::Instance;
use frontend::service_config::{MysqlOptions, PostgresOptions};
use futures::future::BoxFuture;
use object_store::services::{Azblob, Gcs, Oss, S3};
use object_store::test_util::TempFolder;
use object_store::ObjectStore;
Expand Down Expand Up @@ -662,3 +664,22 @@ pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) {
.await
.unwrap();
}

pub async fn run_test_with_kafka_wal<F>(test: F)
where
F: FnOnce(Vec<String>) -> BoxFuture<'static, ()>,
{
let _ = dotenv::dotenv();
let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
if endpoints.is_empty() {
warn!("The endpoints is empty, skipping the test");
return;
}

let endpoints = endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>();

test(endpoints).await
}
14 changes: 7 additions & 7 deletions tests-integration/tests/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub async fn test_region_failover(store_type: StorageType) {

let table_id = prepare_testing_table(&cluster).await;

let results = write_datas(&frontend, logical_timer).await;
let results = insert_values(&frontend, logical_timer).await;
logical_timer += 1000;
for result in results {
assert!(matches!(result.unwrap(), Output::AffectedRows(1)));
Expand Down Expand Up @@ -141,12 +141,12 @@ pub async fn test_region_failover(store_type: StorageType) {

// Inserts data to each datanode after failover
let frontend = cluster.frontend.clone();
let results = write_datas(&frontend, logical_timer).await;
let results = insert_values(&frontend, logical_timer).await;
for result in results {
assert!(matches!(result.unwrap(), Output::AffectedRows(1)));
}

assert_writes(&frontend).await;
assert_values(&frontend).await;

assert!(!distribution.contains_key(&failed_region.datanode_id));

Expand Down Expand Up @@ -179,12 +179,12 @@ async fn has_route_cache(instance: &Arc<Instance>, table_id: TableId) -> bool {
.is_some()
}

async fn write_datas(instance: &Arc<Instance>, ts: u64) -> Vec<FrontendResult<Output>> {
async fn insert_values(instance: &Arc<Instance>, ts: u64) -> Vec<FrontendResult<Output>> {
let query_ctx = QueryContext::arc();

let mut results = Vec::new();
for range in [5, 15, 25, 55] {
let result = write_data(
let result = insert_value(
instance,
&format!("INSERT INTO my_table VALUES ({},{})", range, ts),
query_ctx.clone(),
Expand All @@ -196,15 +196,15 @@ async fn write_datas(instance: &Arc<Instance>, ts: u64) -> Vec<FrontendResult<Ou
results
}

async fn write_data(
async fn insert_value(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> FrontendResult<Output> {
instance.do_query(sql, query_ctx).await.remove(0)
}

async fn assert_writes(instance: &Arc<Instance>) {
async fn assert_values(instance: &Arc<Instance>) {
let query_ctx = QueryContext::arc();

let result = instance
Expand Down
Loading
Loading