Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support writers without static lifetimes #55

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn compress_with_gzip(num_threads: usize, buffer_size: usize, compression_level:
let dir = tempdir().unwrap();
let output_file = File::create(dir.path().join("shakespeare_gzip.txt.gz")).unwrap();

let mut writer: Box<dyn ZWriter> = if num_threads > 0 {
let mut writer: Box<dyn ZWriter<_>> = if num_threads > 0 {
Box::new(
ParCompressBuilder::<Gzip>::new()
.num_threads(num_threads)
Expand Down Expand Up @@ -50,7 +50,7 @@ fn compress_with_gzip(num_threads: usize, buffer_size: usize, compression_level:
fn compress_with_snap(num_threads: usize, buffer_size: usize) {
let dir = tempdir().unwrap();
let output_file = File::create(dir.path().join("shakespeare_gzip.txt.gz")).unwrap();
let mut writer: Box<dyn ZWriter> = if num_threads > 0 {
let mut writer: Box<dyn ZWriter<_>> = if num_threads > 0 {
Box::new(
ParCompressBuilder::<Snap>::new()
.num_threads(num_threads)
Expand Down
2 changes: 1 addition & 1 deletion examples/test1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod example {
pub fn main() {
let file = env::args().skip(1).next().unwrap();
let writer = File::create(file).unwrap();
let mut parz: ParCompress<Gzip> = ParCompressBuilder::new().from_writer(writer);
let mut parz: ParCompress<Gzip, _> = ParCompressBuilder::new().from_writer(writer);
parz.write_all(b"This is a first test line\n").unwrap();
parz.write_all(b"This is a second test line\n").unwrap();
parz.finish().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion examples/test2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod example {
let chunksize = 64 * (1 << 10) * 2;

let stdout = std::io::stdout();
let mut writer: ParCompress<Gzip> = ParCompressBuilder::new()
let mut writer: ParCompress<Gzip, _> = ParCompressBuilder::new()
.compression_level(Compression::new(6))
.from_writer(stdout);

Expand Down
19 changes: 13 additions & 6 deletions src/bgzf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ where
#[cfg(not(feature = "libdeflate"))]
compressor: Compress,
/// The inner writer
writer: W,
writer: Option<W>,
}

impl<W> BgzfSyncWriter<W>
Expand All @@ -130,9 +130,16 @@ where
blocksize,
compression_level,
compressor,
writer,
writer: Some(writer),
}
}

pub(crate) fn finish(mut self) -> io::Result<W> {
self.flush()?;
self.writer
.take()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Writer already taken"))
}
}

/// Decompress a block of bytes
Expand Down Expand Up @@ -315,7 +322,7 @@ where
let b = self.buffer.split_to(self.blocksize).freeze();
let compressed = compress(&b[..], &mut self.compressor, self.compression_level)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.writer.write_all(&compressed)?;
self.writer.as_mut().unwrap().write_all(&compressed)?;
}
Ok(buf.len())
}
Expand All @@ -329,10 +336,10 @@ where
.freeze();
let compressed = compress(&b[..], &mut self.compressor, self.compression_level)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.writer.write_all(&compressed)?;
self.writer.write_all(BGZF_EOF)?; // this is an empty block
self.writer.as_mut().unwrap().write_all(&compressed)?;
self.writer.as_mut().unwrap().write_all(BGZF_EOF)?; // this is an empty block
}
self.writer.flush()
self.writer.as_mut().unwrap().flush()
}
}

Expand Down
96 changes: 64 additions & 32 deletions src/deflate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! use gzp::{deflate::Zlib, par::compress::{ParCompress, ParCompressBuilder}, ZWriter};
//!
//! let mut writer = vec![];
//! let mut parz: ParCompress<Zlib> = ParCompressBuilder::new().from_writer(writer);
//! let mut parz: ParCompress<Zlib,_> = ParCompressBuilder::new().from_writer(writer);
//! parz.write_all(b"This is a first test line\n").unwrap();
//! parz.write_all(b"This is a second test line\n").unwrap();
//! parz.finish().unwrap();
Expand Down Expand Up @@ -153,10 +153,9 @@ where
}
}

impl<W: Write> ZWriter for SyncZ<GzEncoder<W>> {
fn finish(&mut self) -> Result<(), GzpError> {
self.inner.take().unwrap().finish()?;
Ok(())
impl<W: Write> ZWriter<W> for SyncZ<GzEncoder<W>> {
fn finish(&mut self) -> Result<W, GzpError> {
Ok(self.inner.take().unwrap().finish()?)
}
}

Expand Down Expand Up @@ -265,10 +264,9 @@ where
}

#[cfg(feature = "any_zlib")]
impl<W: Write> ZWriter for SyncZ<ZlibEncoder<W>> {
fn finish(&mut self) -> Result<(), GzpError> {
self.inner.take().unwrap().finish()?;
Ok(())
impl<W: Write> ZWriter<W> for SyncZ<ZlibEncoder<W>> {
fn finish(&mut self) -> Result<W, GzpError> {
Ok(self.inner.take().unwrap().finish()?)
}
}

Expand Down Expand Up @@ -344,10 +342,9 @@ where
}
}

impl<W: Write> ZWriter for SyncZ<DeflateEncoder<W>> {
fn finish(&mut self) -> Result<(), GzpError> {
self.inner.take().unwrap().finish()?;
Ok(())
impl<W: Write> ZWriter<W> for SyncZ<DeflateEncoder<W>> {
fn finish(&mut self) -> Result<W, GzpError> {
Ok(self.inner.take().unwrap().finish()?)
}
}

Expand Down Expand Up @@ -494,10 +491,9 @@ where
}
}

impl<W: Write> ZWriter for SyncZ<MgzipSyncWriter<W>> {
fn finish(&mut self) -> Result<(), GzpError> {
self.inner.take().unwrap().flush()?;
Ok(())
impl<W: Write> ZWriter<W> for SyncZ<MgzipSyncWriter<W>> {
fn finish(&mut self) -> Result<W, GzpError> {
Ok(self.inner.take().unwrap().finish()?)
}
}

Expand Down Expand Up @@ -650,10 +646,9 @@ where
}
}

impl<W: Write> ZWriter for SyncZ<BgzfSyncWriter<W>> {
fn finish(&mut self) -> Result<(), GzpError> {
self.inner.take().unwrap().flush()?;
Ok(())
impl<W: Write> ZWriter<W> for SyncZ<BgzfSyncWriter<W>> {
fn finish(&mut self) -> Result<W, GzpError> {
Ok(self.inner.take().unwrap().finish()?)
}
}

Expand Down Expand Up @@ -695,7 +690,7 @@ mod test {
";

// Compress input to output
let mut par_gz: ParCompress<Mgzip> = ParCompressBuilder::new().from_writer(out_writer);
let mut par_gz: ParCompress<Mgzip, _> = ParCompressBuilder::new().from_writer(out_writer);
par_gz.write_all(input).unwrap();
par_gz.finish().unwrap();

Expand Down Expand Up @@ -728,7 +723,7 @@ mod test {
";

// Compress input to output
let mut par_gz: ParCompress<Gzip> = ParCompressBuilder::new().from_writer(out_writer);
let mut par_gz: ParCompress<Gzip, _> = ParCompressBuilder::new().from_writer(out_writer);
par_gz.write_all(input).unwrap();
par_gz.finish().unwrap();

Expand Down Expand Up @@ -761,7 +756,7 @@ mod test {
";

// Compress input to output
let mut par_gz: ParCompress<Gzip> = ParCompressBuilder::new().from_writer(out_writer);
let mut par_gz: ParCompress<Gzip, _> = ParCompressBuilder::new().from_writer(out_writer);
par_gz.write_all(input).unwrap();
drop(par_gz);

Expand Down Expand Up @@ -861,7 +856,7 @@ mod test {
";

// Compress input to output
let mut par_gz: ParCompress<Zlib> = ParCompressBuilder::new().from_writer(out_writer);
let mut par_gz: ParCompress<Zlib, _> = ParCompressBuilder::new().from_writer(out_writer);
par_gz.write_all(input).unwrap();
par_gz.finish().unwrap();

Expand All @@ -879,6 +874,43 @@ mod test {
assert_eq!(input.to_vec(), bytes);
}

#[test]
#[cfg(feature = "any_zlib")]
fn test_scoped_simple_zlib() {
let dir = tempdir().unwrap();

// Create output file
let output_file = dir.path().join("output.txt");
let mut out_writer = BufWriter::new(File::create(&output_file).unwrap());

// Define input bytes
let input = b"\
This is a longer test than normal to come up with a bunch of text.\n\
We'll read just a few lines at a time.\n\
";

std::thread::scope(|s| {
// Compress input to output
let mut par_gz: ParCompress<Zlib, _> =
ParCompressBuilder::new().from_borrowed_writer(&mut out_writer, s);
par_gz.write_all(input).unwrap();
par_gz.finish().unwrap();
});

// Read output back in
let mut reader = BufReader::new(File::open(output_file).unwrap());
let mut result = vec![];
reader.read_to_end(&mut result).unwrap();

// Decompress it
let mut gz = ZlibDecoder::new(&result[..]);
let mut bytes = vec![];
gz.read_to_end(&mut bytes).unwrap();

// Assert decompressed output is equal to input
assert_eq!(input.to_vec(), bytes);
}

#[test]
#[cfg(feature = "any_zlib")]
fn test_simple_zlib_sync() {
Expand Down Expand Up @@ -938,7 +970,7 @@ mod test {
];

// Compress input to output
let mut par_gz: ParCompress<Gzip> = ParCompressBuilder::new()
let mut par_gz: ParCompress<Gzip, _> = ParCompressBuilder::new()
.buffer_size(DICT_SIZE)
.unwrap()
.from_writer(out_writer);
Expand Down Expand Up @@ -974,7 +1006,7 @@ mod test {
";

// Compress input to output
let mut par_gz: ParCompress<Mgzip> = ParCompressBuilder::new().from_writer(out_writer);
let mut par_gz: ParCompress<Mgzip, _> = ParCompressBuilder::new().from_writer(out_writer);
par_gz.write_all(input).unwrap();
par_gz.finish().unwrap();

Expand Down Expand Up @@ -1003,7 +1035,7 @@ mod test {
";

// Compress input to output
let mut par_gz: ParCompress<Bgzf> = ParCompressBuilder::new().from_writer(out_writer);
let mut par_gz: ParCompress<Bgzf, _> = ParCompressBuilder::new().from_writer(out_writer);
par_gz.write_all(input).unwrap();
par_gz.finish().unwrap();

Expand Down Expand Up @@ -1040,7 +1072,7 @@ mod test {


// Compress input to output
let mut par_gz: Box<dyn ZWriter> = if num_threads > 0 {
let mut par_gz: Box<dyn ZWriter<_>> = if num_threads > 0 {
Box::new(ParCompressBuilder::<Gzip>::new()
.buffer_size(buf_size).unwrap()
.num_threads(num_threads).unwrap()
Expand Down Expand Up @@ -1086,7 +1118,7 @@ mod test {


// Compress input to output
let mut par_gz: Box<dyn ZWriter> = if num_threads > 0 {
let mut par_gz: Box<dyn ZWriter<_>> = if num_threads > 0 {
Box::new(ParCompressBuilder::<Mgzip>::new()
.buffer_size(buf_size).unwrap()
.num_threads(num_threads).unwrap()
Expand Down Expand Up @@ -1175,7 +1207,7 @@ mod test {


// Compress input to output
let mut par_gz: Box<dyn ZWriter> = if num_threads > 0 {
let mut par_gz: Box<dyn ZWriter<_>> = if num_threads > 0 {
Box::new(ParCompressBuilder::<Bgzf>::new()
.buffer_size(buf_size).unwrap()
.num_threads(num_threads).unwrap()
Expand Down Expand Up @@ -1301,7 +1333,7 @@ mod test {


// Compress input to output
let mut par_gz: Box<dyn ZWriter> = if num_threads > 0 {
let mut par_gz: Box<dyn ZWriter<_>> = if num_threads > 0 {
Box::new(ParCompressBuilder::<Zlib>::new()
.buffer_size(buf_size).unwrap()
.num_threads(num_threads).unwrap()
Expand Down
13 changes: 7 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
//! use gzp::{deflate::Gzip, par::compress::{ParCompress, ParCompressBuilder}, ZWriter};
//!
//! let mut writer = vec![];
//! let mut parz: ParCompress<Gzip> = ParCompressBuilder::new().from_writer(writer);
//! let mut parz: ParCompress<Gzip, _> = ParCompressBuilder::new().from_writer(writer);
//! parz.write_all(b"This is a first test line\n").unwrap();
//! parz.write_all(b"This is a second test line\n").unwrap();
//! parz.finish().unwrap();
Expand Down Expand Up @@ -162,9 +162,10 @@ pub enum GzpError {
}

/// Trait that unifies sync and async writer
pub trait ZWriter: Write {
/// Cleans up resources, writes footers
fn finish(&mut self) -> Result<(), GzpError>;
pub trait ZWriter<W>: Write {
/// Cleans up resources, writes footers,
/// and returns the underlying writer.
fn finish(&mut self) -> Result<W, GzpError>;
}

/// Create a synchronous writer wrapping the input `W` type.
Expand Down Expand Up @@ -232,9 +233,9 @@ where

/// Create a [`ZWriter`] trait object from a writer.
#[allow(clippy::missing_panics_doc)]
pub fn from_writer(self, writer: W) -> Box<dyn ZWriter>
pub fn from_writer(self, writer: W) -> Box<dyn ZWriter<W>>
where
SyncZ<<F as SyncWriter<W>>::OutputWriter>: ZWriter + Send,
SyncZ<<F as SyncWriter<W>>::OutputWriter>: ZWriter<W> + Send,
{
if self.num_threads > 1 {
Box::new(
Expand Down
17 changes: 12 additions & 5 deletions src/mgzip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ where
#[cfg(not(feature = "libdeflate"))]
compressor: Compress,
/// The inner writer
writer: W,
writer: Option<W>,
}

impl<W> MgzipSyncWriter<W>
Expand All @@ -116,9 +116,16 @@ where
blocksize,
compression_level,
compressor,
writer,
writer: Some(writer),
}
}

pub(crate) fn finish(mut self) -> io::Result<W> {
self.flush()?;
self.writer
.take()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Writer already taken"))
}
}

/// Decompress a block of bytes
Expand Down Expand Up @@ -288,7 +295,7 @@ where
let b = self.buffer.split_to(self.blocksize).freeze();
let compressed = compress(&b[..], &mut self.compressor, self.compression_level)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.writer.write_all(&compressed)?;
self.writer.as_mut().unwrap().write_all(&compressed)?;
}
Ok(buf.len())
}
Expand All @@ -299,9 +306,9 @@ where
if !b.is_empty() {
let compressed = compress(&b[..], &mut self.compressor, self.compression_level)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.writer.write_all(&compressed)?;
self.writer.as_mut().unwrap().write_all(&compressed)?;
}
self.writer.flush()
self.writer.as_mut().unwrap().flush()
}
}

Expand Down
Loading