From 99cc78271f77d1b928ddd0acce084ee9f7dbff12 Mon Sep 17 00:00:00 2001 From: Xanthorrhizol Date: Tue, 18 Jun 2024 14:31:39 +0900 Subject: [PATCH] feat: support replicate_subscription_state option for consumer --- src/connection.rs | 1 + src/consumer/options.rs | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/src/connection.rs b/src/connection.rs index 13eaf085..e2d5aa8d 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1639,6 +1639,7 @@ pub(crate) mod messages { .collect(), read_compacted: Some(options.read_compacted.unwrap_or(false)), initial_position: Some(options.initial_position.into()), + replicate_subscription_state: options.replicate_subscription_state, schema: options.schema, start_message_id: options.start_message_id, ..Default::default() diff --git a/src/consumer/options.rs b/src/consumer/options.rs index ba2af562..1053ce2a 100644 --- a/src/consumer/options.rs +++ b/src/consumer/options.rs @@ -31,6 +31,10 @@ pub struct ConsumerOptions { /// } /// ``` pub initial_position: InitialPosition, + /// Mark the subscription as "replicated". Pulsar will make sure + /// to periodically sync the state of replicated subscriptions + /// across different clusters (when using geo-replication). + pub replicate_subscription_state: Option, } impl ConsumerOptions { @@ -76,4 +80,10 @@ impl ConsumerOptions { self.initial_position = initial_position; self } + + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] + pub fn with_replicate_subscription_state(mut self, replicate_subscription_state: bool) -> Self { + self.replicate_subscription_state = Some(replicate_subscription_state); + self + } }