diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index abd60a4947cf..c081f0a94e1b 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -32,6 +32,9 @@ use crate::pubsub::Message; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("The region migration procedure aborted, reason: {}", reason))] + MigrationAbort { location: Location, reason: String }, + #[snafu(display( "Another procedure is opening the region: {} on peer: {}", region_id, @@ -665,7 +668,8 @@ impl ErrorExt for Error { | Error::Txn { .. } | Error::TableIdChanged { .. } | Error::RegionOpeningRace { .. } - | Error::RegionRouteNotFound { .. } => StatusCode::Unexpected, + | Error::RegionRouteNotFound { .. } + | Error::MigrationAbort { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::InvalidateTableCache { source, .. } => source.status_code(), Error::RequestDatanode { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 967cddcaa805..e54740c34294 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -298,6 +298,19 @@ impl HeartbeatMailbox { serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload }) } + /// Parses the [Instruction] from [MailboxMessage]. + #[cfg(test)] + pub(crate) fn json_instruction(msg: &MailboxMessage) -> Result { + let Payload::Json(payload) = + msg.payload + .as_ref() + .with_context(|| UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: format!("empty payload, msg: {msg:?}"), + })?; + serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload }) + } + pub fn create(pushers: Pushers, sequence: Sequence) -> MailboxRef { let mailbox = Arc::new(Self::new(pushers, sequence)); diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index d5ae7235c79d..4edc69bc3a59 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -13,6 +13,7 @@ // limitations under the License. pub(crate) mod downgrade_leader_region; +pub(crate) mod migration_abort; pub(crate) mod migration_end; pub(crate) mod migration_start; pub(crate) mod open_candidate_region; @@ -25,6 +26,8 @@ use std::any::Any; use std::fmt::Debug; use std::time::Duration; +use api::v1::meta::MailboxMessage; +use common_meta::instruction::Instruction; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; @@ -43,7 +46,7 @@ use self::migration_start::RegionMigrationStart; use crate::error::{self, Error, Result}; use crate::procedure::utils::region_lock_key; use crate::region::lease_keeper::{OpeningRegionGuard, OpeningRegionKeeperRef}; -use crate::service::mailbox::MailboxRef; +use crate::service::mailbox::{BroadcastChannel, MailboxRef}; /// It's shared in each step and available even after recovering. /// @@ -235,6 +238,27 @@ impl Context { pub fn region_id(&self) -> RegionId { self.persistent_ctx.region_id } + + /// Broadcasts the invalidate table cache message. + pub async fn invalidate_table_cache(&self) -> Result<()> { + let table_id = self.region_id().table_id(); + let instruction = Instruction::InvalidateTableIdCache(table_id); + + let msg = &MailboxMessage::json_message( + "Invalidate Table Cache", + &format!("Metasrv@{}", self.server_addr()), + "Frontend broadcast", + common_time::util::current_time_millis(), + &instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: instruction.to_string(), + })?; + + self.mailbox + .broadcast(&BroadcastChannel::Frontend, msg) + .await + } } #[async_trait::async_trait] @@ -346,7 +370,9 @@ mod tests { use super::migration_end::RegionMigrationEnd; use super::*; + use crate::handler::HeartbeatMailbox; use crate::procedure::region_migration::test_util::TestingEnv; + use crate::service::mailbox::Channel; fn new_persistent_context() -> PersistentContext { PersistentContext { @@ -446,4 +472,29 @@ mod tests { assert_eq!(procedure.context.persistent_ctx.cluster_id, 2); assert_matches!(status.unwrap(), Status::Done); } + + #[tokio::test] + async fn test_broadcast_invalidate_table_cache() { + let mut env = TestingEnv::new(); + let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)); + let ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + + // No receivers. + ctx.invalidate_table_cache().await.unwrap(); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Frontend(1), tx) + .await; + + ctx.invalidate_table_cache().await.unwrap(); + + let resp = rx.recv().await.unwrap().unwrap(); + let msg = resp.mailbox_message.unwrap(); + + let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap(); + assert_matches!(instruction, Instruction::InvalidateTableIdCache(1024)); + } } diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 2c6a25fa6630..51bed22f1260 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -277,7 +277,7 @@ mod tests { let (tx, rx) = tokio::sync::mpsc::channel(1); mailbox_ctx - .insert_heartbeat_response_receiver(from_peer_id, tx) + .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx) .await; drop(rx); @@ -306,7 +306,7 @@ mod tests { let (tx, rx) = tokio::sync::mpsc::channel(1); mailbox_ctx - .insert_heartbeat_response_receiver(from_peer_id, tx) + .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx) .await; // Sends an incorrect reply. @@ -336,7 +336,7 @@ mod tests { let (tx, rx) = tokio::sync::mpsc::channel(1); mailbox_ctx - .insert_heartbeat_response_receiver(from_peer_id, tx) + .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx) .await; send_mock_reply(mailbox, rx, |id| { @@ -367,7 +367,7 @@ mod tests { let (tx, rx) = tokio::sync::mpsc::channel(1); mailbox_ctx - .insert_heartbeat_response_receiver(from_peer_id, tx) + .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx) .await; send_mock_reply(mailbox, rx, |id| { @@ -404,7 +404,7 @@ mod tests { let (tx, mut rx) = tokio::sync::mpsc::channel(1); mailbox_ctx - .insert_heartbeat_response_receiver(from_peer_id, tx) + .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx) .await; common_runtime::spawn_bg(async move { @@ -453,7 +453,7 @@ mod tests { let (tx, mut rx) = tokio::sync::mpsc::channel(1); mailbox_ctx - .insert_heartbeat_response_receiver(from_peer_id, tx) + .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx) .await; common_runtime::spawn_bg(async move { @@ -496,7 +496,7 @@ mod tests { let (tx, rx) = tokio::sync::mpsc::channel(1); mailbox_ctx - .insert_heartbeat_response_receiver(from_peer_id, tx) + .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx) .await; send_mock_reply(mailbox, rx, |id| { diff --git a/src/meta-srv/src/procedure/region_migration/migration_abort.rs b/src/meta-srv/src/procedure/region_migration/migration_abort.rs new file mode 100644 index 000000000000..c47864ae5bab --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/migration_abort.rs @@ -0,0 +1,54 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_procedure::Status; +use serde::{Deserialize, Serialize}; + +use crate::error::{self, Result}; +use crate::procedure::region_migration::{Context, State}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct RegionMigrationAbort { + reason: String, +} + +impl RegionMigrationAbort { + /// Returns the [RegionMigrationAbort] with `reason`. + pub fn new(reason: &str) -> Self { + Self { + reason: reason.to_string(), + } + } +} + +#[async_trait::async_trait] +#[typetag::serde] +impl State for RegionMigrationAbort { + async fn next(&mut self, _: &mut Context) -> Result> { + error::MigrationAbortSnafu { + reason: &self.reason, + } + .fail() + } + + fn status(&self) -> Status { + Status::Done + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 2e126625347f..33b465f46b35 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -304,7 +304,7 @@ mod tests { let (tx, rx) = tokio::sync::mpsc::channel(1); mailbox_ctx - .insert_heartbeat_response_receiver(to_peer_id, tx) + .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx) .await; // Sends an incorrect reply. @@ -337,7 +337,7 @@ mod tests { let (tx, rx) = tokio::sync::mpsc::channel(1); mailbox_ctx - .insert_heartbeat_response_receiver(to_peer_id, tx) + .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx) .await; // Sends an timeout error. @@ -372,7 +372,7 @@ mod tests { let (tx, rx) = tokio::sync::mpsc::channel(1); mailbox_ctx - .insert_heartbeat_response_receiver(to_peer_id, tx) + .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx) .await; send_mock_reply(mailbox, rx, |id| { @@ -424,7 +424,7 @@ mod tests { let (tx, rx) = tokio::sync::mpsc::channel(1); mailbox_ctx - .insert_heartbeat_response_receiver(to_peer_id, tx) + .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx) .await; send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None))); diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index cc3779b8f54d..753234985b1b 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -21,7 +21,6 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_meta::sequence::Sequence; -use common_meta::DatanodeId; use common_procedure::{Context as ProcedureContext, ProcedureId}; use common_procedure_test::MockContextProvider; use common_time::util::current_time_millis; @@ -55,10 +54,10 @@ impl MailboxContext { /// Inserts a pusher for `datanode_id` pub async fn insert_heartbeat_response_receiver( &mut self, - datanode_id: DatanodeId, + channel: Channel, tx: Sender>, ) { - let pusher_id = Channel::Datanode(datanode_id).pusher_id(); + let pusher_id = channel.pusher_id(); let pusher = Pusher::new(tx, &RequestHeader::default()); let _ = self.pushers.insert(pusher_id, pusher).await; } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata.rs b/src/meta-srv/src/procedure/region_migration/update_metadata.rs index ba3092548efb..7d56c51390ae 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata.rs @@ -13,12 +13,15 @@ // limitations under the License. pub(crate) mod downgrade_leader_region; +pub(crate) mod rollback_downgraded_region; pub(crate) mod upgrade_candidate_region; use std::any::Any; +use common_telemetry::warn; use serde::{Deserialize, Serialize}; +use super::migration_abort::RegionMigrationAbort; use super::migration_end::RegionMigrationEnd; use crate::error::Result; use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; @@ -31,6 +34,8 @@ pub enum UpdateMetadata { Downgrade, /// Upgrade the candidate region. Upgrade, + /// Rollback the downgraded leader region. + Rollback, } #[async_trait::async_trait] @@ -46,9 +51,21 @@ impl State for UpdateMetadata { UpdateMetadata::Upgrade => { self.upgrade_candidate_region(ctx).await?; - // TODO(weny): invalidate fe cache. + if let Err(err) = ctx.invalidate_table_cache().await { + warn!("Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}"); + }; Ok(Box::new(RegionMigrationEnd)) } + UpdateMetadata::Rollback => { + self.rollback_downgraded_region(ctx).await?; + + if let Err(err) = ctx.invalidate_table_cache().await { + warn!("Failed to broadcast the invalidate table cache message during the rollback, error: {err:?}"); + }; + Ok(Box::new(RegionMigrationAbort::new( + "Failed to upgrade the candidate region.", + ))) + } } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs new file mode 100644 index 000000000000..b9aed08ad678 --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -0,0 +1,241 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::ResultExt; + +use crate::error::{self, Result}; +use crate::procedure::region_migration::update_metadata::UpdateMetadata; +use crate::procedure::region_migration::Context; + +impl UpdateMetadata { + /// Rollbacks the downgraded leader region if the candidate region is unreachable. + /// + /// Abort(non-retry): + /// - TableRoute is not found. + /// + /// Retry: + /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue). + /// - Failed to retrieve the metadata of table. + pub async fn rollback_downgraded_region(&self, ctx: &mut Context) -> Result<()> { + let table_metadata_manager = ctx.table_metadata_manager.clone(); + let region_id = ctx.region_id(); + let table_id = region_id.table_id(); + let current_table_route_value = ctx.get_table_route_value().await?; + + if let Err(err) = table_metadata_manager + .update_leader_region_status(table_id, current_table_route_value, |route| { + if route.region.id == region_id { + Some(None) + } else { + None + } + }) + .await + .context(error::TableMetadataManagerSnafu) + { + debug_assert!(ctx.remove_table_route_value()); + return error::RetryLaterSnafu { + reason: format!("Failed to update the table route during the rollback downgraded leader region, error: {err}") + }.fail(); + } + + debug_assert!(ctx.remove_table_route_value()); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use common_meta::key::test_utils::new_test_table_info; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute, RegionStatus}; + use store_api::storage::RegionId; + + use crate::error::Error; + use crate::procedure::region_migration::migration_abort::RegionMigrationAbort; + use crate::procedure::region_migration::test_util::{self, TestingEnv}; + use crate::procedure::region_migration::update_metadata::UpdateMetadata; + use crate::procedure::region_migration::{ContextFactory, PersistentContext, State}; + + fn new_persistent_context() -> PersistentContext { + test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)) + } + + #[tokio::test] + async fn test_table_route_is_not_found_error() { + let state = UpdateMetadata::Rollback; + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(); + let mut ctx = env.context_factory().new_context(persistent_context); + + let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err(); + + assert_matches!(err, Error::TableRouteNotFound { .. }); + + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_update_table_route_with_retry() { + let state = UpdateMetadata::Rollback; + let persistent_context = new_persistent_context(); + let from_peer = persistent_context.from_peer.clone(); + + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let table_id = ctx.region_id().table_id(); + + let table_info = new_test_table_info(1024, vec![1, 2, 3]).into(); + let region_routes = vec![ + RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(from_peer.clone()), + leader_status: Some(RegionStatus::Downgraded), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(RegionId::new(1024, 2)), + leader_peer: Some(Peer::empty(4)), + leader_status: Some(RegionStatus::Downgraded), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(RegionId::new(1024, 3)), + leader_peer: Some(Peer::empty(5)), + ..Default::default() + }, + ]; + + let expected_region_routes = { + let mut region_routes = region_routes.clone(); + region_routes[0].leader_status = None; + region_routes[1].leader_status = None; + region_routes + }; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let old_table_route = table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .unwrap() + .unwrap(); + + // modifies the table route. + table_metadata_manager + .update_leader_region_status(table_id, &old_table_route, |route| { + if route.region.id == RegionId::new(1024, 2) { + Some(None) + } else { + None + } + }) + .await + .unwrap(); + + ctx.volatile_ctx.table_route = Some(old_table_route); + + let err = state + .rollback_downgraded_region(&mut ctx) + .await + .unwrap_err(); + assert!(ctx.volatile_ctx.table_route.is_none()); + assert_matches!(err, Error::RetryLater { .. }); + assert!(err.is_retryable()); + assert!(err.to_string().contains("Failed to update the table route")); + + state.rollback_downgraded_region(&mut ctx).await.unwrap(); + + let region_routes = table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .unwrap() + .unwrap() + .into_inner() + .region_routes; + assert_eq!(expected_region_routes, region_routes); + } + + #[tokio::test] + async fn test_next_migration_end_state() { + let mut state = Box::new(UpdateMetadata::Rollback); + let persistent_context = new_persistent_context(); + let from_peer = persistent_context.from_peer.clone(); + + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let table_id = ctx.region_id().table_id(); + + let table_info = new_test_table_info(1024, vec![1, 2, 3]).into(); + let region_routes = vec![ + RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(from_peer.clone()), + leader_status: Some(RegionStatus::Downgraded), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(RegionId::new(1024, 2)), + leader_peer: Some(Peer::empty(4)), + leader_status: Some(RegionStatus::Downgraded), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(RegionId::new(1024, 3)), + leader_peer: Some(Peer::empty(5)), + ..Default::default() + }, + ]; + + let expected_region_routes = { + let mut region_routes = region_routes.clone(); + region_routes[0].leader_status = None; + region_routes + }; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let next = state.next(&mut ctx).await.unwrap(); + + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(ctx.volatile_ctx.table_route.is_none()); + + let region_routes = table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .unwrap() + .unwrap() + .into_inner() + .region_routes; + assert_eq!(expected_region_routes, region_routes); + } +}