Skip to content

Commit

Permalink
Bridge: add /events poller (#1363)
Browse files Browse the repository at this point in the history
Helps with svix/monorepo-private#8654

Depends on client lib updates (not yet available).

I'd call this "quick and dirty" but it was actually a bit of a hefty
lift to pull together. It is however fairly dirty.

The want is to be able to pull messages from `/events` and feed them
into a receiver output.
The flow we'd see with `SenderInput` paired with `ReceiverOutput` is
what we want, but the existing typing didn't allow for this.

Refactors were needed to:
- lift out the parts from the HTTP server that run payloads through
transformations then forward to an output.
- update config to account for a new type of receiver: a "poller".
- new traits/types for the poller to differentiate it from the standard
webhook receiver (to ensure we don't accidentally pass one through and
try to bind it to a URL).
- Lots of "connective tissue" in the form of converters between config
values and concrete ones that can actually do things.

Some of the "connective tissue" exists purely to mimic bits and pieces
that existed for either the other receivers or senders (remember, this
case is odd in that it's similar to both).

Refactorings aside, the poller itself boasts an exponential backoff for
both error cases (either from `/events` or from the output) as well as
for the case where the `/events` iterator is "done."

This diff comes with a promise that we will (soon) give these additions
another look at clean up the stuff that doesn't make sense or feels
redundant.
  • Loading branch information
svix-onelson authored Jul 11, 2024
2 parents 286aacb + 5536e9a commit 269153d
Show file tree
Hide file tree
Showing 8 changed files with 509 additions and 62 deletions.
107 changes: 80 additions & 27 deletions bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bridge/svix-bridge-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ async-trait = "0.1"
tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
svix = "1.17.0"
svix = { version = "1.25.0", features = ["svix_beta"] }
26 changes: 23 additions & 3 deletions bridge/svix-bridge-types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

pub use async_trait::async_trait;
use serde::{Deserialize, Serialize};
pub use svix;
Expand Down Expand Up @@ -140,6 +142,13 @@ pub trait SenderInput: Send {
async fn run(&self);
}

#[async_trait]
pub trait PollerInput: Send {
fn name(&self) -> &str;
fn set_transformer(&mut self, _tx: Option<TransformerTx>) {}
async fn run(&self);
}

pub type BoxError = Box<dyn std::error::Error + Send + Sync>;

/// Represents something we can hand a webhook payload to.
Expand Down Expand Up @@ -186,16 +195,27 @@ impl ReceiverInputOpts {
}

// N.b. the codegen types we get from openapi don't impl Deserialize so we need our own version.
#[derive(Debug, Default, Deserialize)]
#[derive(Clone, Debug, Default, Deserialize)]
pub struct SvixOptions {
#[serde(default)]
pub debug: bool,
pub server_url: Option<String>,
pub timeout_secs: Option<u64>,
}

impl From<SvixOptions> for _SvixOptions {
fn from(SvixOptions { debug, server_url }: SvixOptions) -> Self {
_SvixOptions { debug, server_url }
fn from(
SvixOptions {
debug,
server_url,
timeout_secs,
}: SvixOptions,
) -> Self {
_SvixOptions {
debug,
server_url,
timeout: timeout_secs.map(Duration::from_secs),
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions bridge/svix-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ publish = false

[dependencies]
anyhow = "1"
base64 = "0.13.1"
clap = { version = "4.2.4", features = ["env", "derive"] }
axum = { version = "0.6", features = ["macros"] }
enum_dispatch = "0.3"
itertools = "0.12.1"
http = "0.2"
once_cell = "1.18.0"
opentelemetry = "0.22.0"
Expand Down
Loading

0 comments on commit 269153d

Please sign in to comment.