Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add upgrade candidate region step #2829

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/common/meta/src/distributed_time_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,8 @@ pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS;
/// The lease seconds of metasrv leader.
pub const META_LEASE_SECS: u64 = 3;

// In a lease, there are two opportunities for renewal.
/// In a lease, there are two opportunities for renewal.
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;

/// The default mailbox round-trip timeout.
pub const MAILBOX_RTT_SECS: u64 = 1;
49 changes: 49 additions & 0 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl OpenRegion {
/// The instruction of downgrading leader region.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DowngradeRegion {
/// The [RegionId].
pub region_id: RegionId,
}

Expand All @@ -120,20 +121,67 @@ impl Display for DowngradeRegion {
}
}

/// Upgrades a follower region to leader region.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpgradeRegion {
/// The [RegionId].
pub region_id: RegionId,
/// The `last_entry_id` of old leader region.
pub last_entry_id: Option<u64>,
/// The second of waiting for a wal replay.
///
/// `None` stands for no wait,
/// it's helpful to verify whether the leader region is ready.
pub wait_for_replay_secs: Option<u64>,
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Debug, Clone, Serialize, Deserialize, Display)]
pub enum Instruction {
/// Opens a region.
///
/// - Returns true if a specified region exists.
OpenRegion(OpenRegion),
/// Closes a region.
///
/// - Returns true if a specified region does not exist.
CloseRegion(RegionIdent),
/// Upgrades a region.
UpgradeRegion(UpgradeRegion),
/// Downgrades a region.
DowngradeRegion(DowngradeRegion),
/// Invalidates a specified table cache.
InvalidateTableIdCache(TableId),
/// Invalidates a specified table name index cache.
InvalidateTableNameCache(TableName),
}

/// The reply of [UpgradeRegion].
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct UpgradeRegionReply {
/// Returns true if `last_entry_id` has been replayed to the latest.
pub ready: bool,
/// Indicates whether the region exists.
pub exists: bool,
/// Returns error if any.
pub error: Option<String>,
}

impl Display for UpgradeRegionReply {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"(ready={}, exists={}, error={:?})",
self.ready, self.exists, self.error
)
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum InstructionReply {
OpenRegion(SimpleReply),
CloseRegion(SimpleReply),
UpgradeRegion(UpgradeRegionReply),
InvalidateTableCache(SimpleReply),
DowngradeRegion(DowngradeRegionReply),
}
Expand All @@ -143,6 +191,7 @@ impl Display for InstructionReply {
match self {
Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
Self::InvalidateTableCache(reply) => {
write!(f, "InstructionReply::Invalidate({})", reply)
}
Expand Down
9 changes: 9 additions & 0 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ impl RegionHeartbeatResponseHandler {
let close_region_req = RegionRequest::Close(RegionCloseRequest {});
Ok((region_id, close_region_req))
}
Instruction::UpgradeRegion(_) => {
todo!()
}
Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => {
InvalidHeartbeatResponseSnafu.fail()
}
Expand All @@ -86,6 +89,9 @@ impl RegionHeartbeatResponseHandler {
result: false,
error: None,
}),
Instruction::UpgradeRegion(_) => {
todo!()
}
Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => {
InstructionReply::InvalidateTableCache(SimpleReply {
result: false,
Expand Down Expand Up @@ -118,6 +124,9 @@ impl RegionHeartbeatResponseHandler {
reply.error = error;
}
},
InstructionReply::UpgradeRegion(_) => {
todo!()
}
InstructionReply::InvalidateTableCache(reply) => {
reply.result = success;
reply.error = error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::any::Any;
use std::time::Duration;

use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::distributed_time_constants::{MAILBOX_RTT_SECS, REGION_LEASE_SECS};
use common_meta::instruction::{
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
};
Expand All @@ -31,7 +31,7 @@ use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;

const DOWNGRADE_LEADER_REGION_TIMEOUT: Duration = Duration::from_secs(1);
const DOWNGRADE_LEADER_REGION_TIMEOUT: Duration = Duration::from_secs(MAILBOX_RTT_SECS);

#[derive(Debug, Serialize, Deserialize)]
pub struct DowngradeLeaderRegion {
Expand Down Expand Up @@ -64,7 +64,7 @@ impl State for DowngradeLeaderRegion {
tokio::time::sleep_until(*deadline).await;
}

Ok(Box::new(UpgradeCandidateRegion))
Ok(Box::<UpgradeCandidateRegion>::default())
}

fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -159,7 +159,7 @@ impl DowngradeLeaderRegion {
}
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for downgrade leader region {region_id} on Datanode {:?}",
"Mailbox received timeout for downgrade leader region {region_id} on datanode {:?}",
leader,
);
error::RetryLaterSnafu { reason }.fail()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::Duration;

use api::v1::meta::MailboxMessage;
use common_meta::ddl::utils::region_storage_path;
use common_meta::distributed_time_constants::MAILBOX_RTT_SECS;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::RegionIdent;
use serde::{Deserialize, Serialize};
Expand All @@ -29,7 +30,7 @@ use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeader
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;

const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(1);
const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(MAILBOX_RTT_SECS);

#[derive(Debug, Serialize, Deserialize)]
pub struct OpenCandidateRegion;
Expand Down Expand Up @@ -152,7 +153,7 @@ impl OpenCandidateRegion {
} else {
error::RetryLaterSnafu {
reason: format!(
"Region {region_id} is not opened by Datanode {:?}, error: {error:?}",
"Region {region_id} is not opened by datanode {:?}, error: {error:?}",
candidate,
),
}
Expand All @@ -161,7 +162,7 @@ impl OpenCandidateRegion {
}
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for open candidate region {region_id} on Datanode {:?}",
"Mailbox received timeout for open candidate region {region_id} on datanode {:?}",
candidate,
);
error::RetryLaterSnafu { reason }.fail()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use crate::procedure::region_migration::{Context, State};
pub enum UpdateMetadata {
/// Downgrades the leader region.
Downgrade,
/// Upgrade the candidate region.
/// Upgrades the candidate region.
Upgrade,
/// Rollback the downgraded leader region.
/// Rolls back the downgraded region.
Rollback,
}

Expand Down
Loading
Loading