Skip to content

Commit

Permalink
refactor: make write logic slightly more simple (#1026)
Browse files Browse the repository at this point in the history
Got nerd-sniped a bit. It just felt harder to follow than I wanted. I
made the mutation order consistent (rows then bytes), used a variable
instead of getting the back of `row_offsets`, and avoided the Deque by
handling the leading zero in the branch.
  • Loading branch information
danking authored Oct 14, 2024
1 parent 06298f8 commit 59895dc
Showing 1 changed file with 23 additions and 25 deletions.
48 changes: 23 additions & 25 deletions vortex-serde/src/layouts/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use vortex::stream::ArrayStream;
use vortex::validity::Validity;
use vortex::{Array, ArrayDType, IntoArray};
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult};

use crate::io::VortexWrite;
use crate::layouts::read::{ChunkedLayoutSpec, ColumnLayoutSpec};
Expand Down Expand Up @@ -79,36 +79,33 @@ impl<W: VortexWrite> LayoutWriter<W> {
where
S: Stream<Item = VortexResult<Array>> + Unpin,
{
let column_row_offset = self
.column_chunks
.get(column_idx)
.and_then(|c| c.row_offsets.back())
.copied()
.unwrap_or(0u64);
let mut row_offsets: Vec<u64> = Vec::new();
let mut byte_offsets = vec![self.msgs.tell()];
let mut row_offsets = VecDeque::new();
row_offsets.push_front(column_row_offset);

let mut n_rows_written = match self.column_chunks.get(column_idx) {
None => {
row_offsets.push(0);
0
}
Some(x) => {
let last = x.row_offsets.last();
*last.vortex_expect("row offsets is non-empty")
}
};

while let Some(chunk) = stream.try_next().await? {
row_offsets.push_back(
row_offsets
.back()
.map(|off| off + chunk.len() as u64)
.ok_or_else(|| vortex_err!("Row offsets should be initialized with a value"))?,
);
n_rows_written += chunk.len() as u64;
row_offsets.push(n_rows_written);
self.msgs.write_batch(chunk).await?;
byte_offsets.push(self.msgs.tell());
}

if let Some(batches) = self.column_chunks.get_mut(column_idx) {
// Remove first entry from the list as it would be the same as last entry of the previous chunk
row_offsets.pop_front();

batches.batch_byte_offsets.push(byte_offsets);
batches.row_offsets.extend(row_offsets);
batches.batch_byte_offsets.push(byte_offsets);
} else {
self.column_chunks
.push(BatchOffsets::new(vec![byte_offsets], row_offsets));
.push(BatchOffsets::new(row_offsets, vec![byte_offsets]));
}

Ok(())
Expand All @@ -130,10 +127,11 @@ impl<W: VortexWrite> LayoutWriter<W> {
let len = chunk.row_offsets.len() - 1;
chunk.row_offsets.truncate(len);

let offset_vec: Vec<u64> = chunk.row_offsets.into();
assert!(chunks.len() == chunk.row_offsets.len());

let metadata_array = StructArray::try_new(
["row_offset".into()].into(),
vec![offset_vec.into_array()],
vec![chunk.row_offsets.into_array()],
len,
Validity::NonNullable,
)?;
Expand Down Expand Up @@ -185,15 +183,15 @@ impl<W: VortexWrite> LayoutWriter<W> {

#[derive(Clone, Debug)]
pub struct BatchOffsets {
pub row_offsets: Vec<u64>,
pub batch_byte_offsets: Vec<Vec<u64>>,
pub row_offsets: VecDeque<u64>,
}

impl BatchOffsets {
pub fn new(batch_byte_offsets: Vec<Vec<u64>>, row_offsets: VecDeque<u64>) -> Self {
pub fn new(row_offsets: Vec<u64>, batch_byte_offsets: Vec<Vec<u64>>) -> Self {
Self {
batch_byte_offsets,
row_offsets,
batch_byte_offsets,
}
}
}
Expand Down

0 comments on commit 59895dc

Please sign in to comment.