Skip to content

Commit

Permalink
Improve RemoteSequencer tracing and logging
Browse files Browse the repository at this point in the history
Also avoid sending over the open connection if the
inflight commits channel is closed
  • Loading branch information
muhamadazmy committed Nov 13, 2024
1 parent 0f579f6 commit ced7de2
Showing 1 changed file with 30 additions and 7 deletions.
37 changes: 30 additions & 7 deletions crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::{
};

use tokio::sync::{mpsc, Mutex, OwnedSemaphorePermit, Semaphore};
use tracing::{debug, instrument, Span};

use restate_core::{
network::{
Expand All @@ -33,7 +34,6 @@ use restate_types::{
replicated_loglet::ReplicatedLogletParams,
GenerationalNodeId,
};
use tracing::instrument;

use super::rpc_routers::SequencersRpc;
use crate::loglet::{
Expand Down Expand Up @@ -107,7 +107,7 @@ where
}

#[instrument(
level="trace",
level="debug",
skip_all,
fields(
otel.name = "replicated_loglet::remote_sequencer: append",
Expand Down Expand Up @@ -148,16 +148,14 @@ where
Ok(token) => break token,
Err(err) => {
match err.source {
NetworkError::ConnectError(_)
| NetworkError::ConnectionClosed(_)
| NetworkError::Timeout(_) => {
// we retry to re-connect one time
err @ NetworkError::Full => return Err(err.into()),
_ => {
// we retry on any other network error
connection = self.renew_connection(connection).await?;

msg = err.original;
continue;
}
err => return Err(err.into()),
}
}
};
Expand All @@ -170,6 +168,7 @@ where
}

/// Gets or starts a new remote sequencer connection
#[instrument(level = "debug", skip_all)]
async fn get_connection(&self) -> Result<RemoteSequencerConnection, NetworkError> {
let mut guard = self.connection.lock().await;
if let Some(connection) = guard.deref() {
Expand All @@ -190,6 +189,7 @@ where

/// Renew a connection to a remote sequencer. This guarantees that only a single connection
/// to the sequencer is available.
#[instrument(level = "debug", skip_all, fields(renewed = false))]
async fn renew_connection(
&self,
old: RemoteSequencerConnection,
Expand All @@ -207,6 +207,8 @@ where
.node_connection(self.params.sequencer)
.await?;

Span::current().record("renewed", true);

let connection =
RemoteSequencerConnection::start(self.known_global_tail.clone(), connection)?;

Expand All @@ -232,6 +234,7 @@ struct RemoteSequencerConnection {
}

impl RemoteSequencerConnection {
#[instrument(level = "debug", name = "connection_start", skip_all)]
fn start(
known_global_tail: TailOffsetWatch,
connection: WeakConnection,
Expand Down Expand Up @@ -260,6 +263,16 @@ impl RemoteSequencerConnection {
sequencer: GenerationalNodeId,
msg: Append,
) -> Result<RpcToken<Appended>, NetworkSendError<Append>> {
// there are other reasons that can render this connection unusable
// even if the underlying connection is still valid.
// if the channel is closed, this connection cannot be used anymore
if self.tx.is_closed() {
return Err(NetworkSendError::new(
msg,
NetworkError::Unavailable("Inflight commits channel is closed".into()),
));
}

let outgoing = Outgoing::new(sequencer, msg).assign_connection(self.inner.clone());

rpc_router
Expand All @@ -283,6 +296,7 @@ impl RemoteSequencerConnection {
if let Err(err) = self.tx.send(inflight_append) {
// if we failed to push this to be processed by the connection reactor task
// then we need to notify the caller
debug!("Inflight channel closed. Resolve commit as connection closed");
err.0
.commit_resolver
.error(AppendError::retryable(NetworkError::ConnectionClosed(
Expand All @@ -296,6 +310,7 @@ impl RemoteSequencerConnection {
/// This task will run until the [`AppendStream`] is dropped. Once dropped
/// all pending commits will be resolved with an error. it's up to the enqueuer
/// to retry if needed.
#[instrument(level = "debug", skip_all)]
async fn handle_appended_responses(
known_global_tail: TailOffsetWatch,
connection: WeakConnection,
Expand All @@ -315,6 +330,7 @@ impl RemoteSequencerConnection {
inflight
}
_ = &mut closed => {
debug!("Sequencer Connection closed while waiting for next inflight append");
break AppendError::retryable(NetworkError::ConnectionClosed(connection.peer()));
}
};
Expand All @@ -335,6 +351,7 @@ impl RemoteSequencerConnection {
incoming.map_err(AppendError::Shutdown)
},
_ = &mut closed => {
debug!("Sequencer Connection closed while waiting for response");
Err(AppendError::retryable(NetworkError::ConnectionClosed(connection.peer())))
}
};
Expand Down Expand Up @@ -375,6 +392,7 @@ impl RemoteSequencerConnection {
// While the UnknownLoglet status is non-terminal for the connection
// (since only one request is bad),
// the AppendError for the caller is terminal
debug!(error=%err, "Resolve commit with error");
commit_resolver.error(AppendError::other(err));
}
}
Expand All @@ -394,10 +412,15 @@ impl RemoteSequencerConnection {
//
// For now this should not be a problem since they can (possibly) retry
// to do the write again later.
debug!(cause=%err, "Draining inflight channel");
let mut count = 0;
while let Some(inflight) = rx.recv().await {
inflight.commit_resolver.error(err.clone());
count += 1;
}

debug!("Drained/Cancelled {count} inflight commits");

Ok(())
}
}
Expand Down

0 comments on commit ced7de2

Please sign in to comment.