Skip to content

Commit

Permalink
fix: use blocking subscribe everywhere (#296)
Browse files Browse the repository at this point in the history
* fix: use blocking relay subscribe everywhere

* chore: add Grafana metrics

* chore: increase timeouts

* chore: log expected tag and topic

* chore: switch to prod relay

* fix: replication lag

* chore: reduce replication lag sleep for now

* chore: increase message delivery timeout

* fix: update tokio to fix PoolTimedOut errors: launchbadge/sqlx#2881 (comment)

* chore: remove test parallel

* fix: use relay HTTP client for tests (#316)

* fix: use relay HTTP client for tests

* fix: run CI when tests are changed
  • Loading branch information
chris13524 authored Jan 23, 2024
1 parent b654f98 commit e762a83
Show file tree
Hide file tree
Showing 17 changed files with 715 additions and 643 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/event_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ jobs:
- uses: actions/checkout@v3
- uses: WalletConnect/actions/github/paths-filter/@2.2.1
id: filter
with:
path-app: . # run CI when tests are changed
outputs:
infra: ${{ steps.filter.outputs.infra }}
app: ${{ steps.filter.outputs.app }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/sub-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,4 @@ jobs:
uses: WalletConnect/actions-rs/[email protected]
with:
command: test
args: --test integration
args: --test integration -- --test-threads=1
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ test-all:

test-integration:
@echo '==> Testing integration'
RUST_BACKTRACE=1 ANSI_LOGS=true cargo test --test integration -- {{test}}
RUST_BACKTRACE=1 ANSI_LOGS=true cargo test --test integration -- {{test}} --test-threads=1

# Clean build artifacts
clean:
Expand Down
52 changes: 52 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub struct Metrics {
relay_outgoing_message_failures: Counter<u64>,
relay_outgoing_message_latency: Histogram<u64>,
relay_outgoing_message_publish_latency: Histogram<u64>,
relay_subscribes: Counter<u64>,
relay_subscribe_failures: Counter<u64>,
relay_subscribe_latency: Histogram<u64>,
relay_subscribe_request_latency: Histogram<u64>,
postgres_queries: Counter<u64>,
postgres_query_latency: Histogram<u64>,
keys_server_requests: Counter<u64>,
Expand Down Expand Up @@ -112,6 +116,26 @@ impl Metrics {
.with_description("The latency publishing relay messages")
.init();

let relay_subscribes: Counter<u64> = meter
.u64_counter("relay_subscribes")
.with_description("The number of subscribes to relay topics (not including retries)")
.init();

let relay_subscribe_failures: Counter<u64> = meter
.u64_counter("relay_subscribe_failures")
.with_description("The number of failures to subscribe to relay topics")
.init();

let relay_subscribe_latency: Histogram<u64> = meter
.u64_histogram("relay_subscribe_latency")
.with_description("The latency subscribing to relay topics w/ built-in retry")
.init();

let relay_subscribe_request_latency: Histogram<u64> = meter
.u64_histogram("relay_subscribe_request_latency")
.with_description("The latency subscribing to relay topics")
.init();

let postgres_queries: Counter<u64> = meter
.u64_counter("postgres_queries")
.with_description("The number of Postgres queries executed")
Expand Down Expand Up @@ -194,6 +218,10 @@ impl Metrics {
relay_outgoing_message_failures,
relay_outgoing_message_latency,
relay_outgoing_message_publish_latency,
relay_subscribes,
relay_subscribe_failures,
relay_subscribe_latency,
relay_subscribe_request_latency,
postgres_queries,
postgres_query_latency,
keys_server_requests,
Expand Down Expand Up @@ -286,6 +314,30 @@ impl Metrics {
);
}

pub fn relay_subscribe(&self, success: bool, start: Instant) {
let elapsed = start.elapsed();

let ctx = Context::current();
let attributes = [KeyValue::new("success", success.to_string())];
self.relay_subscribes.add(&ctx, 1, &attributes);
self.relay_subscribe_latency
.record(&ctx, elapsed.as_millis() as u64, &attributes);
}

pub fn relay_subscribe_failure(&self, is_permenant: bool) {
let ctx = Context::current();
let attributes = [KeyValue::new("is_permenant", is_permenant.to_string())];
self.relay_subscribe_failures.add(&ctx, 1, &attributes);
}

pub fn relay_subscribe_request(&self, start: Instant) {
let elapsed = start.elapsed();

let ctx = Context::current();
self.relay_subscribe_request_latency
.record(&ctx, elapsed.as_millis() as u64, &[]);
}

pub fn postgres_query(&self, query_name: &'static str, start: Instant) {
let elapsed = start.elapsed();

Expand Down
57 changes: 56 additions & 1 deletion src/publish_relay_message.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use {
crate::metrics::Metrics,
relay_client::{error::Error, http::Client},
relay_rpc::rpc::{msg_id::MsgId, Publish},
relay_rpc::{
domain::Topic,
rpc::{msg_id::MsgId, Publish},
},
std::time::{Duration, Instant},
tokio::time::sleep,
tracing::{error, instrument, warn},
Expand Down Expand Up @@ -79,3 +82,55 @@ pub async fn publish_relay_message(
}
Ok(())
}

#[instrument(skip_all)]
pub async fn subscribe_relay_topic(
relay_ws_client: &relay_client::websocket::Client,
topic: &Topic,
metrics: Option<&Metrics>,
) -> Result<(), Error> {
let start = Instant::now();

let client_publish_call = || async {
let start = Instant::now();
let result = relay_ws_client.subscribe_blocking(topic.clone()).await;
if let Some(metrics) = metrics {
metrics.relay_subscribe_request(start);
}
result
};

let mut tries = 0;
while let Err(e) = client_publish_call().await {
tries += 1;
let is_permenant = tries >= 10;
if let Some(metrics) = metrics {
metrics.relay_subscribe_failure(is_permenant);
}

if is_permenant {
error!("Permenant error subscribing to topic {topic}, took {tries} tries: {e:?}");

if let Some(metrics) = metrics {
// TODO make DRY with end-of-function call
metrics.relay_subscribe(false, start);
}
return Err(e);
}

let retry_in = Duration::from_secs(1);
warn!(
"Temporary error subscribing to topic {topic}, retrying attempt {tries} in {retry_in:?}: {e:?}"
);
sleep(retry_in).await;
}

if let Some(metrics) = metrics {
metrics.relay_subscribe(true, start);
}

// Sleep to account for some replication lag. Without this, the subscription may not be active on all nodes
sleep(Duration::from_millis(250)).await;

Ok(())
}
3 changes: 2 additions & 1 deletion src/services/public_http_server/handlers/subscribe_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use {
crate::{
error::NotifyServerError,
model::helpers::upsert_project,
publish_relay_message::subscribe_relay_topic,
rate_limit::{self, Clock, RateLimitError},
registry::{extractor::AuthedProjectId, storage::redis::Redis},
state::AppState,
Expand Down Expand Up @@ -101,7 +102,7 @@ pub async fn handler(
// Don't call subscribe if we are already subscribed in a previous request
if project.topic == topic.as_ref() {
info!("Subscribing to project topic: {topic}");
state.relay_ws_client.subscribe(topic).await?;
subscribe_relay_topic(&state.relay_ws_client, &topic, state.metrics.as_ref()).await?;
}

Ok(Json(SubscribeTopicResponseBody {
Expand Down
14 changes: 8 additions & 6 deletions src/services/websocket_server/handlers/notify_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
},
error::NotifyServerError,
model::helpers::{get_project_by_topic, get_welcome_notification, upsert_subscriber},
publish_relay_message::publish_relay_message,
publish_relay_message::{publish_relay_message, subscribe_relay_topic},
rate_limit::{self, Clock, RateLimitError},
registry::storage::redis::Redis,
services::{
Expand Down Expand Up @@ -195,11 +195,13 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<(), Relay
.map_err(RelayMessageServerError::NotifyServerError)?; // TODO change to client error?

info!("Timing: Subscribing to notify_topic: {notify_topic}");
state
.relay_ws_client
.subscribe(notify_topic.clone())
.await
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error?
subscribe_relay_topic(
&state.relay_ws_client,
&notify_topic,
state.metrics.as_ref(),
)
.await
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?;
info!("Timing: Finished subscribing to topic");

info!("Timing: Recording SubscriberUpdateParams");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ pub async fn prepare_subscription_watchers(
Ok((source_subscriptions, watchers_with_subscriptions))
}

#[instrument(skip_all)]
pub async fn send_to_subscription_watchers(
watchers_with_subscriptions: Vec<(SubscriptionWatcherQuery, Vec<NotifyServerSubscription>)>,
authentication_secret: &ed25519_dalek::SigningKey,
Expand Down
4 changes: 4 additions & 0 deletions terraform/monitoring/dashboard.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ dashboard.new(
panels.app.registry_request_rate(ds, vars) {gridPos: pos._6 },
panels.app.registry_request_latency(ds, vars) {gridPos: pos._6 },

panels.app.relay_subscribe_rate(ds, vars) {gridPos: pos._6 },
panels.app.relay_subscribe_latency(ds, vars) {gridPos: pos._6 },
panels.app.relay_subscribe_failures(ds, vars) {gridPos: pos._6 },

row.new('Application publisher subservice'),
panels.app.publishing_workers_count(ds, vars) {gridPos: pos._5 },
panels.app.publishing_workers_errors(ds, vars) {gridPos: pos._5 },
Expand Down
50 changes: 50 additions & 0 deletions terraform/monitoring/panels/app/relay_subscribe_failures.libsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
local grafana = import '../../grafonnet-lib/grafana.libsonnet';
local defaults = import '../../grafonnet-lib/defaults.libsonnet';

local panels = grafana.panels;
local targets = grafana.targets;

{
new(ds, vars)::
panels.timeseries(
title = 'Relay Subscribe Errors',
datasource = ds.prometheus,
)
.configure(defaults.configuration.timeseries)

.setAlert(vars.environment, grafana.alert.new(
namespace = vars.namespace,
name = '%(env)s - Failed to subscribe to relay topic' % { env: vars.environment },
message = '%(env)s - Failed to subscribe to relay topic' % { env: vars.environment },
notifications = vars.notifications,
noDataState = 'no_data',
period = '0m',
conditions = [
grafana.alertCondition.new(
evaluatorParams = [ 0 ],
evaluatorType = 'gt',
operatorType = 'or',
queryRefId = 'RelaySubscribesPermenantFailures',
queryTimeStart = '5m',
queryTimeEnd = 'now',
reducerType = grafana.alert_reducers.Avg
),
],
))

.addTarget(targets.prometheus(
datasource = ds.prometheus,
expr = 'sum by (aws_ecs_task_revision) (increase(relay_subscribe_failures_total{is_permenant="true"}[$__rate_interval]))',
legendFormat = 'Permenant r{{aws_ecs_task_revision}}',
exemplar = true,
refId = 'RelaySubscribePermenantFailures',
))

.addTarget(targets.prometheus(
datasource = ds.prometheus,
expr = 'sum by (aws_ecs_task_revision) (increase(relay_subscribe_failures_total{is_permenant="false"}[$__rate_interval]))',
legendFormat = 'Temporary r{{aws_ecs_task_revision}}',
exemplar = true,
refId = 'RelaySubscribeTemporaryFailures',
))
}
33 changes: 33 additions & 0 deletions terraform/monitoring/panels/app/relay_subscribe_latency.libsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
local grafana = import '../../grafonnet-lib/grafana.libsonnet';
local defaults = import '../../grafonnet-lib/defaults.libsonnet';

local panels = grafana.panels;
local targets = grafana.targets;

{
new(ds, vars)::
panels.timeseries(
title = 'Relay Subscribe Latency',
datasource = ds.prometheus,
)
.configure(
defaults.configuration.timeseries
.withUnit('ms')
)

.addTarget(targets.prometheus(
datasource = ds.prometheus,
expr = 'sum by (aws_ecs_task_revision) (rate(relay_subscribe_latency_sum[$__rate_interval])) / sum by (aws_ecs_task_revision) (rate(relay_subscribe_latency_count[$__rate_interval]))',
legendFormat = 'Publish w/ retries r{{aws_ecs_task_revision}}',
exemplar = false,
refId = 'RelaySubscribeLatency',
))

.addTarget(targets.prometheus(
datasource = ds.prometheus,
expr = 'sum by (aws_ecs_task_revision) (rate(relay_subscribe_request_latency_sum[$__rate_interval])) / sum by (aws_ecs_task_revision) (rate(relay_subscribe_request_latency_count[$__rate_interval]))',
legendFormat = 'Individual RPC r{{aws_ecs_task_revision}}',
exemplar = false,
refId = 'RelaySubscribeRequestLatency',
))
}
25 changes: 25 additions & 0 deletions terraform/monitoring/panels/app/relay_subscribe_rate.libsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
local grafana = import '../../grafonnet-lib/grafana.libsonnet';
local defaults = import '../../grafonnet-lib/defaults.libsonnet';

local panels = grafana.panels;
local targets = grafana.targets;

{
new(ds, vars)::
panels.timeseries(
title = 'Relay Subscribe Rate',
datasource = ds.prometheus,
)
.configure(
defaults.configuration.timeseries
.withUnit('cps')
)

.addTarget(targets.prometheus(
datasource = ds.prometheus,
expr = 'sum by (aws_ecs_task_revision, tag) (rate(relay_subscribes_total[$__rate_interval]))',
legendFormat = '{{tag}} r{{aws_ecs_task_revision}}',
exemplar = true,
refId = 'RelaySubscribesRate',
))
}
3 changes: 3 additions & 0 deletions terraform/monitoring/panels/panels.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ local docdb_mem_threshold = units.size_bin(GiB = docdb_mem * 0.1);
relay_outgoing_message_rate: (import 'app/relay_outgoing_message_rate.libsonnet' ).new,
relay_outgoing_message_latency: (import 'app/relay_outgoing_message_latency.libsonnet' ).new,
relay_outgoing_message_failures: (import 'app/relay_outgoing_message_failures.libsonnet' ).new,
relay_subscribe_rate: (import 'app/relay_subscribe_rate.libsonnet' ).new,
relay_subscribe_latency: (import 'app/relay_subscribe_latency.libsonnet' ).new,
relay_subscribe_failures: (import 'app/relay_subscribe_failures.libsonnet' ).new,
postgres_query_rate: (import 'app/postgres_query_rate.libsonnet' ).new,
postgres_query_latency: (import 'app/postgres_query_latency.libsonnet' ).new,
keys_server_request_rate: (import 'app/keys_server_request_rate.libsonnet' ).new,
Expand Down
Loading

0 comments on commit e762a83

Please sign in to comment.