From 1fc37353eea3bc1c57cea900aa4974b0bf8a4882 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 1 Dec 2023 06:29:09 +0000 Subject: [PATCH 1/3] feat: handle the downgrade region instruction --- src/datanode/src/heartbeat/handler.rs | 169 ++++++++++++-------------- src/datanode/src/region_server.rs | 15 ++- 2 files changed, 90 insertions(+), 94 deletions(-) diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index ee243942e787..e7dbc27ba86e 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -13,54 +13,104 @@ // limitations under the License. use async_trait::async_trait; -use common_error::ext::ErrorExt; -use common_error::status_code::StatusCode; use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult}; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; -use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; +use common_meta::instruction::{ + DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply, OpenRegion, SimpleReply, +}; use common_meta::RegionIdent; -use common_query::Output; use common_telemetry::error; +use futures::future::BoxFuture; use snafu::OptionExt; use store_api::path_utils::region_dir; +use store_api::region_engine::SetReadonlyResponse; use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionRequest}; use store_api::storage::RegionId; -use crate::error::Result; use crate::region_server::RegionServer; - /// Handler for [Instruction::OpenRegion] and [Instruction::CloseRegion]. #[derive(Clone)] pub struct RegionHeartbeatResponseHandler { region_server: RegionServer, } +/// Handler of the instruction. +pub type InstructionHandler = + Box BoxFuture<'static, InstructionReply> + Send>; + impl RegionHeartbeatResponseHandler { pub fn new(region_server: RegionServer) -> Self { Self { region_server } } - fn instruction_to_request(instruction: Instruction) -> MetaResult<(RegionId, RegionRequest)> { + fn build_handler(instruction: Instruction) -> MetaResult { match instruction { Instruction::OpenRegion(OpenRegion { region_ident, region_storage_path, options, - }) => { - let region_id = Self::region_ident_to_region_id(®ion_ident); - let open_region_req = RegionRequest::Open(RegionOpenRequest { - engine: region_ident.engine, - region_dir: region_dir(®ion_storage_path, region_id), - options, - }); - Ok((region_id, open_region_req)) - } - Instruction::CloseRegion(region_ident) => { - let region_id = Self::region_ident_to_region_id(®ion_ident); - let close_region_req = RegionRequest::Close(RegionCloseRequest {}); - Ok((region_id, close_region_req)) + }) => Ok(Box::new(|region_server| { + Box::pin(async move { + let region_id = Self::region_ident_to_region_id(®ion_ident); + let request = RegionRequest::Open(RegionOpenRequest { + engine: region_ident.engine, + region_dir: region_dir(®ion_storage_path, region_id), + options, + }); + let result = region_server.handle_request(region_id, request).await; + + let success = result.is_ok(); + let error = result.as_ref().map_err(|e| e.to_string()).err(); + + InstructionReply::OpenRegion(SimpleReply { + result: success, + error, + }) + }) + })), + Instruction::CloseRegion(region_ident) => Ok(Box::new(|region_server| { + Box::pin(async move { + let region_id = Self::region_ident_to_region_id(®ion_ident); + let request = RegionRequest::Close(RegionCloseRequest {}); + let result = region_server.handle_request(region_id, request).await; + + let success = result.is_ok(); + let error = result.as_ref().map_err(|e| e.to_string()).err(); + + InstructionReply::CloseRegion(SimpleReply { + result: success, + error, + }) + }) + })), + Instruction::DowngradeRegion(DowngradeRegion { region_id }) => { + Ok(Box::new(move |region_server| { + Box::pin(async move { + match region_server.set_readonly_gracefully(region_id).await { + Ok(SetReadonlyResponse::Success { last_entry_id }) => { + InstructionReply::DowngradeRegion(DowngradeRegionReply { + last_entry_id, + exists: true, + error: None, + }) + } + Ok(SetReadonlyResponse::NotFound) => { + InstructionReply::DowngradeRegion(DowngradeRegionReply { + last_entry_id: None, + exists: false, + error: None, + }) + } + Err(err) => InstructionReply::DowngradeRegion(DowngradeRegionReply { + last_entry_id: None, + exists: false, + error: Some(err.to_string()), + }), + } + }) + })) } Instruction::UpgradeRegion(_) => { todo!() @@ -68,77 +118,12 @@ impl RegionHeartbeatResponseHandler { Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => { InvalidHeartbeatResponseSnafu.fail() } - Instruction::DowngradeRegion(_) => { - // TODO(weny): add it later. - todo!() - } } } fn region_ident_to_region_id(region_ident: &RegionIdent) -> RegionId { RegionId::new(region_ident.table_id, region_ident.region_number) } - - fn reply_template_from_instruction(instruction: &Instruction) -> InstructionReply { - match instruction { - Instruction::OpenRegion(_) => InstructionReply::OpenRegion(SimpleReply { - result: false, - error: None, - }), - Instruction::CloseRegion(_) => InstructionReply::CloseRegion(SimpleReply { - result: false, - error: None, - }), - Instruction::UpgradeRegion(_) => { - todo!() - } - Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => { - InstructionReply::InvalidateTableCache(SimpleReply { - result: false, - error: None, - }) - } - Instruction::DowngradeRegion(_) => { - // TODO(weny): add it later. - todo!() - } - } - } - - fn fill_reply(mut template: InstructionReply, result: Result) -> InstructionReply { - let success = result.is_ok(); - let error = result.as_ref().map_err(|e| e.to_string()).err(); - match &mut template { - InstructionReply::OpenRegion(reply) => { - reply.result = success; - reply.error = error; - } - InstructionReply::CloseRegion(reply) => match result { - Err(e) => { - if e.status_code() == StatusCode::RegionNotFound { - reply.result = true; - } - } - _ => { - reply.result = success; - reply.error = error; - } - }, - InstructionReply::UpgradeRegion(_) => { - todo!() - } - InstructionReply::InvalidateTableCache(reply) => { - reply.result = success; - reply.error = error; - } - InstructionReply::DowngradeRegion(_) => { - // TODO(weny): add it later. - todo!() - } - } - - template - } } #[async_trait] @@ -146,7 +131,9 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { matches!( ctx.incoming_message.as_ref(), - Some((_, Instruction::OpenRegion { .. })) | Some((_, Instruction::CloseRegion { .. })) + Some((_, Instruction::OpenRegion { .. })) + | Some((_, Instruction::CloseRegion { .. })) + | Some((_, Instruction::DowngradeRegion { .. })) ) } @@ -158,15 +145,11 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { let mailbox = ctx.mailbox.clone(); let region_server = self.region_server.clone(); - let reply_template = Self::reply_template_from_instruction(&instruction); - let (region_id, region_req) = Self::instruction_to_request(instruction)?; + let handler = Self::build_handler(instruction)?; let _handle = common_runtime::spawn_bg(async move { - let result = region_server.handle_request(region_id, region_req).await; + let reply = handler(region_server).await; - if let Err(e) = mailbox - .send((meta, Self::fill_reply(reply_template, result))) - .await - { + if let Err(e) = mailbox.send((meta, reply)).await { error!(e; "Failed to send reply to mailbox"); } }); diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 343891b5cc20..6580fa891140 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -49,7 +49,7 @@ use servers::grpc::region_server::RegionServerHandler; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{RegionEngineRef, RegionRole}; +use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse}; use store_api::region_request::{RegionCloseRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; @@ -148,6 +148,19 @@ impl RegionServer { .with_context(|_| HandleRegionRequestSnafu { region_id }) } + pub async fn set_readonly_gracefully( + &self, + region_id: RegionId, + ) -> Result { + match self.inner.region_map.get(®ion_id) { + Some(engine) => Ok(engine + .set_readonly_gracefully(region_id) + .await + .with_context(|_| HandleRegionRequestSnafu { region_id })?), + None => Ok(SetReadonlyResponse::NotFound), + } + } + pub fn runtime(&self) -> Arc { self.inner.runtime.clone() } From 52eaed473a4a70c96175e350e9155864c23845bc Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 1 Dec 2023 08:07:50 +0000 Subject: [PATCH 2/3] test: add tests for RegionHeartbeatResponseHandler --- src/common/meta/src/heartbeat/handler.rs | 2 +- src/common/meta/src/heartbeat/mailbox.rs | 2 +- src/datanode/Cargo.toml | 2 + src/datanode/src/heartbeat/handler.rs | 289 ++++++++++++++++++++++- src/mito2/src/test_util.rs | 6 +- 5 files changed, 290 insertions(+), 11 deletions(-) diff --git a/src/common/meta/src/heartbeat/handler.rs b/src/common/meta/src/heartbeat/handler.rs index 9b24955af3ea..d80e3b8486c9 100644 --- a/src/common/meta/src/heartbeat/handler.rs +++ b/src/common/meta/src/heartbeat/handler.rs @@ -37,7 +37,7 @@ pub struct HeartbeatResponseHandlerContext { /// HandleControl /// /// Controls process of handling heartbeat response. -#[derive(PartialEq)] +#[derive(Debug, PartialEq)] pub enum HandleControl { Continue, Done, diff --git a/src/common/meta/src/heartbeat/mailbox.rs b/src/common/meta/src/heartbeat/mailbox.rs index 944a423c4c7f..538a81b72ca0 100644 --- a/src/common/meta/src/heartbeat/mailbox.rs +++ b/src/common/meta/src/heartbeat/mailbox.rs @@ -30,8 +30,8 @@ pub struct MessageMeta { pub from: String, } -#[cfg(test)] impl MessageMeta { + #[cfg(any(test, feature = "testing"))] pub fn new_test(id: u64, subject: &str, to: &str, from: &str) -> Self { MessageMeta { id, diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 323293ff0ee1..b3bc70ed97fd 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -77,7 +77,9 @@ uuid.workspace = true [dev-dependencies] axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" } client.workspace = true +common-meta = { workspace = true, features = ["testing"] } common-query.workspace = true common-test-util.workspace = true datafusion-common.workspace = true +mito2 = { workspace = true, features = ["test"] } session.workspace = true diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index e7dbc27ba86e..cd1d692a4dd7 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -29,6 +29,7 @@ use store_api::region_engine::SetReadonlyResponse; use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionRequest}; use store_api::storage::RegionId; +use crate::error; use crate::region_server::RegionServer; /// Handler for [Instruction::OpenRegion] and [Instruction::CloseRegion]. #[derive(Clone)] @@ -41,10 +42,12 @@ pub type InstructionHandler = Box BoxFuture<'static, InstructionReply> + Send>; impl RegionHeartbeatResponseHandler { + /// Returns the [RegionHeartbeatResponseHandler]. pub fn new(region_server: RegionServer) -> Self { Self { region_server } } + /// Builds the [InstructionHandler]. fn build_handler(instruction: Instruction) -> MetaResult { match instruction { Instruction::OpenRegion(OpenRegion { @@ -76,13 +79,22 @@ impl RegionHeartbeatResponseHandler { let request = RegionRequest::Close(RegionCloseRequest {}); let result = region_server.handle_request(region_id, request).await; - let success = result.is_ok(); - let error = result.as_ref().map_err(|e| e.to_string()).err(); - - InstructionReply::CloseRegion(SimpleReply { - result: success, - error, - }) + match result { + Ok(_) => InstructionReply::CloseRegion(SimpleReply { + result: true, + error: None, + }), + Err(error::Error::RegionNotFound { .. }) => { + InstructionReply::CloseRegion(SimpleReply { + result: true, + error: None, + }) + } + Err(err) => InstructionReply::CloseRegion(SimpleReply { + result: false, + error: Some(err.to_string()), + }), + } }) })), Instruction::DowngradeRegion(DowngradeRegion { region_id }) => { @@ -157,3 +169,266 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { Ok(HandleControl::Done) } } + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::collections::HashMap; + use std::sync::Arc; + + use common_meta::heartbeat::mailbox::{ + HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta, + }; + use mito2::config::MitoConfig; + use mito2::engine::MITO_ENGINE_NAME; + use mito2::test_util::{CreateRequestBuilder, TestEnv}; + use store_api::region_request::RegionRequest; + use store_api::storage::RegionId; + use tokio::sync::mpsc::{self, Receiver}; + + use super::*; + use crate::error; + use crate::tests::mock_region_server; + + pub struct HeartbeatResponseTestEnv { + mailbox: MailboxRef, + receiver: Receiver<(MessageMeta, InstructionReply)>, + } + + impl HeartbeatResponseTestEnv { + pub fn new() -> Self { + let (tx, rx) = mpsc::channel(8); + let mailbox = Arc::new(HeartbeatMailbox::new(tx)); + + HeartbeatResponseTestEnv { + mailbox, + receiver: rx, + } + } + + pub fn create_handler_ctx( + &self, + incoming_message: IncomingMessage, + ) -> HeartbeatResponseHandlerContext { + HeartbeatResponseHandlerContext { + mailbox: self.mailbox.clone(), + response: Default::default(), + incoming_message: Some(incoming_message), + } + } + } + + fn close_region_instruction(region_id: RegionId) -> Instruction { + Instruction::CloseRegion(RegionIdent { + table_id: region_id.table_id(), + region_number: region_id.region_number(), + cluster_id: 1, + datanode_id: 2, + engine: MITO_ENGINE_NAME.to_string(), + }) + } + + fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction { + Instruction::OpenRegion(OpenRegion::new( + RegionIdent { + table_id: region_id.table_id(), + region_number: region_id.region_number(), + cluster_id: 1, + datanode_id: 2, + engine: MITO_ENGINE_NAME.to_string(), + }, + path, + HashMap::new(), + )) + } + + #[tokio::test] + async fn test_close_region() { + common_telemetry::init_default_ut_logging(); + + let mut region_server = mock_region_server(); + let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + + let mut engine_env = TestEnv::with_prefix("close-region"); + let engine = engine_env.create_engine(MitoConfig::default()).await; + region_server.register_engine(Arc::new(engine)); + let region_id = RegionId::new(1024, 1); + + let builder = CreateRequestBuilder::new(); + let create_req = builder.build(); + region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap(); + + let mut heartbeat_env = HeartbeatResponseTestEnv::new(); + + // Should be ok, if we try to close it twice. + for _ in 0..2 { + let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0"); + let instruction = close_region_instruction(region_id); + + let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); + let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); + assert_matches!(control, HandleControl::Done); + + let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); + + if let InstructionReply::CloseRegion(reply) = reply { + assert!(reply.result); + assert!(reply.error.is_none()); + } else { + unreachable!() + } + + assert_matches!( + region_server.set_writable(region_id, true).unwrap_err(), + error::Error::RegionNotFound { .. } + ); + } + } + + #[tokio::test] + async fn test_open_region_ok() { + common_telemetry::init_default_ut_logging(); + + let mut region_server = mock_region_server(); + let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + + let mut engine_env = TestEnv::with_prefix("open-region"); + let engine = engine_env.create_engine(MitoConfig::default()).await; + region_server.register_engine(Arc::new(engine)); + let region_id = RegionId::new(1024, 1); + + let builder = CreateRequestBuilder::new(); + let mut create_req = builder.build(); + let storage_path = "test"; + create_req.region_dir = region_dir(storage_path, region_id); + + region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap(); + + region_server + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + let mut heartbeat_env = HeartbeatResponseTestEnv::new(); + + // Should be ok, if we try to open it twice. + for _ in 0..2 { + let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0"); + let instruction = open_region_instruction(region_id, storage_path); + + let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); + let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); + assert_matches!(control, HandleControl::Done); + + let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); + + if let InstructionReply::OpenRegion(reply) = reply { + assert!(reply.result); + assert!(reply.error.is_none()); + } else { + unreachable!() + } + } + } + + #[tokio::test] + async fn test_open_not_exists_region() { + common_telemetry::init_default_ut_logging(); + + let mut region_server = mock_region_server(); + let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + + let mut engine_env = TestEnv::with_prefix("open-not-exists-region"); + let engine = engine_env.create_engine(MitoConfig::default()).await; + region_server.register_engine(Arc::new(engine)); + let region_id = RegionId::new(1024, 1); + let storage_path = "test"; + + let mut heartbeat_env = HeartbeatResponseTestEnv::new(); + + let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0"); + let instruction = open_region_instruction(region_id, storage_path); + + let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); + let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); + assert_matches!(control, HandleControl::Done); + + let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); + + if let InstructionReply::OpenRegion(reply) = reply { + assert!(!reply.result); + assert!(reply.error.is_some()); + } else { + unreachable!() + } + } + + #[tokio::test] + async fn test_downgrade_region() { + common_telemetry::init_default_ut_logging(); + + let mut region_server = mock_region_server(); + let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + + let mut engine_env = TestEnv::with_prefix("downgrade-region"); + let engine = engine_env.create_engine(MitoConfig::default()).await; + region_server.register_engine(Arc::new(engine)); + let region_id = RegionId::new(1024, 1); + + let builder = CreateRequestBuilder::new(); + let mut create_req = builder.build(); + let storage_path = "test"; + create_req.region_dir = region_dir(storage_path, region_id); + + region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap(); + + let mut heartbeat_env = HeartbeatResponseTestEnv::new(); + + // Should be ok, if we try to downgrade it twice. + for _ in 0..2 { + let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0"); + let instruction = Instruction::DowngradeRegion(DowngradeRegion { region_id }); + + let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); + let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); + assert_matches!(control, HandleControl::Done); + + let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); + + if let InstructionReply::DowngradeRegion(reply) = reply { + assert!(reply.exists); + assert!(reply.error.is_none()); + assert_eq!(reply.last_entry_id.unwrap(), 0); + } else { + unreachable!() + } + } + + // Downgrades a not exists region. + let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0"); + let instruction = Instruction::DowngradeRegion(DowngradeRegion { + region_id: RegionId::new(2048, 1), + }); + let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); + let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); + assert_matches!(control, HandleControl::Done); + + let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); + + if let InstructionReply::DowngradeRegion(reply) = reply { + assert!(!reply.exists); + assert!(reply.error.is_none()); + assert!(reply.last_entry_id.is_none()); + } else { + unreachable!() + } + } +} diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 4039045a3975..c0469a7ac97f 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -50,7 +50,7 @@ use store_api::storage::{ColumnId, RegionId}; use crate::config::MitoConfig; use crate::engine::listener::EventListenerRef; -use crate::engine::MitoEngine; +use crate::engine::{MitoEngine, MITO_ENGINE_NAME}; use crate::error::Result; use crate::flush::{WriteBufferManager, WriteBufferManagerRef}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; @@ -278,6 +278,7 @@ pub struct CreateRequestBuilder { options: HashMap, primary_key: Option>, all_not_null: bool, + engine: String, } impl Default for CreateRequestBuilder { @@ -289,6 +290,7 @@ impl Default for CreateRequestBuilder { options: HashMap::new(), primary_key: None, all_not_null: false, + engine: MITO_ENGINE_NAME.to_string(), } } } @@ -378,7 +380,7 @@ impl CreateRequestBuilder { RegionCreateRequest { // We use empty engine name as we already locates the engine. - engine: String::new(), + engine: self.engine.to_string(), column_metadatas, primary_key: self.primary_key.clone().unwrap_or(primary_key), options: self.options.clone(), From 5e6d6cece8a8a31cfe7ba514db3b5ad3c707448e Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 1 Dec 2023 08:08:45 +0000 Subject: [PATCH 3/3] refactor: remove unused code --- src/datanode/src/lib.rs | 1 - src/datanode/src/tests.rs | 52 --------------------------------------- 2 files changed, 53 deletions(-) diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 2a1265baa786..43e8ee8c2e55 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -26,5 +26,4 @@ pub mod metrics; pub mod region_server; mod store; #[cfg(test)] -#[allow(dead_code)] mod tests; diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 76e3bd1bdf2b..7a5b31771cb1 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -13,19 +13,12 @@ // limitations under the License. use std::any::Any; -use std::collections::HashMap; use std::sync::Arc; -use api::v1::meta::HeartbeatResponse; use async_trait::async_trait; use common_error::ext::BoxedError; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::FunctionRef; -use common_meta::heartbeat::handler::{ - HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, -}; -use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; -use common_meta::instruction::{Instruction, OpenRegion, RegionIdent}; use common_query::prelude::ScalarUdf; use common_query::Output; use common_recordbatch::SendableRecordBatchStream; @@ -46,51 +39,6 @@ use tokio::sync::mpsc::{Receiver, Sender}; use crate::event_listener::NoopRegionServerEventListener; use crate::region_server::RegionServer; -pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> MessageMeta { - MessageMeta { - id, - subject: subject.to_string(), - to: to.to_string(), - from: from.to_string(), - } -} - -async fn handle_instruction( - executor: Arc, - mailbox: Arc, - instruction: Instruction, -) { - let response = HeartbeatResponse::default(); - let mut ctx: HeartbeatResponseHandlerContext = - HeartbeatResponseHandlerContext::new(mailbox, response); - ctx.incoming_message = Some((test_message_meta(1, "hi", "foo", "bar"), instruction)); - executor.handle(ctx).await.unwrap(); -} - -fn close_region_instruction() -> Instruction { - Instruction::CloseRegion(RegionIdent { - table_id: 1024, - region_number: 0, - cluster_id: 1, - datanode_id: 2, - engine: "mito2".to_string(), - }) -} - -fn open_region_instruction() -> Instruction { - Instruction::OpenRegion(OpenRegion::new( - RegionIdent { - table_id: 1024, - region_number: 0, - cluster_id: 1, - datanode_id: 2, - engine: "mito2".to_string(), - }, - "path/dir", - HashMap::new(), - )) -} - pub struct MockQueryEngine; #[async_trait]