Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bridge: More config cleanup #1330

Merged
merged 3 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 48 additions & 56 deletions bridge/svix-bridge-plugin-queue/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,38 @@ pub use crate::{
sqs::{SqsInputOpts, SqsOutputOpts},
};

#[derive(Deserialize)]
pub struct QueueSenderConfig {
pub name: String,
pub input: SenderInputOpts,
#[serde(default)]
pub transformation: Option<TransformationConfig>,
pub output: SenderOutputOpts,
}

impl QueueSenderConfig {
pub fn into_sender_input(self) -> Result<Box<dyn SenderInput>, &'static str> {
// FIXME: see if this check is still needed. String transforms worked for the omniqueue redis receiver, I think?
if matches!(self.input, SenderInputOpts::Redis(_))
&& self
.transformation
.as_ref()
.map(|t| t.format() != TransformerInputFormat::Json)
.unwrap_or_default()
{
return Err("redis only supports json formatted transformations");
}

Ok(Box::new(QueueSender::new(
self.name,
self.input,
self.transformation,
self.output,
)))
pub fn into_sender_input(
name: String,
input_opts: QueueInputOpts,
transformation: Option<TransformationConfig>,
output: SenderOutputOpts,
) -> Result<Box<dyn SenderInput>, &'static str> {
// FIXME: see if this check is still needed. String transforms worked for the omniqueue redis receiver, I think?
if matches!(input_opts, QueueInputOpts::Redis(_))
&& transformation
.as_ref()
.map(|t| t.format() != TransformerInputFormat::Json)
.unwrap_or_default()
{
return Err("redis only supports json formatted transformations");
}

Ok(Box::new(QueueSender::new(
name,
input_opts,
transformation,
output,
)))
}

