Skip to content

Commit

Permalink
test: add tests for RegionHeartbeatResponseHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 1, 2023
1 parent 706b6f6 commit 1e43a98
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/common/meta/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct HeartbeatResponseHandlerContext {
/// HandleControl
///
/// Controls process of handling heartbeat response.
#[derive(PartialEq)]
#[derive(Debug, PartialEq)]
pub enum HandleControl {
Continue,
Done,
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/heartbeat/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub struct MessageMeta {
pub from: String,
}

#[cfg(test)]
impl MessageMeta {
#[cfg(any(test, feature = "testing"))]
pub fn new_test(id: u64, subject: &str, to: &str, from: &str) -> Self {
MessageMeta {
id,
Expand Down
2 changes: 2 additions & 0 deletions src/datanode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ uuid.workspace = true
[dev-dependencies]
axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }
client.workspace = true
common-meta = { workspace = true, features = ["testing"] }
common-query.workspace = true
common-test-util.workspace = true
datafusion-common.workspace = true
mito2 = { workspace = true, features = ["test"] }
session.workspace = true
289 changes: 282 additions & 7 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use store_api::region_engine::SetReadonlyResponse;
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;

use crate::error;
use crate::region_server::RegionServer;
/// Handler for [Instruction::OpenRegion] and [Instruction::CloseRegion].
#[derive(Clone)]
Expand All @@ -41,10 +42,12 @@ pub type InstructionHandler =
Box<dyn FnOnce(RegionServer) -> BoxFuture<'static, InstructionReply> + Send>;

impl RegionHeartbeatResponseHandler {
/// Returns the [RegionHeartbeatResponseHandler].
pub fn new(region_server: RegionServer) -> Self {
Self { region_server }
}

/// Builds the [InstructionHandler].
fn build_handler(instruction: Instruction) -> MetaResult<InstructionHandler> {
match instruction {
Instruction::OpenRegion(OpenRegion {
Expand Down Expand Up @@ -76,13 +79,22 @@ impl RegionHeartbeatResponseHandler {
let request = RegionRequest::Close(RegionCloseRequest {});
let result = region_server.handle_request(region_id, request).await;

let success = result.is_ok();
let error = result.as_ref().map_err(|e| e.to_string()).err();

InstructionReply::CloseRegion(SimpleReply {
result: success,
error,
})
match result {
Ok(_) => InstructionReply::CloseRegion(SimpleReply {
result: true,
error: None,
}),
Err(error::Error::RegionNotFound { .. }) => {
InstructionReply::CloseRegion(SimpleReply {
result: true,
error: None,
})
}
Err(err) => InstructionReply::CloseRegion(SimpleReply {
result: false,
error: Some(err.to_string()),
}),
}
})
})),
Instruction::DowngradeRegion(DowngradeRegion { region_id }) => {
Expand Down Expand Up @@ -154,3 +166,266 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
Ok(HandleControl::Done)
}
}

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;

use common_meta::heartbeat::mailbox::{
HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
};
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{self, Receiver};

use super::*;
use crate::error;
use crate::tests::mock_region_server;

pub struct HeartbeatResponseTestEnv {
mailbox: MailboxRef,
receiver: Receiver<(MessageMeta, InstructionReply)>,
}

impl HeartbeatResponseTestEnv {
pub fn new() -> Self {
let (tx, rx) = mpsc::channel(8);
let mailbox = Arc::new(HeartbeatMailbox::new(tx));

HeartbeatResponseTestEnv {
mailbox,
receiver: rx,
}
}

pub fn create_handler_ctx(
&self,
incoming_message: IncomingMessage,
) -> HeartbeatResponseHandlerContext {
HeartbeatResponseHandlerContext {
mailbox: self.mailbox.clone(),
response: Default::default(),
incoming_message: Some(incoming_message),
}
}
}

fn close_region_instruction(region_id: RegionId) -> Instruction {
Instruction::CloseRegion(RegionIdent {
table_id: region_id.table_id(),
region_number: region_id.region_number(),
cluster_id: 1,
datanode_id: 2,
engine: MITO_ENGINE_NAME.to_string(),
})
}

fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
Instruction::OpenRegion(OpenRegion::new(
RegionIdent {
table_id: region_id.table_id(),
region_number: region_id.region_number(),
cluster_id: 1,
datanode_id: 2,
engine: MITO_ENGINE_NAME.to_string(),
},
path,
HashMap::new(),
))
}

#[tokio::test]
async fn test_close_region() {
common_telemetry::init_default_ut_logging();

let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());

let mut engine_env = TestEnv::with_prefix("close-region");
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);

let builder = CreateRequestBuilder::new();
let create_req = builder.build();
region_server
.handle_request(region_id, RegionRequest::Create(create_req))
.await
.unwrap();

let mut heartbeat_env = HeartbeatResponseTestEnv::new();

// Should be ok, if we try to close it twice.
for _ in 0..2 {
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
let instruction = close_region_instruction(region_id);

let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);

let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

if let InstructionReply::CloseRegion(reply) = reply {
assert!(reply.result);
assert!(reply.error.is_none());
} else {
unreachable!()
}

assert_matches!(
region_server.set_writable(region_id, true).unwrap_err(),
error::Error::RegionNotFound { .. }
);
}
}

