Skip to content

Commit

Permalink
feat(tests-integration): add a naive region migration test
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jan 3, 2024
1 parent 5f7cfc7 commit cdb2374
Show file tree
Hide file tree
Showing 3 changed files with 326 additions and 14 deletions.
46 changes: 32 additions & 14 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use meta_client::client::MetaClientBuilder;
use meta_srv::cluster::MetaPeerClientRef;
use meta_srv::metasrv::{MetaSrv, MetaSrvOptions};
use meta_srv::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef};
use meta_srv::mocks::MockInfo;
use servers::grpc::GrpcServer;
use servers::heartbeat_options::HeartbeatOptions;
use servers::Mode;
use tempfile::TempDir;
use tonic::transport::Server;
use tower::service_fn;

Expand Down Expand Up @@ -75,6 +76,8 @@ pub struct GreptimeDbClusterBuilder {
datanodes: Option<u32>,
wal_config: WalConfig,
meta_wal_config: MetaWalConfig,
shared_home_dir: Option<Arc<TempDir>>,
meta_selector: Option<SelectorRef>,
}

impl GreptimeDbClusterBuilder {
Expand Down Expand Up @@ -102,6 +105,8 @@ impl GreptimeDbClusterBuilder {
datanodes: None,
wal_config: WalConfig::default(),
meta_wal_config: MetaWalConfig::default(),
shared_home_dir: None,
meta_selector: None,
}
}

Expand Down Expand Up @@ -130,6 +135,16 @@ impl GreptimeDbClusterBuilder {
self
}

pub fn with_shared_home_dir(mut self, shared_home_dir: Arc<TempDir>) -> Self {
self.shared_home_dir = Some(shared_home_dir);
self
}

pub fn with_meta_selector(mut self, selector: SelectorRef) -> Self {
self.meta_selector = Some(selector);
self
}

pub async fn build(self) -> GreptimeDbCluster {
let datanodes = self.datanodes.unwrap_or(4);

Expand All @@ -147,7 +162,13 @@ impl GreptimeDbClusterBuilder {
..Default::default()
};

let meta_srv = self.build_metasrv(opt, datanode_clients.clone()).await;
let meta_srv = meta_srv::mocks::mock(
opt,
self.kv_backend.clone(),
self.meta_selector.clone(),
Some(datanode_clients.clone()),
)
.await;

let (datanode_instances, storage_guards, dir_guards) =
self.build_datanodes(meta_srv.clone(), datanodes).await;
Expand Down Expand Up @@ -175,14 +196,6 @@ impl GreptimeDbClusterBuilder {
}
}

async fn build_metasrv(
&self,
opt: MetaSrvOptions,
datanode_clients: Arc<DatanodeClients>,
) -> MockInfo {
meta_srv::mocks::mock(opt, self.kv_backend.clone(), None, Some(datanode_clients)).await
}

async fn build_datanodes(
&self,
meta_srv: MockInfo,
Expand All @@ -200,10 +213,15 @@ impl GreptimeDbClusterBuilder {
let datanode_id = i as u64 + 1;
let mode = Mode::Distributed;
let mut opts = if let Some(store_config) = &self.store_config {
let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();

dir_guards.push(FileDirGuard::new(home_tmp_dir));
let home_dir = if let Some(home_dir) = &self.shared_home_dir {
home_dir.path().to_str().unwrap().to_string()
} else {
let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
dir_guards.push(FileDirGuard::new(home_tmp_dir));

home_dir
};

create_datanode_opts(
mode,
Expand Down
21 changes: 21 additions & 0 deletions tests-integration/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use common_meta::key::schema_name::SchemaNameKey;
use common_query::Output;
use common_recordbatch::util;
use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::warn;
use common_test_util::ports;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datanode::config::{
Expand All @@ -36,6 +37,7 @@ use datanode::config::{
use frontend::frontend::TomlSerializable;
use frontend::instance::Instance;
use frontend::service_config::{MysqlOptions, PostgresOptions};
use futures::future::BoxFuture;
use object_store::services::{Azblob, Gcs, Oss, S3};
use object_store::test_util::TempFolder;
use object_store::ObjectStore;
Expand Down Expand Up @@ -662,3 +664,22 @@ pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) {
.await
.unwrap();
}

pub async fn run_test_with_kafka_wal<F>(test: F)
where
F: FnOnce(Vec<String>) -> BoxFuture<'static, ()>,
{
let _ = dotenv::dotenv();
let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
if endpoints.is_empty() {
warn!("The endpoints is empty, skipping the test");
return;
}

let endpoints = endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>();

test(endpoints).await
}
Loading

0 comments on commit cdb2374

Please sign in to comment.