diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index c7533db5423c..5eaa3ad88c93 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -96,6 +96,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { Some((_, Instruction::OpenRegion { .. })) | Some((_, Instruction::CloseRegion { .. })) | Some((_, Instruction::DowngradeRegion { .. })) + | Some((_, Instruction::UpgradeRegion { .. })) ) } @@ -134,7 +135,7 @@ mod tests { use common_meta::heartbeat::mailbox::{ HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta, }; - use common_meta::instruction::{DowngradeRegion, OpenRegion}; + use common_meta::instruction::{DowngradeRegion, OpenRegion, UpgradeRegion}; use mito2::config::MitoConfig; use mito2::engine::MITO_ENGINE_NAME; use mito2::test_util::{CreateRequestBuilder, TestEnv}; @@ -175,6 +176,44 @@ mod tests { } } + #[test] + fn test_is_acceptable() { + common_telemetry::init_default_ut_logging(); + let region_server = mock_region_server(); + let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + let heartbeat_env = HeartbeatResponseTestEnv::new(); + let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0"); + + // Open region + let region_id = RegionId::new(1024, 1); + let storage_path = "test"; + let instruction = open_region_instruction(region_id, storage_path); + assert!(heartbeat_handler + .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))); + + // Close region + let instruction = close_region_instruction(region_id); + assert!(heartbeat_handler + .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))); + + // Downgrade region + let instruction = Instruction::DowngradeRegion(DowngradeRegion { + region_id: RegionId::new(2048, 1), + }); + assert!(heartbeat_handler + .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))); + + // Upgrade region + let instruction = Instruction::UpgradeRegion(UpgradeRegion { + region_id, + last_entry_id: None, + wait_for_replay_timeout: None, + }); + assert!( + heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction))) + ); + } + fn close_region_instruction(region_id: RegionId) -> Instruction { Instruction::CloseRegion(RegionIdent { table_id: region_id.table_id(), diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index 131026be4469..f4823d8d5710 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -14,6 +14,7 @@ use common_error::ext::ErrorExt; use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply}; +use common_meta::wal::prepare_wal_option; use futures_util::future::BoxFuture; use store_api::path_utils::region_dir; use store_api::region_request::{RegionOpenRequest, RegionRequest}; @@ -26,15 +27,14 @@ impl HandlerContext { OpenRegion { region_ident, region_storage_path, - region_options, + mut region_options, region_wal_options, skip_wal_replay, }: OpenRegion, ) -> BoxFuture<'static, InstructionReply> { Box::pin(async move { let region_id = Self::region_ident_to_region_id(®ion_ident); - // TODO(niebayes): extends region options with region_wal_options. - let _ = region_wal_options; + prepare_wal_option(&mut region_options, region_id, ®ion_wal_options); let request = RegionRequest::Open(RegionOpenRequest { engine: region_ident.engine, region_dir: region_dir(®ion_storage_path, region_id), @@ -42,10 +42,8 @@ impl HandlerContext { skip_wal_replay, }); let result = self.region_server.handle_request(region_id, request).await; - let success = result.is_ok(); let error = result.as_ref().map_err(|e| e.output_msg()).err(); - InstructionReply::OpenRegion(SimpleReply { result: success, error, diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index fbbf19822555..de1dd46d4fca 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -42,6 +42,7 @@ use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, }; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; +pub use manager::RegionMigrationProcedureTask; use serde::{Deserialize, Serialize}; use snafu::{location, Location, OptionExt, ResultExt}; use store_api::storage::RegionId; diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index dd034ba3e7d2..3937d4e13905 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -56,13 +56,24 @@ impl Drop for RegionMigrationProcedureGuard { } #[derive(Debug, Clone)] -pub(crate) struct RegionMigrationProcedureTask { +pub struct RegionMigrationProcedureTask { pub(crate) cluster_id: ClusterId, pub(crate) region_id: RegionId, pub(crate) from_peer: Peer, pub(crate) to_peer: Peer, } +impl RegionMigrationProcedureTask { + pub fn new(cluster_id: ClusterId, region_id: RegionId, from_peer: Peer, to_peer: Peer) -> Self { + Self { + cluster_id, + region_id, + from_peer, + to_peer, + } + } +} + impl Display for RegionMigrationProcedureTask { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -223,7 +234,7 @@ impl RegionMigrationManager { } /// Submits a new region migration procedure. - pub(crate) async fn submit_procedure( + pub async fn submit_procedure( &self, task: RegionMigrationProcedureTask, ) -> Result> { diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 336d1d68d0b0..21276bda9892 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -43,17 +43,18 @@ use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance as FeInstance}; use meta_client::client::MetaClientBuilder; use meta_srv::cluster::MetaPeerClientRef; -use meta_srv::metasrv::{MetaSrv, MetaSrvOptions}; +use meta_srv::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef}; use meta_srv::mocks::MockInfo; use servers::grpc::GrpcServer; use servers::heartbeat_options::HeartbeatOptions; use servers::Mode; +use tempfile::TempDir; use tonic::transport::Server; use tower::service_fn; use crate::test_util::{ self, create_datanode_opts, create_tmp_dir_and_datanode_opts, FileDirGuard, StorageGuard, - StorageType, + StorageType, PEER_PLACEHOLDER_ADDR, }; pub struct GreptimeDbCluster { @@ -75,6 +76,8 @@ pub struct GreptimeDbClusterBuilder { datanodes: Option, wal_config: WalConfig, meta_wal_config: MetaWalConfig, + shared_home_dir: Option>, + meta_selector: Option, } impl GreptimeDbClusterBuilder { @@ -102,34 +105,53 @@ impl GreptimeDbClusterBuilder { datanodes: None, wal_config: WalConfig::default(), meta_wal_config: MetaWalConfig::default(), + shared_home_dir: None, + meta_selector: None, } } + #[must_use] pub fn with_store_config(mut self, store_config: ObjectStoreConfig) -> Self { self.store_config = Some(store_config); self } + #[must_use] pub fn with_store_providers(mut self, store_providers: Vec) -> Self { self.store_providers = Some(store_providers); self } + #[must_use] pub fn with_datanodes(mut self, datanodes: u32) -> Self { self.datanodes = Some(datanodes); self } + #[must_use] pub fn with_wal_config(mut self, wal_config: WalConfig) -> Self { self.wal_config = wal_config; self } + #[must_use] pub fn with_meta_wal_config(mut self, wal_meta: MetaWalConfig) -> Self { self.meta_wal_config = wal_meta; self } + #[must_use] + pub fn with_shared_home_dir(mut self, shared_home_dir: Arc) -> Self { + self.shared_home_dir = Some(shared_home_dir); + self + } + + #[must_use] + pub fn with_meta_selector(mut self, selector: SelectorRef) -> Self { + self.meta_selector = Some(selector); + self + } + pub async fn build(self) -> GreptimeDbCluster { let datanodes = self.datanodes.unwrap_or(4); @@ -147,7 +169,13 @@ impl GreptimeDbClusterBuilder { ..Default::default() }; - let meta_srv = self.build_metasrv(opt, datanode_clients.clone()).await; + let meta_srv = meta_srv::mocks::mock( + opt, + self.kv_backend.clone(), + self.meta_selector.clone(), + Some(datanode_clients.clone()), + ) + .await; let (datanode_instances, storage_guards, dir_guards) = self.build_datanodes(meta_srv.clone(), datanodes).await; @@ -175,14 +203,6 @@ impl GreptimeDbClusterBuilder { } } - async fn build_metasrv( - &self, - opt: MetaSrvOptions, - datanode_clients: Arc, - ) -> MockInfo { - meta_srv::mocks::mock(opt, self.kv_backend.clone(), None, Some(datanode_clients)).await - } - async fn build_datanodes( &self, meta_srv: MockInfo, @@ -200,10 +220,15 @@ impl GreptimeDbClusterBuilder { let datanode_id = i as u64 + 1; let mode = Mode::Distributed; let mut opts = if let Some(store_config) = &self.store_config { - let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name)); - let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); + let home_dir = if let Some(home_dir) = &self.shared_home_dir { + home_dir.path().to_str().unwrap().to_string() + } else { + let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name)); + let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); + dir_guards.push(FileDirGuard::new(home_tmp_dir)); - dir_guards.push(FileDirGuard::new(home_tmp_dir)); + home_dir + }; create_datanode_opts( mode, @@ -370,8 +395,8 @@ async fn create_datanode_client(datanode: &Datanode) -> (String, Client) { // Move client to an option so we can _move_ the inner value // on the first attempt to connect. All other attempts will fail. let mut client = Some(client); - // "127.0.0.1:3001" is just a placeholder, does not actually connect to it. - let addr = "127.0.0.1:3001"; + // `PEER_PLACEHOLDER_ADDR` is just a placeholder, does not actually connect to it. + let addr = PEER_PLACEHOLDER_ADDR; let channel_manager = ChannelManager::new(); let _ = channel_manager .reset_with_connector( diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 9c6956a6bbda..d79d89fef108 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -65,6 +65,7 @@ impl GreptimeDbStandaloneBuilder { } } + #[must_use] pub fn with_default_store_type(self, store_type: StorageType) -> Self { Self { default_store: Some(store_type), @@ -73,6 +74,7 @@ impl GreptimeDbStandaloneBuilder { } #[cfg(test)] + #[must_use] pub fn with_store_providers(self, store_providers: Vec) -> Self { Self { store_providers: Some(store_providers), @@ -81,6 +83,7 @@ impl GreptimeDbStandaloneBuilder { } #[cfg(test)] + #[must_use] pub fn with_plugin(self, plugin: Plugins) -> Self { Self { plugin: Some(plugin), @@ -88,11 +91,13 @@ impl GreptimeDbStandaloneBuilder { } } + #[must_use] pub fn with_wal_config(mut self, wal_config: WalConfig) -> Self { self.wal_config = wal_config; self } + #[must_use] pub fn with_meta_wal_config(mut self, wal_meta: MetaWalConfig) -> Self { self.meta_wal_config = wal_meta; self diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 04e31d91ca3f..8ec3302dc7c8 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -27,6 +27,7 @@ use common_meta::key::schema_name::SchemaNameKey; use common_query::Output; use common_recordbatch::util; use common_runtime::Builder as RuntimeBuilder; +use common_telemetry::warn; use common_test_util::ports; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datanode::config::{ @@ -36,6 +37,7 @@ use datanode::config::{ use frontend::frontend::TomlSerializable; use frontend::instance::Instance; use frontend::service_config::{MysqlOptions, PostgresOptions}; +use futures::future::BoxFuture; use object_store::services::{Azblob, Gcs, Oss, S3}; use object_store::test_util::TempFolder; use object_store::ObjectStore; @@ -54,6 +56,8 @@ use session::context::QueryContext; use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder}; +pub const PEER_PLACEHOLDER_ADDR: &str = "127.0.0.1:3001"; + #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub enum StorageType { S3, @@ -662,3 +666,22 @@ pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) { .await .unwrap(); } + +pub async fn run_test_with_kafka_wal(test: F) +where + F: FnOnce(Vec) -> BoxFuture<'static, ()>, +{ + let _ = dotenv::dotenv(); + let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default(); + if endpoints.is_empty() { + warn!("The endpoints is empty, skipping the test"); + return; + } + + let endpoints = endpoints + .split(',') + .map(|s| s.trim().to_string()) + .collect::>(); + + test(endpoints).await +} diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index c8cfdc12796f..73f1c4c33132 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -100,7 +100,7 @@ pub async fn test_region_failover(store_type: StorageType) { let table_id = prepare_testing_table(&cluster).await; - let results = write_datas(&frontend, logical_timer).await; + let results = insert_values(&frontend, logical_timer).await; logical_timer += 1000; for result in results { assert!(matches!(result.unwrap(), Output::AffectedRows(1))); @@ -141,12 +141,12 @@ pub async fn test_region_failover(store_type: StorageType) { // Inserts data to each datanode after failover let frontend = cluster.frontend.clone(); - let results = write_datas(&frontend, logical_timer).await; + let results = insert_values(&frontend, logical_timer).await; for result in results { assert!(matches!(result.unwrap(), Output::AffectedRows(1))); } - assert_writes(&frontend).await; + assert_values(&frontend).await; assert!(!distribution.contains_key(&failed_region.datanode_id)); @@ -179,12 +179,12 @@ async fn has_route_cache(instance: &Arc, table_id: TableId) -> bool { .is_some() } -async fn write_datas(instance: &Arc, ts: u64) -> Vec> { +async fn insert_values(instance: &Arc, ts: u64) -> Vec> { let query_ctx = QueryContext::arc(); let mut results = Vec::new(); for range in [5, 15, 25, 55] { - let result = write_data( + let result = insert_value( instance, &format!("INSERT INTO my_table VALUES ({},{})", range, ts), query_ctx.clone(), @@ -196,7 +196,7 @@ async fn write_datas(instance: &Arc, ts: u64) -> Vec, sql: &str, query_ctx: QueryContextRef, @@ -204,7 +204,7 @@ async fn write_data( instance.do_query(sql, query_ctx).await.remove(0) } -async fn assert_writes(instance: &Arc) { +async fn assert_values(instance: &Arc) { let query_ctx = QueryContext::arc(); let result = instance diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs new file mode 100644 index 000000000000..b267c01fc675 --- /dev/null +++ b/tests-integration/tests/region_migration.rs @@ -0,0 +1,288 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_config::wal::KafkaConfig; +use common_config::WalConfig; +use common_meta::key::{RegionDistribution, TableMetadataManagerRef}; +use common_meta::peer::Peer; +use common_meta::wal::kafka::KafkaConfig as MetaKafkaConfig; +use common_meta::wal::WalConfig as MetaWalConfig; +use common_query::Output; +use common_telemetry::info; +use common_test_util::temp_dir::create_temp_dir; +use frontend::error::Result as FrontendResult; +use frontend::instance::Instance; +use futures::future::BoxFuture; +use meta_srv::error::Result as MetaResult; +use meta_srv::metasrv::SelectorContext; +use meta_srv::procedure::region_migration::RegionMigrationProcedureTask; +use meta_srv::selector::{Namespace, Selector, SelectorOptions}; +use servers::query_handler::sql::SqlQueryHandler; +use session::context::{QueryContext, QueryContextRef}; +use store_api::storage::RegionId; +use table::metadata::TableId; +use tests_integration::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder}; +use tests_integration::test_util::{ + check_output_stream, get_test_store_config, run_test_with_kafka_wal, StorageType, + PEER_PLACEHOLDER_ADDR, +}; +use uuid::Uuid; + +const TEST_TABLE_NAME: &str = "migration_target"; + +#[tokio::test(flavor = "multi_thread")] +async fn test_region_migration_fs() { + common_telemetry::init_default_ut_logging(); + run_test_with_kafka_wal(|endpoints| { + Box::pin(async move { test_region_migration(StorageType::File, endpoints).await }) + }) + .await +} + +pub async fn test_region_migration(store_type: StorageType, endpoints: Vec) { + let cluster_name = "test_region_migration"; + let peer_factory = |id| Peer { + id, + addr: PEER_PLACEHOLDER_ADDR.to_string(), + }; + + // Prepares test cluster. + let (store_config, _guard) = get_test_store_config(&store_type); + let home_dir = create_temp_dir("test_migration_data_home"); + let datanodes = 5u64; + let builder = GreptimeDbClusterBuilder::new(cluster_name).await; + let const_selector = Arc::new(ConstNodeSelector { + peers: vec![peer_factory(1), peer_factory(2), peer_factory(3)], + }); + let cluster = builder + .with_datanodes(datanodes as u32) + .with_store_config(store_config) + .with_wal_config(WalConfig::Kafka(KafkaConfig { + broker_endpoints: endpoints.clone(), + linger: Duration::from_millis(25), + ..Default::default() + })) + .with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig { + broker_endpoints: endpoints, + num_topics: 3, + topic_name_prefix: Uuid::new_v4().to_string(), + ..Default::default() + })) + .with_shared_home_dir(Arc::new(home_dir)) + .with_meta_selector(const_selector.clone()) + .build() + .await; + let mut logical_timer = 1685508715000; + let table_metadata_manager = cluster.meta_srv.table_metadata_manager().clone(); + + // Prepares test table. + let table_id = prepare_testing_table(&cluster).await; + + // Inserts data + let results = insert_values(&cluster.frontend, logical_timer).await; + logical_timer += 1000; + for result in results { + assert!(matches!(result.unwrap(), Output::AffectedRows(1))); + } + + // The region distribution + let mut distribution = find_region_distribution(&table_metadata_manager, table_id).await; + + // Selecting target of region migration. + let region_migration_manager = cluster.meta_srv.region_migration_manager(); + let (from_peer_id, from_regions) = distribution.pop_first().unwrap(); + info!( + "Selecting from peer: {from_peer_id}, and regions: {:?}", + from_regions + ); + let (to_peer_id, mut to_regions) = distribution.pop_first().unwrap(); + info!( + "Selecting to peer: {from_peer_id}, and regions: {:?}", + to_regions + ); + + let region_id = RegionId::new(table_id, from_regions[0]); + // Trigger region migration. + let procedure = region_migration_manager + .submit_procedure(RegionMigrationProcedureTask::new( + 0, + region_id, + peer_factory(from_peer_id), + peer_factory(to_peer_id), + )) + .await + .unwrap(); + info!("Started region procedure: {}!", procedure.unwrap()); + + // Prepares expected region distribution. + to_regions.extend(from_regions); + // Keeps asc order. + to_regions.sort(); + distribution.insert(to_peer_id, to_regions); + + // Waits condition + wait_condition( + Duration::from_secs(10), + Box::pin(async move { + loop { + let region_migration = + find_region_distribution(&table_metadata_manager, table_id).await; + if region_migration == distribution { + info!("Found new distribution: {region_migration:?}"); + break; + } else { + info!("Found the unexpected distribution: {region_migration:?}, expected: {distribution:?}"); + tokio::time::sleep(Duration::from_millis(200)).await; + } + } + }), + ) + .await; + + // Inserts more table. + let results = insert_values(&cluster.frontend, logical_timer).await; + for result in results { + assert!(matches!(result.unwrap(), Output::AffectedRows(1))); + } + + // Asserts the writes. + assert_values(&cluster.frontend).await; + + // Triggers again. + let procedure = region_migration_manager + .submit_procedure(RegionMigrationProcedureTask::new( + 0, + region_id, + peer_factory(from_peer_id), + peer_factory(to_peer_id), + )) + .await + .unwrap(); + assert!(procedure.is_none()); +} + +pub struct ConstNodeSelector { + peers: Vec, +} + +#[async_trait::async_trait] +impl Selector for ConstNodeSelector { + type Context = SelectorContext; + type Output = Vec; + + async fn select( + &self, + _ns: Namespace, + _ctx: &Self::Context, + _opts: SelectorOptions, + ) -> MetaResult { + Ok(self.peers.clone()) + } +} + +async fn wait_condition(timeout: Duration, condition: BoxFuture<'static, ()>) { + tokio::time::timeout(timeout, condition).await.unwrap(); +} + +async fn assert_values(instance: &Arc) { + let query_ctx = QueryContext::arc(); + + let result = instance + .do_query( + &format!("select * from {TEST_TABLE_NAME} order by i, ts"), + query_ctx, + ) + .await + .remove(0); + + let expected = "\ ++----+---------------------+ +| i | ts | ++----+---------------------+ +| 5 | 2023-05-31T04:51:55 | +| 5 | 2023-05-31T04:51:56 | +| 15 | 2023-05-31T04:51:55 | +| 15 | 2023-05-31T04:51:56 | +| 55 | 2023-05-31T04:51:55 | +| 55 | 2023-05-31T04:51:56 | ++----+---------------------+"; + check_output_stream(result.unwrap(), expected).await; +} + +async fn prepare_testing_table(cluster: &GreptimeDbCluster) -> TableId { + let sql = format!( + r" + CREATE TABLE {TEST_TABLE_NAME} ( + i INT PRIMARY KEY, + ts TIMESTAMP TIME INDEX, + ) PARTITION BY RANGE COLUMNS (i) ( + PARTITION r0 VALUES LESS THAN (10), + PARTITION r1 VALUES LESS THAN (50), + PARTITION r3 VALUES LESS THAN (MAXVALUE), + )" + ); + let mut result = cluster.frontend.do_query(&sql, QueryContext::arc()).await; + let output = result.remove(0).unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + + let table = cluster + .frontend + .catalog_manager() + .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME) + .await + .unwrap() + .unwrap(); + table.table_info().table_id() +} + +async fn find_region_distribution( + table_metadata_manager: &TableMetadataManagerRef, + table_id: TableId, +) -> RegionDistribution { + table_metadata_manager + .table_route_manager() + .get_region_distribution(table_id) + .await + .unwrap() + .unwrap() +} + +async fn insert_values(instance: &Arc, ts: u64) -> Vec> { + let query_ctx = QueryContext::arc(); + + let mut results = Vec::new(); + for range in [5, 15, 55] { + let result = insert_value( + instance, + &format!("INSERT INTO {TEST_TABLE_NAME} VALUES ({},{})", range, ts), + query_ctx.clone(), + ) + .await; + results.push(result); + } + + results +} + +async fn insert_value( + instance: &Arc, + sql: &str, + query_ctx: QueryContextRef, +) -> FrontendResult { + instance.do_query(sql, query_ctx).await.remove(0) +}