diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index bb2023b3297f..ac03941bfe04 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -104,7 +104,7 @@ pub mod table_route; #[cfg(any(test, feature = "testing"))] pub mod test_utils; mod tombstone; -pub mod topic; +pub mod topic_name; pub(crate) mod txn_helper; pub mod view_info; diff --git a/src/common/meta/src/key/topic.rs b/src/common/meta/src/key/topic.rs deleted file mode 100644 index ed5747dd72c6..000000000000 --- a/src/common/meta/src/key/topic.rs +++ /dev/null @@ -1,255 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashSet; -use std::fmt::{self, Display}; - -use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt}; - -use crate::error::{DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result}; -use crate::key::{ - MetadataKey, KAFKA_TOPIC_KEY_PATTERN, KAFKA_TOPIC_KEY_PREFIX, LEGACY_TOPIC_KEY_PREFIX, -}; -use crate::kv_backend::KvBackendRef; -use crate::rpc::store::{BatchPutRequest, RangeRequest}; -use crate::rpc::KeyValue; - -#[derive(Debug, Clone, PartialEq)] -pub struct TopicNameKey<'a> { - pub topic: &'a str, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct TopicNameValue; - -impl<'a> TopicNameKey<'a> { - pub fn new(topic: &'a str) -> Self { - Self { topic } - } - - pub fn gen_with_id_and_prefix(id: usize, prefix: &'a str) -> String { - format!("{}_{}", prefix, id) - } - - pub fn range_start_key(prefix: &'a str) -> String { - format!("{}/{}", KAFKA_TOPIC_KEY_PREFIX, prefix) - } -} - -impl<'a> MetadataKey<'a, TopicNameKey<'a>> for TopicNameKey<'_> { - fn to_bytes(&self) -> Vec { - self.to_string().into_bytes() - } - - fn from_bytes(bytes: &'a [u8]) -> Result> { - let key = std::str::from_utf8(bytes).map_err(|e| { - InvalidMetadataSnafu { - err_msg: format!( - "TopicNameKey '{}' is not a valid UTF8 string: {e}", - String::from_utf8_lossy(bytes) - ), - } - .build() - })?; - TopicNameKey::try_from(key) - } -} - -impl Display for TopicNameKey<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}/{}", KAFKA_TOPIC_KEY_PREFIX, self.topic) - } -} - -impl<'a> TryFrom<&'a str> for TopicNameKey<'a> { - type Error = Error; - - fn try_from(value: &'a str) -> Result> { - let captures = KAFKA_TOPIC_KEY_PATTERN - .captures(value) - .context(InvalidMetadataSnafu { - err_msg: format!("Invalid topic name key: {}", value), - })?; - - // Safety: pass the regex check above - Ok(TopicNameKey { - topic: captures.get(1).unwrap().as_str(), - }) - } -} - -/// Convert a key-value pair to a topic name. -fn topic_decoder(kv: &KeyValue) -> Result { - let key = TopicNameKey::from_bytes(&kv.key)?; - Ok(key.topic.to_string()) -} - -pub struct TopicPool { - prefix: String, - pub(crate) topics: Vec, -} - -impl TopicPool { - pub fn new(num_topics: usize, prefix: String) -> Self { - let topics = (0..num_topics) - .map(|i| TopicNameKey::gen_with_id_and_prefix(i, &prefix)) - .collect(); - Self { prefix, topics } - } - - /// Legacy restore for compatibility. - /// Returns the topics in legacy format and the transaction to update to the new format if needed. - async fn legacy_restore(&self, kv_backend: &KvBackendRef) -> Result<()> { - if let Some(kv) = kv_backend.get(LEGACY_TOPIC_KEY_PREFIX.as_bytes()).await? { - let topics = - serde_json::from_slice::>(&kv.value).context(DecodeJsonSnafu)?; - // Should remove the legacy topics and update to the new format. - kv_backend - .delete(LEGACY_TOPIC_KEY_PREFIX.as_bytes(), false) - .await?; - let batch_put_req = BatchPutRequest { - kvs: topics - .iter() - .map(|topic| KeyValue { - key: TopicNameKey::new(topic).to_bytes(), - value: vec![], - }) - .collect(), - prev_kv: false, - }; - kv_backend.batch_put(batch_put_req).await?; - } - Ok(()) - } - - async fn restore(&self, kv_backend: &KvBackendRef) -> Result> { - // Topics in legacy format should be updated to the new format. - self.legacy_restore(kv_backend).await?; - let req = - RangeRequest::new().with_prefix(TopicNameKey::range_start_key(&self.prefix).as_bytes()); - let resp = kv_backend.range(req).await?; - let topics = resp - .kvs - .iter() - .map(topic_decoder) - .collect::>>()?; - Ok(topics) - } - - /// Restores topics from kvbackend and return the topics that are not stored in kvbackend. - pub async fn to_be_created(&self, kv_backend: &KvBackendRef) -> Result> { - let topics = self.restore(kv_backend).await?; - let mut topics_set = HashSet::with_capacity(topics.len()); - topics_set.extend(topics); - - Ok(self - .topics - .iter() - .filter(|topic| !topics_set.contains(*topic)) - .collect::>()) - } - - /// Persists topics into kvbackend. - pub async fn persist(&self, kv_backend: &KvBackendRef) -> Result<()> { - let topic_name_keys = self - .topics - .iter() - .map(|topic| TopicNameKey::new(topic)) - .collect::>(); - let req = BatchPutRequest { - kvs: topic_name_keys - .iter() - .map(|key| KeyValue { - key: key.to_bytes(), - value: vec![], - }) - .collect(), - prev_kv: false, - }; - kv_backend.batch_put(req).await?; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use super::*; - use crate::kv_backend::memory::MemoryKvBackend; - use crate::rpc::store::PutRequest; - - #[tokio::test] - async fn test_restore_legacy_persisted_topics() { - let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; - let topic_pool = TopicPool::new(16, "greptimedb_wal_topic".to_string()); - - // No legacy topics. - let mut topics_to_be_created = topic_pool.to_be_created(&kv_backend).await.unwrap(); - topics_to_be_created.sort(); - let mut expected = topic_pool.topics.iter().collect::>(); - expected.sort(); - assert_eq!(expected, topics_to_be_created); - - // A topic pool with 16 topics stored in kvbackend in legacy format. - let topics = "[\"greptimedb_wal_topic_0\",\"greptimedb_wal_topic_1\",\"greptimedb_wal_topic_2\",\"greptimedb_wal_topic_3\",\"greptimedb_wal_topic_4\",\"greptimedb_wal_topic_5\",\"greptimedb_wal_topic_6\",\"greptimedb_wal_topic_7\",\"greptimedb_wal_topic_8\",\"greptimedb_wal_topic_9\",\"greptimedb_wal_topic_10\",\"greptimedb_wal_topic_11\",\"greptimedb_wal_topic_12\",\"greptimedb_wal_topic_13\",\"greptimedb_wal_topic_14\",\"greptimedb_wal_topic_15\"]"; - let put_req = PutRequest { - key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(), - value: topics.as_bytes().to_vec(), - prev_kv: true, - }; - let res = kv_backend.put(put_req).await.unwrap(); - assert!(res.prev_kv.is_none()); - - let topics_to_be_created = topic_pool.to_be_created(&kv_backend).await.unwrap(); - assert!(topics_to_be_created.is_empty()); - - // Legacy topics should be deleted after restoring. - let legacy_topics = kv_backend - .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes()) - .await - .unwrap(); - assert!(legacy_topics.is_none()); - - // Then we can restore it from the new format. - let mut restored_topics = topic_pool.restore(&kv_backend).await.unwrap(); - restored_topics.sort(); - let mut expected = topic_pool.topics.clone(); - expected.sort(); - assert_eq!(expected, restored_topics); - } - - // Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend. - #[tokio::test] - async fn test_restore_persisted_topics() { - let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; - let topic_name_prefix = "greptimedb_wal_topic"; - let num_topics = 16; - - // Constructs mock topics. - let topic_pool = TopicPool::new(num_topics, topic_name_prefix.to_string()); - - let mut topics_to_be_created = topic_pool.to_be_created(&kv_backend).await.unwrap(); - topics_to_be_created.sort(); - let mut expected = topic_pool.topics.iter().collect::>(); - expected.sort(); - assert_eq!(expected, topics_to_be_created); - - // Persists topics to kv backend. - topic_pool.persist(&kv_backend).await.unwrap(); - let topics_to_be_created = topic_pool.to_be_created(&kv_backend).await.unwrap(); - assert!(topics_to_be_created.is_empty()); - } -} diff --git a/src/common/meta/src/key/topic_name.rs b/src/common/meta/src/key/topic_name.rs new file mode 100644 index 000000000000..257c72288528 --- /dev/null +++ b/src/common/meta/src/key/topic_name.rs @@ -0,0 +1,218 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{self, Display}; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; + +use crate::error::{DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result}; +use crate::key::{ + MetadataKey, KAFKA_TOPIC_KEY_PATTERN, KAFKA_TOPIC_KEY_PREFIX, LEGACY_TOPIC_KEY_PREFIX, +}; +use crate::kv_backend::memory::MemoryKvBackend; +use crate::kv_backend::txn::{Txn, TxnOp}; +use crate::kv_backend::KvBackendRef; +use crate::rpc::store::{BatchPutRequest, RangeRequest}; +use crate::rpc::KeyValue; + +#[derive(Debug, Clone, PartialEq)] +pub struct TopicNameKey<'a> { + pub topic: &'a str, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct TopicNameValue; + +impl<'a> TopicNameKey<'a> { + pub fn new(topic: &'a str) -> Self { + Self { topic } + } + + pub fn gen_with_id_and_prefix(id: usize, prefix: &'a str) -> String { + format!("{}_{}", prefix, id) + } + + pub fn range_start_key() -> String { + KAFKA_TOPIC_KEY_PREFIX.to_string() + } +} + +impl<'a> MetadataKey<'a, TopicNameKey<'a>> for TopicNameKey<'_> { + fn to_bytes(&self) -> Vec { + self.to_string().into_bytes() + } + + fn from_bytes(bytes: &'a [u8]) -> Result> { + let key = std::str::from_utf8(bytes).map_err(|e| { + InvalidMetadataSnafu { + err_msg: format!( + "TopicNameKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + TopicNameKey::try_from(key) + } +} + +impl Display for TopicNameKey<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}/{}", KAFKA_TOPIC_KEY_PREFIX, self.topic) + } +} + +impl<'a> TryFrom<&'a str> for TopicNameKey<'a> { + type Error = Error; + + fn try_from(value: &'a str) -> Result> { + let captures = KAFKA_TOPIC_KEY_PATTERN + .captures(value) + .context(InvalidMetadataSnafu { + err_msg: format!("Invalid topic name key: {}", value), + })?; + + // Safety: pass the regex check above + Ok(TopicNameKey { + topic: captures.get(1).unwrap().as_str(), + }) + } +} + +/// Convert a key-value pair to a topic name. +fn topic_decoder(kv: &KeyValue) -> Result { + let key = TopicNameKey::from_bytes(&kv.key)?; + Ok(key.topic.to_string()) +} + +pub struct TopicNameKeyManager { + kv_backend: KvBackendRef, +} + +impl Default for TopicNameKeyManager { + fn default() -> Self { + Self::new(Arc::new(MemoryKvBackend::default())) + } +} + +impl TopicNameKeyManager { + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Update the topics in legacy format to the new format. + pub async fn update_legacy_topics(&self) -> Result<()> { + if let Some(kv) = self + .kv_backend + .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes()) + .await? + { + let topics = + serde_json::from_slice::>(&kv.value).context(DecodeJsonSnafu)?; + let mut reqs = topics + .iter() + .map(|topic| { + let key = TopicNameKey::new(topic); + TxnOp::Put(key.to_bytes(), vec![]) + }) + .collect::>(); + let delete_req = TxnOp::Delete(LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec()); + reqs.push(delete_req); + let txn = Txn::new().and_then(reqs); + self.kv_backend.txn(txn).await?; + } + Ok(()) + } + + /// Range query for topics. + /// Caution: this method returns keys as String instead of values of range query since the topics are stoired in keys. + pub async fn range(&self) -> Result> { + let prefix = TopicNameKey::range_start_key(); + let raw_prefix = prefix.as_bytes(); + let req = RangeRequest::new().with_prefix(raw_prefix); + let resp = self.kv_backend.range(req).await?; + resp.kvs + .iter() + .map(topic_decoder) + .collect::>>() + } + + /// Put topics into kvbackend. + pub async fn batch_put(&self, topic_name_keys: Vec>) -> Result<()> { + let req = BatchPutRequest { + kvs: topic_name_keys + .iter() + .map(|key| KeyValue { + key: key.to_bytes(), + value: vec![], + }) + .collect(), + prev_kv: false, + }; + self.kv_backend.batch_put(req).await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::KvBackend; + use crate::rpc::store::PutRequest; + + #[tokio::test] + async fn test_topic_name_key_manager() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicNameKeyManager::new(kv_backend.clone()); + + let mut all_topics = (0..16) + .map(|i| format!("{}/{}", KAFKA_TOPIC_KEY_PREFIX, i)) + .collect::>(); + all_topics.sort(); + let topic_name_keys = all_topics + .iter() + .map(|topic| TopicNameKey::new(topic)) + .collect::>(); + + manager.batch_put(topic_name_keys.clone()).await.unwrap(); + + let topics = manager.range().await.unwrap(); + assert_eq!(topics, all_topics); + + kv_backend + .put(PutRequest { + key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(), + value: serde_json::to_vec(&all_topics).unwrap(), + prev_kv: false, + }) + .await + .unwrap(); + manager.update_legacy_topics().await.unwrap(); + let res = kv_backend + .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes()) + .await + .unwrap(); + assert!(res.is_none()); + let topics = manager.range().await.unwrap(); + assert_eq!(topics, all_topics); + + let topics = manager.range().await.unwrap(); + assert_eq!(topics, all_topics); + } +} diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs index 8076ea69334e..e55f7d774691 100644 --- a/src/common/meta/src/wal_options_allocator.rs +++ b/src/common/meta/src/wal_options_allocator.rs @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod kafka; +mod kafka_topic_pool; +mod kvbackend; +mod selector; use std::collections::HashMap; use std::sync::Arc; @@ -26,14 +28,14 @@ use store_api::storage::{RegionId, RegionNumber}; use crate::error::{EncodeWalOptionsSnafu, Result}; use crate::kv_backend::KvBackendRef; use crate::leadership_notifier::LeadershipChangeListener; -use crate::wal_options_allocator::kafka::topic_manager::TopicManager as KafkaTopicManager; +use crate::wal_options_allocator::kafka_topic_pool::KafkaTopicPool; /// Allocates wal options in region granularity. #[derive(Default)] pub enum WalOptionsAllocator { #[default] RaftEngine, - Kafka(KafkaTopicManager), + Kafka(KafkaTopicPool), } /// Arc wrapper of WalOptionsAllocator. @@ -45,7 +47,7 @@ impl WalOptionsAllocator { match config { MetasrvWalConfig::RaftEngine => Self::RaftEngine, MetasrvWalConfig::Kafka(kafka_config) => { - Self::Kafka(KafkaTopicManager::new(kafka_config, kv_backend)) + Self::Kafka(KafkaTopicPool::new(kafka_config, kv_backend)) } } } @@ -54,7 +56,7 @@ impl WalOptionsAllocator { pub async fn start(&self) -> Result<()> { match self { Self::RaftEngine => Ok(()), - Self::Kafka(kafka_topic_manager) => kafka_topic_manager.start().await, + Self::Kafka(kafka_topic_manager) => kafka_topic_manager.init().await, } } @@ -146,7 +148,6 @@ mod tests { use super::*; use crate::kv_backend::memory::MemoryKvBackend; - use crate::wal_options_allocator::kafka::topic_selector::RoundRobinTopicSelector; // Tests that the wal options allocator could successfully allocate raft-engine wal options. #[tokio::test] @@ -191,14 +192,10 @@ mod tests { ..Default::default() }; let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; - let mut topic_manager = KafkaTopicManager::new(config.clone(), kv_backend); - // Replaces the default topic pool with the constructed topics. - topic_manager.topic_pool.topics.clone_from(&topics); - // Replaces the default selector with a round-robin selector without shuffled. - topic_manager.topic_selector = Arc::new(RoundRobinTopicSelector::default()); + let topic_pool = KafkaTopicPool::new(config.clone(), kv_backend); // Creates an options allocator. - let allocator = WalOptionsAllocator::Kafka(topic_manager); + let allocator = WalOptionsAllocator::Kafka(topic_pool); allocator.start().await.unwrap(); let num_regions = 32; diff --git a/src/common/meta/src/wal_options_allocator/kafka.rs b/src/common/meta/src/wal_options_allocator/kafka.rs deleted file mode 100644 index cc454a3c7d1a..000000000000 --- a/src/common/meta/src/wal_options_allocator/kafka.rs +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod topic_manager; -pub mod topic_selector; diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs deleted file mode 100644 index b383f1004ace..000000000000 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ /dev/null @@ -1,277 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use common_telemetry::{error, info}; -use common_wal::config::kafka::MetasrvKafkaConfig; -use common_wal::TopicSelectorType; -use rskafka::client::controller::ControllerClient; -use rskafka::client::error::Error as RsKafkaError; -use rskafka::client::error::ProtocolError::TopicAlreadyExists; -use rskafka::client::partition::{Compression, UnknownTopicHandling}; -use rskafka::client::{Client, ClientBuilder}; -use rskafka::record::Record; -use rskafka::BackoffConfig; -use snafu::{ensure, ResultExt}; - -use crate::error::{ - BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu, - CreateKafkaWalTopicSnafu, InvalidNumTopicsSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu, - Result, TlsConfigSnafu, -}; -use crate::key::topic::TopicPool; -use crate::kv_backend::KvBackendRef; -use crate::wal_options_allocator::kafka::topic_selector::{ - RoundRobinTopicSelector, TopicSelectorRef, -}; - -// Each topic only has one partition for now. -// The `DEFAULT_PARTITION` refers to the index of the partition. -const DEFAULT_PARTITION: i32 = 0; - -/// Manages topic initialization and selection. -pub struct TopicManager { - config: MetasrvKafkaConfig, - pub(crate) topic_pool: TopicPool, - pub(crate) topic_selector: TopicSelectorRef, - kv_backend: KvBackendRef, -} - -impl TopicManager { - /// Creates a new topic manager. - pub fn new(config: MetasrvKafkaConfig, kv_backend: KvBackendRef) -> Self { - let selector = match config.kafka_topic.selector_type { - TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(), - }; - let num_topics = config.kafka_topic.num_topics; - let prefix = config.kafka_topic.topic_name_prefix.clone(); - - Self { - config, - topic_pool: TopicPool::new(num_topics, prefix), - topic_selector: Arc::new(selector), - kv_backend, - } - } - - /// Tries to initialize the topic manager. - /// The initializer first tries to restore persisted topics from the kv backend. - /// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics. - pub async fn start(&self) -> Result<()> { - // Skip creating topics. - if !self.config.auto_create_topics { - return Ok(()); - } - let num_topics = self.config.kafka_topic.num_topics; - ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics }); - - let topics_to_be_created = self.topic_pool.to_be_created(&self.kv_backend).await?; - - if !topics_to_be_created.is_empty() { - self.try_create_topics(&topics_to_be_created).await?; - self.topic_pool.persist(&self.kv_backend).await?; - } - Ok(()) - } - - /// Tries to create topics specified by indexes in `to_be_created`. - async fn try_create_topics(&self, topics: &[&String]) -> Result<()> { - // Builds an kafka controller client for creating topics. - let backoff_config = BackoffConfig { - init_backoff: self.config.backoff.init, - max_backoff: self.config.backoff.max, - base: self.config.backoff.base as f64, - deadline: self.config.backoff.deadline, - }; - let broker_endpoints = - common_wal::resolve_to_ipv4(&self.config.connection.broker_endpoints) - .await - .context(ResolveKafkaEndpointSnafu)?; - let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config); - if let Some(sasl) = &self.config.connection.sasl { - builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); - }; - if let Some(tls) = &self.config.connection.tls { - builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?) - }; - let client = builder - .build() - .await - .with_context(|_| BuildKafkaClientSnafu { - broker_endpoints: self.config.connection.broker_endpoints.clone(), - })?; - - let control_client = client - .controller_client() - .context(BuildKafkaCtrlClientSnafu)?; - - // Try to create missing topics. - let tasks = topics - .iter() - .map(|topic| async { - self.try_create_topic(topic, &control_client).await?; - self.try_append_noop_record(topic, &client).await?; - Ok(()) - }) - .collect::>(); - futures::future::try_join_all(tasks).await.map(|_| ()) - } - - /// Selects one topic from the topic pool through the topic selector. - pub fn select(&self) -> Result<&String> { - self.topic_selector.select(&self.topic_pool.topics) - } - - /// Selects a batch of topics from the topic pool through the topic selector. - pub fn select_batch(&self, num_topics: usize) -> Result> { - (0..num_topics) - .map(|_| self.topic_selector.select(&self.topic_pool.topics)) - .collect() - } - - async fn try_append_noop_record(&self, topic: &String, client: &Client) -> Result<()> { - let partition_client = client - .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry) - .await - .context(BuildKafkaPartitionClientSnafu { - topic, - partition: DEFAULT_PARTITION, - })?; - - partition_client - .produce( - vec![Record { - key: None, - value: None, - timestamp: chrono::Utc::now(), - headers: Default::default(), - }], - Compression::Lz4, - ) - .await - .context(ProduceRecordSnafu { topic })?; - - Ok(()) - } - - async fn try_create_topic(&self, topic: &String, client: &ControllerClient) -> Result<()> { - match client - .create_topic( - topic.clone(), - self.config.kafka_topic.num_partitions, - self.config.kafka_topic.replication_factor, - self.config.kafka_topic.create_topic_timeout.as_millis() as i32, - ) - .await - { - Ok(_) => { - info!("Successfully created topic {}", topic); - Ok(()) - } - Err(e) => { - if Self::is_topic_already_exist_err(&e) { - info!("The topic {} already exists", topic); - Ok(()) - } else { - error!("Failed to create a topic {}, error {:?}", topic, e); - Err(e).context(CreateKafkaWalTopicSnafu) - } - } - } - } - - fn is_topic_already_exist_err(e: &RsKafkaError) -> bool { - matches!( - e, - &RsKafkaError::ServerError { - protocol_error: TopicAlreadyExists, - .. - } - ) - } -} - -#[cfg(test)] -mod tests { - use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; - use common_wal::test_util::run_test_with_kafka_wal; - - use super::*; - use crate::kv_backend::memory::MemoryKvBackend; - - /// Tests that the topic manager could allocate topics correctly. - #[tokio::test] - async fn test_alloc_topics() { - run_test_with_kafka_wal(|broker_endpoints| { - Box::pin(async { - // Constructs topics that should be created. - let topics = (0..256) - .map(|i| format!("test_alloc_topics_{}_{}", i, uuid::Uuid::new_v4())) - .collect::>(); - - // Creates a topic manager. - let kafka_topic = KafkaTopicConfig { - replication_factor: broker_endpoints.len() as i16, - ..Default::default() - }; - let config = MetasrvKafkaConfig { - connection: KafkaConnectionConfig { - broker_endpoints, - ..Default::default() - }, - kafka_topic, - ..Default::default() - }; - let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; - let mut manager = TopicManager::new(config.clone(), kv_backend); - // Replaces the default topic pool with the constructed topics. - manager.topic_pool.topics.clone_from(&topics); - // Replaces the default selector with a round-robin selector without shuffled. - manager.topic_selector = Arc::new(RoundRobinTopicSelector::default()); - manager.start().await.unwrap(); - - // Selects exactly the number of `num_topics` topics one by one. - let got = (0..topics.len()) - .map(|_| manager.select().unwrap()) - .cloned() - .collect::>(); - assert_eq!(got, topics); - - // Selects exactly the number of `num_topics` topics in a batching manner. - let got = manager - .select_batch(topics.len()) - .unwrap() - .into_iter() - .map(ToString::to_string) - .collect::>(); - assert_eq!(got, topics); - - // Selects more than the number of `num_topics` topics. - let got = manager - .select_batch(2 * topics.len()) - .unwrap() - .into_iter() - .map(ToString::to_string) - .collect::>(); - let expected = vec![topics.clone(); 2] - .into_iter() - .flatten() - .collect::>(); - assert_eq!(got, expected); - }) - }) - .await; - } -} diff --git a/src/common/meta/src/wal_options_allocator/kafka_topic_pool.rs b/src/common/meta/src/wal_options_allocator/kafka_topic_pool.rs new file mode 100644 index 000000000000..a837adb50673 --- /dev/null +++ b/src/common/meta/src/wal_options_allocator/kafka_topic_pool.rs @@ -0,0 +1,170 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod kafka_topic_manager; + +use std::sync::Arc; + +use common_wal::config::kafka::MetasrvKafkaConfig; +use common_wal::TopicSelectorType; +use snafu::ensure; + +use crate::error::{InvalidNumTopicsSnafu, Result}; +use crate::kv_backend::KvBackendRef; +use crate::wal_options_allocator::kafka_topic_pool::kafka_topic_manager::TopicKafkaManager; +use crate::wal_options_allocator::kvbackend::TopicKvBackendManager; +use crate::wal_options_allocator::selector::{RoundRobinTopicSelector, TopicSelectorRef}; + +/// Topic pool for kafka remote wal. +/// Responsible for: +/// 1. Persists topics in kvbackend. +/// 2. Creates topics in kafka. +/// 3. Selects topic +pub struct KafkaTopicPool { + topics: Vec, + topic_kvbackend_manager: TopicKvBackendManager, + topic_kafka_manager: TopicKafkaManager, + selector: TopicSelectorRef, +} + +impl KafkaTopicPool { + pub fn new(config: MetasrvKafkaConfig, kvbackend: KvBackendRef) -> Self { + let num_topics = config.kafka_topic.num_topics; + + let prefix = config.kafka_topic.topic_name_prefix.clone(); + let topics = (0..num_topics) + .map(|i| format!("{}_{}", prefix, i)) + .collect(); + + let selector = match config.kafka_topic.selector_type { + TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(), + }; + + let kafka_topic_manager = TopicKafkaManager::new(config.clone()); + let kvbackend_topic_manager = TopicKvBackendManager::new(kvbackend.clone()); + + Self { + topics, + topic_kvbackend_manager: kvbackend_topic_manager, + topic_kafka_manager: kafka_topic_manager, + selector: Arc::new(selector), + } + } + + /// Tries to initialize the topic manager. + /// The initializer first tries to restore persisted topics from the kv backend. + /// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics. + pub async fn init(&self) -> Result<()> { + if !self.topic_kafka_manager.config.auto_create_topics { + return Ok(()); + } + + let num_topics = self.topic_kafka_manager.config.kafka_topic.num_topics; + ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics }); + + let topics_to_be_created = self + .topic_kvbackend_manager + .to_be_created(&self.topics) + .await?; + + if !topics_to_be_created.is_empty() { + self.topic_kafka_manager + .try_create_topics(&topics_to_be_created) + .await?; + self.topic_kvbackend_manager.persist(&self.topics).await?; + } + Ok(()) + } + + /// Selects one topic from the topic pool through the topic selector. + pub fn select(&self) -> Result<&String> { + self.selector.select(&self.topics) + } + + /// Selects a batch of topics from the topic pool through the topic selector. + pub fn select_batch(&self, num_topics: usize) -> Result> { + (0..num_topics) + .map(|_| self.selector.select(&self.topics)) + .collect() + } +} + +#[cfg(test)] +mod tests { + use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; + use common_wal::test_util::run_test_with_kafka_wal; + + use super::*; + use crate::kv_backend::memory::MemoryKvBackend; + + /// Tests that the topic manager could allocate topics correctly. + #[tokio::test] + async fn test_alloc_topics() { + run_test_with_kafka_wal(|broker_endpoints| { + Box::pin(async { + // Constructs topics that should be created. + let topics = (0..256) + .map(|i| format!("test_alloc_topics_{}_{}", i, uuid::Uuid::new_v4())) + .collect::>(); + + // Creates a topic manager. + let kafka_topic = KafkaTopicConfig { + replication_factor: broker_endpoints.len() as i16, + ..Default::default() + }; + let config = MetasrvKafkaConfig { + connection: KafkaConnectionConfig { + broker_endpoints, + ..Default::default() + }, + kafka_topic, + ..Default::default() + }; + let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; + let topic_pool = KafkaTopicPool::new(config.clone(), kv_backend); + topic_pool.init().await.unwrap(); + + // Selects exactly the number of `num_topics` topics one by one. + let got = (0..topics.len()) + .map(|_| topic_pool.select().unwrap()) + .cloned() + .collect::>(); + assert_eq!(got, topics); + + // Selects exactly the number of `num_topics` topics in a batching manner. + let got = topic_pool + .select_batch(topics.len()) + .unwrap() + .into_iter() + .map(ToString::to_string) + .collect::>(); + assert_eq!(got, topics); + + // Selects more than the number of `num_topics` topics. + let got = topic_pool + .select_batch(2 * topics.len()) + .unwrap() + .into_iter() + .map(ToString::to_string) + .collect::>(); + let expected = vec![topics.clone(); 2] + .into_iter() + .flatten() + .collect::>(); + assert_eq!(got, expected); + }) + }) + .await; + } +} diff --git a/src/common/meta/src/wal_options_allocator/kafka_topic_pool/kafka_topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka_topic_pool/kafka_topic_manager.rs new file mode 100644 index 000000000000..91932bf4f6e7 --- /dev/null +++ b/src/common/meta/src/wal_options_allocator/kafka_topic_pool/kafka_topic_manager.rs @@ -0,0 +1,148 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::{error, info}; +use common_wal::config::kafka::MetasrvKafkaConfig; +use rskafka::client::controller::ControllerClient; +use rskafka::client::error::Error as RsKafkaError; +use rskafka::client::error::ProtocolError::TopicAlreadyExists; +use rskafka::client::partition::{Compression, UnknownTopicHandling}; +use rskafka::client::{Client, ClientBuilder}; +use rskafka::record::Record; +use rskafka::BackoffConfig; +use snafu::ResultExt; + +use crate::error::{ + BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu, + CreateKafkaWalTopicSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result, + TlsConfigSnafu, +}; + +// Each topic only has one partition for now. +// The `DEFAULT_PARTITION` refers to the index of the partition. +const DEFAULT_PARTITION: i32 = 0; + +/// Manages topics in kafka. +pub struct TopicKafkaManager { + pub(super) config: MetasrvKafkaConfig, +} + +impl TopicKafkaManager { + pub fn new(config: MetasrvKafkaConfig) -> Self { + Self { config } + } + + async fn try_create_topic(&self, topic: &String, client: &ControllerClient) -> Result<()> { + match client + .create_topic( + topic.clone(), + self.config.kafka_topic.num_partitions, + self.config.kafka_topic.replication_factor, + self.config.kafka_topic.create_topic_timeout.as_millis() as i32, + ) + .await + { + Ok(_) => { + info!("Successfully created topic {}", topic); + Ok(()) + } + Err(e) => { + if Self::is_topic_already_exist_err(&e) { + info!("The topic {} already exists", topic); + Ok(()) + } else { + error!("Failed to create a topic {}, error {:?}", topic, e); + Err(e).context(CreateKafkaWalTopicSnafu) + } + } + } + } + + async fn try_append_noop_record(&self, topic: &String, client: &Client) -> Result<()> { + let partition_client = client + .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry) + .await + .context(BuildKafkaPartitionClientSnafu { + topic, + partition: DEFAULT_PARTITION, + })?; + + partition_client + .produce( + vec![Record { + key: None, + value: None, + timestamp: chrono::Utc::now(), + headers: Default::default(), + }], + Compression::Lz4, + ) + .await + .context(ProduceRecordSnafu { topic })?; + + Ok(()) + } + + pub async fn try_create_topics(&self, topics: &[&String]) -> Result<()> { + // Builds an kafka controller client for creating topics. + let backoff_config = BackoffConfig { + init_backoff: self.config.backoff.init, + max_backoff: self.config.backoff.max, + base: self.config.backoff.base as f64, + deadline: self.config.backoff.deadline, + }; + let broker_endpoints = + common_wal::resolve_to_ipv4(&self.config.connection.broker_endpoints) + .await + .context(ResolveKafkaEndpointSnafu)?; + let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config); + if let Some(sasl) = &self.config.connection.sasl { + builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); + }; + if let Some(tls) = &self.config.connection.tls { + builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?) + }; + let client = builder + .build() + .await + .with_context(|_| BuildKafkaClientSnafu { + broker_endpoints: self.config.connection.broker_endpoints.clone(), + })?; + + let control_client = client + .controller_client() + .context(BuildKafkaCtrlClientSnafu)?; + + // Try to create missing topics. + let tasks = topics + .iter() + .map(|topic| async { + self.try_create_topic(topic, &control_client).await?; + self.try_append_noop_record(topic, &client).await?; + Ok(()) + }) + .collect::>(); + futures::future::try_join_all(tasks).await.map(|_| ()) + } + + fn is_topic_already_exist_err(e: &RsKafkaError) -> bool { + matches!( + e, + &RsKafkaError::ServerError { + protocol_error: TopicAlreadyExists, + .. + } + ) + } +} diff --git a/src/common/meta/src/wal_options_allocator/kvbackend.rs b/src/common/meta/src/wal_options_allocator/kvbackend.rs new file mode 100644 index 000000000000..bd0bcf6320ef --- /dev/null +++ b/src/common/meta/src/wal_options_allocator/kvbackend.rs @@ -0,0 +1,155 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use crate::error::Result; +use crate::key::topic_name::{TopicNameKey, TopicNameKeyManager}; +use crate::kv_backend::KvBackendRef; + +pub struct TopicKvBackendManager { + manager: TopicNameKeyManager, +} + +impl TopicKvBackendManager { + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { + manager: TopicNameKeyManager::new(kv_backend), + } + } + + async fn restore(&self) -> Result> { + self.manager.update_legacy_topics().await?; + let topics = self.manager.range().await?; + Ok(topics) + } + + /// Restores topics from kvbackend and return the topics that are not stored in kvbackend. + pub async fn to_be_created<'a>(&self, all_topics: &'a [String]) -> Result> { + let topics = self.restore().await?; + let topic_set = topics.iter().collect::>(); + let mut to_be_created = Vec::with_capacity(all_topics.len()); + for topic in all_topics { + if !topic_set.contains(topic) { + to_be_created.push(topic); + } + } + Ok(to_be_created) + } + + /// Persists topics into kvbackend. + pub async fn persist(&self, topics: &[String]) -> Result<()> { + self.manager + .batch_put( + topics + .iter() + .map(|topic| TopicNameKey::new(topic)) + .collect(), + ) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::key::LEGACY_TOPIC_KEY_PREFIX; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::rpc::store::PutRequest; + + #[tokio::test] + async fn test_restore_legacy_persisted_topics() { + let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; + let topic_kvbackend_manager = TopicKvBackendManager::new(kv_backend.clone()); + + let all_topics = (0..16) + .map(|i| format!("greptimedb_wal_topic_{}", i)) + .collect::>(); + + // No legacy topics. + let mut topics_to_be_created = topic_kvbackend_manager + .to_be_created(&all_topics) + .await + .unwrap(); + topics_to_be_created.sort(); + let mut expected = all_topics.iter().collect::>(); + expected.sort(); + assert_eq!(expected, topics_to_be_created); + + // A topic pool with 16 topics stored in kvbackend in legacy format. + let topics = "[\"greptimedb_wal_topic_0\",\"greptimedb_wal_topic_1\",\"greptimedb_wal_topic_2\",\"greptimedb_wal_topic_3\",\"greptimedb_wal_topic_4\",\"greptimedb_wal_topic_5\",\"greptimedb_wal_topic_6\",\"greptimedb_wal_topic_7\",\"greptimedb_wal_topic_8\",\"greptimedb_wal_topic_9\",\"greptimedb_wal_topic_10\",\"greptimedb_wal_topic_11\",\"greptimedb_wal_topic_12\",\"greptimedb_wal_topic_13\",\"greptimedb_wal_topic_14\",\"greptimedb_wal_topic_15\"]"; + let put_req = PutRequest { + key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(), + value: topics.as_bytes().to_vec(), + prev_kv: true, + }; + let res = kv_backend.put(put_req).await.unwrap(); + assert!(res.prev_kv.is_none()); + + let topics_to_be_created = topic_kvbackend_manager + .to_be_created(&all_topics) + .await + .unwrap(); + assert!(topics_to_be_created.is_empty()); + + // Legacy topics should be deleted after restoring. + let legacy_topics = kv_backend + .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes()) + .await + .unwrap(); + assert!(legacy_topics.is_none()); + + // Then we can restore it from the new format. + let mut restored_topics = topic_kvbackend_manager.restore().await.unwrap(); + restored_topics.sort(); + let mut expected = all_topics.clone(); + expected.sort(); + assert_eq!(expected, restored_topics); + } + + // Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend. + #[tokio::test] + async fn test_restore_persisted_topics() { + let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; + let topic_name_prefix = "greptimedb_wal_topic"; + let num_topics = 16; + + let all_topics = (0..num_topics) + .map(|i| format!("{}_{}", topic_name_prefix, i)) + .collect::>(); + + // Constructs mock topics. + let topic_kvbackend_manager = TopicKvBackendManager::new(kv_backend); + + let mut topics_to_be_created = topic_kvbackend_manager + .to_be_created(&all_topics) + .await + .unwrap(); + topics_to_be_created.sort(); + let mut expected = all_topics.iter().collect::>(); + expected.sort(); + assert_eq!(expected, topics_to_be_created); + + // Persists topics to kv backend. + topic_kvbackend_manager.persist(&all_topics).await.unwrap(); + let topics_to_be_created = topic_kvbackend_manager + .to_be_created(&all_topics) + .await + .unwrap(); + assert!(topics_to_be_created.is_empty()); + } +} diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_selector.rs b/src/common/meta/src/wal_options_allocator/selector.rs similarity index 100% rename from src/common/meta/src/wal_options_allocator/kafka/topic_selector.rs rename to src/common/meta/src/wal_options_allocator/selector.rs