diff --git a/vortex-serde/src/layouts/write/writer.rs b/vortex-serde/src/layouts/write/writer.rs index 78a5f38b40..be1929b762 100644 --- a/vortex-serde/src/layouts/write/writer.rs +++ b/vortex-serde/src/layouts/write/writer.rs @@ -7,7 +7,7 @@ use vortex::stream::ArrayStream; use vortex::validity::Validity; use vortex::{Array, ArrayDType, IntoArray}; use vortex_dtype::DType; -use vortex_error::{vortex_bail, vortex_err, VortexResult}; +use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; use crate::io::VortexWrite; use crate::layouts::read::{ChunkedLayoutSpec, ColumnLayoutSpec}; @@ -79,36 +79,33 @@ impl<W: VortexWrite> LayoutWriter<W> { where S: Stream<Item = VortexResult<Array>> + Unpin, { - let column_row_offset = self - .column_chunks - .get(column_idx) - .and_then(|c| c.row_offsets.back()) - .copied() - .unwrap_or(0u64); + let mut row_offsets: Vec<u64> = Vec::new(); let mut byte_offsets = vec![self.msgs.tell()]; - let mut row_offsets = VecDeque::new(); - row_offsets.push_front(column_row_offset); + + let mut n_rows_written = match self.column_chunks.get(column_idx) { + None => { + row_offsets.push(0); + 0 + } + Some(x) => { + let last = x.row_offsets.last(); + *last.vortex_expect("row offsets is non-empty") + } + }; while let Some(chunk) = stream.try_next().await? { - row_offsets.push_back( - row_offsets - .back() - .map(|off| off + chunk.len() as u64) - .ok_or_else(|| vortex_err!("Row offsets should be initialized with a value"))?, - ); + n_rows_written += chunk.len() as u64; + row_offsets.push(n_rows_written); self.msgs.write_batch(chunk).await?; byte_offsets.push(self.msgs.tell()); } if let Some(batches) = self.column_chunks.get_mut(column_idx) { - // Remove first entry from the list as it would be the same as last entry of the previous chunk - row_offsets.pop_front(); - - batches.batch_byte_offsets.push(byte_offsets); batches.row_offsets.extend(row_offsets); + batches.batch_byte_offsets.push(byte_offsets); } else { self.column_chunks - .push(BatchOffsets::new(vec![byte_offsets], row_offsets)); + .push(BatchOffsets::new(row_offsets, vec![byte_offsets])); } Ok(()) @@ -130,10 +127,11 @@ impl<W: VortexWrite> LayoutWriter<W> { let len = chunk.row_offsets.len() - 1; chunk.row_offsets.truncate(len); - let offset_vec: Vec<u64> = chunk.row_offsets.into(); + assert!(chunks.len() == chunk.row_offsets.len()); + let metadata_array = StructArray::try_new( ["row_offset".into()].into(), - vec![offset_vec.into_array()], + vec![chunk.row_offsets.into_array()], len, Validity::NonNullable, )?; @@ -185,15 +183,15 @@ impl<W: VortexWrite> LayoutWriter<W> { #[derive(Clone, Debug)] pub struct BatchOffsets { + pub row_offsets: Vec<u64>, pub batch_byte_offsets: Vec<Vec<u64>>, - pub row_offsets: VecDeque<u64>, } impl BatchOffsets { - pub fn new(batch_byte_offsets: Vec<Vec<u64>>, row_offsets: VecDeque<u64>) -> Self { + pub fn new(row_offsets: Vec<u64>, batch_byte_offsets: Vec<Vec<u64>>) -> Self { Self { - batch_byte_offsets, row_offsets, + batch_byte_offsets, } } }