Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bridge: Require type field for kafka input, output #1374

Merged
merged 2 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bridge/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ messaging systems:
- RabbitMQ
- Redis
- SQS
- Kafka

> Important to note that queues, exchanges, topics, etc should be created and configured independently,
> prior to using launching Bridge. Bridge will not automatically attempt to create these resources, it will only try
Expand Down
125 changes: 75 additions & 50 deletions bridge/svix-bridge-plugin-kafka/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,56 @@ use svix_bridge_types::{ReceiverOutput, SenderInput, SenderOutputOpts, Transform
use crate::{input::KafkaConsumer, KafkaProducer, Result};

#[derive(Clone, Deserialize)]
pub struct KafkaInputOpts {
/// Comma-separated list of addresses.
///
/// Example: `localhost:9094`
#[serde(rename = "kafka_bootstrap_brokers")]
pub bootstrap_brokers: String,

/// The consumer group ID, used to track the stream offset between restarts
/// (due to host maintenance, upgrades, crashes, etc.).
#[serde(rename = "kafka_group_id")]
pub group_id: String,

/// The topic to listen to.
#[serde(rename = "kafka_topic")]
pub topic: String,

/// The value for 'security.protocol' in the kafka config.
#[serde(flatten)]
pub security_protocol: KafkaSecurityProtocol,

/// The 'debug' config value for rdkafka - enables more verbose logging
/// for the selected 'contexts'
#[serde(rename = "kafka_debug_contexts")]
pub debug_contexts: Option<String>,
#[serde(tag = "type")]
pub enum KafkaInputOpts {
// Single-variant enum so we can require the "type": "kafka" field in deserialization
#[serde(rename = "kafka")]
Inner {
/// Comma-separated list of addresses.
///
/// Example: `localhost:9094`
#[serde(rename = "kafka_bootstrap_brokers")]
bootstrap_brokers: String,

/// The consumer group ID, used to track the stream offset between restarts
/// (due to host maintenance, upgrades, crashes, etc.).
#[serde(rename = "kafka_group_id")]
group_id: String,

/// The topic to listen to.
#[serde(rename = "kafka_topic")]
topic: String,

/// The value for 'security.protocol' in the kafka config.
#[serde(flatten)]
security_protocol: KafkaSecurityProtocol,

/// The 'debug' config value for rdkafka - enables more verbose logging
/// for the selected 'contexts'
#[serde(rename = "kafka_debug_contexts")]
debug_contexts: Option<String>,
},
}

impl KafkaInputOpts {
pub(crate) fn create_consumer(self) -> KafkaResult<StreamConsumer> {
let Self::Inner {
bootstrap_brokers,
group_id,
security_protocol,
debug_contexts,
..
} = self;

let mut config = ClientConfig::new();
config
.set("group.id", self.group_id)
.set("bootstrap.servers", self.bootstrap_brokers)
.set("group.id", group_id)
.set("bootstrap.servers", bootstrap_brokers)
// messages are committed manually after webhook delivery was successful.
.set("enable.auto.commit", "false");

self.security_protocol.apply(&mut config);
if let Some(debug_contexts) = self.debug_contexts {
security_protocol.apply(&mut config);
if let Some(debug_contexts) = debug_contexts {
if !debug_contexts.is_empty() {
config.set("debug", debug_contexts);
}
Expand All @@ -54,34 +67,46 @@ impl KafkaInputOpts {
}

#[derive(Clone, Deserialize)]
pub struct KafkaOutputOpts {
/// Comma-separated list of addresses.
///
/// Example: `localhost:9094`
#[serde(rename = "kafka_bootstrap_brokers")]
pub bootstrap_brokers: String,

/// The topic to listen to.
#[serde(rename = "kafka_topic")]
pub topic: String,

/// The value for 'security.protocol' in the kafka config.
#[serde(flatten)]
pub security_protocol: KafkaSecurityProtocol,

/// The 'debug' config value for rdkafka - enables more verbose logging
/// for the selected 'contexts'
#[serde(rename = "kafka_debug_contexts")]
pub debug_contexts: Option<String>,
#[serde(tag = "type")]
pub enum KafkaOutputOpts {
// Single-variant enum so we can require the "type": "kafka" field in deserialization
#[serde(rename = "kafka")]
Inner {
/// Comma-separated list of addresses.
///
/// Example: `localhost:9094`
#[serde(rename = "kafka_bootstrap_brokers")]
bootstrap_brokers: String,

/// The topic to listen to.
#[serde(rename = "kafka_topic")]
topic: String,

/// The value for 'security.protocol' in the kafka config.
#[serde(flatten)]
security_protocol: KafkaSecurityProtocol,

/// The 'debug' config value for rdkafka - enables more verbose logging
/// for the selected 'contexts'
#[serde(rename = "kafka_debug_contexts")]
debug_contexts: Option<String>,
},
}

impl KafkaOutputOpts {
pub(crate) fn create_producer(self) -> KafkaResult<FutureProducer> {
let Self::Inner {
bootstrap_brokers,
security_protocol,
debug_contexts,
..
} = self;

let mut config = ClientConfig::new();
config.set("bootstrap.servers", self.bootstrap_brokers);
config.set("bootstrap.servers", bootstrap_brokers);

self.security_protocol.apply(&mut config);
if let Some(debug_contexts) = self.debug_contexts {
security_protocol.apply(&mut config);
if let Some(debug_contexts) = debug_contexts {
if !debug_contexts.is_empty() {
config.set("debug", debug_contexts);
}
Expand Down
17 changes: 9 additions & 8 deletions bridge/svix-bridge-plugin-kafka/src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,14 @@ impl KafkaConsumer {
mut post_options,
} = payload;

let KafkaInputOpts::Inner {
group_id, topic, ..
} = &self.opts;

// If committing the message fails or the process crashes after posting the webhook but
// before committing, this makes sure that the next run of this fn with the same kafka
// message doesn't end up creating a duplicate webhook in svix.
let idempotency_key = format!(
"svix_bridge_kafka_{}_{}_{}",
self.opts.group_id,
self.opts.topic,
msg.offset()
);
let idempotency_key = format!("svix_bridge_kafka_{group_id}_{topic}_{}", msg.offset());
post_options
.get_or_insert_with(Default::default)
.idempotency_key = Some(idempotency_key);
Expand Down Expand Up @@ -124,7 +123,8 @@ impl KafkaConsumer {
// `ClientConfig::create` does blocking I/O.
// Same for subscribe, most likely.
let consumer = spawn_blocking(move || {
let topic = opts.topic.clone();
let KafkaInputOpts::Inner { topic, .. } = &opts;
let topic = topic.clone();

let consumer = opts.create_consumer()?;
tracing::debug!("Created StreamConsumer");
Expand Down Expand Up @@ -197,7 +197,8 @@ impl SenderInput for KafkaConsumer {
let mut fails: u64 = 0;
let mut last_fail = Instant::now();

tracing::info!(topic = self.opts.topic, "Starting to listen for messages");
let KafkaInputOpts::Inner { topic, .. } = &self.opts;
tracing::info!(topic, "Starting to listen for messages");

loop {
if let Err(e) = self.run_inner().await {
Expand Down
3 changes: 2 additions & 1 deletion bridge/svix-bridge-plugin-kafka/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ pub struct KafkaProducer {

impl KafkaProducer {
pub fn new(name: String, opts: KafkaOutputOpts) -> Result<Self, KafkaError> {
let topic = opts.topic.clone();
let KafkaOutputOpts::Inner { topic, .. } = &opts;
let topic = topic.clone();
let producer = opts.create_producer()?;

Ok(Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn get_test_plugin(
) -> KafkaConsumer {
KafkaConsumer::new(
"test".into(),
KafkaInputOpts {
KafkaInputOpts::Inner {
bootstrap_brokers: BROKER_HOST.to_owned(),
// All tests use different topics, so it's fine to have only one consumer group ID
group_id: "svix_bridge_test_group_id".to_owned(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn test_produce_ok() {
// Only then actually send a message
let producer = KafkaProducer::new(
"test".into(),
KafkaOutputOpts {
KafkaOutputOpts::Inner {
bootstrap_brokers: BROKER_HOST.to_owned(),
topic: topic.to_owned(),
security_protocol: svix_bridge_plugin_kafka::KafkaSecurityProtocol::Plaintext,
Expand Down
1 change: 1 addition & 0 deletions bridge/svix-bridge.example.receivers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ receivers:
verification:
type: "none"
output:
type: "kafka"
kafka_bootstrap_brokers: "localhost:9094"
kafka_topic: "foobar"
# Other valid values: "plaintext", "ssl"
Expand Down
1 change: 1 addition & 0 deletions bridge/svix-bridge.example.senders.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ senders:
# Kafka Consumer
- name: "kafka-example"
input:
type: "kafka"
kafka_bootstrap_brokers: "localhost:9094"
kafka_group_id: "kafka_example_consumer_group"
kafka_topic: "foobar"
Expand Down
2 changes: 2 additions & 0 deletions bridge/svix-bridge/src/config/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ senders:
# Kafka Consumer
- name: "kafka-example"
input:
type: "kafka"
kafka_bootstrap_brokers: "localhost:9094"
kafka_group_id: "kafka_example_consumer_group"
kafka_topic: "foobar"
Expand Down Expand Up @@ -272,6 +273,7 @@ receivers:
verification:
type: "none"
output:
type: "kafka"
kafka_bootstrap_brokers: "localhost:9094"
kafka_topic: "foobar"
# Other valid values: "plaintext", "ssl"
Expand Down