Skip to content

Commit

Permalink
feat(pack): add flat byte buffer stopgap crate
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Sep 11, 2024
1 parent d5a62bd commit 54b1e40
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 0 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions crates/pack/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "wrpc-pack"
version = "0.1.0"
description = "Temporary stopgap solution for encoding fully-synchronous wRPC values"

authors.workspace = true
categories.workspace = true
edition.workspace = true
homepage.workspace = true
license.workspace = true
repository.workspace = true

[dependencies]
anyhow = { workspace = true }
bytes = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
wrpc-transport = { workspace = true }
113 changes: 113 additions & 0 deletions crates/pack/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//! Stopgap solution for packing and unpacking wRPC values to/from singular, flat byte buffers.
//!
//! All APIs in this crate are to be considered unstable and everything may break arbitrarily.
//!
//! This crate will never reach 1.0 and will be deprecated once <https://github.com/bytecodealliance/wrpc/issues/25> is complete
//!
//! This crate is maintained on a best-effort basis.
use core::pin::Pin;
use core::task::{Context, Poll};

use bytes::BytesMut;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::{Decoder, Encoder};
use wrpc_transport::{Decode, Deferred as _, Encode};

/// A stream, which fails on each operation, this type should only ever be used in trait bounds
pub struct NoopStream;

impl AsyncRead for NoopStream {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"should not be called",
)))
}
}

impl AsyncWrite for NoopStream {
fn poll_write(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &[u8],
) -> Poll<std::io::Result<usize>> {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"should not be called",
)))
}

fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"should not be called",
)))
}

fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"should not be called",
)))
}
}

impl wrpc_transport::Index<Self> for NoopStream {
fn index(&self, _: &[usize]) -> anyhow::Result<Self> {
anyhow::bail!("should not be called")
}
}

/// Pack a [`wrpc_transport::Encode`] into a singular byte buffer `dst`.
///
/// This function does not support asynchronous values and will return an error is such a value is
/// passed in.
///
/// This is unstable API, which will be deprecated once feature-complete "packing" functionality is available in [`wrpc_transport`].
/// Track <https://github.com/bytecodealliance/wrpc/issues/25> for updates.
pub fn pack<T: Encode<NoopStream>>(
v: T,
dst: &mut BytesMut,
) -> Result<(), <T::Encoder as Encoder<T>>::Error> {
let mut enc = T::Encoder::default();
enc.encode(v, dst)?;
if enc.take_deferred().is_some() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"value contains pending asynchronous values and cannot be packed",
)
.into());
}
Ok(())
}

/// Unpack a [`wrpc_transport::Decode`] from a byte buffer `dst`.
///
/// This function does not support asynchronous values and will return an error if `buf` contains pending async values.
///
/// If this function returns an error, contents of `buf` are undefined.
///
/// This is unstable API, which will be deprecated once feature-complete "unpacking" functionality is available in [`wrpc_transport`].
/// Track <https://github.com/bytecodealliance/wrpc/issues/25> for updates.
pub fn unpack<T: Decode<NoopStream>>(
buf: &mut BytesMut,
) -> Result<T, <T::Decoder as Decoder>::Error> {
let mut dec = T::Decoder::default();
let v = dec.decode(buf)?;
let v = v.ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "buffer is incomplete")
})?;
if dec.take_deferred().is_some() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"buffer contains pending asynchronous values and cannot be unpacked",
)
.into());
}
Ok(v)
}

0 comments on commit 54b1e40

Please sign in to comment.