From cdb2374b24c71f1c04c2773cee91cbdb07220459 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 3 Jan 2024 05:35:14 +0000 Subject: [PATCH] feat(tests-integration): add a naive region migration test --- tests-integration/src/cluster.rs | 46 +++- tests-integration/src/test_util.rs | 21 ++ tests-integration/tests/region_migration.rs | 273 ++++++++++++++++++++ 3 files changed, 326 insertions(+), 14 deletions(-) create mode 100644 tests-integration/tests/region_migration.rs diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 336d1d68d0b0..183a7fba12e7 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -43,11 +43,12 @@ use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance as FeInstance}; use meta_client::client::MetaClientBuilder; use meta_srv::cluster::MetaPeerClientRef; -use meta_srv::metasrv::{MetaSrv, MetaSrvOptions}; +use meta_srv::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef}; use meta_srv::mocks::MockInfo; use servers::grpc::GrpcServer; use servers::heartbeat_options::HeartbeatOptions; use servers::Mode; +use tempfile::TempDir; use tonic::transport::Server; use tower::service_fn; @@ -75,6 +76,8 @@ pub struct GreptimeDbClusterBuilder { datanodes: Option, wal_config: WalConfig, meta_wal_config: MetaWalConfig, + shared_home_dir: Option>, + meta_selector: Option, } impl GreptimeDbClusterBuilder { @@ -102,6 +105,8 @@ impl GreptimeDbClusterBuilder { datanodes: None, wal_config: WalConfig::default(), meta_wal_config: MetaWalConfig::default(), + shared_home_dir: None, + meta_selector: None, } } @@ -130,6 +135,16 @@ impl GreptimeDbClusterBuilder { self } + pub fn with_shared_home_dir(mut self, shared_home_dir: Arc) -> Self { + self.shared_home_dir = Some(shared_home_dir); + self + } + + pub fn with_meta_selector(mut self, selector: SelectorRef) -> Self { + self.meta_selector = Some(selector); + self + } + pub async fn build(self) -> GreptimeDbCluster { let datanodes = self.datanodes.unwrap_or(4); @@ -147,7 +162,13 @@ impl GreptimeDbClusterBuilder { ..Default::default() }; - let meta_srv = self.build_metasrv(opt, datanode_clients.clone()).await; + let meta_srv = meta_srv::mocks::mock( + opt, + self.kv_backend.clone(), + self.meta_selector.clone(), + Some(datanode_clients.clone()), + ) + .await; let (datanode_instances, storage_guards, dir_guards) = self.build_datanodes(meta_srv.clone(), datanodes).await; @@ -175,14 +196,6 @@ impl GreptimeDbClusterBuilder { } } - async fn build_metasrv( - &self, - opt: MetaSrvOptions, - datanode_clients: Arc, - ) -> MockInfo { - meta_srv::mocks::mock(opt, self.kv_backend.clone(), None, Some(datanode_clients)).await - } - async fn build_datanodes( &self, meta_srv: MockInfo, @@ -200,10 +213,15 @@ impl GreptimeDbClusterBuilder { let datanode_id = i as u64 + 1; let mode = Mode::Distributed; let mut opts = if let Some(store_config) = &self.store_config { - let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name)); - let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); - - dir_guards.push(FileDirGuard::new(home_tmp_dir)); + let home_dir = if let Some(home_dir) = &self.shared_home_dir { + home_dir.path().to_str().unwrap().to_string() + } else { + let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name)); + let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); + dir_guards.push(FileDirGuard::new(home_tmp_dir)); + + home_dir + }; create_datanode_opts( mode, diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 04e31d91ca3f..441342b2fee6 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -27,6 +27,7 @@ use common_meta::key::schema_name::SchemaNameKey; use common_query::Output; use common_recordbatch::util; use common_runtime::Builder as RuntimeBuilder; +use common_telemetry::warn; use common_test_util::ports; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datanode::config::{ @@ -36,6 +37,7 @@ use datanode::config::{ use frontend::frontend::TomlSerializable; use frontend::instance::Instance; use frontend::service_config::{MysqlOptions, PostgresOptions}; +use futures::future::BoxFuture; use object_store::services::{Azblob, Gcs, Oss, S3}; use object_store::test_util::TempFolder; use object_store::ObjectStore; @@ -662,3 +664,22 @@ pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) { .await .unwrap(); } + +pub async fn run_test_with_kafka_wal(test: F) +where + F: FnOnce(Vec) -> BoxFuture<'static, ()>, +{ + let _ = dotenv::dotenv(); + let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default(); + if endpoints.is_empty() { + warn!("The endpoints is empty, skipping the test"); + return; + } + + let endpoints = endpoints + .split(',') + .map(|s| s.trim().to_string()) + .collect::>(); + + test(endpoints).await +} diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs new file mode 100644 index 000000000000..5191c51591eb --- /dev/null +++ b/tests-integration/tests/region_migration.rs @@ -0,0 +1,273 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_config::wal::KafkaConfig; +use common_config::WalConfig; +use common_meta::key::{RegionDistribution, TableMetadataManagerRef}; +use common_meta::peer::Peer; +use common_meta::wal::kafka::KafkaConfig as MetaKafkaConfig; +use common_meta::wal::WalConfig as MetaWalConfig; +use common_query::Output; +use common_telemetry::info; +use common_test_util::temp_dir::create_temp_dir; +use frontend::error::Result as FrontendResult; +use frontend::instance::Instance; +use futures::future::BoxFuture; +use meta_srv::error::Result as MetaResult; +use meta_srv::metasrv::SelectorContext; +use meta_srv::procedure::region_migration::RegionMigrationProcedureTask; +use meta_srv::selector::{Namespace, Selector, SelectorOptions}; +use servers::query_handler::sql::SqlQueryHandler; +use session::context::{QueryContext, QueryContextRef}; +use store_api::storage::RegionId; +use table::metadata::TableId; +use tests_integration::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder}; +use tests_integration::test_util::{ + check_output_stream, get_test_store_config, run_test_with_kafka_wal, StorageType, +}; +use uuid::Uuid; + +const TEST_TABLE_NAME: &str = "migration_target"; + +#[tokio::test(flavor = "multi_thread")] +async fn test_region_migration_fs() { + run_test_with_kafka_wal(|endpoints| { + Box::pin(async move { test_region_migration(StorageType::File, endpoints).await }) + }) + .await +} + +pub async fn test_region_migration(store_type: StorageType, endpoints: Vec) { + let cluster_name = "test_region_migration"; + let peer_factory = |id| Peer { + id, + addr: "127.0.0.1:3001".to_string(), + }; + + // Prepares test cluster. + let (store_config, _guard) = get_test_store_config(&store_type); + let home_dir = create_temp_dir("test_migration_data_home"); + let datanodes = 5u64; + let builder = GreptimeDbClusterBuilder::new(cluster_name).await; + let const_selector = Arc::new(ConstNodeSelector { + peers: vec![peer_factory(1), peer_factory(2), peer_factory(3)], + }); + let cluster = builder + .with_datanodes(datanodes as u32) + .with_store_config(store_config) + .with_wal_config(WalConfig::Kafka(KafkaConfig { + broker_endpoints: endpoints.clone(), + linger: Duration::from_millis(25), + ..Default::default() + })) + .with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig { + broker_endpoints: endpoints, + num_topics: 3, + topic_name_prefix: Uuid::new_v4().to_string(), + ..Default::default() + })) + .with_shared_home_dir(Arc::new(home_dir)) + .with_meta_selector(const_selector.clone()) + .build() + .await; + let mut logical_timer = 1685508715000; + let table_metadata_manager = cluster.meta_srv.table_metadata_manager().clone(); + + // Prepares test table. + let table_id = prepare_testing_table(&cluster).await; + + // Inserts data + let results = insert_datas(&cluster.frontend, logical_timer).await; + logical_timer += 1000; + for result in results { + assert!(matches!(result.unwrap(), Output::AffectedRows(1))); + } + + // The region distribution + let mut distribution = find_region_distribution(&table_metadata_manager, table_id).await; + + // Selecting target of region migration. + let region_migration_manager = cluster.meta_srv.region_migration_manager(); + let (from_peer_id, from_regions) = distribution.pop_first().unwrap(); + info!( + "Selecting from peer: {from_peer_id}, and regions: {:?}", + from_regions + ); + let (to_peer_id, mut to_regions) = distribution.pop_first().unwrap(); + info!( + "Selecting to peer: {from_peer_id}, and regions: {:?}", + to_regions + ); + + // Trigger region migration. + let procedure = region_migration_manager + .submit_procedure(RegionMigrationProcedureTask::new( + 0, + RegionId::new(table_id, from_regions[0]), + peer_factory(from_peer_id), + peer_factory(to_peer_id), + )) + .await + .unwrap(); + info!("Started region procedure: {}!", procedure.unwrap()); + + // Prepares expected region distribution. + to_regions.extend(from_regions); + // Keeps asc order. + to_regions.sort(); + distribution.insert(to_peer_id, to_regions); + + // Waits condition + wait_condition( + Duration::from_secs(10), + Box::pin(async move { + loop { + let region_migration = + find_region_distribution(&table_metadata_manager, table_id).await; + if region_migration == distribution { + info!("Found new distribution: {region_migration:?}"); + break; + } else { + info!("Found the unexpected distribution: {region_migration:?}, expected: {distribution:?}"); + tokio::time::sleep(Duration::from_millis(200)).await; + } + } + }), + ) + .await; + + // Inserts more table. + let results = insert_datas(&cluster.frontend, logical_timer).await; + for result in results { + assert!(matches!(result.unwrap(), Output::AffectedRows(1))); + } + + // Asserts the writes. + assert_writes(&cluster.frontend).await; +} + +pub struct ConstNodeSelector { + peers: Vec, +} + +#[async_trait::async_trait] +impl Selector for ConstNodeSelector { + type Context = SelectorContext; + type Output = Vec; + + async fn select( + &self, + _ns: Namespace, + _ctx: &Self::Context, + _opts: SelectorOptions, + ) -> MetaResult { + Ok(self.peers.clone()) + } +} + +async fn wait_condition(timeout: Duration, condition: BoxFuture<'static, ()>) { + tokio::time::timeout(timeout, condition).await.unwrap(); +} + +async fn assert_writes(instance: &Arc) { + let query_ctx = QueryContext::arc(); + + let result = instance + .do_query( + &format!("select * from {TEST_TABLE_NAME} order by i, ts"), + query_ctx, + ) + .await + .remove(0); + + let expected = "\ ++----+---------------------+ +| i | ts | ++----+---------------------+ +| 5 | 2023-05-31T04:51:55 | +| 5 | 2023-05-31T04:51:56 | +| 15 | 2023-05-31T04:51:55 | +| 15 | 2023-05-31T04:51:56 | +| 55 | 2023-05-31T04:51:55 | +| 55 | 2023-05-31T04:51:56 | ++----+---------------------+"; + check_output_stream(result.unwrap(), expected).await; +} + +async fn prepare_testing_table(cluster: &GreptimeDbCluster) -> TableId { + let sql = format!( + r" + CREATE TABLE {TEST_TABLE_NAME} ( + i INT PRIMARY KEY, + ts TIMESTAMP TIME INDEX, + ) PARTITION BY RANGE COLUMNS (i) ( + PARTITION r0 VALUES LESS THAN (10), + PARTITION r1 VALUES LESS THAN (50), + PARTITION r3 VALUES LESS THAN (MAXVALUE), + )" + ); + let mut result = cluster.frontend.do_query(&sql, QueryContext::arc()).await; + let output = result.remove(0).unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + + let table = cluster + .frontend + .catalog_manager() + .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME) + .await + .unwrap() + .unwrap(); + table.table_info().table_id() +} + +async fn find_region_distribution( + table_metadata_manager: &TableMetadataManagerRef, + table_id: TableId, +) -> RegionDistribution { + table_metadata_manager + .table_route_manager() + .get_region_distribution(table_id) + .await + .unwrap() + .unwrap() +} + +async fn insert_datas(instance: &Arc, ts: u64) -> Vec> { + let query_ctx = QueryContext::arc(); + + let mut results = Vec::new(); + for range in [5, 15, 55] { + let result = write_data( + instance, + &format!("INSERT INTO {TEST_TABLE_NAME} VALUES ({},{})", range, ts), + query_ctx.clone(), + ) + .await; + results.push(result); + } + + results +} + +async fn write_data( + instance: &Arc, + sql: &str, + query_ctx: QueryContextRef, +) -> FrontendResult { + instance.do_query(sql, query_ctx).await.remove(0) +}