diff --git a/Cargo.lock b/Cargo.lock index d30251dcb7c9..c5b8fc016a5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1812,10 +1812,12 @@ name = "common-base" version = "0.9.3" dependencies = [ "anymap", + "async-trait", "bitvec", "bytes", "common-error", "common-macro", + "futures", "paste", "serde", "snafu 0.8.4", diff --git a/src/common/base/Cargo.toml b/src/common/base/Cargo.toml index 38f677dd3fd6..5afbc3b88c29 100644 --- a/src/common/base/Cargo.toml +++ b/src/common/base/Cargo.toml @@ -9,10 +9,12 @@ workspace = true [dependencies] anymap = "1.0.0-beta.2" +async-trait.workspace = true bitvec = "1.0" bytes.workspace = true common-error.workspace = true common-macro.workspace = true +futures.workspace = true paste = "1.0" serde = { version = "1.0", features = ["derive"] } snafu.workspace = true diff --git a/src/common/base/src/buffer.rs b/src/common/base/src/buffer.rs deleted file mode 100644 index bce39842e1f5..000000000000 --- a/src/common/base/src/buffer.rs +++ /dev/null @@ -1,242 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::io::{Read, Write}; - -use bytes::{Buf, BufMut, BytesMut}; -use common_error::ext::ErrorExt; -use common_macro::stack_trace_debug; -use paste::paste; -use snafu::{ensure, Location, ResultExt, Snafu}; - -#[derive(Snafu)] -#[snafu(visibility(pub))] -#[stack_trace_debug] -pub enum Error { - #[snafu(display( - "Destination buffer overflow, src_len: {}, dst_len: {}", - src_len, - dst_len - ))] - Overflow { - src_len: usize, - dst_len: usize, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Buffer underflow"))] - Underflow { - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("IO operation reach EOF"))] - Eof { - #[snafu(source)] - error: std::io::Error, - #[snafu(implicit)] - location: Location, - }, -} - -pub type Result = std::result::Result; - -impl ErrorExt for Error { - fn as_any(&self) -> &dyn Any { - self - } -} - -macro_rules! impl_read_le { - ( $($num_ty: ty), *) => { - $( - paste!{ - // TODO(hl): default implementation requires allocating a - // temp buffer. maybe use more efficient impls in concrete buffers. - // see https://github.com/GrepTimeTeam/greptimedb/pull/97#discussion_r930798941 - fn [](&mut self) -> Result<$num_ty> { - let mut buf = [0u8; std::mem::size_of::<$num_ty>()]; - self.read_to_slice(&mut buf)?; - Ok($num_ty::from_le_bytes(buf)) - } - - fn [](&mut self) -> Result<$num_ty> { - let mut buf = [0u8; std::mem::size_of::<$num_ty>()]; - self.peek_to_slice(&mut buf)?; - Ok($num_ty::from_le_bytes(buf)) - } - } - )* - } -} - -macro_rules! impl_write_le { - ( $($num_ty: ty), *) => { - $( - paste!{ - fn [](&mut self, n: $num_ty) -> Result<()> { - self.write_from_slice(&n.to_le_bytes())?; - Ok(()) - } - } - )* - } -} - -pub trait Buffer { - /// Returns remaining data size for read. - fn remaining_size(&self) -> usize; - - /// Returns true if buffer has no data for read. - fn is_empty(&self) -> bool { - self.remaining_size() == 0 - } - - /// Peeks data into dst. This method should not change internal cursor, - /// invoke `advance_by` if needed. - /// # Panics - /// This method **may** panic if buffer does not have enough data to be copied to dst. - fn peek_to_slice(&self, dst: &mut [u8]) -> Result<()>; - - /// Reads data into dst. This method will change internal cursor. - /// # Panics - /// This method **may** panic if buffer does not have enough data to be copied to dst. - fn read_to_slice(&mut self, dst: &mut [u8]) -> Result<()> { - self.peek_to_slice(dst)?; - self.advance_by(dst.len()); - Ok(()) - } - - /// Advances internal cursor for next read. - /// # Panics - /// This method **may** panic if the offset after advancing exceeds the length of underlying buffer. - fn advance_by(&mut self, by: usize); - - impl_read_le![u8, i8, u16, i16, u32, i32, u64, i64, f32, f64]; -} - -macro_rules! impl_buffer_for_bytes { - ( $($buf_ty:ty), *) => { - $( - impl Buffer for $buf_ty { - fn remaining_size(&self) -> usize{ - self.len() - } - - fn peek_to_slice(&self, dst: &mut [u8]) -> Result<()> { - let dst_len = dst.len(); - ensure!(self.remaining() >= dst.len(), OverflowSnafu { - src_len: self.remaining_size(), - dst_len, - } - ); - dst.copy_from_slice(&self[0..dst_len]); - Ok(()) - } - - #[inline] - fn advance_by(&mut self, by: usize) { - self.advance(by); - } - } - )* - }; -} - -impl_buffer_for_bytes![bytes::Bytes, bytes::BytesMut]; - -impl Buffer for &[u8] { - fn remaining_size(&self) -> usize { - self.len() - } - - fn peek_to_slice(&self, dst: &mut [u8]) -> Result<()> { - let dst_len = dst.len(); - ensure!( - self.len() >= dst.len(), - OverflowSnafu { - src_len: self.remaining_size(), - dst_len, - } - ); - dst.copy_from_slice(&self[0..dst_len]); - Ok(()) - } - - fn read_to_slice(&mut self, dst: &mut [u8]) -> Result<()> { - ensure!( - self.len() >= dst.len(), - OverflowSnafu { - src_len: self.remaining_size(), - dst_len: dst.len(), - } - ); - self.read_exact(dst).context(EofSnafu) - } - - fn advance_by(&mut self, by: usize) { - *self = &self[by..]; - } -} - -/// Mutable buffer. -pub trait BufferMut { - fn as_slice(&self) -> &[u8]; - - fn write_from_slice(&mut self, src: &[u8]) -> Result<()>; - - impl_write_le![i8, u8, i16, u16, i32, u32, i64, u64, f32, f64]; -} - -impl BufferMut for BytesMut { - fn as_slice(&self) -> &[u8] { - self - } - - fn write_from_slice(&mut self, src: &[u8]) -> Result<()> { - self.put_slice(src); - Ok(()) - } -} - -impl BufferMut for &mut [u8] { - fn as_slice(&self) -> &[u8] { - self - } - - fn write_from_slice(&mut self, src: &[u8]) -> Result<()> { - // see std::io::Write::write_all - // https://doc.rust-lang.org/src/std/io/impls.rs.html#363 - self.write_all(src).map_err(|_| { - OverflowSnafu { - src_len: src.len(), - dst_len: self.as_slice().len(), - } - .build() - }) - } -} - -impl BufferMut for Vec { - fn as_slice(&self) -> &[u8] { - self - } - - fn write_from_slice(&mut self, src: &[u8]) -> Result<()> { - self.extend_from_slice(src); - Ok(()) - } -} diff --git a/src/common/base/src/bytes.rs b/src/common/base/src/bytes.rs index aec2dfd9edbf..ea08a9f0b022 100644 --- a/src/common/base/src/bytes.rs +++ b/src/common/base/src/bytes.rs @@ -44,6 +44,12 @@ impl From> for Bytes { } } +impl From for Vec { + fn from(bytes: Bytes) -> Vec { + bytes.0.into() + } +} + impl Deref for Bytes { type Target = [u8]; diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index 539da1ba8cef..62a801d9462d 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -13,9 +13,9 @@ // limitations under the License. pub mod bit_vec; -pub mod buffer; pub mod bytes; pub mod plugins; +pub mod range_read; #[allow(clippy::all)] pub mod readable_size; pub mod secrets; diff --git a/src/common/base/src/range_read.rs b/src/common/base/src/range_read.rs new file mode 100644 index 000000000000..920b2e1f8c79 --- /dev/null +++ b/src/common/base/src/range_read.rs @@ -0,0 +1,80 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::ops::Range; + +use async_trait::async_trait; +use bytes::{BufMut, Bytes}; +use futures::{AsyncReadExt, AsyncSeekExt}; + +/// `Metadata` contains the metadata of a source. +pub struct Metadata { + /// The length of the source in bytes. + pub content_length: u64, +} + +/// `RangeReader` reads a range of bytes from a source. +#[async_trait] +pub trait RangeReader: Send + Unpin { + /// Returns the metadata of the source. + async fn metadata(&mut self) -> io::Result; + + /// Reads the bytes in the given range. + async fn read(&mut self, range: Range) -> io::Result; + + /// Reads the bytes in the given range into the buffer. + /// + /// Handles the buffer based on its capacity: + /// - If the buffer is insufficient to hold the bytes, it will either: + /// - Allocate additional space (e.g., for `Vec`) + /// - Panic (e.g., for `&mut [u8]`) + async fn read_into( + &mut self, + range: Range, + buf: &mut (impl BufMut + Send), + ) -> io::Result<()> { + let bytes = self.read(range).await?; + buf.put_slice(&bytes); + Ok(()) + } + + /// Reads the bytes in the given ranges. + async fn read_vec(&mut self, ranges: &[Range]) -> io::Result> { + let mut result = Vec::with_capacity(ranges.len()); + for range in ranges { + result.push(self.read(range.clone()).await?); + } + Ok(result) + } +} + +/// Implement `RangeReader` for a type that implements `AsyncRead + AsyncSeek`. +/// +/// TODO(zhongzc): It's a temporary solution for porting the codebase from `AsyncRead + AsyncSeek` to `RangeReader`. +/// Until the codebase is fully ported to `RangeReader`, remove this implementation. +#[async_trait] +impl RangeReader for R { + async fn metadata(&mut self) -> io::Result { + let content_length = self.seek(io::SeekFrom::End(0)).await?; + Ok(Metadata { content_length }) + } + + async fn read(&mut self, range: Range) -> io::Result { + let mut buf = vec![0; (range.end - range.start) as usize]; + self.seek(io::SeekFrom::Start(range.start)).await?; + self.read_exact(&mut buf).await?; + Ok(Bytes::from(buf)) + } +} diff --git a/src/common/base/tests/buffer_tests.rs b/src/common/base/tests/buffer_tests.rs deleted file mode 100644 index a59bde64b6a0..000000000000 --- a/src/common/base/tests/buffer_tests.rs +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#![feature(assert_matches)] - -#[cfg(test)] -mod tests { - use std::assert_matches::assert_matches; - - use bytes::{Buf, Bytes, BytesMut}; - use common_base::buffer::Error::Overflow; - use common_base::buffer::{Buffer, BufferMut}; - use paste::paste; - - #[test] - pub fn test_buffer_read_write() { - let mut buf = BytesMut::with_capacity(16); - buf.write_u64_le(1234u64).unwrap(); - let result = buf.peek_u64_le().unwrap(); - assert_eq!(1234u64, result); - buf.advance_by(8); - - buf.write_from_slice("hello, world".as_bytes()).unwrap(); - let mut content = vec![0u8; 5]; - buf.peek_to_slice(&mut content).unwrap(); - let read = String::from_utf8_lossy(&content); - assert_eq!("hello", read); - buf.advance_by(5); - // after read, buffer should still have 7 bytes to read. - assert_eq!(7, buf.remaining()); - - let mut content = vec![0u8; 6]; - buf.read_to_slice(&mut content).unwrap(); - let read = String::from_utf8_lossy(&content); - assert_eq!(", worl", read); - // after read, buffer should still have 1 byte to read. - assert_eq!(1, buf.remaining()); - } - - #[test] - pub fn test_buffer_read() { - let mut bytes = Bytes::from_static("hello".as_bytes()); - assert_eq!(5, bytes.remaining_size()); - assert_eq!(b'h', bytes.peek_u8_le().unwrap()); - bytes.advance_by(1); - assert_eq!(4, bytes.remaining_size()); - } - - macro_rules! test_primitive_read_write { - ( $($num_ty: ty), *) => { - $( - paste!{ - #[test] - fn []() { - assert_eq!($num_ty::MAX,(&mut $num_ty::MAX.to_le_bytes() as &[u8]).[]().unwrap()); - assert_eq!($num_ty::MIN,(&mut $num_ty::MIN.to_le_bytes() as &[u8]).[]().unwrap()); - } - } - )* - } - } - - test_primitive_read_write![u8, u16, u32, u64, i8, i16, i32, i64, f32, f64]; - - #[test] - pub fn test_read_write_from_slice_buffer() { - let mut buf = "hello".as_bytes(); - assert_eq!(104, buf.peek_u8_le().unwrap()); - buf.advance_by(1); - assert_eq!(101, buf.peek_u8_le().unwrap()); - buf.advance_by(1); - assert_eq!(108, buf.peek_u8_le().unwrap()); - buf.advance_by(1); - assert_eq!(108, buf.peek_u8_le().unwrap()); - buf.advance_by(1); - assert_eq!(111, buf.peek_u8_le().unwrap()); - buf.advance_by(1); - assert_matches!(buf.peek_u8_le(), Err(Overflow { .. })); - } - - #[test] - pub fn test_read_u8_from_slice_buffer() { - let mut buf = "hello".as_bytes(); - assert_eq!(104, buf.read_u8_le().unwrap()); - assert_eq!(101, buf.read_u8_le().unwrap()); - assert_eq!(108, buf.read_u8_le().unwrap()); - assert_eq!(108, buf.read_u8_le().unwrap()); - assert_eq!(111, buf.read_u8_le().unwrap()); - assert_matches!(buf.read_u8_le(), Err(Overflow { .. })); - } - - #[test] - pub fn test_read_write_numbers() { - let mut buf: Vec = vec![]; - buf.write_u64_le(1234).unwrap(); - assert_eq!(1234, (&buf[..]).read_u64_le().unwrap()); - - buf.write_u32_le(4242).unwrap(); - let mut p = &buf[..]; - assert_eq!(1234, p.read_u64_le().unwrap()); - assert_eq!(4242, p.read_u32_le().unwrap()); - } - - macro_rules! test_primitive_vec_read_write { - ( $($num_ty: ty), *) => { - $( - paste!{ - #[test] - fn []() { - let mut buf = vec![]; - let _ = buf.[]($num_ty::MAX).unwrap(); - assert_eq!($num_ty::MAX, buf.as_slice().[]().unwrap()); - } - } - )* - } - } - - test_primitive_vec_read_write![u8, u16, u32, u64, i8, i16, i32, i64, f32, f64]; - - #[test] - pub fn test_peek_write_from_vec_buffer() { - let mut buf: Vec = vec![]; - buf.write_from_slice("hello".as_bytes()).unwrap(); - let mut slice = buf.as_slice(); - assert_eq!(104, slice.peek_u8_le().unwrap()); - slice.advance_by(1); - assert_eq!(101, slice.peek_u8_le().unwrap()); - slice.advance_by(1); - assert_eq!(108, slice.peek_u8_le().unwrap()); - slice.advance_by(1); - assert_eq!(108, slice.peek_u8_le().unwrap()); - slice.advance_by(1); - assert_eq!(111, slice.peek_u8_le().unwrap()); - slice.advance_by(1); - assert_matches!(slice.read_u8_le(), Err(Overflow { .. })); - } - - macro_rules! test_primitive_bytes_read_write { - ( $($num_ty: ty), *) => { - $( - paste!{ - #[test] - fn []() { - let mut bytes = bytes::Bytes::from($num_ty::MAX.to_le_bytes().to_vec()); - assert_eq!($num_ty::MAX, bytes.[]().unwrap()); - - let mut bytes = bytes::Bytes::from($num_ty::MIN.to_le_bytes().to_vec()); - assert_eq!($num_ty::MIN, bytes.[]().unwrap()); - } - } - )* - } - } - - test_primitive_bytes_read_write![u8, u16, u32, u64, i8, i16, i32, i64, f32, f64]; - - #[test] - pub fn test_write_overflow() { - let mut buf = [0u8; 4]; - assert_matches!( - (&mut buf[..]).write_from_slice("hell".as_bytes()), - Ok { .. } - ); - - assert_matches!( - (&mut buf[..]).write_from_slice("hello".as_bytes()), - Err(common_base::buffer::Error::Overflow { .. }) - ); - } -} diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index 3a6274f5f90b..5da70e37489c 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -12,15 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::SeekFrom; use std::sync::Arc; use async_trait::async_trait; -use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +use common_base::range_read::RangeReader; use greptime_proto::v1::index::InvertedIndexMetas; use snafu::{ensure, ResultExt}; -use crate::inverted_index::error::{ReadSnafu, Result, SeekSnafu, UnexpectedBlobSizeSnafu}; +use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu}; use crate::inverted_index::format::reader::footer::InvertedIndeFooterReader; use crate::inverted_index::format::reader::InvertedIndexReader; use crate::inverted_index::format::MIN_BLOB_SIZE; @@ -49,28 +48,28 @@ impl InvertedIndexBlobReader { } #[async_trait] -impl InvertedIndexReader for InvertedIndexBlobReader { +impl InvertedIndexReader for InvertedIndexBlobReader { async fn read_all(&mut self, dest: &mut Vec) -> Result { + let metadata = self.source.metadata().await.context(CommonIoSnafu)?; self.source - .seek(SeekFrom::Start(0)) + .read_into(0..metadata.content_length, dest) .await - .context(SeekSnafu)?; - self.source.read_to_end(dest).await.context(ReadSnafu) + .context(CommonIoSnafu)?; + Ok(metadata.content_length as usize) } async fn seek_read(&mut self, offset: u64, size: u32) -> Result> { - self.source - .seek(SeekFrom::Start(offset)) + let buf = self + .source + .read(offset..offset + size as u64) .await - .context(SeekSnafu)?; - let mut buf = vec![0u8; size as usize]; - self.source.read(&mut buf).await.context(ReadSnafu)?; - Ok(buf) + .context(CommonIoSnafu)?; + Ok(buf.into()) } async fn metadata(&mut self) -> Result> { - let end = SeekFrom::End(0); - let blob_size = self.source.seek(end).await.context(SeekSnafu)?; + let metadata = self.source.metadata().await.context(CommonIoSnafu)?; + let blob_size = metadata.content_length; Self::validate_blob_size(blob_size)?; let mut footer_reader = InvertedIndeFooterReader::new(&mut self.source, blob_size); diff --git a/src/index/src/inverted_index/format/reader/footer.rs b/src/index/src/inverted_index/format/reader/footer.rs index 478352ee685a..244973669b88 100644 --- a/src/index/src/inverted_index/format/reader/footer.rs +++ b/src/index/src/inverted_index/format/reader/footer.rs @@ -12,32 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::SeekFrom; - -use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +use common_base::range_read::RangeReader; use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas}; use prost::Message; use snafu::{ensure, ResultExt}; use crate::inverted_index::error::{ - DecodeProtoSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedFooterPayloadSizeSnafu, + CommonIoSnafu, DecodeProtoSnafu, Result, UnexpectedFooterPayloadSizeSnafu, UnexpectedOffsetSizeSnafu, UnexpectedZeroSegmentRowCountSnafu, }; use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE; /// InvertedIndeFooterReader is for reading the footer section of the blob. -pub struct InvertedIndeFooterReader { - source: R, +pub struct InvertedIndeFooterReader<'a, R> { + source: &'a mut R, blob_size: u64, } -impl InvertedIndeFooterReader { - pub fn new(source: R, blob_size: u64) -> Self { +impl<'a, R> InvertedIndeFooterReader<'a, R> { + pub fn new(source: &'a mut R, blob_size: u64) -> Self { Self { source, blob_size } } } -impl InvertedIndeFooterReader { +impl<'a, R: RangeReader> InvertedIndeFooterReader<'a, R> { pub async fn metadata(&mut self) -> Result { let payload_size = self.read_payload_size().await?; let metas = self.read_payload(payload_size).await?; @@ -45,26 +43,26 @@ impl InvertedIndeFooterReader { } async fn read_payload_size(&mut self) -> Result { - let size_offset = SeekFrom::Start(self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE); - self.source.seek(size_offset).await.context(SeekSnafu)?; - let size_buf = &mut [0u8; FOOTER_PAYLOAD_SIZE_SIZE as usize]; - self.source.read_exact(size_buf).await.context(ReadSnafu)?; - - let payload_size = u32::from_le_bytes(*size_buf) as u64; + let mut size_buf = [0u8; FOOTER_PAYLOAD_SIZE_SIZE as usize]; + let end = self.blob_size; + let start = end - FOOTER_PAYLOAD_SIZE_SIZE; + self.source + .read_into(start..end, &mut &mut size_buf[..]) + .await + .context(CommonIoSnafu)?; + + let payload_size = u32::from_le_bytes(size_buf) as u64; self.validate_payload_size(payload_size)?; Ok(payload_size) } async fn read_payload(&mut self, payload_size: u64) -> Result { - let payload_offset = - SeekFrom::Start(self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE - payload_size); - self.source.seek(payload_offset).await.context(SeekSnafu)?; - - let payload = &mut vec![0u8; payload_size as usize]; - self.source.read_exact(payload).await.context(ReadSnafu)?; + let end = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE; + let start = end - payload_size; + let bytes = self.source.read(start..end).await.context(CommonIoSnafu)?; - let metas = InvertedIndexMetas::decode(&payload[..]).context(DecodeProtoSnafu)?; + let metas = InvertedIndexMetas::decode(&*bytes).context(DecodeProtoSnafu)?; self.validate_metas(&metas, payload_size)?; Ok(metas) @@ -144,7 +142,8 @@ mod tests { let payload_buf = create_test_payload(meta); let blob_size = payload_buf.len() as u64; - let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size); + let mut cursor = Cursor::new(payload_buf); + let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size); let payload_size = reader.read_payload_size().await.unwrap(); let metas = reader.read_payload(payload_size).await.unwrap(); @@ -164,7 +163,8 @@ mod tests { let mut payload_buf = create_test_payload(meta); payload_buf.push(0xff); // Add an extra byte to corrupt the footer let blob_size = payload_buf.len() as u64; - let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size); + let mut cursor = Cursor::new(payload_buf); + let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size); let payload_size_result = reader.read_payload_size().await; assert!(payload_size_result.is_err()); @@ -181,7 +181,8 @@ mod tests { let payload_buf = create_test_payload(meta); let blob_size = payload_buf.len() as u64; - let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size); + let mut cursor = Cursor::new(payload_buf); + let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size); let payload_size = reader.read_payload_size().await.unwrap(); let payload_result = reader.read_payload(payload_size).await;