Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional tracing instrumentation. #97

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions src/cmd/get.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{Connection, Db, Frame, Parse};
use crate::{Connection, Db, Frame, Parse, ParseError};

use bytes::Bytes;
use tracing::{debug, instrument};
Expand Down Expand Up @@ -47,7 +47,8 @@ impl Get {
/// ```text
/// GET key
/// ```
pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Get> {
#[instrument(level = "trace", name = "Get::parse_frames", skip(parse))]
pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Self, ParseError> {
// The `GET` string has already been consumed. The next value is the
// name of the key to get. If the next value is not a string or the
// input is fully consumed, then an error is returned.
Expand All @@ -60,7 +61,14 @@ impl Get {
///
/// The response is written to `dst`. This is called by the server in order
/// to execute a received command.
#[instrument(skip(self, db, dst))]
#[instrument(
level = "trace",
name = "Get::apply",
skip(self, db, dst),
fields(
key = self.key.as_str(),
),
)]
pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> {
// Get the value from the shared database state
let response = if let Some(value) = db.get(&self.key) {
Expand Down
25 changes: 25 additions & 0 deletions src/cmd/invalid.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use crate::{Connection, Frame, ParseError};

use tracing::instrument;

/// Represents a malformed frame. This is not a real `Redis` command.
#[derive(Debug)]
pub struct Invalid {
error: ParseError,
}

impl Invalid {
/// Create a new `Invalid` command which responds to frames that could not
/// be successfully parsed as commands.
pub(crate) fn new(error: ParseError) -> Self {
Self { error }
}

/// Responds to the client, indicating the command could not be parsed.
#[instrument(level = "trace", name = "ParseError::apply", skip(dst))]
pub(crate) async fn apply(self, dst: &mut Connection) -> crate::Result<()> {
let response = Frame::Error(self.error.to_string());
dst.write_frame(&response).await?;
Ok(())
}
}
34 changes: 30 additions & 4 deletions src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ pub use ping::Ping;
mod unknown;
pub use unknown::Unknown;

mod invalid;
pub use invalid::Invalid;

use crate::{Connection, Db, Frame, Parse, ParseError, Shutdown};
use tracing::instrument;

/// Enumeration of supported Redis commands.
///
Expand All @@ -30,6 +34,7 @@ pub enum Command {
Unsubscribe(Unsubscribe),
Ping(Ping),
Unknown(Unknown),
Invalid(Invalid),
}

impl Command {
Expand All @@ -41,7 +46,13 @@ impl Command {
/// # Returns
///
/// On success, the command value is returned, otherwise, `Err` is returned.
pub fn from_frame(frame: Frame) -> crate::Result<Command> {
///
/// # Traces
///
/// Generates a TRACE-level span named `Command::from_frame` that includes
/// the `Debug`-representation of `frame` as a field.
#[instrument(level = "trace", name = "Command::from_frame")]
pub fn from_frame(frame: Frame) -> Result<Command, ParseError> {
// The frame value is decorated with `Parse`. `Parse` provides a
// "cursor" like API which makes parsing the command easier.
//
Expand All @@ -51,8 +62,10 @@ impl Command {

// All redis commands begin with the command name as a string. The name
// is read and converted to lower cases in order to do case sensitive
// matching.
let command_name = parse.next_string()?.to_lowercase();
// matching. Doing this in-place with `str::make_ascii_lowercase` is
// substantially more performant than `str::to_lowercase`.
let mut command_name = parse.next_string()?;
command_name.make_ascii_lowercase();

// Match the command name, delegating the rest of the parsing to the
// specific command.
Expand Down Expand Up @@ -83,10 +96,21 @@ impl Command {
Ok(command)
}

/// Construct an `Invalid` response command from a `ParseError`.
pub(crate) fn from_error(err: ParseError) -> Command {
Command::Invalid(invalid::Invalid::new(err))
}

/// Apply the command to the specified `Db` instance.
///
/// The response is written to `dst`. This is called by the server in order
/// to execute a received command.
///
/// # Traces
///
/// Generates a `TRACE`-level span that includes the `Debug`-serializaiton
/// of `self` (the `Command` being applied) as a field.
#[instrument(level = "trace", name = "Command::apply", skip(db, dst, shutdown))]
pub(crate) async fn apply(
self,
db: &Db,
Expand All @@ -102,9 +126,10 @@ impl Command {
Subscribe(cmd) => cmd.apply(db, dst, shutdown).await,
Ping(cmd) => cmd.apply(dst).await,
Unknown(cmd) => cmd.apply(dst).await,
Invalid(cmd) => cmd.apply(dst).await,
// `Unsubscribe` cannot be applied. It may only be received from the
// context of a `Subscribe` command.
Unsubscribe(_) => Err("`Unsubscribe` is unsupported in this context".into()),
Unsubscribe(_) => Result::Err("`Unsubscribe` is unsupported in this context".into()),
}
}

Expand All @@ -118,6 +143,7 @@ impl Command {
Command::Unsubscribe(_) => "unsubscribe",
Command::Ping(_) => "ping",
Command::Unknown(cmd) => cmd.get_name(),
Command::Invalid(_) => "err",
}
}
}
11 changes: 9 additions & 2 deletions src/cmd/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ impl Ping {
/// ```text
/// PING [message]
/// ```
pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Ping> {
#[instrument(level = "trace", name = "Ping::parse_frames", skip(parse))]
pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Self, ParseError> {
match parse.next_string() {
Ok(msg) => Ok(Ping::new(Some(msg))),
Err(ParseError::EndOfStream) => Ok(Ping::default()),
Expand All @@ -51,7 +52,13 @@ impl Ping {
///
/// The response is written to `dst`. This is called by the server in order
/// to execute a received command.
#[instrument(skip(self, dst))]
#[instrument(
name = "Ping::apply",
skip(self, dst),
fields(
msg = ?self.msg,
),
)]
pub(crate) async fn apply(self, dst: &mut Connection) -> crate::Result<()> {
let response = match self.msg {
None => Frame::Simple("PONG".to_string()),
Expand Down
14 changes: 12 additions & 2 deletions src/cmd/publish.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{Connection, Db, Frame, Parse};
use crate::{Connection, Db, Frame, Parse, ParseError};

use bytes::Bytes;
use tracing::instrument;

/// Posts a message to the given channel.
///
Expand Down Expand Up @@ -47,7 +48,8 @@ impl Publish {
/// ```text
/// PUBLISH channel message
/// ```
pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Publish> {
#[instrument(level = "trace", name = "Publish::parse_frames", skip(parse))]
pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Self, ParseError> {
// The `PUBLISH` string has already been consumed. Extract the `channel`
// and `message` values from the frame.
//
Expand All @@ -64,6 +66,14 @@ impl Publish {
///
/// The response is written to `dst`. This is called by the server in order
/// to execute a received command.
#[instrument(
level = "trace",
name = "Publish::apply",
skip(self, db, dst),
fields(
channel = self.channel.as_str(),
),
)]
pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> {
// The shared state contains the `tokio::sync::broadcast::Sender` for
// all active channels. Calling `db.publish` dispatches the message into
Expand Down
16 changes: 12 additions & 4 deletions src/cmd/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{Connection, Db, Frame};

use bytes::Bytes;
use std::time::Duration;
use tracing::{debug, instrument};
use tracing::instrument;

/// Set `key` to hold the string `value`.
///
Expand Down Expand Up @@ -77,7 +77,8 @@ impl Set {
/// ```text
/// SET key value [EX seconds|PX milliseconds]
/// ```
pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Set> {
#[instrument(level = "trace", name = "Set::parse_frames", skip(parse))]
pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Self, ParseError> {
use ParseError::EndOfStream;

// Read the key to set. This is a required field
Expand Down Expand Up @@ -124,14 +125,21 @@ impl Set {
///
/// The response is written to `dst`. This is called by the server in order
/// to execute a received command.
#[instrument(skip(self, db, dst))]
#[instrument(
level = "trace",
name = "Set::apply",
skip(self, db, dst),
fields(
key = self.key.as_str(),
expire = ?self.expire.as_ref(),
),
)]
pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> {
// Set the value in the shared database state.
db.set(self.key, self.value, self.expire);

// Create a success response and write it to `dst`.
let response = Frame::Simple("OK".to_string());
debug!(?response);
dst.write_frame(&response).await?;

Ok(())
Expand Down
12 changes: 11 additions & 1 deletion src/cmd/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::pin::Pin;
use tokio::select;
use tokio::sync::broadcast;
use tokio_stream::{Stream, StreamExt, StreamMap};
use tracing::instrument;

/// Subscribes the client to one or more channels.
///
Expand Down Expand Up @@ -60,7 +61,8 @@ impl Subscribe {
/// ```text
/// SUBSCRIBE channel [channel ...]
/// ```
pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Subscribe> {
#[instrument(level = "trace", name = "Subscribe::parse_frames", skip(parse))]
pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Self, ParseError> {
use ParseError::EndOfStream;

// The `SUBSCRIBE` string has already been consumed. At this point,
Expand Down Expand Up @@ -99,6 +101,14 @@ impl Subscribe {
/// are updated accordingly.
///
/// [here]: https://redis.io/topics/pubsub
#[instrument(
level = "trace",
name = "Suscribe::apply",
skip(self, db, dst),
fields(
channels = "UNIMPLEMENTED", // FIXME
),
)]
pub(crate) async fn apply(
mut self,
db: &Db,
Expand Down
9 changes: 8 additions & 1 deletion src/cmd/unknown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,14 @@ impl Unknown {
/// Responds to the client, indicating the command is not recognized.
///
/// This usually means the command is not yet implemented by `mini-redis`.
#[instrument(skip(self, dst))]
#[instrument(
level = "trace",
name = "Unknown::apply",
skip(self, dst),
fields(
command_name = self.command_name.as_str(),
),
)]
pub(crate) async fn apply(self, dst: &mut Connection) -> crate::Result<()> {
let response = Frame::Error(format!("ERR unknown command '{}'", self.command_name));

Expand Down
56 changes: 44 additions & 12 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::frame::{self, Frame};

use bytes::{Buf, BytesMut};
use std::io::{self, Cursor};
use std::io::{self, Cursor, ErrorKind::ConnectionReset};
use std::net::SocketAddr;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::net::TcpStream;
use tracing::warn;

/// Send and receive `Frame` values from a remote peer.
///
Expand All @@ -28,6 +30,16 @@ pub struct Connection {
buffer: BytesMut,
}

/// The result of [`Connection::maybe_read_bytes`].
enum ConnectionState {
/// The connection was gracefully closed when reading was attempted.
Closed,
/// The connection was open when reading was attempted.
Open,
/// The connection was abruptly reset by the peer when reading was attempted.
Reset,
}

impl Connection {
/// Create a new `Connection`, backed by `socket`. Read and write buffers
/// are initialized.
Expand All @@ -42,6 +54,11 @@ impl Connection {
}
}

/// Returns the remote address that this connection is bound to.
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.stream.get_ref().peer_addr()
}

/// Read a single `Frame` value from the underlying stream.
///
/// The function waits until it has retrieved enough data to parse a frame.
Expand All @@ -63,23 +80,38 @@ impl Connection {

// There is not enough buffered data to read a frame. Attempt to
// read more data from the socket.
//
// On success, the number of bytes is returned. `0` indicates "end
// of stream".
if 0 == self.stream.read_buf(&mut self.buffer).await? {
// The remote closed the connection. For this to be a clean
// shutdown, there should be no data in the read buffer. If
// there is, this means that the peer closed the socket while
// sending a frame.
if self.buffer.is_empty() {
match self.maybe_read_bytes().await? {
ConnectionState::Open => continue,
ConnectionState::Closed | ConnectionState::Reset => {
if !self.buffer.is_empty() {
warn!(
incomplete = ?self.buffer,
"connection closed with incomplete frame",
);
}
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
}
}
}

/// Attempt to read bytes from the connection.
async fn maybe_read_bytes(&mut self) -> io::Result<ConnectionState> {
match self.stream.read_buf(&mut self.buffer).await {
// the connection was closed gracefully
Ok(0) => Ok(ConnectionState::Closed),
// the connection is still open
Ok(_) => Ok(ConnectionState::Open),
// the connection was closed abruptly by the peer
Err(e) if e.kind() == ConnectionReset => {
warn!("connection closed abruptly by peer");
Ok(ConnectionState::Reset)
}
// reading failed for some other reason
Err(err) => Err(err),
}
}

/// Tries to parse a frame from the buffer. If the buffer contains enough
/// data, the frame is returned and the data removed from the buffer. If not
/// enough data has been buffered yet, `Ok(None)` is returned. If the
Expand Down
Loading