Skip to content

Commit

Permalink
Bridge: rebuild rabbitmq producer on error
Browse files Browse the repository at this point in the history
Previously there were two defects related to RabbitMQ connection
handling as a receiver output:

1. When unable to connect to rabbitmq on startup, bridge would exit
   immediately.
2. When forwarding to rabbitmq resulted in an error, the connection
   could be left in a bad state and not recover leaving all subsequent
   deliveries to the node to fail until bridge was restarted.

To address both of these issues, `QueueForwarder` now initializes
clients lazily, on demand, and will tear down the client when errors are
encountered.

When the client is discarded, the output will re-initialize another
client using the same connection parameters at the time of the next
delivery to that output.

For webhook receiver and poller inputs, the HTTP client POSTing the input
will see a 500 response in the case the publish to rabbit fails and can retry
at their convenience. Whenever they do, the rabbitmq producer should
either re-initialize (or fail trying). Rinse, repeat.
  • Loading branch information
svix-onelson committed Aug 2, 2024
1 parent 190870a commit 536450e
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 11 deletions.
62 changes: 51 additions & 11 deletions bridge/svix-bridge-plugin-queue/src/receiver_output/mod.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,47 @@
use std::sync::Arc;

use omniqueue::DynProducer;
use omniqueue::{DynProducer, QueueError};
use svix_bridge_types::{async_trait, BoxError, ForwardRequest, ReceiverOutput};
use tokio::sync::Mutex;

use crate::{config::QueueOutputOpts, error::Result};

#[derive(Clone)]
pub struct QueueForwarder {
name: String,
// FIXME: if we retain things like the queue name we can show this in the Debug impl
opts: QueueOutputOpts,
// FIXME: raw payloads not yet supported for receivers, but probably should be.
sender: Arc<DynProducer>,
// FIXME: `RwLock` might be better for throughput, but more likely to drive up complexity in
// `QueueForwarder::handle`. A better option might be to handle connection recovery inside
// omniqueue.
sender: Arc<Mutex<Option<DynProducer>>>,
}

impl QueueForwarder {
async fn build_sender(opts: &QueueOutputOpts) -> Result<DynProducer> {
Ok(match opts {
QueueOutputOpts::GCPPubSub(cfg) => crate::gcp_pubsub::producer(cfg).await?,
QueueOutputOpts::RabbitMQ(cfg) => crate::rabbitmq::producer(cfg).await?,
QueueOutputOpts::Redis(cfg) => crate::redis::producer(cfg).await?,
QueueOutputOpts::SQS(cfg) => crate::sqs::producer(cfg).await?,
})
}

pub async fn from_receiver_output_opts(
name: String,
opts: QueueOutputOpts,
) -> Result<QueueForwarder> {
let sender = match opts {
QueueOutputOpts::GCPPubSub(cfg) => crate::gcp_pubsub::producer(&cfg).await?,
QueueOutputOpts::RabbitMQ(cfg) => crate::rabbitmq::producer(&cfg).await?,
QueueOutputOpts::Redis(cfg) => crate::redis::producer(&cfg).await?,
QueueOutputOpts::SQS(cfg) => crate::sqs::producer(&cfg).await?,
};
Ok(QueueForwarder {
name,
sender: Arc::new(sender),
opts,
sender: Arc::new(Mutex::new(None)),
})
}
}

impl std::fmt::Debug for QueueForwarder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// TODO: see what fields from `opts` we can expose here, branched by the variant.
f.debug_struct("QueueForwarder").finish()
}
}
Expand All @@ -44,7 +53,38 @@ impl ReceiverOutput for QueueForwarder {
}

async fn handle(&self, request: ForwardRequest) -> Result<(), BoxError> {
self.sender.send_serde_json(&request.payload).await?;
let mut sender = self.sender.lock().await;
// `QueueForwarder` initializes its sender lazily, but also the sender can be discarded if
// it gets in a bad state.
// When None, rebuild the sender.
if sender.is_none() {
*sender = Some(Self::build_sender(&self.opts).await?);
}

let res = sender
.as_ref()
.expect("non-none sender")
.send_serde_json(&request.payload)
.await;

// Certain clients (like rabbitmq) require intervention when the connection to the remote
// is interrupted. Check for those failure cases and invalidate the sender so it gets
// rebuilt next time.
//
// `QueueError::Generic` boxes the inner error so either we'd need to have a direct dep on
// `lapin` so we can downcast to inspect specifics, or otherwise we'd need omniqueue to
// re-export the error types.
//
// In practice multiple error types can show up that all mean the same thing so to start,
// just recycle for any rabbitmq error.
if let (Err(QueueError::Generic(_err)), QueueOutputOpts::RabbitMQ(_cfg)) =
(&res, &self.opts)
{
let _ = sender.take();
}

res?;

Ok(())
}
}
1 change: 1 addition & 0 deletions bridge/svix-bridge-plugin-queue/tests/it/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod gcp_pubsub_consumer;
mod rabbitmq_consumer;
mod rabbitmq_receiver;
mod redis_stream_consumer;
mod sqs_consumer;
120 changes: 120 additions & 0 deletions bridge/svix-bridge-plugin-queue/tests/it/rabbitmq_receiver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
//! Requires a rabbitmq node to be running on localhost:5672 (the default port) and using the
//! default guest/guest credentials.
//! Try using the `testing-docker-compose.yml` in the repo root to get this going.
use std::time::Duration;