#[tokio::test]
async fn test_open_region_ok() {
common_telemetry::init_default_ut_logging();

let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());

let mut engine_env = TestEnv::with_prefix("open-region");
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);

let builder = CreateRequestBuilder::new();
let mut create_req = builder.build();
let storage_path = "test";
create_req.region_dir = region_dir(storage_path, region_id);

region_server
.handle_request(region_id, RegionRequest::Create(create_req))
.await
.unwrap();

region_server
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();
let mut heartbeat_env = HeartbeatResponseTestEnv::new();

// Should be ok, if we try to open it twice.
for _ in 0..2 {
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
let instruction = open_region_instruction(region_id, storage_path);

let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);

let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

if let InstructionReply::OpenRegion(reply) = reply {
assert!(reply.result);
assert!(reply.error.is_none());
} else {
unreachable!()
}
}
}

#[tokio::test]
async fn test_open_not_exists_region() {
common_telemetry::init_default_ut_logging();

let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());

let mut engine_env = TestEnv::with_prefix("open-not-exists-region");
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);
let storage_path = "test";

let mut heartbeat_env = HeartbeatResponseTestEnv::new();

let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
let instruction = open_region_instruction(region_id, storage_path);

let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);

let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

if let InstructionReply::OpenRegion(reply) = reply {
assert!(!reply.result);
assert!(reply.error.is_some());
} else {
unreachable!()
}
}

#[tokio::test]
async fn test_downgrade_region() {
common_telemetry::init_default_ut_logging();

let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());

let mut engine_env = TestEnv::with_prefix("downgrade-region");
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);

let builder = CreateRequestBuilder::new();
let mut create_req = builder.build();
let storage_path = "test";
create_req.region_dir = region_dir(storage_path, region_id);

region_server
.handle_request(region_id, RegionRequest::Create(create_req))
.await
.unwrap();

let mut heartbeat_env = HeartbeatResponseTestEnv::new();

// Should be ok, if we try to downgrade it twice.
for _ in 0..2 {
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
let instruction = Instruction::DowngradeRegion(DowngradeRegion { region_id });

let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);

let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(reply.exists);
assert!(reply.error.is_none());
assert_eq!(reply.last_entry_id.unwrap(), 0);
} else {
unreachable!()
}
}

// Downgrades a not exists region.
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id: RegionId::new(2048, 1),
});
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);

let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(!reply.exists);
assert!(reply.error.is_none());
assert!(reply.last_entry_id.is_none());
} else {
unreachable!()
}
}
}
6 changes: 4 additions & 2 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use store_api::storage::{ColumnId, RegionId};

use crate::config::MitoConfig;
use crate::engine::listener::EventListenerRef;
use crate::engine::MitoEngine;
use crate::engine::{MitoEngine, MITO_ENGINE_NAME};
use crate::error::Result;
use crate::flush::{WriteBufferManager, WriteBufferManagerRef};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
Expand Down Expand Up @@ -278,6 +278,7 @@ pub struct CreateRequestBuilder {
options: HashMap<String, String>,
primary_key: Option<Vec<ColumnId>>,
all_not_null: bool,
engine: String,
}

impl Default for CreateRequestBuilder {
Expand All @@ -289,6 +290,7 @@ impl Default for CreateRequestBuilder {
options: HashMap::new(),
primary_key: None,
all_not_null: false,
engine: MITO_ENGINE_NAME.to_string(),
}
}
}
Expand Down Expand Up @@ -378,7 +380,7 @@ impl CreateRequestBuilder {

RegionCreateRequest {
// We use empty engine name as we already locates the engine.
engine: String::new(),
engine: self.engine.to_string(),
column_metadatas,
primary_key: self.primary_key.clone().unwrap_or(primary_key),
options: self.options.clone(),
Expand Down

0 comments on commit 1e43a98

Please sign in to comment.