Skip to content

Commit

Permalink
fix: limit subscription watcher count (#392)
Browse files Browse the repository at this point in the history
* fix: limit subscription watcher count

* fix: remove unnecessary struct
  • Loading branch information
chris13524 authored Mar 6, 2024
1 parent 5d42c7d commit 0e674b6
Show file tree
Hide file tree
Showing 6 changed files with 361 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX subscription_watcher_address ON subscription_watcher (get_address_lower(account));
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub enum NotifyServerError {
AccountNotAuthorized,

#[error("sqlx error: {0}")]
SqlxError(#[from] sqlx::error::Error),
Sqlx(#[from] sqlx::error::Error),

#[error("sqlx migration error: {0}")]
SqlxMigrationError(#[from] sqlx::migrate::MigrateError),
Expand Down
79 changes: 53 additions & 26 deletions src/model/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ use {
x25519_dalek::StaticSecret,
};

// Import not part of group above because it breaks formatting: https://github.com/rust-lang/rustfmt/issues/4746
use crate::services::public_http_server::handlers::relay_webhook::handlers::notify_watch_subscriptions::SUBSCRIPTION_WATCHER_LIMIT;

#[derive(Debug, FromRow)]
pub struct ProjectWithPublicKeys {
pub authentication_public_key: String,
Expand Down Expand Up @@ -685,6 +688,15 @@ pub async fn get_subscriptions_by_account_and_maybe_app(
result
}

#[derive(Debug, thiserror::Error)]
pub enum UpsertSubscriptionWatcherError {
#[error("Subscription watcher limit reached")]
LimitReached,

#[error("SQL error: {0}")]
Sqlx(#[from] sqlx::error::Error),
}

#[instrument(skip(postgres, metrics))]
pub async fn upsert_subscription_watcher(
account: AccountId,
Expand All @@ -694,33 +706,48 @@ pub async fn upsert_subscription_watcher(
expiry: DateTime<Utc>,
postgres: &PgPool,
metrics: Option<&Metrics>,
) -> Result<(), sqlx::error::Error> {
) -> Result<(), UpsertSubscriptionWatcherError> {
let query = "
INSERT INTO subscription_watcher (
account,
project,
did_key,
sym_key,
expiry
)
SELECT $1, $2, $3, $4, $5 WHERE (
SELECT COUNT(*)
FROM subscription_watcher
WHERE get_address_lower(account)=get_address_lower($1)
AND project=$2
) < $6
ON CONFLICT (did_key) DO UPDATE SET
updated_at=now(),
account=$1,
project=$2,
sym_key=$4,
expiry=$5
RETURNING *
";
let start = Instant::now();
let _ = sqlx::query::<Postgres>(
"
INSERT INTO subscription_watcher (
account,
project,
did_key,
sym_key,
expiry
)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (did_key) DO UPDATE SET
updated_at=now(),
account=$1,
project=$2,
sym_key=$4,
expiry=$5
",
)
.bind(account.as_ref())
.bind(project)
.bind(did_key)
.bind(sym_key)
.bind(expiry)
.execute(postgres)
.await?;
let mut txn = postgres.begin().await?;
// https://stackoverflow.com/a/48730873
sqlx::query::<Postgres>("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") // TODO serialization errors not handled
.execute(&mut *txn)
.await?;
let result = sqlx::query_as::<Postgres, ()>(query)
.bind(account.as_ref())
.bind(project)
.bind(did_key)
.bind(sym_key)
.bind(expiry)
.bind(SUBSCRIPTION_WATCHER_LIMIT)
.fetch_optional(&mut *txn)
.await?;
if result.is_none() {
return Err(UpsertSubscriptionWatcherError::LimitReached);
}
txn.commit().await?;
if let Some(metrics) = metrics {
metrics.postgres_query("upsert_subscription_watcher", start);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub enum RelayMessageClientError {
#[error("Received 4010 on wrong topic: {0}")]
WrongNotifyWatchSubscriptionsTopic(Topic),

#[error("Subscription watcher limit reached")]
SubscriptionWatcherLimitReached,

#[error("Received 4008 on unrecognized topic: {0}")]
WrongNotifyUpdateTopic(Topic),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use {
helpers::{
get_project_by_app_domain, get_subscription_watchers_for_account_by_app_or_all_app,
get_subscriptions_by_account_and_maybe_app, upsert_subscription_watcher,
SubscriberWithProject, SubscriptionWatcherQuery,
SubscriberWithProject, SubscriptionWatcherQuery, UpsertSubscriptionWatcherError,
},
types::AccountId,
},
Expand Down Expand Up @@ -46,6 +46,8 @@ use {
x25519_dalek::PublicKey,
};

pub const SUBSCRIPTION_WATCHER_LIMIT: i32 = 25;

#[instrument(name = "wc_notifyWatchSubscriptions", skip_all)]
pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), RelayMessageError> {
if msg.topic != state.notify_keys.key_agreement_topic {
Expand Down Expand Up @@ -164,7 +166,14 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
state.metrics.as_ref(),
)
.await
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error?
.map_err(|e| match e {
UpsertSubscriptionWatcherError::LimitReached => {
RelayMessageError::Client(RelayMessageClientError::SubscriptionWatcherLimitReached)
}
UpsertSubscriptionWatcherError::Sqlx(e) => RelayMessageError::Server(
RelayMessageServerError::NotifyServerError(NotifyServerError::Sqlx(e)),
),
})?;

{
let now = Utc::now();
Expand Down
Loading

0 comments on commit 0e674b6

Please sign in to comment.