diff --git a/.github/actions/setup-etcd-cluster/action.yml b/.github/actions/setup-etcd-cluster/action.yml index 8a2ad9528175..cee7dd4f18df 100644 --- a/.github/actions/setup-etcd-cluster/action.yml +++ b/.github/actions/setup-etcd-cluster/action.yml @@ -2,7 +2,7 @@ name: Setup Etcd cluster description: Deploy Etcd cluster on Kubernetes inputs: etcd-replicas: - default: 3 + default: 1 description: "Etcd replicas" namespace: default: "etcd-cluster" diff --git a/src/common/meta/src/distributed_time_constants.rs b/src/common/meta/src/distributed_time_constants.rs index 040a4d816959..30e779c31f1f 100644 --- a/src/common/meta/src/distributed_time_constants.rs +++ b/src/common/meta/src/distributed_time_constants.rs @@ -33,7 +33,7 @@ pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS; pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS; /// The lease seconds of metasrv leader. -pub const META_LEASE_SECS: u64 = 3; +pub const META_LEASE_SECS: u64 = 5; /// In a lease, there are two opportunities for renewal. pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2; diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index 49dbfc7c891b..bb9a9984f8b8 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -18,11 +18,12 @@ use std::time::Duration; use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS}; use common_telemetry::{error, info, warn}; -use etcd_client::{Client, GetOptions, PutOptions}; -use snafu::{OptionExt, ResultExt}; +use etcd_client::{Client, GetOptions, LeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions}; +use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; +use tokio::time::{timeout, MissedTickBehavior}; use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY}; use crate::error; @@ -231,44 +232,43 @@ impl Election for EtcdElection { .await .context(error::EtcdFailedSnafu)?; - let mut keep_alive_interval = - tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)); + let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS); + let mut keep_alive_interval = tokio::time::interval(keep_lease_duration); + keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { - let _ = keep_alive_interval.tick().await; - keeper.keep_alive().await.context(error::EtcdFailedSnafu)?; - - if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? { - if res.ttl() > 0 { - // Only after a successful `keep_alive` is the leader considered official. - if self - .is_leader - .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) - .is_ok() - { - self.infancy.store(true, Ordering::Relaxed); - - if let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::Elected(Arc::new(leader.clone()))) - { - error!(e; "Failed to send leader change message"); - } - } - } else { - if self.is_leader.load(Ordering::Relaxed) { - if let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::StepDown(Arc::new(leader.clone()))) - { - error!("Failed to send leader change message, error: {e}"); - } - } + // The keep alive operation MUST be done in `META_KEEP_ALIVE_INTERVAL_SECS`. + match timeout( + keep_lease_duration, + self.keep_alive(&mut keeper, &mut receiver, leader), + ) + .await + { + Ok(Ok(())) => { + let _ = keep_alive_interval.tick().await; + } + Ok(Err(err)) => { + error!(err; "Failed to keep alive"); + break; + } + Err(_) => { + error!("Refresh lease timeout"); break; } } } - self.is_leader.store(false, Ordering::Relaxed); + if self + .is_leader + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + if let Err(e) = self + .leader_watcher + .send(LeaderChangeMessage::StepDown(Arc::new(leader.clone()))) + { + error!(e; "Failed to send leader change message"); + } + } } Ok(()) @@ -297,3 +297,40 @@ impl Election for EtcdElection { self.leader_watcher.subscribe() } } + +impl EtcdElection { + async fn keep_alive( + &self, + keeper: &mut LeaseKeeper, + receiver: &mut LeaseKeepAliveStream, + leader: &LeaderKey, + ) -> Result<()> { + keeper.keep_alive().await.context(error::EtcdFailedSnafu)?; + if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? { + ensure!( + res.ttl() > 0, + error::UnexpectedSnafu { + violated: "Failed to refresh the lease", + } + ); + + // Only after a successful `keep_alive` is the leader considered official. + if self + .is_leader + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + self.infancy.store(true, Ordering::Relaxed); + + if let Err(e) = self + .leader_watcher + .send(LeaderChangeMessage::Elected(Arc::new(leader.clone()))) + { + error!(e; "Failed to send leader change message"); + } + } + } + + Ok(()) + } +} diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 9105ca048959..d92798a113b1 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -450,7 +450,7 @@ impl Metasrv { { let election = election.clone(); let started = self.started.clone(); - let _handle = common_runtime::spawn_bg(async move { + let _handle = common_runtime::spawn_write(async move { while started.load(Ordering::Relaxed) { let res = election.campaign().await; if let Err(e) = res {