Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add tests for region migration procedure #2857

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ impl TableRouteValue {
version: self.version + 1,
}
}

/// Returns the version.
///
/// For test purpose.
#[cfg(any(tets, feature = "testing"))]
pub fn version(&self) -> u64 {
self.version
}
}

impl TableMetaKey for TableRouteKey {
Expand Down
191 changes: 170 additions & 21 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::service::mailbox::{BroadcastChannel, MailboxRef};
/// It will only be updated/stored after the Red node has succeeded.
///
/// **Notes: Stores with too large data in the context might incur replication overhead.**
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistentContext {
/// The Id of the cluster.
cluster_id: ClusterId,
Expand Down Expand Up @@ -263,14 +263,9 @@ impl Context {

#[async_trait::async_trait]
#[typetag::serde(tag = "region_migration_state")]
trait State: Sync + Send + Debug {
/// Yields the next state.
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>>;

/// Indicates the procedure execution status of the `State`.
fn status(&self) -> Status {
Status::Executing { persist: true }
}
pub(crate) trait State: Sync + Send + Debug {
/// Yields the next [State] and [Status].
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)>;

/// Returns as [Any](std::any::Any).
fn as_any(&self) -> &dyn Any;
Expand Down Expand Up @@ -340,14 +335,16 @@ impl Procedure for RegionMigrationProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;

*state = state.next(&mut self.context).await.map_err(|e| {
let (next, status) = state.next(&mut self.context).await.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
Ok(state.status())

*state = next;
Ok(status)
}

fn dump(&self) -> ProcedureResult<String> {
Expand All @@ -367,20 +364,21 @@ impl Procedure for RegionMigrationProcedure {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;

use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::rpc::router::{Region, RegionRoute};

use super::migration_end::RegionMigrationEnd;
use super::update_metadata::UpdateMetadata;
use super::*;
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::procedure::region_migration::test_util::*;
use crate::service::mailbox::Channel;

fn new_persistent_context() -> PersistentContext {
PersistentContext {
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
cluster_id: 0,
}
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
}

#[test]
Expand Down Expand Up @@ -414,20 +412,30 @@ mod tests {
assert_eq!(expected, serialized);
}

#[test]
fn test_backward_compatibility() {
let persistent_ctx = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
// NOTES: Changes it will break backward compatibility.
let serialized = r#"{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();

assert_eq!(persistent_ctx, deserialized);
}

#[derive(Debug, Serialize, Deserialize, Default)]
pub struct MockState;

#[async_trait::async_trait]
#[typetag::serde]
impl State for MockState {
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
let pc = &mut ctx.persistent_ctx;

if pc.cluster_id == 2 {
Ok(Box::new(RegionMigrationEnd))
Ok((Box::new(RegionMigrationEnd), Status::Done))
} else {
pc.cluster_id += 1;
Ok(Box::new(MockState))
Ok((Box::new(MockState), Status::executing(false)))
}
}

Expand Down Expand Up @@ -497,4 +505,145 @@ mod tests {
let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
assert_matches!(instruction, Instruction::InvalidateTableIdCache(1024));
}

fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec<Step> {
vec![
// MigrationStart
Step::next(
"Should be the update metadata for downgrading",
None,
Assertion::simple(assert_update_metadata_downgrade, assert_need_persist),
),
// UpdateMetadata::Downgrade
Step::next(
"Should be the downgrade leader region",
None,
Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
),
// Downgrade Candidate
Step::next(
"Should be the upgrade candidate region",
Some(mock_datanode_reply(
from_peer_id,
Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
)),
Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
),
// Upgrade Candidate
Step::next(
"Should be the update metadata for upgrading",
Some(mock_datanode_reply(
to_peer_id,
Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
)),
Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
),
// UpdateMetadata::Upgrade
Step::next(
"Should be the region migration end",
None,
Assertion::simple(assert_region_migration_end, assert_done),
),
// RegionMigrationEnd
Step::next(
"Should be the region migration end again",
None,
Assertion::simple(assert_region_migration_end, assert_done),
),
]
}

#[tokio::test]
async fn test_procedure_flow() {
common_telemetry::init_default_ut_logging();

let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
let state = Box::new(RegionMigrationStart);

// The table metadata.
let from_peer_id = persistent_context.from_peer.id;
let to_peer_id = persistent_context.to_peer.id;
let from_peer = persistent_context.from_peer.clone();
let to_peer = persistent_context.to_peer.clone();
let region_id = persistent_context.region_id;
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(from_peer),
follower_peers: vec![to_peer],
..Default::default()
}];

let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
suite.init_table_metadata(table_info, region_routes).await;

let steps = procedure_flow_steps(from_peer_id, to_peer_id);
let timer = Instant::now();

// Run the table tests.
let runner = ProcedureMigrationSuiteRunner::new(suite)
.steps(steps)
.run_once()
.await;

// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);

runner.suite.verify_table_metadata().await;
}

#[tokio::test]
async fn test_procedure_flow_idempotent() {
common_telemetry::init_default_ut_logging();

let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
let state = Box::new(RegionMigrationStart);

// The table metadata.
let from_peer_id = persistent_context.from_peer.id;
let to_peer_id = persistent_context.to_peer.id;
let from_peer = persistent_context.from_peer.clone();
let to_peer = persistent_context.to_peer.clone();
let region_id = persistent_context.region_id;
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(from_peer),
follower_peers: vec![to_peer],
..Default::default()
}];

let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
suite.init_table_metadata(table_info, region_routes).await;

let steps = procedure_flow_steps(from_peer_id, to_peer_id);
let setup_to_latest_persisted_state = Step::setup(
"Sets state to UpdateMetadata::Downgrade",
merge_before_test_fn(vec![
setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))),
Arc::new(reset_volatile_ctx),
]),
);

let steps = [
steps.clone(),
vec![setup_to_latest_persisted_state.clone()],
steps.clone()[1..].to_vec(),
vec![setup_to_latest_persisted_state],
steps.clone()[1..].to_vec(),
]
.concat();
let timer = Instant::now();

// Run the table tests.
let runner = ProcedureMigrationSuiteRunner::new(suite)
.steps(steps.clone())
.run_once()
.await;

// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);

runner.suite.verify_table_metadata().await;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use common_meta::distributed_time_constants::{MAILBOX_RTT_SECS, REGION_LEASE_SEC
use common_meta::instruction::{
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
};
use common_telemetry::warn;
use common_procedure::Status;
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use tokio::time::sleep;
Expand Down Expand Up @@ -53,18 +54,25 @@ impl Default for DowngradeLeaderRegion {
#[async_trait::async_trait]
#[typetag::serde]
impl State for DowngradeLeaderRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
// Ensures the `leader_region_lease_deadline` must exist after recovering.
ctx.volatile_ctx
.set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
self.downgrade_region_with_retry(ctx).await;

// Safety: must exist.
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
info!(
"Running into the downgrade leader slow path, sleep until {:?}",
deadline
);
tokio::time::sleep_until(*deadline).await;
}

Ok(Box::<UpgradeCandidateRegion>::default())
Ok((
Box::<UpgradeCandidateRegion>::default(),
Status::executing(false),
))
}

fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -202,16 +210,14 @@ impl DowngradeLeaderRegion {
mod tests {
use std::assert_matches::assert_matches;

use api::v1::meta::mailbox_message::Payload;
use common_meta::peer::Peer;
use common_time::util::current_time_millis;
use store_api::storage::RegionId;
use tokio::time::Instant;

use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::{
new_close_region_reply, send_mock_reply, TestingEnv,
new_close_region_reply, new_downgrade_region_reply, send_mock_reply, TestingEnv,
};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};

Expand All @@ -224,29 +230,6 @@ mod tests {
}
}

fn new_downgrade_region_reply(
id: u64,
last_entry_id: Option<u64>,
exist: bool,
error: Option<String>,
) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
exists: exist,
error,
}))
.unwrap(),
)),
}
}