use lapin::{
options::QueueDeclareOptions, types::FieldTable, Channel, Connection, ConnectionProperties,
Queue,
};
use serde_json::json;
use svix_bridge_plugin_queue::config::{QueueForwarder, QueueOutputOpts, RabbitMqOutputOpts};
use svix_bridge_types::{ForwardRequest, ReceiverOutput};
use tokio::{
io::copy_bidirectional,
net::{TcpListener, TcpStream, ToSocketAddrs},
task::JoinHandle,
};

async fn declare_queue(name: &str, channel: &Channel) -> Queue {
channel
.queue_declare(
name,
QueueDeclareOptions {
auto_delete: true,
..Default::default()
},
FieldTable::default(),
)
.await
.unwrap()
}

async fn mq_connection(uri: &str) -> Connection {
let options = ConnectionProperties::default()
.with_connection_name("test".into())
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
Connection::connect(uri, options).await.unwrap()
}

const WAIT_MS: u64 = 200;

/// These tests assume a "vanilla" rabbitmq instance, using the default port, creds, exchange...
const MQ_URI: &str = "amqp://guest:guest@localhost:5672/%2f";

/// TCP proxy. Useful for giving us control over the connection to rabbit inside our tests.
async fn proxy(
listener: TcpListener,
server_addr: impl ToSocketAddrs + Clone + Sync + Send + 'static,
) -> Result<JoinHandle<()>, ()> {
let handle = tokio::task::spawn(async move {
while let Ok((mut inbound, _)) = listener.accept().await {
let mut outbound = TcpStream::connect(server_addr.clone()).await.unwrap();
if let Err(e) = copy_bidirectional(&mut inbound, &mut outbound).await {
eprintln!("Failed to transfer; error={}", e);
}
}
});
Ok(handle)
}

#[tokio::test]
async fn test_connection_recovery() {
let mq_conn = mq_connection(MQ_URI).await;
let channel = mq_conn.create_channel().await.unwrap();
// setup the queue before running the consumer or the consumer will error out
let queue = declare_queue("", &channel).await;
let queue_name = queue.name().as_str();

let proxy_listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap();
let port = proxy_listener.local_addr().unwrap().port();
// Start the proxy
let proxy_handle = proxy(proxy_listener, "127.0.0.0:5672").await.unwrap();

// Configure the receiver output to connect to the proxy so we can interrupt the connection as needed.
let proxied_mq_uri = format!("amqp://guest:guest@localhost:{port}/%2f");

let opts = QueueOutputOpts::RabbitMQ(RabbitMqOutputOpts {
uri: proxied_mq_uri.clone(),
exchange: "".to_string(),
routing_key: queue_name.to_string(),
publish_options: Default::default(),
publish_properties: Default::default(),
});

let output = QueueForwarder::from_receiver_output_opts(String::from("test"), opts)
.await
.unwrap();

let req = ForwardRequest {
payload: json!({"test": true}),
};

assert!(
output.handle(req.clone()).await.is_ok(),
"expected ok when rabbit available"
);

// Disconnect the proxy
proxy_handle.abort();
// Sleep a beat to give time for the proxy tear down.
tokio::time::sleep(Duration::from_millis(WAIT_MS)).await;

assert!(
output.handle(req.clone()).await.is_err(),
"expected err when rabbit unavailable"
);

// Reconnect the proxy on the same port
let proxy_listener = TcpListener::bind(("127.0.0.1", port)).await.unwrap();
let proxy_handle = proxy(proxy_listener, "127.0.0.0:5672").await.unwrap();

assert!(
output.handle(req.clone()).await.is_ok(),
"expected ok when rabbit available"
);

proxy_handle.abort();
}

0 comments on commit 536450e

Please sign in to comment.