pub async fn into_receiver_output(
name: String,
opts: ReceiverOutputOpts,
opts: QueueOutputOpts,
// Annoying to have to pass this, but certain backends (redis) only work with certain transformations (json).
transformation: Option<&TransformationConfig>,
) -> Result<Box<dyn ReceiverOutput>, crate::Error> {
// FIXME: see if this check is still needed. String transforms worked for the omniqueue redis receiver, I think?
if matches!(opts, ReceiverOutputOpts::Redis(_))
if matches!(opts, QueueOutputOpts::Redis(_))
&& transformation
.as_ref()
.map(|t| t.format() != TransformerInputFormat::Json)
Expand All @@ -68,7 +61,7 @@ pub async fn into_receiver_output(
// TODO: feature flag the variants, thread the features down through to generic-queue
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum SenderInputOpts {
pub enum QueueInputOpts {
#[serde(rename = "gcp-pubsub")]
GCPPubSub(GCPPubSubInputOpts),
RabbitMQ(RabbitMqInputOpts),
Expand All @@ -78,7 +71,7 @@ pub enum SenderInputOpts {

#[derive(Clone, Debug, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum ReceiverOutputOpts {
pub enum QueueOutputOpts {
#[serde(rename = "gcp-pubsub")]
GCPPubSub(GCPPubSubOutputOpts),
RabbitMQ(RabbitMqOutputOpts),
Expand All @@ -92,51 +85,50 @@ mod tests {
SenderOutputOpts, SvixSenderOutputOpts, TransformationConfig, TransformerInputFormat,
};

use super::{into_receiver_output, QueueSenderConfig};
use super::{into_receiver_output, into_sender_input};
use crate::{
config::{ReceiverOutputOpts, SenderInputOpts},
config::{QueueInputOpts, QueueOutputOpts},
redis::{RedisInputOpts, RedisOutputOpts},
};

// FIXME: can't support raw payload access for redis because it requires JSON internally.
// Revisit after `omniqueue` adoption.
#[test]
fn redis_sender_with_string_transformation_is_err() {
let cfg = QueueSenderConfig {
name: "redis-with-string-transformation".to_string(),
input: SenderInputOpts::Redis(RedisInputOpts {
dsn: "".to_string(),
max_connections: 0,
reinsert_on_nack: false,
queue_key: "".to_string(),
delayed_queue_key: None,
consumer_group: "".to_string(),
consumer_name: "".to_string(),
ack_deadline_ms: 2_000,
}),
transformation: Some(TransformationConfig::Explicit {
let input_opts = QueueInputOpts::Redis(RedisInputOpts {
dsn: "".to_string(),
max_connections: 0,
reinsert_on_nack: false,
queue_key: "".to_string(),
delayed_queue_key: None,
consumer_group: "".to_string(),
consumer_name: "".to_string(),
ack_deadline_ms: 2_000,
});

let err = into_sender_input(
"redis-with-string-transformation".to_owned(),
input_opts,
Some(TransformationConfig::Explicit {
format: TransformerInputFormat::String,
src: String::new(),
}),
output: SenderOutputOpts::Svix(SvixSenderOutputOpts {
SenderOutputOpts::Svix(SvixSenderOutputOpts {
token: "".to_string(),
options: None,
}),
};

assert_eq!(
cfg.into_sender_input()
.err()
.expect("invalid config didn't result in error"),
"redis only supports json formatted transformations"
)
.err()
.expect("invalid config didn't result in error");

assert_eq!(err, "redis only supports json formatted transformations")
}

// FIXME: can't support raw payload access for redis because it requires JSON internally.
// Revisit after `omniqueue` adoption.
#[tokio::test]
async fn test_redis_receiver_string_transform_is_err() {
let redis_out = ReceiverOutputOpts::Redis(RedisOutputOpts {
let redis_out = QueueOutputOpts::Redis(RedisOutputOpts {
dsn: "".to_string(),
max_connections: 0,
queue_key: "".to_string(),
Expand Down
3 changes: 2 additions & 1 deletion bridge/svix-bridge-plugin-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ mod redis;
pub mod sender_input;
mod sqs;

use error::Error;
pub use self::config::{into_receiver_output, into_sender_input};
use self::error::Error;

/// Newtype for [`omniqueue::queue::Delivery`].
///
Expand Down
12 changes: 6 additions & 6 deletions bridge/svix-bridge-plugin-queue/src/receiver_output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use omniqueue::DynProducer;
use svix_bridge_types::{async_trait, ForwardRequest, ReceiverOutput};

use crate::{config::ReceiverOutputOpts, error::Result};
use crate::{config::QueueOutputOpts, error::Result};

#[derive(Clone)]
pub struct QueueForwarder {
Expand All @@ -16,13 +16,13 @@ pub struct QueueForwarder {
impl QueueForwarder {
pub async fn from_receiver_output_opts(
name: String,
opts: ReceiverOutputOpts,
opts: QueueOutputOpts,
) -> Result<QueueForwarder> {
let sender = match opts {
ReceiverOutputOpts::GCPPubSub(cfg) => crate::gcp_pubsub::producer(&cfg).await?,
ReceiverOutputOpts::RabbitMQ(cfg) => crate::rabbitmq::producer(&cfg).await?,
ReceiverOutputOpts::Redis(cfg) => crate::redis::producer(&cfg).await?,
ReceiverOutputOpts::SQS(cfg) => crate::sqs::producer(&cfg).await?,
QueueOutputOpts::GCPPubSub(cfg) => crate::gcp_pubsub::producer(&cfg).await?,
QueueOutputOpts::RabbitMQ(cfg) => crate::rabbitmq::producer(&cfg).await?,
QueueOutputOpts::Redis(cfg) => crate::redis::producer(&cfg).await?,
QueueOutputOpts::SQS(cfg) => crate::sqs::producer(&cfg).await?,
};
Ok(QueueForwarder {
name,
Expand Down
36 changes: 17 additions & 19 deletions bridge/svix-bridge-plugin-queue/src/sender_input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ use svix_bridge_types::{
TransformerTx,
};

use crate::{
config::SenderInputOpts, error::Error, gcp_pubsub, rabbitmq, run_inner, sqs, Consumer,
};
use crate::{config::QueueInputOpts, error::Error, gcp_pubsub, rabbitmq, run_inner, sqs, Consumer};

pub struct QueueSender {
name: String,
source: String,
system: String,
input_opts: SenderInputOpts,
input_opts: QueueInputOpts,
transformation: Option<TransformationConfig>,
transformer_tx: Option<TransformerTx>,
svix_client: Svix,
Expand All @@ -24,28 +22,28 @@ impl std::fmt::Debug for QueueSender {
}
}

fn system_name(opts: &SenderInputOpts) -> &'static str {
fn system_name(opts: &QueueInputOpts) -> &'static str {
match opts {
SenderInputOpts::GCPPubSub(_) => "gcp-pubsub",
SenderInputOpts::RabbitMQ(_) => "rabbitmq",
SenderInputOpts::Redis(_) => "redis",
SenderInputOpts::SQS(_) => "sqs",
QueueInputOpts::GCPPubSub(_) => "gcp-pubsub",
QueueInputOpts::RabbitMQ(_) => "rabbitmq",
QueueInputOpts::Redis(_) => "redis",
QueueInputOpts::SQS(_) => "sqs",
}
}

fn source_name(opts: &SenderInputOpts) -> &str {
fn source_name(opts: &QueueInputOpts) -> &str {
match opts {
SenderInputOpts::GCPPubSub(opts) => &opts.subscription_id,
SenderInputOpts::RabbitMQ(opts) => &opts.queue_name,
SenderInputOpts::Redis(opts) => &opts.queue_key,
SenderInputOpts::SQS(opts) => &opts.queue_dsn,
QueueInputOpts::GCPPubSub(opts) => &opts.subscription_id,
QueueInputOpts::RabbitMQ(opts) => &opts.queue_name,
QueueInputOpts::Redis(opts) => &opts.queue_key,
QueueInputOpts::SQS(opts) => &opts.queue_dsn,
}
}

impl QueueSender {
pub fn new(
name: String,
input: SenderInputOpts,
input: QueueInputOpts,
transformation: Option<TransformationConfig>,
output: SenderOutputOpts,
) -> Self {
Expand Down Expand Up @@ -89,10 +87,10 @@ impl Consumer for QueueSender {

async fn consumer(&self) -> std::io::Result<DynConsumer> {
Ok(match &self.input_opts {
SenderInputOpts::GCPPubSub(cfg) => gcp_pubsub::consumer(cfg).await,
SenderInputOpts::RabbitMQ(cfg) => rabbitmq::consumer(cfg).await,
SenderInputOpts::Redis(cfg) => crate::redis::consumer(cfg).await,
SenderInputOpts::SQS(cfg) => sqs::consumer(cfg).await,
QueueInputOpts::GCPPubSub(cfg) => gcp_pubsub::consumer(cfg).await,
QueueInputOpts::RabbitMQ(cfg) => rabbitmq::consumer(cfg).await,
QueueInputOpts::Redis(cfg) => crate::redis::consumer(cfg).await,
QueueInputOpts::SQS(cfg) => sqs::consumer(cfg).await,
}
.map_err(Error::from)?)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use google_cloud_pubsub::{
};
use serde_json::json;
use svix_bridge_plugin_queue::{
config::{GCPPubSubInputOpts, SenderInputOpts},
config::{GCPPubSubInputOpts, QueueInputOpts},
sender_input::QueueSender,
};
use svix_bridge_types::{
Expand All @@ -35,7 +35,7 @@ fn get_test_plugin(
) -> QueueSender {
QueueSender::new(
"test".into(),
SenderInputOpts::GCPPubSub(GCPPubSubInputOpts {
QueueInputOpts::GCPPubSub(GCPPubSubInputOpts {
subscription_id,
credentials_file: None,
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use lapin::{
};
use serde_json::json;
use svix_bridge_plugin_queue::{
config::{RabbitMqInputOpts, SenderInputOpts},
config::{QueueInputOpts, RabbitMqInputOpts},
sender_input::QueueSender,
};
use svix_bridge_types::{
Expand All @@ -31,7 +31,7 @@ fn get_test_plugin(
) -> QueueSender {
QueueSender::new(
"test".into(),
SenderInputOpts::RabbitMQ(RabbitMqInputOpts {
QueueInputOpts::RabbitMQ(RabbitMqInputOpts {
uri: mq_uri.to_string(),
queue_name: queue_name.to_string(),
consumer_tag: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::Duration;
use redis::{AsyncCommands, Client};
use serde_json::json;
use svix_bridge_plugin_queue::{
config::{RedisInputOpts, SenderInputOpts},
config::{QueueInputOpts, RedisInputOpts},
sender_input::QueueSender,
};
use svix_bridge_types::{
Expand All @@ -26,7 +26,7 @@ fn get_test_plugin(
) -> QueueSender {
QueueSender::new(
"test".into(),
SenderInputOpts::Redis(RedisInputOpts {
QueueInputOpts::Redis(RedisInputOpts {
dsn: "redis://localhost/".to_owned(),
max_connections: 8,
reinsert_on_nack: false,
Expand Down
4 changes: 2 additions & 2 deletions bridge/svix-bridge-plugin-queue/tests/it/sqs_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::time::Duration;
use aws_sdk_sqs::Client;
use serde_json::json;
use svix_bridge_plugin_queue::{
config::{SenderInputOpts, SqsInputOpts},
config::{QueueInputOpts, SqsInputOpts},
sender_input::QueueSender,
};
use svix_bridge_types::{
Expand All @@ -35,7 +35,7 @@ fn get_test_plugin(
) -> QueueSender {
QueueSender::new(
"test".into(),
SenderInputOpts::SQS(SqsInputOpts {
QueueInputOpts::SQS(SqsInputOpts {
queue_dsn,
override_endpoint: true,
}),
Expand Down
10 changes: 2 additions & 8 deletions bridge/svix-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_yaml = "0.9"
svix-ksuid = "0.7.0"
svix-bridge-plugin-queue = { optional=true, path = "../svix-bridge-plugin-queue" }
svix-bridge-plugin-queue = { path = "../svix-bridge-plugin-queue" }
svix-bridge-types = { path = "../svix-bridge-types" }
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
Expand All @@ -41,11 +41,5 @@ chrono = "0.4"
tower = "0.4"

[features]
default = ["gcp-pubsub", "rabbitmq", "redis", "sqs", "jemalloc"]

gcp-pubsub = ["generic-queue"]
generic-queue = ["dep:svix-bridge-plugin-queue"]
rabbitmq = ["generic-queue"]
redis = ["generic-queue"]
sqs = ["generic-queue"]
default = ["jemalloc"]
jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"]
Loading