#[tokio::test]
async fn test_datanode_is_unreachable() {
let state = DowngradeLeaderRegion::default();
Expand Down Expand Up @@ -504,7 +487,7 @@ mod tests {
});

let timer = Instant::now();
let next = state.next(&mut ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let elapsed = timer.elapsed().as_secs();
assert!(elapsed < REGION_LEASE_SECS / 2);
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,13 @@ impl RegionMigrationAbort {
#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationAbort {
async fn next(&mut self, _: &mut Context) -> Result<Box<dyn State>> {
async fn next(&mut self, _: &mut Context) -> Result<(Box<dyn State>, Status)> {
error::MigrationAbortSnafu {
reason: &self.reason,
}
.fail()
}

fn status(&self) -> Status {
Status::Done
}

fn as_any(&self) -> &dyn Any {
self
}
Expand Down
8 changes: 2 additions & 6 deletions src/meta-srv/src/procedure/region_migration/migration_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@ pub struct RegionMigrationEnd;
#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationEnd {
async fn next(&mut self, _: &mut Context) -> Result<Box<dyn State>> {
Ok(Box::new(RegionMigrationEnd))
}

fn status(&self) -> Status {
Status::Done
async fn next(&mut self, _: &mut Context) -> Result<(Box<dyn State>, Status)> {
Ok((Box::new(RegionMigrationEnd), Status::Done))
}

fn as_any(&self) -> &dyn Any {
Expand Down
Loading
Loading