From 1e43a981e379b67360df3c74e64d29144276cd7f Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 1 Dec 2023 08:07:50 +0000 Subject: [PATCH] test: add tests for RegionHeartbeatResponseHandler --- src/common/meta/src/heartbeat/handler.rs | 2 +- src/common/meta/src/heartbeat/mailbox.rs | 2 +- src/datanode/Cargo.toml | 2 + src/datanode/src/heartbeat/handler.rs | 289 ++++++++++++++++++++++- src/mito2/src/test_util.rs | 6 +- 5 files changed, 290 insertions(+), 11 deletions(-) diff --git a/src/common/meta/src/heartbeat/handler.rs b/src/common/meta/src/heartbeat/handler.rs index 9b24955af3ea..d80e3b8486c9 100644 --- a/src/common/meta/src/heartbeat/handler.rs +++ b/src/common/meta/src/heartbeat/handler.rs @@ -37,7 +37,7 @@ pub struct HeartbeatResponseHandlerContext { /// HandleControl /// /// Controls process of handling heartbeat response. -#[derive(PartialEq)] +#[derive(Debug, PartialEq)] pub enum HandleControl { Continue, Done, diff --git a/src/common/meta/src/heartbeat/mailbox.rs b/src/common/meta/src/heartbeat/mailbox.rs index 944a423c4c7f..538a81b72ca0 100644 --- a/src/common/meta/src/heartbeat/mailbox.rs +++ b/src/common/meta/src/heartbeat/mailbox.rs @@ -30,8 +30,8 @@ pub struct MessageMeta { pub from: String, } -#[cfg(test)] impl MessageMeta { + #[cfg(any(test, feature = "testing"))] pub fn new_test(id: u64, subject: &str, to: &str, from: &str) -> Self { MessageMeta { id, diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 323293ff0ee1..b3bc70ed97fd 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -77,7 +77,9 @@ uuid.workspace = true [dev-dependencies] axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" } client.workspace = true +common-meta = { workspace = true, features = ["testing"] } common-query.workspace = true common-test-util.workspace = true datafusion-common.workspace = true +mito2 = { workspace = true, features = ["test"] } session.workspace = true diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 5c66396bf463..fe7026afa1f6 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -29,6 +29,7 @@ use store_api::region_engine::SetReadonlyResponse; use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionRequest}; use store_api::storage::RegionId; +use crate::error; use crate::region_server::RegionServer; /// Handler for [Instruction::OpenRegion] and [Instruction::CloseRegion]. #[derive(Clone)] @@ -41,10 +42,12 @@ pub type InstructionHandler = Box BoxFuture<'static, InstructionReply> + Send>; impl RegionHeartbeatResponseHandler { + /// Returns the [RegionHeartbeatResponseHandler]. pub fn new(region_server: RegionServer) -> Self { Self { region_server } } + /// Builds the [InstructionHandler]. fn build_handler(instruction: Instruction) -> MetaResult { match instruction { Instruction::OpenRegion(OpenRegion { @@ -76,13 +79,22 @@ impl RegionHeartbeatResponseHandler { let request = RegionRequest::Close(RegionCloseRequest {}); let result = region_server.handle_request(region_id, request).await; - let success = result.is_ok(); - let error = result.as_ref().map_err(|e| e.to_string()).err(); - - InstructionReply::CloseRegion(SimpleReply { - result: success, - error, - }) + match result { + Ok(_) => InstructionReply::CloseRegion(SimpleReply { + result: true, + error: None, + }), + Err(error::Error::RegionNotFound { .. }) => { + InstructionReply::CloseRegion(SimpleReply { + result: true, + error: None, + }) + } + Err(err) => InstructionReply::CloseRegion(SimpleReply { + result: false, + error: Some(err.to_string()), + }), + } }) })), Instruction::DowngradeRegion(DowngradeRegion { region_id }) => { @@ -154,3 +166,266 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { Ok(HandleControl::Done) } } + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::collections::HashMap; + use std::sync::Arc; + + use common_meta::heartbeat::mailbox::{ + HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta, + }; + use mito2::config::MitoConfig; + use mito2::engine::MITO_ENGINE_NAME; + use mito2::test_util::{CreateRequestBuilder, TestEnv}; + use store_api::region_request::RegionRequest; + use store_api::storage::RegionId; + use tokio::sync::mpsc::{self, Receiver}; + + use super::*; + use crate::error; + use crate::tests::mock_region_server; + + pub struct HeartbeatResponseTestEnv { + mailbox: MailboxRef, + receiver: Receiver<(MessageMeta, InstructionReply)>, + } + + impl HeartbeatResponseTestEnv { + pub fn new() -> Self { + let (tx, rx) = mpsc::channel(8); + let mailbox = Arc::new(HeartbeatMailbox::new(tx)); + + HeartbeatResponseTestEnv { + mailbox, + receiver: rx, + } + } + + pub fn create_handler_ctx( + &self, + incoming_message: IncomingMessage, + ) -> HeartbeatResponseHandlerContext { + HeartbeatResponseHandlerContext { + mailbox: self.mailbox.clone(), + response: Default::default(), + incoming_message: Some(incoming_message), + } + } + } + + fn close_region_instruction(region_id: RegionId) -> Instruction { + Instruction::CloseRegion(RegionIdent { + table_id: region_id.table_id(), + region_number: region_id.region_number(), + cluster_id: 1, + datanode_id: 2, + engine: MITO_ENGINE_NAME.to_string(), + }) + } + + fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction { + Instruction::OpenRegion(OpenRegion::new( + RegionIdent { + table_id: region_id.table_id(), + region_number: region_id.region_number(), + cluster_id: 1, + datanode_id: 2, + engine: MITO_ENGINE_NAME.to_string(), + }, + path, + HashMap::new(), + )) + } + + #[tokio::test] + async fn test_close_region() { + common_telemetry::init_default_ut_logging(); + + let mut region_server = mock_region_server(); + let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + + let mut engine_env = TestEnv::with_prefix("close-region"); + let engine = engine_env.create_engine(MitoConfig::default()).await; + region_server.register_engine(Arc::new(engine)); + let region_id = RegionId::new(1024, 1); + + let builder = CreateRequestBuilder::new(); + let create_req = builder.build(); + region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap(); + + let mut heartbeat_env = HeartbeatResponseTestEnv::new(); + + // Should be ok, if we try to close it twice. + for _ in 0..2 { + let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0"); + let instruction = close_region_instruction(region_id); + + let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); + let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); + assert_matches!(control, HandleControl::Done); + + let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); + + if let InstructionReply::CloseRegion(reply) = reply { + assert!(reply.result); + assert!(reply.error.is_none()); + } else { + unreachable!() + } + + assert_matches!( + region_server.set_writable(region_id, true).unwrap_err(), + error::Error::RegionNotFound { .. } + ); + } + } + + #[tokio::test] + async fn test_open_region_ok() { + common_telemetry::init_default_ut_logging(); + + let mut region_server = mock_region_server(); + let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + + let mut engine_env = TestEnv::with_prefix("open-region"); + let engine = engine_env.create_engine(MitoConfig::default()).await; + region_server.register_engine(Arc::new(engine)); + let region_id = RegionId::new(1024, 1); + + let builder = CreateRequestBuilder::new(); + let mut create_req = builder.build(); + let storage_path = "test"; + create_req.region_dir = region_dir(storage_path, region_id); + + region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap(); + + region_server + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + let mut heartbeat_env = HeartbeatResponseTestEnv::new(); + + // Should be ok, if we try to open it twice. + for _ in 0..2 { + let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0"); + let instruction = open_region_instruction(region_id, storage_path); + + let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); + let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); + assert_matches!(control, HandleControl::Done); + + let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); + + if let InstructionReply::OpenRegion(reply) = reply { + assert!(reply.result); + assert!(reply.error.is_none()); + } else { + unreachable!() + } + } + } + + #[tokio::test] + async fn test_open_not_exists_region() { + common_telemetry::init_default_ut_logging(); + + let mut region_server = mock_region_server(); + let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + + let mut engine_env = TestEnv::with_prefix("open-not-exists-region"); + let engine = engine_env.create_engine(MitoConfig::default()).await; + region_server.register_engine(Arc::new(engine)); + let region_id = RegionId::new(1024, 1); + let storage_path = "test"; + + let mut heartbeat_env = HeartbeatResponseTestEnv::new(); + + let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0"); + let instruction = open_region_instruction(region_id, storage_path); + + let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); + let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); + assert_matches!(control, HandleControl::Done); + + let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); + + if let InstructionReply::OpenRegion(reply) = reply { + assert!(!reply.result); + assert!(reply.error.is_some()); + } else { + unreachable!() + } + } + + #[tokio::test] + async fn test_downgrade_region() { + common_telemetry::init_default_ut_logging(); + + let mut region_server = mock_region_server(); + let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + + let mut engine_env = TestEnv::with_prefix("downgrade-region"); + let engine = engine_env.create_engine(MitoConfig::default()).await; + region_server.register_engine(Arc::new(engine)); + let region_id = RegionId::new(1024, 1); + + let builder = CreateRequestBuilder::new(); + let mut create_req = builder.build(); + let storage_path = "test"; + create_req.region_dir = region_dir(storage_path, region_id); + + region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap(); + + let mut heartbeat_env = HeartbeatResponseTestEnv::new(); + + // Should be ok, if we try to downgrade it twice. + for _ in 0..2 { + let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0"); + let instruction = Instruction::DowngradeRegion(DowngradeRegion { region_id }); + + let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); + let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); + assert_matches!(control, HandleControl::Done); + + let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); + + if let InstructionReply::DowngradeRegion(reply) = reply { + assert!(reply.exists); + assert!(reply.error.is_none()); + assert_eq!(reply.last_entry_id.unwrap(), 0); + } else { + unreachable!() + } + } + + // Downgrades a not exists region. + let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0"); + let instruction = Instruction::DowngradeRegion(DowngradeRegion { + region_id: RegionId::new(2048, 1), + }); + let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); + let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); + assert_matches!(control, HandleControl::Done); + + let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); + + if let InstructionReply::DowngradeRegion(reply) = reply { + assert!(!reply.exists); + assert!(reply.error.is_none()); + assert!(reply.last_entry_id.is_none()); + } else { + unreachable!() + } + } +} diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 4039045a3975..c0469a7ac97f 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -50,7 +50,7 @@ use store_api::storage::{ColumnId, RegionId}; use crate::config::MitoConfig; use crate::engine::listener::EventListenerRef; -use crate::engine::MitoEngine; +use crate::engine::{MitoEngine, MITO_ENGINE_NAME}; use crate::error::Result; use crate::flush::{WriteBufferManager, WriteBufferManagerRef}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; @@ -278,6 +278,7 @@ pub struct CreateRequestBuilder { options: HashMap, primary_key: Option>, all_not_null: bool, + engine: String, } impl Default for CreateRequestBuilder { @@ -289,6 +290,7 @@ impl Default for CreateRequestBuilder { options: HashMap::new(), primary_key: None, all_not_null: false, + engine: MITO_ENGINE_NAME.to_string(), } } } @@ -378,7 +380,7 @@ impl CreateRequestBuilder { RegionCreateRequest { // We use empty engine name as we already locates the engine. - engine: String::new(), + engine: self.engine.to_string(), column_metadatas, primary_key: self.primary_key.clone().unwrap_or(primary_key), options: self.options.clone(),