From 3537205e86f05d2a6352855b67f08d23c77f3abd Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Mon, 21 Oct 2024 14:15:45 +0100 Subject: [PATCH 01/51] Filter pushdown --- Cargo.lock | 2 + pyvortex/python/vortex/dataset.py | 16 +- pyvortex/src/dataset.rs | 36 +- pyvortex/src/io.rs | 4 +- pyvortex/test/test_dataset.py | 2 +- vortex-datafusion/src/persistent/execution.rs | 1 - vortex-datafusion/src/persistent/opener.rs | 5 - vortex-expr/src/lib.rs | 2 +- vortex-serde/Cargo.toml | 2 + vortex-serde/src/layouts/read/batch.rs | 291 ++++++++- vortex-serde/src/layouts/read/buffered.rs | 297 +++++++-- vortex-serde/src/layouts/read/builder.rs | 47 +- vortex-serde/src/layouts/read/cache.rs | 20 +- vortex-serde/src/layouts/read/context.rs | 9 +- .../src/layouts/read/filter_project.rs | 71 +++ vortex-serde/src/layouts/read/filtering.rs | 52 +- vortex-serde/src/layouts/read/footer.rs | 14 +- .../src/layouts/read/layouts/chunked.rs | 602 +++++++++++++++++- .../src/layouts/read/layouts/column.rs | 586 +++++++++++++++-- vortex-serde/src/layouts/read/layouts/flat.rs | 397 +++++++++++- .../src/layouts/read/layouts/inline_dtype.rs | 96 ++- vortex-serde/src/layouts/read/layouts/mod.rs | 3 + .../src/layouts/read/layouts/test_read.rs | 104 +++ vortex-serde/src/layouts/read/mod.rs | 42 +- vortex-serde/src/layouts/read/selection.rs | 150 +++++ vortex-serde/src/layouts/read/stream.rs | 173 ++--- vortex-serde/src/layouts/tests.rs | 13 +- vortex-serde/src/layouts/write/mod.rs | 1 + vortex-serde/src/stream_writer/mod.rs | 5 + 29 files changed, 2635 insertions(+), 408 deletions(-) create mode 100644 vortex-serde/src/layouts/read/filter_project.rs create mode 100644 vortex-serde/src/layouts/read/layouts/test_read.rs create mode 100644 vortex-serde/src/layouts/read/selection.rs diff --git a/Cargo.lock b/Cargo.lock index bb20b9f92f..1ecf6541c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4680,6 +4680,7 @@ dependencies = [ "arrow-select", "bytes", "criterion", + "croaring", "flatbuffers", "futures", "futures-executor", @@ -4691,6 +4692,7 @@ dependencies = [ "once_cell", "pin-project", "rand", + "rstest", "simplelog", "tokio", "url", diff --git a/pyvortex/python/vortex/dataset.py b/pyvortex/python/vortex/dataset.py index 7f9d8d5d3b..4e2be124da 100644 --- a/pyvortex/python/vortex/dataset.py +++ b/pyvortex/python/vortex/dataset.py @@ -109,11 +109,7 @@ def head( del memory_pool if filter is not None: filter = arrow_to_vortex_expr(filter, self.schema) - return ( - self._dataset.to_array(columns=columns, batch_size=batch_size, row_filter=filter) - .slice(0, num_rows) - .to_arrow_table() - ) + return self._dataset.to_array(columns=columns, row_filter=filter).slice(0, num_rows).to_arrow_table() def join( self, @@ -232,11 +228,7 @@ def take( table : :class:`.pyarrow.Table` """ - return ( - self._dataset.to_array(columns=columns, batch_size=batch_size, row_filter=filter) - .take(encoding.array(indices)) - .to_arrow_table() - ) + return self._dataset.to_array(columns=columns, row_filter=filter).take(encoding.array(indices)).to_arrow_table() def to_record_batch_reader( self, @@ -289,7 +281,7 @@ def to_record_batch_reader( del memory_pool if filter is not None: filter = arrow_to_vortex_expr(filter, self.schema) - return self._dataset.to_record_batch_reader(columns=columns, batch_size=batch_size, row_filter=filter) + return self._dataset.to_record_batch_reader(columns=columns, row_filter=filter) def to_batches( self, @@ -396,7 +388,7 @@ def to_table( del memory_pool if filter is not None: filter = arrow_to_vortex_expr(filter, self.schema) - return self._dataset.to_array(columns=columns, batch_size=batch_size, row_filter=filter).to_arrow_table() + return self._dataset.to_array(columns=columns, row_filter=filter).to_arrow_table() def from_path(path: str) -> VortexDataset: diff --git a/pyvortex/src/dataset.rs b/pyvortex/src/dataset.rs index 92664dcf54..47b3128687 100644 --- a/pyvortex/src/dataset.rs +++ b/pyvortex/src/dataset.rs @@ -27,7 +27,6 @@ use crate::{PyArray, TOKIO_RUNTIME}; pub async fn layout_stream_from_reader( reader: T, projection: Projection, - batch_size: Option, row_filter: Option, ) -> VortexResult> { let mut builder = LayoutReaderBuilder::new( @@ -39,10 +38,6 @@ pub async fn layout_stream_from_reader( ) .with_projection(projection); - if let Some(batch_size) = batch_size { - builder = builder.with_batch_size(batch_size); - } - if let Some(row_filter) = row_filter { builder = builder.with_row_filter(row_filter); } @@ -53,10 +48,9 @@ pub async fn layout_stream_from_reader( pub async fn read_array_from_reader( reader: T, projection: Projection, - batch_size: Option, row_filter: Option, ) -> VortexResult { - layout_stream_from_reader(reader, projection, batch_size, row_filter) + layout_stream_from_reader(reader, projection, row_filter) .await? .read_all() .await @@ -124,13 +118,11 @@ impl TokioFileDataset { async fn async_to_array( &self, columns: Option>>, - batch_size: Option, row_filter: Option<&Bound<'_, PyExpr>>, ) -> PyResult { let inner = read_array_from_reader( self.file().await?, projection_from_python(columns)?, - batch_size, row_filter_from_python(row_filter), ) .await?; @@ -140,13 +132,11 @@ impl TokioFileDataset { async fn async_to_record_batch_reader( self_: PyRef<'_, Self>, columns: Option>>, - batch_size: Option, row_filter: Option<&Bound<'_, PyExpr>>, ) -> PyResult { let layout_reader = layout_stream_from_reader( self_.file().await?, projection_from_python(columns)?, - batch_size, row_filter_from_python(row_filter), ) .await?; @@ -164,25 +154,23 @@ impl TokioFileDataset { self_.schema.clone().to_pyarrow(self_.py()) } - #[pyo3(signature = (*, columns=None, batch_size=None, row_filter=None))] + #[pyo3(signature = (*, columns=None, row_filter=None))] pub fn to_array( &self, columns: Option>>, - batch_size: Option, row_filter: Option<&Bound<'_, PyExpr>>, ) -> PyResult { - TOKIO_RUNTIME.block_on(self.async_to_array(columns, batch_size, row_filter)) + TOKIO_RUNTIME.block_on(self.async_to_array(columns, row_filter)) } - #[pyo3(signature = (*, columns=None, batch_size=None, row_filter=None))] + #[pyo3(signature = (*, columns=None, row_filter=None))] pub fn to_record_batch_reader( self_: PyRef, columns: Option>>, - batch_size: Option, row_filter: Option<&Bound<'_, PyExpr>>, ) -> PyResult { TOKIO_RUNTIME.block_on(Self::async_to_record_batch_reader( - self_, columns, batch_size, row_filter, + self_, columns, row_filter, )) } } @@ -208,13 +196,11 @@ impl ObjectStoreUrlDataset { async fn async_to_array( &self, columns: Option>>, - batch_size: Option, row_filter: Option<&Bound<'_, PyExpr>>, ) -> PyResult { let inner = read_array_from_reader( self.reader().await?, projection_from_python(columns)?, - batch_size, row_filter_from_python(row_filter), ) .await?; @@ -224,13 +210,11 @@ impl ObjectStoreUrlDataset { async fn async_to_record_batch_reader( self_: PyRef<'_, Self>, columns: Option>>, - batch_size: Option, row_filter: Option<&Bound<'_, PyExpr>>, ) -> PyResult { let layout_reader = layout_stream_from_reader( self_.reader().await?, projection_from_python(columns)?, - batch_size, row_filter_from_python(row_filter), ) .await?; @@ -248,25 +232,23 @@ impl ObjectStoreUrlDataset { self_.schema.clone().to_pyarrow(self_.py()) } - #[pyo3(signature = (*, columns=None, batch_size=None, row_filter=None))] + #[pyo3(signature = (*, columns=None, row_filter=None))] pub fn to_array( &self, columns: Option>>, - batch_size: Option, row_filter: Option<&Bound<'_, PyExpr>>, ) -> PyResult { - TOKIO_RUNTIME.block_on(self.async_to_array(columns, batch_size, row_filter)) + TOKIO_RUNTIME.block_on(self.async_to_array(columns, row_filter)) } - #[pyo3(signature = (*, columns=None, batch_size=None, row_filter=None))] + #[pyo3(signature = (*, columns=None, row_filter=None))] pub fn to_record_batch_reader( self_: PyRef, columns: Option>>, - batch_size: Option, row_filter: Option<&Bound<'_, PyExpr>>, ) -> PyResult { TOKIO_RUNTIME.block_on(Self::async_to_record_batch_reader( - self_, columns, batch_size, row_filter, + self_, columns, row_filter, )) } } diff --git a/pyvortex/src/io.rs b/pyvortex/src/io.rs index d93350df95..aaaa1f95de 100644 --- a/pyvortex/src/io.rs +++ b/pyvortex/src/io.rs @@ -132,7 +132,7 @@ pub fn read_path( row_filter: Option<&Bound>, ) -> PyResult { let dataset = TOKIO_RUNTIME.block_on(TokioFileDataset::try_new(path.extract()?))?; - dataset.to_array(projection, None, row_filter) + dataset.to_array(projection, row_filter) } /// Read a vortex struct array from a URL. @@ -184,7 +184,7 @@ pub fn read_url( row_filter: Option<&Bound>, ) -> PyResult { let dataset = TOKIO_RUNTIME.block_on(ObjectStoreUrlDataset::try_new(url.extract()?))?; - dataset.to_array(projection, None, row_filter) + dataset.to_array(projection, row_filter) } /// Write a vortex struct array to the local filesystem. diff --git a/pyvortex/test/test_dataset.py b/pyvortex/test/test_dataset.py index efa296eb86..f2cdb1c514 100644 --- a/pyvortex/test/test_dataset.py +++ b/pyvortex/test/test_dataset.py @@ -53,7 +53,7 @@ def test_to_batches(ds): chunk0 = next(ds.to_batches(columns=["string", "bool"])) assert chunk0.to_struct_array() == pa.array( - [record(x, columns=["string", "bool"]) for x in range(1 << 16)], type=schema + [record(x, columns=["string", "bool"]) for x in range(1_000_000)], type=schema ) diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index 43ff1a7a48..ac91c8b8c5 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -98,7 +98,6 @@ impl ExecutionPlan for VortexExec { ctx: self.ctx.clone(), object_store, projection: self.file_scan_config.projection.clone(), - batch_size: None, predicate: self.predicate.clone(), arrow_schema, }; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 73e1ea08c0..30a608ec86 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -17,7 +17,6 @@ use vortex_serde::layouts::{ pub struct VortexFileOpener { pub ctx: Arc, pub object_store: Arc, - pub batch_size: Option, pub projection: Option>, pub predicate: Option>, pub arrow_schema: SchemaRef, @@ -33,10 +32,6 @@ impl FileOpener for VortexFileOpener { LayoutDeserializer::new(self.ctx.clone(), Arc::new(LayoutContext::default())), ); - if let Some(batch_size) = self.batch_size { - builder = builder.with_batch_size(batch_size); - } - let row_filter = self .predicate .clone() diff --git a/vortex-expr/src/lib.rs b/vortex-expr/src/lib.rs index 58e7c82b63..fcf7335a54 100644 --- a/vortex-expr/src/lib.rs +++ b/vortex-expr/src/lib.rs @@ -61,7 +61,7 @@ fn split_inner(expr: &Arc, exprs: &mut Vec>) } // Taken from apache-datafusion, necessary since you can't require VortexExpr implement PartialEq -pub(crate) fn unbox_any(any: &dyn Any) -> &dyn Any { +pub fn unbox_any(any: &dyn Any) -> &dyn Any { if any.is::>() { any.downcast_ref::>() .vortex_expect("any.is::> returned true but downcast_ref failed") diff --git a/vortex-serde/Cargo.toml b/vortex-serde/Cargo.toml index 573fa51b05..d61f8e7675 100644 --- a/vortex-serde/Cargo.toml +++ b/vortex-serde/Cargo.toml @@ -18,6 +18,7 @@ arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-schema = { workspace = true } bytes = { workspace = true } +croaring = { workspace = true } flatbuffers = { workspace = true } futures = { workspace = true } futures-executor = { workspace = true } @@ -47,6 +48,7 @@ arrow-schema = { workspace = true } arrow-select = { workspace = true } criterion = { workspace = true, features = ["async_futures"] } rand = { workspace = true } +rstest = { workspace = true } simplelog = { workspace = true } tokio = { workspace = true, features = ["full"] } vortex-alp = { path = "../encodings/alp" } diff --git a/vortex-serde/src/layouts/read/batch.rs b/vortex-serde/src/layouts/read/batch.rs index ed50cdb6e8..23be6e843a 100644 --- a/vortex-serde/src/layouts/read/batch.rs +++ b/vortex-serde/src/layouts/read/batch.rs @@ -2,29 +2,90 @@ use std::mem; use std::sync::Arc; use vortex::array::StructArray; +use vortex::validity::Validity; use vortex::{Array, IntoArray}; -use vortex_error::{vortex_err, VortexResult}; +use vortex_dtype::FieldNames; +use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; +use vortex_expr::VortexExpr; +use crate::layouts::read::selection::RowSelector; use crate::layouts::read::{LayoutReader, ReadResult}; +use crate::layouts::{Message, RangeResult}; #[derive(Debug)] -pub struct BatchReader { - names: Arc<[Arc]>, +pub struct ColumnBatchReader { + names: FieldNames, children: Vec>, + read_ranges: Vec>, arrays: Vec>, } -impl BatchReader { - pub fn new(names: Arc<[Arc]>, children: Vec>) -> Self { +impl ColumnBatchReader { + pub fn new(names: FieldNames, children: Vec>) -> Self { let arrays = vec![None; children.len()]; + let read_ranges = vec![None; children.len()]; Self { names, children, + read_ranges, arrays, } } - pub(crate) fn read(&mut self) -> VortexResult> { + pub fn next_range(&mut self) -> VortexResult { + let mut messages = Vec::new(); + for (i, child_selector) in self + .read_ranges + .iter_mut() + .enumerate() + .filter(|(_, a)| a.is_none()) + { + match self.children[i].next_range()? { + RangeResult::ReadMore(m) => messages.extend(m), + RangeResult::Rows(s) => match s { + None => return Ok(RangeResult::Rows(None)), + Some(rs) => { + if rs.is_empty() { + return self.advance(rs.end()).map(RangeResult::ReadMore); + } + *child_selector = Some(rs); + } + }, + } + } + + if messages.is_empty() { + let ranges = mem::replace(&mut self.read_ranges, vec![None; self.children.len()]); + let mut ranges_iter = ranges.iter().enumerate(); + let mut final_range: Option = + ranges_iter.next().and_then(|(_, ri)| ri.clone()); + for (i, range) in ranges_iter { + let Some(column_range) = range else { + vortex_bail!("Finished reading all columns but column {i} didn't produce range") + }; + final_range = final_range.and_then(|fr| { + let intersection = fr.intersect(column_range); + if intersection.is_empty() { + None + } else { + Some(intersection) + } + }) + } + + if let Some(fr) = final_range.as_ref() { + self.read_ranges = ranges + .into_iter() + .map(|rs| rs.and_then(|r| r.advance(fr.end()))) + .collect(); + } + Ok(RangeResult::Rows(final_range)) + } else { + Ok(RangeResult::ReadMore(messages)) + } + } + + pub fn read_next(&mut self, selection: RowSelector) -> VortexResult> { let mut messages = Vec::new(); for (i, child_array) in self .arrays @@ -32,17 +93,19 @@ impl BatchReader { .enumerate() .filter(|(_, a)| a.is_none()) { - match self.children[i].read_next()? { + match self.children[i].read_next(selection.clone())? { Some(rr) => match rr { ReadResult::ReadMore(message) => { messages.extend(message); } ReadResult::Batch(a) => *child_array = Some(a), + ReadResult::Selector(_) => unreachable!("Can only produce batches"), }, None => { debug_assert!( self.arrays.iter().all(Option::is_none), - "Expected layout to produce an array but it was empty" + "Expected layout {}({i}) to produce an array but it was empty", + self.names[i] ); return Ok(None); } @@ -53,14 +116,222 @@ impl BatchReader { let child_arrays = mem::replace(&mut self.arrays, vec![None; self.children.len()]) .into_iter() .enumerate() - .map(|(i, a)| a.ok_or_else(|| vortex_err!("Missing child array at index {}", i))) + .map(|(i, a)| a.ok_or_else(|| vortex_err!("Missing child array at index {i}"))) .collect::>>()?; + let len = child_arrays + .first() + .map(|l| l.len()) + .unwrap_or(selection.len()); Ok(Some(ReadResult::Batch( - StructArray::from_fields(&self.names.iter().zip(child_arrays).collect::>())? + StructArray::try_new(self.names.clone(), child_arrays, len, Validity::NonNullable)? .into_array(), ))) } else { Ok(Some(ReadResult::ReadMore(messages))) } } + + pub fn advance(&mut self, up_to_row: usize) -> VortexResult> { + self.arrays = vec![None; self.children.len()]; + self.read_ranges = mem::take(&mut self.read_ranges) + .into_iter() + .map(|s| s.and_then(|rs| rs.advance(up_to_row))) + .collect(); + + let mut messages = Vec::new(); + for c in self.children.iter_mut() { + messages.extend(c.advance(up_to_row)?); + } + Ok(messages) + } +} + +#[derive(Debug)] +pub struct FilterLayoutReader { + reader: ColumnBatchReader, + filter: Arc, + row_offset: usize, +} + +impl FilterLayoutReader { + pub fn new(reader: ColumnBatchReader, filter: Arc, row_offset: usize) -> Self { + Self { + reader, + filter, + row_offset, + } + } +} + +impl LayoutReader for FilterLayoutReader { + fn next_range(&mut self) -> VortexResult { + self.reader.next_range() + } + + fn read_next(&mut self, selection: RowSelector) -> VortexResult> { + match self.reader.read_next(selection)? { + None => Ok(None), + Some(rr) => match rr { + ReadResult::ReadMore(m) => Ok(Some(ReadResult::ReadMore(m))), + ReadResult::Batch(b) => { + let filter_result = self.filter.evaluate(&b)?; + let selector = RowSelector::from_array( + &filter_result, + self.row_offset, + self.row_offset + filter_result.len(), + )?; + self.row_offset += b.len(); + Ok(Some(ReadResult::Selector(selector))) + } + ReadResult::Selector(_) => unreachable!("Can only produce batches"), + }, + } + } + + fn advance(&mut self, up_to_row: usize) -> VortexResult> { + self.row_offset = up_to_row; + self.reader.advance(up_to_row) + } +} + +#[derive(Debug)] +pub struct ColumnBatchFilter { + children: Vec>, + read_ranges: Vec>, + filters: Vec>, +} + +impl ColumnBatchFilter { + pub fn new(children: Vec>) -> Self { + let read_ranges = vec![None; children.len()]; + let filters = vec![None; children.len()]; + Self { + children, + read_ranges, + filters, + } + } + + pub fn next_range(&mut self) -> VortexResult { + let mut messages = Vec::new(); + for (i, child_selector) in self + .read_ranges + .iter_mut() + .enumerate() + .filter(|(_, a)| a.is_none()) + { + match self.children[i].next_range()? { + RangeResult::ReadMore(m) => messages.extend(m), + RangeResult::Rows(s) => match s { + None => return Ok(RangeResult::Rows(None)), + Some(rs) => { + if rs.is_empty() { + return self.advance(rs.end()).map(RangeResult::ReadMore); + } + *child_selector = Some(rs); + } + }, + } + } + + if messages.is_empty() { + let ranges = mem::replace(&mut self.read_ranges, vec![None; self.children.len()]); + let mut ranges_iter = ranges.iter().enumerate(); + let mut final_range: Option = + ranges_iter.next().and_then(|(_, ri)| ri.clone()); + for (i, range) in ranges_iter { + let Some(column_range) = range else { + vortex_bail!("Finished reading all columns but column {i} didn't produce range") + }; + final_range = final_range.and_then(|fr| { + let intersection = fr.intersect(column_range); + if intersection.is_empty() { + None + } else { + Some(intersection) + } + }) + } + + if let Some(fr) = final_range.as_ref() { + self.read_ranges = ranges + .into_iter() + .map(|rs| rs.and_then(|r| r.advance(fr.end()))) + .collect(); + } + Ok(RangeResult::Rows(final_range)) + } else { + Ok(RangeResult::ReadMore(messages)) + } + } + + pub fn read_next(&mut self, selection: RowSelector) -> VortexResult> { + let mut messages = Vec::new(); + for (i, child_selector) in self + .filters + .iter_mut() + .enumerate() + .filter(|(_, a)| a.is_none()) + { + match self.children[i].read_next(selection.clone())? { + Some(rr) => match rr { + ReadResult::ReadMore(msgs) => messages.extend(msgs), + ReadResult::Selector(s) => { + if s.is_empty() { + return self.advance(s.end()).map(ReadResult::ReadMore).map(Some); + } + *child_selector = Some(s) + } + ReadResult::Batch(_) => unreachable!("Can only produce selectors"), + }, + None => { + debug_assert!( + self.filters.iter().all(Option::is_none), + "Expected layout to produce an array but it was empty" + ); + return Ok(None); + } + } + } + + if messages.is_empty() { + let selectors = mem::replace(&mut self.filters, vec![None; self.children.len()]) + .into_iter() + .enumerate() + .map(|(i, a)| a.ok_or_else(|| vortex_err!("Missing child array at index {i}"))) + .collect::>>()?; + let mut selector_iter = selectors.into_iter(); + // TODO(robert): Handle empty projections + let mut current = selector_iter + .next() + .vortex_expect("Must have at least one child"); + for next_filter in selector_iter { + if current.is_empty() { + return self.read_next(selection); + } + current = current.intersect(&next_filter); + } + Ok(Some(ReadResult::Selector(current))) + } else { + Ok(Some(ReadResult::ReadMore(messages))) + } + } + + pub fn advance(&mut self, up_to_row: usize) -> VortexResult> { + self.filters = mem::take(&mut self.filters) + .into_iter() + .map(|s| s.and_then(|rs| rs.advance(up_to_row))) + .collect(); + + self.read_ranges = mem::take(&mut self.read_ranges) + .into_iter() + .map(|s| s.and_then(|rs| rs.advance(up_to_row))) + .collect(); + + let mut messages = Vec::new(); + for c in self.children.iter_mut() { + messages.extend(c.advance(up_to_row)?); + } + Ok(messages) + } } diff --git a/vortex-serde/src/layouts/read/buffered.rs b/vortex-serde/src/layouts/read/buffered.rs index 486bc67a54..9003f529c7 100644 --- a/vortex-serde/src/layouts/read/buffered.rs +++ b/vortex-serde/src/layouts/read/buffered.rs @@ -1,25 +1,32 @@ use std::collections::VecDeque; +use std::mem; +use croaring::Bitmap; use vortex::array::ChunkedArray; use vortex::compute::slice; use vortex::{Array, ArrayDType, IntoArray}; -use vortex_error::VortexResult; +use vortex_error::{vortex_bail, VortexResult}; +use crate::layouts::read::selection::RowSelector; use crate::layouts::read::{LayoutReader, ReadResult}; +use crate::layouts::{Message, RangeResult}; + +pub type RangedLayoutReader = ((usize, usize), Box); +pub type RangedArray = ((usize, usize), Array); #[derive(Debug)] -pub struct BufferedReader { - layouts: VecDeque>, - arrays: VecDeque, - batch_size: usize, +pub struct BufferedArrayReader { + layouts: VecDeque, + arrays: VecDeque, + next_range_offset: usize, } -impl BufferedReader { - pub fn new(layouts: VecDeque>, batch_size: usize) -> Self { +impl BufferedArrayReader { + pub fn new(layouts: VecDeque) -> Self { Self { layouts, - arrays: Default::default(), - batch_size, + arrays: VecDeque::new(), + next_range_offset: 0, } } @@ -27,79 +34,253 @@ impl BufferedReader { self.layouts.is_empty() && self.arrays.is_empty() } - fn buffered_row_count(&self) -> usize { - self.arrays.iter().map(Array::len).sum() - } + pub fn next_range(&mut self) -> VortexResult { + if self.next_range_offset == self.layouts.len() { + return Ok(RangeResult::Rows(None)); + } - fn buffer(&mut self) -> VortexResult> { - while self.buffered_row_count() < self.batch_size { - if let Some(mut layout) = self.layouts.pop_front() { - if let Some(rr) = layout.read_next()? { - self.layouts.push_front(layout); - match rr { - read_more @ ReadResult::ReadMore(..) => { - return Ok(Some(read_more)); - } - ReadResult::Batch(a) => self.arrays.push_back(a), + match self.layouts[self.next_range_offset].1.next_range()? { + RangeResult::ReadMore(m) => Ok(RangeResult::ReadMore(m)), + RangeResult::Rows(r) => match r { + None => { + self.next_range_offset += 1; + self.next_range() + } + Some(rs) => { + let layout_range = self.layouts[self.next_range_offset].0; + let offset = rs.offset(-(layout_range.0 as i64)); + if offset.end() == layout_range.1 { + self.next_range_offset += 1; } - } else { - continue; + Ok(RangeResult::Rows(Some(offset))) } - } else { - return Ok(None); - } + }, } - Ok(None) } - pub fn read(&mut self) -> VortexResult> { + pub fn read_next(&mut self, selection: RowSelector) -> VortexResult> { if self.is_empty() { return Ok(None); } - if let Some(rr) = self.buffer()? { + if let Some(rr) = buffer_read(&mut self.layouts, selection, |range, read| match read { + ReadResult::ReadMore(_) => unreachable!("Handled by outside closure"), + ReadResult::Selector(_) => unreachable!("Can't be a selector"), + ReadResult::Batch(a) => self.arrays.push_back((range, a)), + })? { match rr { read_more @ ReadResult::ReadMore(..) => return Ok(Some(read_more)), - ReadResult::Batch(_) => { - unreachable!("Batches should be handled inside the buffer call") + ReadResult::Batch(_) | ReadResult::Selector(_) => { + unreachable!("Buffer should only produce ReadMore") } } } + self.next_range_offset = 0; + + let mut result = mem::take(&mut self.arrays); + match result.len() { + 0 | 1 => Ok(result.pop_front().map(|(_, a)| a).map(ReadResult::Batch)), + _ => { + let dtype = result[0].1.dtype().clone(); + Ok(Some(ReadResult::Batch( + ChunkedArray::try_new(result.into_iter().map(|(_, a)| a).collect(), dtype)? + .into_array(), + ))) + } + } + } - let mut rows_to_read = if self.layouts.is_empty() { - usize::min(self.batch_size, self.buffered_row_count()) + pub fn advance(&mut self, up_to_row: usize) -> VortexResult> { + if self + .arrays + .front() + .map(|((begin, _), _)| up_to_row < *begin) + .or_else(|| { + self.layouts + .front() + .map(|((begin, _), _)| up_to_row < *begin) + }) + .unwrap_or(false) + { + vortex_bail!("Can't advance backwards {up_to_row}") + } + + let mut new_arrays = mem::take(&mut self.arrays) + .into_iter() + .skip_while(|((_, end), _)| *end < up_to_row) + .collect::>(); + if let Some(((begin, end), carr)) = new_arrays.pop_front() { + let slice_end = carr.len(); + let sliced = slice(carr, slice_end - (end - up_to_row), slice_end)?; + + new_arrays.push_front(((begin, end), sliced)); + }; + self.arrays = new_arrays; + + let mut new_layouts = mem::take(&mut self.layouts) + .into_iter() + .skip_while(|((_, end), _)| *end < up_to_row) + .collect::>(); + let res = if let Some(((begin, end), mut l)) = new_layouts.pop_front() { + let advance = l.advance(up_to_row - begin); + new_layouts.push_front(((begin, end), l)); + advance } else { - self.batch_size + Ok(vec![]) }; + self.next_range_offset = 0; + self.layouts = new_layouts; + res + } +} - let mut result = Vec::new(); - - while rows_to_read != 0 { - match self.arrays.pop_front() { - None => break, - Some(array) => { - if array.len() > rows_to_read { - let taken = slice(&array, 0, rows_to_read)?; - let leftover = slice(&array, rows_to_read, array.len())?; - self.arrays.push_front(leftover); - rows_to_read -= taken.len(); - result.push(taken); - } else { - rows_to_read -= array.len(); - result.push(array); - } +fn buffer_read( + layouts: &mut VecDeque, + selection: RowSelector, + mut consumer: F, +) -> VortexResult> { + while let Some(((begin, end), mut layout)) = layouts.pop_front() { + // This selection doesn't know about rows in this chunk, we should put it back and wait for another request with different range + if selection.end() <= begin { + layouts.push_front(((begin, end), layout)); + return Ok(None); + } + let layout_selection = + RowSelector::new(Bitmap::from_range(begin as u32..end as u32), begin, end) + .intersect(&selection) + .offset(begin as i64); + if let Some(rr) = layout.read_next(layout_selection)? { + layouts.push_front(((begin, end), layout)); + match rr { + read_more @ ReadResult::ReadMore(..) => { + return Ok(Some(read_more)); } + ReadResult::Batch(a) => consumer((begin, end), ReadResult::Batch(a)), + ReadResult::Selector(s) => consumer((begin, end), ReadResult::Selector(s)), } + } else { + if end > selection.end() && begin < selection.end() { + layouts.push_front(((begin, end), layout)); + return Ok(None); + } + continue; } + } + Ok(None) +} - match result.len() { - 0 | 1 => Ok(result.pop().map(ReadResult::Batch)), - _ => { - let dtype = result[0].dtype().clone(); - Ok(Some(ReadResult::Batch( - ChunkedArray::try_new(result, dtype)?.into_array(), - ))) +#[derive(Debug)] +pub struct BufferedSelectorReader { + layouts: VecDeque, + selectors: VecDeque, + next_range_offset: usize, +} + +impl BufferedSelectorReader { + pub fn new(layouts: VecDeque) -> Self { + Self { + layouts, + selectors: VecDeque::new(), + next_range_offset: 0, + } + } + + fn is_empty(&self) -> bool { + self.layouts.is_empty() && self.selectors.is_empty() + } + + pub fn next_range(&mut self) -> VortexResult { + if self.next_range_offset == self.layouts.len() { + return Ok(RangeResult::Rows(None)); + } + + match self.layouts[self.next_range_offset].1.next_range()? { + RangeResult::ReadMore(m) => Ok(RangeResult::ReadMore(m)), + RangeResult::Rows(r) => match r { + None => { + self.next_range_offset += 1; + self.next_range() + } + Some(rs) => { + let layout_range = self.layouts[self.next_range_offset].0; + let offset = rs.offset(-(layout_range.0 as i64)); + if offset.end() == layout_range.1 { + self.next_range_offset += 1; + } + Ok(RangeResult::Rows(Some(offset))) + } + }, + } + } + + pub fn read_next(&mut self, selection: RowSelector) -> VortexResult> { + if self.is_empty() { + return Ok(None); + } + + if let Some(rr) = buffer_read( + &mut self.layouts, + selection, + |(begin, _), read| match read { + ReadResult::ReadMore(_) => unreachable!("Handled by outside closure"), + ReadResult::Selector(s) => self.selectors.push_back(s.offset(-(begin as i64))), + ReadResult::Batch(_) => unreachable!("Can't be an array"), + }, + )? { + match rr { + read_more @ ReadResult::ReadMore(..) => return Ok(Some(read_more)), + ReadResult::Batch(_) | ReadResult::Selector(_) => { + unreachable!("Buffer should only produce ReadMore") + } } } + self.next_range_offset = 0; + + Ok(mem::take(&mut self.selectors) + .into_iter() + .reduce(|acc, a| acc.concatenate(&a)) + .map(ReadResult::Selector)) + } + + pub fn advance(&mut self, up_to_row: usize) -> VortexResult> { + if self + .selectors + .front() + .map(|s| up_to_row < s.begin()) + .or_else(|| { + self.layouts + .front() + .map(|((begin, _), _)| up_to_row < *begin) + }) + .unwrap_or(false) + { + vortex_bail!("Can't advance backwards to {up_to_row}") + } + + let mut new_selectors = mem::take(&mut self.selectors) + .into_iter() + .skip_while(|s| s.end() < up_to_row) + .collect::>(); + if let Some(s) = new_selectors.pop_front() { + if let Some(rs) = s.advance(up_to_row) { + new_selectors.push_front(rs); + } + }; + self.selectors = new_selectors; + + let mut new_layouts = mem::take(&mut self.layouts) + .into_iter() + .skip_while(|((_, end), _)| *end < up_to_row) + .collect::>(); + let res = if let Some(((begin, end), mut l)) = new_layouts.pop_front() { + let advance = l.advance(up_to_row - begin); + new_layouts.push_front(((begin, end), l)); + advance + } else { + Ok(vec![]) + }; + self.next_range_offset = 0; + self.layouts = new_layouts; + res } } diff --git a/vortex-serde/src/layouts/read/builder.rs b/vortex-serde/src/layouts/read/builder.rs index e82030eaee..46ca76931a 100644 --- a/vortex-serde/src/layouts/read/builder.rs +++ b/vortex-serde/src/layouts/read/builder.rs @@ -2,6 +2,7 @@ use std::sync::{Arc, RwLock}; use vortex::{Array, ArrayDType}; use vortex_error::VortexResult; +use vortex_expr::Select; use vortex_schema::projection::Projection; use crate::io::VortexReadAt; @@ -10,7 +11,7 @@ use crate::layouts::read::context::LayoutDeserializer; use crate::layouts::read::filtering::RowFilter; use crate::layouts::read::footer::LayoutDescriptorReader; use crate::layouts::read::stream::LayoutBatchStream; -use crate::layouts::read::{Scan, DEFAULT_BATCH_SIZE}; +use crate::layouts::read::Scan; pub struct LayoutReaderBuilder { reader: R, @@ -19,7 +20,6 @@ pub struct LayoutReaderBuilder { size: Option, indices: Option, row_filter: Option, - batch_size: Option, } impl LayoutReaderBuilder { @@ -31,7 +31,6 @@ impl LayoutReaderBuilder { row_filter: None, size: None, indices: None, - batch_size: None, } } @@ -60,16 +59,11 @@ impl LayoutReaderBuilder { self } - pub fn with_batch_size(mut self, batch_size: usize) -> Self { - self.batch_size = Some(batch_size); - self - } - pub async fn build(self) -> VortexResult> { let footer = LayoutDescriptorReader::new(self.layout_serde.clone()) - .read_footer(&self.reader, self.size().await as u64) + .read_footer(&self.reader, self.size().await) .await?; - let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE); + let row_count = footer.row_count()?; // TODO(robert): Propagate projection immediately instead of delegating to layouts, needs more restructuring let footer_dtype = Arc::new(LazyDeserializedDType::from_bytes( footer.dtype_bytes()?, @@ -77,39 +71,34 @@ impl LayoutReaderBuilder { )); let read_projection = self.projection.unwrap_or_default(); - let filter_projection = self - .row_filter - .as_ref() - .map(|f| f.references().into_iter().cloned().collect::>()) - .map(Projection::from); - let projected_dtype = match read_projection { Projection::All => footer.dtype()?, Projection::Flat(ref projection) => footer.projected_dtype(projection)?, }; - let scan = Scan { - filter: self.row_filter.clone(), - batch_size, - projection: read_projection, - indices: self.indices, + let read_scan = Scan { + expr: match read_projection { + Projection::All => None, + Projection::Flat(p) => Some(Arc::new(Select::include(p))), + }, }; let message_cache = Arc::new(RwLock::new(LayoutMessageCache::default())); let data_reader = footer.layout( - scan.clone(), + row_count, + read_scan.clone(), RelativeLayoutCache::new(message_cache.clone(), footer_dtype.clone()), )?; - let filter_reader = filter_projection - .map(|projection| { + let filter_reader = self + .row_filter + .as_ref() + .map(|filter| { footer.layout( + row_count, Scan { - filter: self.row_filter, - batch_size, - projection, - indices: None, + expr: Some(Arc::new(filter.clone())), }, RelativeLayoutCache::new(message_cache.clone(), footer_dtype), ) @@ -122,7 +111,7 @@ impl LayoutReaderBuilder { filter_reader, message_cache, projected_dtype, - scan, + row_count, )) } diff --git a/vortex-serde/src/layouts/read/cache.rs b/vortex-serde/src/layouts/read/cache.rs index 3f6868ae63..9e9d0641bd 100644 --- a/vortex-serde/src/layouts/read/cache.rs +++ b/vortex-serde/src/layouts/read/cache.rs @@ -7,7 +7,7 @@ use vortex::aliases::hash_map::HashMap; use vortex_dtype::field::Field; use vortex_dtype::flatbuffers::{deserialize_and_project, resolve_field}; use vortex_dtype::DType; -use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexResult}; +use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexExpect, VortexResult}; use vortex_flatbuffers::message; use vortex_schema::projection::Projection; @@ -135,7 +135,7 @@ impl LazyDeserializedDType { #[derive(Debug)] pub struct RelativeLayoutCache { root: Arc>, - dtype: Arc, + dtype: Option>, path: MessageId, } @@ -143,7 +143,7 @@ impl RelativeLayoutCache { pub fn new(root: Arc>, dtype: Arc) -> Self { Self { root, - dtype, + dtype: Some(dtype), path: Vec::new(), } } @@ -154,7 +154,17 @@ impl RelativeLayoutCache { Self { root: self.root.clone(), path: new_path, - dtype, + dtype: Some(dtype), + } + } + + pub fn stored_dtype(&self, id: LayoutPartId) -> Self { + let mut new_path = self.path.clone(); + new_path.push(id); + Self { + root: self.root.clone(), + path: new_path, + dtype: None, } } @@ -185,7 +195,7 @@ impl RelativeLayoutCache { } pub fn dtype(&self) -> &Arc { - &self.dtype + self.dtype.as_ref().vortex_expect("Must have dtype") } pub fn absolute_id(&self, path: &[LayoutPartId]) -> MessageId { diff --git a/vortex-serde/src/layouts/read/context.rs b/vortex-serde/src/layouts/read/context.rs index 933943d2d0..f1eda64afe 100644 --- a/vortex-serde/src/layouts/read/context.rs +++ b/vortex-serde/src/layouts/read/context.rs @@ -8,7 +8,9 @@ use vortex_error::{vortex_err, VortexResult}; use vortex_flatbuffers::footer as fb; use crate::layouts::read::cache::RelativeLayoutCache; -use crate::layouts::read::layouts::{ChunkedLayoutSpec, ColumnLayoutSpec, FlatLayoutSpec}; +use crate::layouts::read::layouts::{ + ChunkedLayoutSpec, ColumnLayoutSpec, FlatLayoutSpec, InlineDTypeLayoutSpec, +}; use crate::layouts::read::{LayoutReader, Scan}; #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] @@ -27,6 +29,7 @@ pub trait LayoutSpec: Debug + Send + Sync { &self, fb_bytes: Bytes, fb_loc: usize, + length: u64, scan: Scan, layout_reader: LayoutDeserializer, message_cache: RelativeLayoutCache, @@ -56,6 +59,7 @@ impl Default for LayoutContext { [ &ColumnLayoutSpec as LayoutSpecRef, &ChunkedLayoutSpec, + &InlineDTypeLayoutSpec, &FlatLayoutSpec, ] .into_iter() @@ -80,6 +84,7 @@ impl LayoutDeserializer { &self, fb_bytes: Bytes, fb_loc: usize, + length: u64, scan: Scan, message_cache: RelativeLayoutCache, ) -> VortexResult> { @@ -92,7 +97,7 @@ impl LayoutDeserializer { .layout_ctx .lookup_layout(&layout_id) .ok_or_else(|| vortex_err!("Unknown layout definition {layout_id}"))? - .layout(fb_bytes, fb_loc, scan, self.clone(), message_cache)) + .layout(fb_bytes, fb_loc, length, scan, self.clone(), message_cache)) } pub(crate) fn ctx(&self) -> Arc { diff --git a/vortex-serde/src/layouts/read/filter_project.rs b/vortex-serde/src/layouts/read/filter_project.rs new file mode 100644 index 0000000000..93a1a5a1ac --- /dev/null +++ b/vortex-serde/src/layouts/read/filter_project.rs @@ -0,0 +1,71 @@ +use std::sync::Arc; + +use vortex_dtype::field::Field; +use vortex_expr::{BinaryExpr, Column, Identity, Literal, Operator, Select, VortexExpr}; + +use crate::layouts::RowFilter; + +pub fn filter_project( + filter: &Arc, + projection: &[Field], +) -> Option> { + if let Some(rf) = filter.as_any().downcast_ref::() { + rf.only_fields(projection).map(|rf| Arc::new(rf) as _) + } else if filter.as_any().downcast_ref::().is_some() { + Some(filter.clone()) + } else if let Some(s) = filter.as_any().downcast_ref::() { + match se { + Select::Include(i) => Ok(i.clone()), + Select::Exclude(_) => vortex_bail!("Select::Exclude not supported"), + } + } else { + Ok(e.references().into_iter().cloned().collect::>()) + } + }) + .transpose() + } + + fn read_init(&mut self) -> VortexResult<()> { + if let Some(expr) = self.scan.expr.as_ref() { + if expr.as_any().is::() { + self.state = ColumnLayoutState::Filtering(self.filter_reader()?); + } else { + self.state = ColumnLayoutState::Reading(self.read_children()?); + } + } else { + self.state = ColumnLayoutState::Reading(self.read_children()?); + } + Ok(()) } } impl LayoutReader for ColumnLayout { - fn read_next(&mut self) -> VortexResult> { - if let Some(br) = &mut self.reader { - br.read() - } else { - let result_lazy_dtype = self.lazy_dtype()?; - let DType::Struct(s, _) = result_lazy_dtype.value()? else { - vortex_bail!("DType was not a struct") - }; + fn next_range(&mut self) -> VortexResult { + match &mut self.state { + ColumnLayoutState::Init => { + self.read_init()?; + self.next_range() + } + ColumnLayoutState::Filtering(fc) => fc.next_range(), + ColumnLayoutState::Reading(rc) => rc.next_range(), + } + } - let fb_children = self - .flatbuffer() - .children() - .ok_or_else(|| vortex_err!("Missing children"))?; - - let child_layouts = match &self.scan.projection { - Projection::All => (0..fb_children.len()) - .zip_eq(s.dtypes().iter()) - .map(|(index, dtype)| self.read_child(index, fb_children, dtype.clone())) - .collect::>>()?, - Projection::Flat(proj) => proj - .iter() - .map(|f| result_lazy_dtype.resolve_field(f)) - .zip(s.dtypes().iter().cloned()) - .map(|(child_idx, dtype)| self.read_child(child_idx?, fb_children, dtype)) - .collect::>>()?, - }; + fn read_next(&mut self, selector: RowSelector) -> VortexResult> { + match &mut self.state { + ColumnLayoutState::Init => { + self.read_init()?; + self.read_next(selector) + } + ColumnLayoutState::Filtering(fc) => fc.read_next(selector), + ColumnLayoutState::Reading(rc) => rc.read_next(selector), + } + } - self.reader = Some(BatchReader::new(s.names().clone(), child_layouts)); - self.read_next() + fn advance(&mut self, up_to_row: usize) -> VortexResult> { + match &mut self.state { + ColumnLayoutState::Filtering(fr) => fr.advance(up_to_row), + ColumnLayoutState::Reading(br) => br.advance(up_to_row), + _ => { + self.offset = up_to_row; + Ok(vec![]) + } } } } + +#[cfg(test)] +mod tests { + use std::collections::VecDeque; + use std::iter; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::{Arc, RwLock}; + + use bytes::Bytes; + use vortex::accessor::ArrayAccessor; + use vortex::array::{ChunkedArray, PrimitiveArray, StructArray, VarBinArray}; + use vortex::validity::Validity; + use vortex::{ArrayDType, IntoArray, IntoArrayVariant}; + use vortex_dtype::field::Field; + use vortex_dtype::{DType, Nullability}; + use vortex_expr::{BinaryExpr, Column, Literal, Operator}; + use vortex_schema::projection::Projection; + + use crate::layouts::read::cache::{LazyDeserializedDType, RelativeLayoutCache}; + use crate::layouts::read::layouts::test_read::{ + filter_read_layout, read_filters, read_layout, read_layout_data, read_layout_ranges, + }; + use crate::layouts::{ + LayoutDescriptorReader, LayoutDeserializer, LayoutMessageCache, LayoutReader, LayoutWriter, + RowFilter, Scan, + }; + + async fn layout_and_bytes( + cache: Arc>, + scan: Scan, + ) -> (Box, Box, Bytes) { + let int_array = PrimitiveArray::from((0..100).collect::>()).into_array(); + let int_dtype = int_array.dtype().clone(); + let chunked = ChunkedArray::try_new(iter::repeat(int_array).take(5).collect(), int_dtype) + .unwrap() + .into_array(); + let str_array = VarBinArray::from_vec( + iter::repeat("test text").take(500).collect(), + DType::Utf8(Nullability::NonNullable), + ) + .into_array(); + let len = chunked.len(); + let struct_arr = StructArray::try_new( + vec!["ints".into(), "strs".into()].into(), + vec![chunked, str_array], + len, + Validity::NonNullable, + ) + .unwrap() + .into_array(); + + let mut writer = LayoutWriter::new(Vec::new()); + writer = writer.write_array_columns(struct_arr).await.unwrap(); + let written = writer.finalize().await.unwrap(); + + let footer = LayoutDescriptorReader::new(LayoutDeserializer::default()) + .read_footer(&written, written.len() as u64) + .await + .unwrap(); + + let dtype = Arc::new(LazyDeserializedDType::from_bytes( + footer.dtype_bytes().unwrap(), + Projection::All, + )); + let len = len as u64; + ( + footer + .layout( + len, + scan, + RelativeLayoutCache::new(cache.clone(), dtype.clone()), + ) + .unwrap(), + footer + .layout(len, Scan::new(None), RelativeLayoutCache::new(cache, dtype)) + .unwrap(), + Bytes::from(written), + ) + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn read_range() { + let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); + let (mut filter_layout, mut project_layout, buf) = layout_and_bytes( + cache.clone(), + Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new( + Arc::new(Column::new(Field::from("ints"))), + Operator::Gt, + Arc::new(Literal::new(10.into())), + )))))), + ) + .await; + let arr = filter_read_layout(filter_layout.as_mut(), project_layout.as_mut(), cache, &buf) + .pop_front(); + + assert!(arr.is_some()); + let prim_arr = arr + .as_ref() + .unwrap() + .with_dyn(|a| a.as_struct_array_unchecked().field(0)) + .unwrap() + .into_primitive() + .unwrap(); + let str_arr = arr + .as_ref() + .unwrap() + .with_dyn(|a| a.as_struct_array_unchecked().field(1)) + .unwrap() + .into_varbinview() + .unwrap(); + assert_eq!( + prim_arr.maybe_null_slice::(), + &(11..100).collect::>() + ); + assert_eq!( + str_arr + .with_iterator(|iter| iter + .flatten() + .map(|s| unsafe { String::from_utf8_unchecked(s.to_vec()) }) + .collect::>()) + .unwrap(), + iter::repeat("test text").take(89).collect::>() + ); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn read_range_no_filter() { + let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); + let (_, mut project_layout, buf) = layout_and_bytes(cache.clone(), Scan::new(None)).await; + let arr = read_layout(project_layout.as_mut(), cache, &buf).pop_front(); + + assert!(arr.is_some()); + let prim_arr = arr + .as_ref() + .unwrap() + .with_dyn(|a| a.as_struct_array_unchecked().field(0)) + .unwrap() + .into_primitive() + .unwrap(); + let str_arr = arr + .as_ref() + .unwrap() + .with_dyn(|a| a.as_struct_array_unchecked().field(1)) + .unwrap() + .into_varbinview() + .unwrap(); + assert_eq!( + prim_arr.maybe_null_slice::(), + (0..100).collect::>() + ); + assert_eq!( + str_arr + .with_iterator(|iter| iter + .flatten() + .map(|s| unsafe { String::from_utf8_unchecked(s.to_vec()) }) + .collect::>()) + .unwrap(), + iter::repeat("test text").take(100).collect::>() + ); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn advance_read_range() { + let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); + let (mut filter_layout, mut project_layout, buf) = layout_and_bytes( + cache.clone(), + Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new( + Arc::new(Column::new(Field::from("ints"))), + Operator::Gt, + Arc::new(Literal::new(10.into())), + )))))), + ) + .await; + filter_layout.advance(50).unwrap(); + let arr = filter_read_layout(filter_layout.as_mut(), project_layout.as_mut(), cache, &buf) + .pop_front(); + + assert!(arr.is_some()); + let arr = arr + .unwrap() + .with_dyn(|a| a.as_struct_array_unchecked().field(0)) + .unwrap() + .into_primitive() + .unwrap(); + assert_eq!( + arr.into_primitive().unwrap().maybe_null_slice::(), + &(50..100).collect::>() + ); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn advance_skipped() { + let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); + let (mut filter_layout, mut project_layout, buf) = layout_and_bytes( + cache.clone(), + Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new( + Arc::new(Column::new(Field::from("ints"))), + Operator::Gt, + Arc::new(Literal::new(10.into())), + )))))), + ) + .await; + filter_layout.advance(500).unwrap(); + let arr = filter_read_layout(filter_layout.as_mut(), project_layout.as_mut(), cache, &buf); + + assert!(arr.is_empty()); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn advance_after_filter() { + let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); + let (mut filter_layout, mut project_layout, buf) = layout_and_bytes( + cache.clone(), + Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new( + Arc::new(Column::new(Field::from("ints"))), + Operator::Gt, + Arc::new(Literal::new(10.into())), + )))))), + ) + .await; + let selectors = read_layout_ranges(filter_layout.as_mut(), cache.clone(), &buf) + .into_iter() + .flat_map(|s| read_filters(filter_layout.as_mut(), cache.clone(), &buf, s)) + .collect::>(); + + project_layout.advance(50).unwrap(); + let arr = selectors + .into_iter() + .flat_map(|s| read_layout_data(project_layout.as_mut(), cache.clone(), &buf, s)) + .collect::>(); + + assert_eq!( + arr.first() + .unwrap() + .with_dyn(|a| a.as_struct_array_unchecked().field(0)) + .unwrap() + .into_primitive() + .unwrap() + .maybe_null_slice::(), + &(50..100).collect::>() + ); + assert_eq!( + arr[4] + .with_dyn(|a| a.as_struct_array_unchecked().field(0)) + .unwrap() + .into_primitive() + .unwrap() + .maybe_null_slice::(), + &(11..100).collect::>() + ); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn advance_mid_read() { + let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); + let (mut filter_layout, mut project_layout, buf) = layout_and_bytes( + cache.clone(), + Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new( + Arc::new(Column::new(Field::from("ints"))), + Operator::Gt, + Arc::new(Literal::new(10.into())), + )))))), + ) + .await; + let selectors = read_layout_ranges(filter_layout.as_mut(), cache.clone(), &buf) + .into_iter() + .flat_map(|s| read_filters(filter_layout.as_mut(), cache.clone(), &buf, s)) + .collect::>(); + let advanced = AtomicBool::new(false); + let mut arr = selectors + .into_iter() + .flat_map(|s| { + let a = read_layout_data(project_layout.as_mut(), cache.clone(), &buf, s); + if advanced + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + project_layout.advance(321).unwrap(); + } + a + }) + .collect::>(); + + assert_eq!(arr.len(), 3); + assert_eq!( + arr.pop_front() + .unwrap() + .with_dyn(|a| a.as_struct_array_unchecked().field(0)) + .unwrap() + .into_primitive() + .unwrap() + .maybe_null_slice::(), + &(11..100).collect::>() + ); + assert_eq!( + arr.pop_front() + .unwrap() + .with_dyn(|a| a.as_struct_array_unchecked().field(0)) + .unwrap() + .into_primitive() + .unwrap() + .maybe_null_slice::(), + &(21..100).collect::>() + ); + assert_eq!( + arr.pop_front() + .unwrap() + .with_dyn(|a| a.as_struct_array_unchecked().field(0)) + .unwrap() + .into_primitive() + .unwrap() + .maybe_null_slice::(), + &(11..100).collect::>() + ); + } +} diff --git a/vortex-serde/src/layouts/read/layouts/flat.rs b/vortex-serde/src/layouts/read/layouts/flat.rs index 70f6f0f474..dd434f7475 100644 --- a/vortex-serde/src/layouts/read/layouts/flat.rs +++ b/vortex-serde/src/layouts/read/layouts/flat.rs @@ -1,16 +1,17 @@ -use std::cmp::min; use std::sync::Arc; use bytes::Bytes; +use croaring::Bitmap; use vortex::compute::slice; use vortex::{Array, Context}; -use vortex_error::{vortex_err, VortexResult, VortexUnwrap}; +use vortex_error::{vortex_bail, vortex_err, VortexResult, VortexUnwrap}; use vortex_flatbuffers::footer; use crate::layouts::read::cache::RelativeLayoutCache; +use crate::layouts::read::selection::RowSelector; use crate::layouts::{ - LayoutDeserializer, LayoutId, LayoutReader, LayoutSpec, Message, ReadResult, Scan, - FLAT_LAYOUT_ID, + LayoutDeserializer, LayoutId, LayoutReader, LayoutSpec, Message, RangeResult, ReadResult, + RowFilter, Scan, FLAT_LAYOUT_ID, }; use crate::message_reader::ArrayBufferReader; use crate::stream_writer::ByteRange; @@ -27,6 +28,7 @@ impl LayoutSpec for FlatLayoutSpec { &self, fb_bytes: Bytes, fb_loc: usize, + length: u64, scan: Scan, layout_serde: LayoutDeserializer, message_cache: RelativeLayoutCache, @@ -43,6 +45,7 @@ impl LayoutSpec for FlatLayoutSpec { Box::new(FlatLayout::new( ByteRange::new(buf.begin(), buf.end()), + length, scan, layout_serde.ctx(), message_cache, @@ -53,30 +56,47 @@ impl LayoutSpec for FlatLayoutSpec { #[derive(Debug)] pub struct FlatLayout { range: ByteRange, + length: u64, scan: Scan, ctx: Arc, cache: RelativeLayoutCache, - done: bool, - cached_array: Option, + offset: usize, + sent_range: bool, } impl FlatLayout { pub fn new( range: ByteRange, + length: u64, scan: Scan, ctx: Arc, cache: RelativeLayoutCache, ) -> Self { Self { range, + length, scan, ctx, cache, - done: false, - cached_array: None, + offset: 0, + sent_range: false, } } + fn skipped(&self) -> bool { + self.offset as u64 == self.length + } + + fn own_range(&self) -> Option { + (self.offset as u64 != self.length).then(|| { + RowSelector::new( + Bitmap::from_range(self.offset as u32..self.length as u32), + self.offset, + self.length as usize, + ) + }) + } + fn own_message(&self) -> Message { (self.cache.absolute_id(&[]), self.range) } @@ -92,28 +112,357 @@ impl FlatLayout { } impl LayoutReader for FlatLayout { - fn read_next(&mut self) -> VortexResult> { - if self.done { + fn next_range(&mut self) -> VortexResult { + if self.sent_range { + Ok(RangeResult::Rows(None)) + } else { + self.sent_range = true; + Ok(RangeResult::Rows(self.own_range())) + } + } + + fn read_next(&mut self, selection: RowSelector) -> VortexResult> { + if self.skipped() || selection.end() <= self.offset { return Ok(None); } - if let Some(array) = self.cached_array.take() { - let array = if array.len() > self.scan.batch_size { - let rows_to_read = min(self.scan.batch_size, array.len()); - let taken = slice(&array, 0, rows_to_read)?; - let leftover = slice(&array, rows_to_read, array.len())?; - self.cached_array = Some(leftover); - taken + if let Some(buf) = self.cache.get(&[]) { + let array = self.array_from_bytes(buf)?; + let selection_end = selection.end(); + let selected = selection.offset(self.offset as i64).filter_array(slice( + &array, + self.offset, + selection_end, + )?)?; + if let Some(selected) = selected { + let result = if let Some(f) = self.scan.expr.as_ref() { + let evaluated = f.evaluate(&selected)?; + if f.as_any().is::() { + ReadResult::Selector(RowSelector::from_array( + &evaluated, + self.offset, + selection_end, + )?) + } else { + ReadResult::Batch(evaluated) + } + } else { + ReadResult::Batch(selected) + }; + self.offset = selection_end; + Ok(Some(result)) } else { - self.done = true; - array - }; - Ok(Some(ReadResult::Batch(array))) - } else if let Some(buf) = self.cache.get(&[]) { - self.cached_array = Some(self.array_from_bytes(buf)?); - self.read_next() + self.offset = selection_end; + Ok(None) + } } else { Ok(Some(ReadResult::ReadMore(vec![self.own_message()]))) } } + + fn advance(&mut self, up_to_row: usize) -> VortexResult> { + if up_to_row < self.offset { + vortex_bail!("Can't advance backwards") + } + + self.offset = up_to_row; + if self.skipped() { + Ok(vec![]) + } else { + Ok(vec![self.own_message()]) + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::VecDeque; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::{Arc, RwLock}; + + use bytes::Bytes; + use croaring::Bitmap; + use vortex::array::PrimitiveArray; + use vortex::{Array, Context, IntoArray, IntoArrayVariant}; + use vortex_dtype::PType; + use vortex_expr::{BinaryExpr, Identity, Literal, Operator}; + + use crate::layouts::read::cache::{LazyDeserializedDType, RelativeLayoutCache}; + use crate::layouts::read::layouts::flat::FlatLayout; + use crate::layouts::read::layouts::test_read::{ + filter_read_layout, read_filters, read_layout, read_layout_data, read_layout_ranges, + }; + use crate::layouts::read::selection::RowSelector; + use crate::layouts::{LayoutMessageCache, LayoutReader, RowFilter, Scan}; + use crate::message_writer::MessageWriter; + use crate::stream_writer::ByteRange; + + async fn read_only_layout( + cache: Arc>, + ) -> (FlatLayout, Bytes, u64, Arc) { + let mut writer = MessageWriter::new(Vec::new()); + let array = PrimitiveArray::from((0..100).collect::>()).into_array(); + let len = array.len() as u64; + writer.write_batch(array).await.unwrap(); + let written = writer.into_inner(); + + let projection_scan = Scan::new(None); + let dtype = Arc::new(LazyDeserializedDType::from_dtype(PType::I32.into())); + + ( + FlatLayout::new( + ByteRange::new(0, written.len() as u64), + len, + projection_scan, + Arc::new(Context::default()), + RelativeLayoutCache::new(cache, dtype.clone()), + ), + Bytes::from(written), + len, + dtype, + ) + } + + async fn layout_and_bytes( + cache: Arc>, + scan: Scan, + ) -> (FlatLayout, FlatLayout, Bytes) { + let (read_layout, bytes, len, dtype) = read_only_layout(cache.clone()).await; + + ( + FlatLayout::new( + ByteRange::new(0, bytes.len() as u64), + len, + scan, + Arc::new(Context::default()), + RelativeLayoutCache::new(cache, dtype), + ), + read_layout, + bytes, + ) + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn read_range() { + let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); + let (mut filter_layout, mut projection_layout, buf) = layout_and_bytes( + cache.clone(), + Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new( + Arc::new(Identity), + Operator::Gt, + Arc::new(Literal::new(10.into())), + )))))), + ) + .await; + let arr = + filter_read_layout(&mut filter_layout, &mut projection_layout, cache, &buf).pop_front(); + + assert!(arr.is_some()); + let arr = arr.unwrap(); + assert_eq!( + arr.into_primitive().unwrap().maybe_null_slice::(), + &(11..100).collect::>() + ); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn read_range_no_filter() { + let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); + let (mut data_layout, buf, ..) = read_only_layout(cache.clone()).await; + let arr = read_layout(&mut data_layout, cache, &buf).pop_front(); + + assert!(arr.is_some()); + let arr = arr.unwrap(); + assert_eq!( + arr.into_primitive().unwrap().maybe_null_slice::(), + &(0..100).collect::>() + ); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn read_empty() { + let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); + let (mut filter_layout, mut projection_layout, buf) = layout_and_bytes( + cache.clone(), + Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new( + Arc::new(Identity), + Operator::Gt, + Arc::new(Literal::new(101.into())), + )))))), + ) + .await; + let arr = + filter_read_layout(&mut filter_layout, &mut projection_layout, cache, &buf).pop_front(); + + assert!(arr.is_none()); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn advance_read_range() { + let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); + let (mut filter_layout, mut projection_layout, buf) = layout_and_bytes( + cache.clone(), + Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new( + Arc::new(Identity), + Operator::Gt, + Arc::new(Literal::new(10.into())), + )))))), + ) + .await; + filter_layout.advance(50).unwrap(); + let arr = + filter_read_layout(&mut filter_layout, &mut projection_layout, cache, &buf).pop_front(); + + assert!(arr.is_some()); + let arr = arr.unwrap(); + assert_eq!( + arr.into_primitive().unwrap().maybe_null_slice::(), + &(50..100).collect::>() + ); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn advance_skipped() { + let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); + let (mut filter_layout, mut projection_layout, buf) = layout_and_bytes( + cache.clone(), + Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new( + Arc::new(Identity), + Operator::Gt, + Arc::new(Literal::new(10.into())), + )))))), + ) + .await; + filter_layout.advance(100).unwrap(); + let arr = + filter_read_layout(&mut filter_layout, &mut projection_layout, cache, &buf).pop_front(); + + assert!(arr.is_none()); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn read_multiple_selectors() { + let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); + let (_, mut projection_layout, buf) = layout_and_bytes( + cache.clone(), + Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new( + Arc::new(Identity), + Operator::Gt, + Arc::new(Literal::new(10.into())), + )))))), + ) + .await; + let mut arr = [ + RowSelector::new(Bitmap::from_range(11..50), 0, 50), + RowSelector::new(Bitmap::from_range(50..100), 50, 100), + ] + .into_iter() + .flat_map(|s| read_layout_data(&mut projection_layout, cache.clone(), &buf, s)) + .collect::>(); + + assert_eq!( + arr.pop_front() + .unwrap() + .into_primitive() + .unwrap() + .maybe_null_slice::(), + &(11..50).collect::>() + ); + assert_eq!( + arr.pop_front() + .unwrap() + .into_primitive() + .unwrap() + .maybe_null_slice::(), + &(50..100).collect::>() + ); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn advance_after_filter() { + let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); + let (mut filter_layout, mut projection_layout, buf) = layout_and_bytes( + cache.clone(), + Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new( + Arc::new(Identity), + Operator::Gt, + Arc::new(Literal::new(10.into())), + )))))), + ) + .await; + let selector = read_layout_ranges(&mut filter_layout, cache.clone(), &buf) + .into_iter() + .flat_map(|s| read_filters(&mut filter_layout, cache.clone(), &buf, s)) + .collect::>(); + + projection_layout.advance(50).unwrap(); + let mut arr: Vec = selector + .into_iter() + .flat_map(|s| read_layout_data(&mut projection_layout, cache.clone(), &buf, s)) + .collect(); + + assert_eq!( + arr.remove(0) + .into_primitive() + .unwrap() + .maybe_null_slice::(), + &(50..100).collect::>() + ); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn advance_mid_read() { + let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); + let (_, mut projection_layout, buf) = layout_and_bytes( + cache.clone(), + Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new( + Arc::new(Identity), + Operator::Gt, + Arc::new(Literal::new(10.into())), + )))))), + ) + .await; + let advanced = AtomicBool::new(false); + let mut arr = [ + RowSelector::new(Bitmap::from_range(11..50), 0, 50), + RowSelector::new(Bitmap::from_range(50..100), 50, 100), + ] + .into_iter() + .flat_map(|s| { + let a = read_layout_data(&mut projection_layout, cache.clone(), &buf, s); + if advanced + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + projection_layout.advance(90).unwrap(); + } + a + }) + .collect::>(); + + assert_eq!( + arr.pop_front() + .unwrap() + .into_primitive() + .unwrap() + .maybe_null_slice::(), + &(11..50).collect::>() + ); + assert_eq!( + arr.pop_front() + .unwrap() + .into_primitive() + .unwrap() + .maybe_null_slice::(), + &(90..100).collect::>() + ); + } } diff --git a/vortex-serde/src/layouts/read/layouts/inline_dtype.rs b/vortex-serde/src/layouts/read/layouts/inline_dtype.rs index ee969b69d1..983c5a4395 100644 --- a/vortex-serde/src/layouts/read/layouts/inline_dtype.rs +++ b/vortex-serde/src/layouts/read/layouts/inline_dtype.rs @@ -7,8 +7,9 @@ use vortex_error::{vortex_err, VortexResult}; use vortex_flatbuffers::{footer, message}; use crate::layouts::read::cache::{LazyDeserializedDType, RelativeLayoutCache}; +use crate::layouts::read::selection::RowSelector; use crate::layouts::{ - LayoutDeserializer, LayoutId, LayoutReader, LayoutSpec, Message, ReadResult, Scan, + LayoutDeserializer, LayoutId, LayoutReader, LayoutSpec, Message, RangeResult, ReadResult, Scan, INLINE_SCHEMA_LAYOUT_ID, }; use crate::stream_writer::ByteRange; @@ -25,6 +26,7 @@ impl LayoutSpec for InlineDTypeLayoutSpec { &self, fb_bytes: Bytes, fb_loc: usize, + length: u64, scan: Scan, layout_reader: LayoutDeserializer, message_cache: RelativeLayoutCache, @@ -32,6 +34,7 @@ impl LayoutSpec for InlineDTypeLayoutSpec { Box::new(InlineDTypeLayout::new( fb_bytes, fb_loc, + length, scan, layout_reader, message_cache, @@ -43,6 +46,8 @@ impl LayoutSpec for InlineDTypeLayoutSpec { pub struct InlineDTypeLayout { fb_bytes: Bytes, fb_loc: usize, + length: u64, + offset: usize, scan: Scan, layout_builder: LayoutDeserializer, message_cache: RelativeLayoutCache, @@ -58,6 +63,7 @@ impl InlineDTypeLayout { pub fn new( fb_bytes: Bytes, fb_loc: usize, + length: u64, scan: Scan, layout_builder: LayoutDeserializer, message_cache: RelativeLayoutCache, @@ -65,6 +71,8 @@ impl InlineDTypeLayout { Self { fb_bytes, fb_loc, + length, + offset: 0, scan, layout_builder, message_cache, @@ -81,11 +89,9 @@ impl InlineDTypeLayout { fn dtype(&self) -> VortexResult { if let Some(dt_bytes) = self.message_cache.get(&[0]) { - let msg = root::(&dt_bytes)? + let msg = root::(&dt_bytes[4..])? .header_as_schema() - .ok_or_else(|| { - vortex_err!("Expected schema message; this was checked earlier in the function") - })?; + .ok_or_else(|| vortex_err!("Expected schema message"))?; Ok(DTypeReadResult::DType( DType::try_from( @@ -106,34 +112,70 @@ impl InlineDTypeLayout { )])) } } + + /// Returns None when child reader has been created + fn child_reader(&mut self) -> VortexResult> { + match self.dtype()? { + DTypeReadResult::ReadMore(m) => Ok(Some(ReadResult::ReadMore(m))), + DTypeReadResult::DType(d) => { + let layout = self + .flatbuffer() + .children() + .ok_or_else(|| vortex_err!("No children"))? + .get(0); + + let mut child_layout = self.layout_builder.read_layout( + self.fb_bytes.clone(), + layout._tab.loc(), + self.length, + self.scan.clone(), + self.message_cache + .relative(1u16, Arc::new(LazyDeserializedDType::from_dtype(d))), + )?; + if self.offset != 0 { + child_layout.advance(self.offset)?; + } + self.child_layout = Some(child_layout); + Ok(None) + } + } + } } impl LayoutReader for InlineDTypeLayout { - fn read_next(&mut self) -> VortexResult> { + fn next_range(&mut self) -> VortexResult { if let Some(cr) = self.child_layout.as_mut() { - cr.read_next() + cr.next_range() } else { - match self.dtype()? { - DTypeReadResult::ReadMore(m) => Ok(Some(ReadResult::ReadMore(m))), - DTypeReadResult::DType(d) => { - let layout = self - .flatbuffer() - .children() - .ok_or_else(|| vortex_err!("No children"))? - .get(0); - - self.child_layout = Some( - self.layout_builder.read_layout( - self.fb_bytes.clone(), - layout._tab.loc(), - self.scan.clone(), - self.message_cache - .relative(1u16, Arc::new(LazyDeserializedDType::from_dtype(d))), - )?, - ); - self.read_next() - } + match self.child_reader()? { + Some(r) => match r { + ReadResult::ReadMore(rm) => Ok(RangeResult::ReadMore(rm)), + ReadResult::Selector(_) | ReadResult::Batch(_) => { + unreachable!("Child reader will only return ReadMore") + } + }, + None => self.next_range(), + } + } + } + + fn read_next(&mut self, selector: RowSelector) -> VortexResult> { + if let Some(cr) = self.child_layout.as_mut() { + cr.read_next(selector) + } else { + match self.child_reader()? { + Some(r) => Ok(Some(r)), + None => self.read_next(selector), } } } + + fn advance(&mut self, up_to_row: usize) -> VortexResult> { + if let Some(cr) = self.child_layout.as_mut() { + cr.advance(up_to_row) + } else { + self.offset = up_to_row; + Ok(vec![]) + } + } } diff --git a/vortex-serde/src/layouts/read/layouts/mod.rs b/vortex-serde/src/layouts/read/layouts/mod.rs index 17498ca164..552db171c3 100644 --- a/vortex-serde/src/layouts/read/layouts/mod.rs +++ b/vortex-serde/src/layouts/read/layouts/mod.rs @@ -2,7 +2,10 @@ mod chunked; mod column; mod flat; mod inline_dtype; +#[cfg(test)] +mod test_read; pub use chunked::ChunkedLayoutSpec; pub use column::ColumnLayoutSpec; pub use flat::FlatLayoutSpec; +pub use inline_dtype::InlineDTypeLayoutSpec; diff --git a/vortex-serde/src/layouts/read/layouts/test_read.rs b/vortex-serde/src/layouts/read/layouts/test_read.rs new file mode 100644 index 0000000000..ef0ff449b4 --- /dev/null +++ b/vortex-serde/src/layouts/read/layouts/test_read.rs @@ -0,0 +1,104 @@ +use std::collections::VecDeque; +use std::sync::{Arc, RwLock}; + +use bytes::Bytes; +use vortex::Array; + +use crate::layouts::read::selection::RowSelector; +use crate::layouts::{LayoutMessageCache, LayoutReader, RangeResult, ReadResult}; + +pub fn read_layout_ranges( + layout: &mut dyn LayoutReader, + cache: Arc>, + buf: &Bytes, +) -> Vec { + let mut s = Vec::new(); + loop { + match layout.next_range().unwrap() { + RangeResult::ReadMore(m) => { + let mut write_cache_guard = cache.write().unwrap(); + for (id, range) in m { + write_cache_guard.set(id, buf.slice(range.to_range())); + } + } + RangeResult::Rows(rs) => { + if let Some(r) = rs { + s.push(r); + } else { + break; + } + } + } + } + s +} + +pub fn read_layout_data( + layout: &mut dyn LayoutReader, + cache: Arc>, + buf: &Bytes, + selector: RowSelector, +) -> Vec { + let mut arr = Vec::new(); + while let Some(rr) = layout.read_next(selector.clone()).unwrap() { + match rr { + ReadResult::ReadMore(m) => { + let mut write_cache_guard = cache.write().unwrap(); + for (id, range) in m { + write_cache_guard.set(id, buf.slice(range.to_range())); + } + } + ReadResult::Batch(a) => arr.push(a), + ReadResult::Selector(_) => {} + } + } + + arr +} + +pub fn read_filters( + layout: &mut dyn LayoutReader, + cache: Arc>, + buf: &Bytes, + selector: RowSelector, +) -> Vec { + let mut sels = Vec::new(); + while let Some(rr) = layout.read_next(selector.clone()).unwrap() { + match rr { + ReadResult::ReadMore(m) => { + let mut write_cache_guard = cache.write().unwrap(); + for (id, range) in m { + write_cache_guard.set(id, buf.slice(range.to_range())); + } + } + ReadResult::Batch(_) => unreachable!("Can't produce batches from filter"), + ReadResult::Selector(s) => sels.push(s), + } + } + + sels +} + +pub fn filter_read_layout( + filter_layout: &mut dyn LayoutReader, + layout: &mut dyn LayoutReader, + cache: Arc>, + buf: &Bytes, +) -> VecDeque { + read_layout_ranges(filter_layout, cache.clone(), buf) + .into_iter() + .flat_map(|s| read_filters(filter_layout, cache.clone(), buf, s)) + .flat_map(|s| read_layout_data(layout, cache.clone(), buf, s)) + .collect() +} + +pub fn read_layout( + layout: &mut dyn LayoutReader, + cache: Arc>, + buf: &Bytes, +) -> VecDeque { + read_layout_ranges(layout, cache.clone(), buf) + .into_iter() + .flat_map(|s| read_layout_data(layout, cache.clone(), buf, s)) + .collect() +} diff --git a/vortex-serde/src/layouts/read/mod.rs b/vortex-serde/src/layouts/read/mod.rs index 9d93505bc0..78712afcce 100644 --- a/vortex-serde/src/layouts/read/mod.rs +++ b/vortex-serde/src/layouts/read/mod.rs @@ -1,4 +1,5 @@ use std::fmt::Debug; +use std::sync::Arc; use arrow_buffer::BooleanBuffer; use vortex::array::BoolArray; @@ -11,10 +12,12 @@ mod buffered; mod builder; mod cache; mod context; +mod filter_project; mod filtering; mod footer; mod layouts; mod recordbatchreader; +mod selection; mod stream; pub use builder::LayoutReaderBuilder; @@ -24,22 +27,25 @@ pub use filtering::RowFilter; pub use footer::LayoutDescriptorReader; pub use recordbatchreader::{AsyncRuntime, VortexRecordBatchReader}; pub use stream::LayoutBatchStream; +use vortex_expr::VortexExpr; pub use vortex_schema::projection::Projection; pub use vortex_schema::Schema; +use crate::layouts::read::selection::RowSelector; use crate::stream_writer::ByteRange; // Recommended read-size according to the AWS performance guide pub const INITIAL_READ_SIZE: usize = 8 * 1024 * 1024; -pub const DEFAULT_BATCH_SIZE: usize = 65536; -#[allow(dead_code)] #[derive(Debug, Clone)] pub struct Scan { - indices: Option, - projection: Projection, - filter: Option, - batch_size: usize, + expr: Option>, +} + +impl Scan { + pub fn new(expr: Option>) -> Self { + Self { expr } + } } /// Unique identifier for a message within a layout @@ -50,10 +56,22 @@ pub type Message = (MessageId, ByteRange); #[derive(Debug)] pub enum ReadResult { ReadMore(Vec), + Selector(RowSelector), Batch(Array), } +#[derive(Debug)] +pub enum RangeResult { + ReadMore(Vec), + Rows(Option), +} + pub trait LayoutReader: Debug + Send { + /// Produce sets of row ranges to read from underlying layouts. + /// + /// Range ending at the end of the layout length indicates layout is done producing ranges + fn next_range(&mut self) -> VortexResult; + /// Reads the data from the underlying layout /// /// The layout can either return a batch data, i.e. an Array or ask for more layout messages to @@ -61,14 +79,10 @@ pub trait LayoutReader: Debug + Send { /// and then call back into this function. /// /// The layout is finished reading when it returns None - fn read_next(&mut self) -> VortexResult>; - - // TODO(robert): Support stats pruning via planning. Requires propagating all the metadata - // to top level and then pushing down the result of it - // Try to use metadata of the layout to perform pruning given the passed `Scan` object. - // - // The layout should perform any planning that's cheap and doesn't require reading the data. - // fn plan(&mut self, scan: Scan) -> VortexResult>; + fn read_next(&mut self, selector: RowSelector) -> VortexResult>; + + /// Advance readers to global row offset + fn advance(&mut self, up_to_row: usize) -> VortexResult>; } pub fn null_as_false(array: BoolArray) -> VortexResult { diff --git a/vortex-serde/src/layouts/read/selection.rs b/vortex-serde/src/layouts/read/selection.rs new file mode 100644 index 0000000000..4629ee1e3c --- /dev/null +++ b/vortex-serde/src/layouts/read/selection.rs @@ -0,0 +1,150 @@ +#![allow(dead_code)] +use std::cmp::{max, min}; + +use arrow_buffer::{BooleanBuffer, MutableBuffer}; +use croaring::Bitmap; +use vortex::array::BoolArray; +use vortex::compute::filter; +use vortex::validity::Validity; +use vortex::Array; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; + +/// Bitmap of selected row ranges +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct RowSelector { + values: Bitmap, + begin: usize, + end: usize, +} + +impl RowSelector { + pub fn new(values: Bitmap, begin: usize, end: usize) -> Self { + Self { values, begin, end } + } + + pub fn from_array(array: &Array, begin: usize, end: usize) -> VortexResult { + array.with_dyn(|a| { + a.as_bool_array() + .ok_or_else(|| vortex_err!("Must be a bool array")) + .map(|b| { + let mut bitmap = Bitmap::new(); + for (sb, se) in b.maybe_null_slices_iter() { + bitmap.add_range((sb + begin) as u32..(se + begin) as u32); + } + RowSelector::new(bitmap, 0, end) + }) + }) + } + + pub fn is_empty(&self) -> bool { + self.values.is_empty() + } + + pub fn begin(&self) -> usize { + self.begin + } + + pub fn end(&self) -> usize { + self.end + } + + pub fn len(&self) -> usize { + self.end - self.begin + } + + pub fn intersect(mut self, other: &RowSelector) -> Self { + self.values.and_inplace(&other.values); + RowSelector::new( + self.values, + max(self.begin, other.begin), + min(self.end, other.end), + ) + } + + pub fn concatenate(mut self, other: &RowSelector) -> Self { + assert_eq!( + self.end, other.begin, + "Can only concatenate consecutive selectors" + ); + self.values.or_inplace(&other.values); + RowSelector::new( + self.values, + min(self.begin, other.begin), + max(self.end, other.end), + ) + } + + pub fn filter_array(&self, array: impl AsRef) -> VortexResult> { + if self.begin != 0 { + vortex_bail!("Cannot filter arrays with absolute row selections") + } + + if self.values.cardinality() == 0 { + return Ok(None); + } + + let bitset = self + .values + .to_bitset() + .ok_or_else(|| vortex_err!("Couldn't create bitset for RowSelection"))?; + + let byte_length = self.len().div_ceil(8); + let mut buffer = MutableBuffer::with_capacity(byte_length); + buffer.extend_from_slice(bitset.as_slice()); + if byte_length > bitset.size_in_bytes() { + buffer.extend_zeros(byte_length - bitset.size_in_bytes()); + } + let predicate = BoolArray::try_new( + BooleanBuffer::new(buffer.into(), 0, self.len()), + Validity::NonNullable, + )?; + filter(array, predicate).map(Some) + } + + pub fn offset(self, offset: i64) -> RowSelector { + if offset == 0 { + self + } else { + RowSelector::new( + self.values.add_offset(-offset), + if self.begin as i64 > offset { + (self.begin as i64 - offset) as usize + } else { + 0 + }, + (self.end as i64 - offset) as usize, + ) + } + } + + pub fn advance(mut self, up_to_row: usize) -> Option { + if up_to_row >= self.end { + None + } else { + self.values.remove_range(0..up_to_row as u32); + Some(RowSelector::new(self.values, self.begin, self.end)) + } + } +} + +#[cfg(test)] +mod tests { + use croaring::Bitmap; + use rstest::rstest; + + use crate::layouts::read::selection::RowSelector; + + #[rstest] + #[case(RowSelector::new((0..2).chain(9..10).collect(),0, 10), RowSelector::new((0..1).collect(),0, 10), RowSelector::new((0..1).collect(),0, 10))] + #[case(RowSelector::new((5..8).chain(9..10).collect(),0, 10), RowSelector::new((2..5).collect(),0, 10), RowSelector::new(Bitmap::new(),0, 10))] + #[case(RowSelector::new((0..4).collect(),0, 10), RowSelector::new((0..1).chain(2..3).chain(3..5).collect(),0, 10), RowSelector::new((0..1).chain(2..4).collect(),0, 10))] + #[case(RowSelector::new((0..3).chain(5..6).collect(),0, 10), RowSelector::new((2..6).collect(),0, 10), RowSelector::new((2..3).chain(5..6).collect(),0, 10))] + #[cfg_attr(miri, ignore)] + fn intersection( + #[case] first: RowSelector, + #[case] second: RowSelector, + #[case] expected: RowSelector, + ) { + assert_eq!(first.intersect(&second), expected); + } +} diff --git a/vortex-serde/src/layouts/read/stream.rs b/vortex-serde/src/layouts/read/stream.rs index bcbe449390..da438df72c 100644 --- a/vortex-serde/src/layouts/read/stream.rs +++ b/vortex-serde/src/layouts/read/stream.rs @@ -7,8 +7,6 @@ use futures::Stream; use futures_util::future::BoxFuture; use futures_util::{stream, FutureExt, StreamExt, TryStreamExt}; use vortex::array::ChunkedArray; -use vortex::compute::filter; -use vortex::stats::ArrayStatistics; use vortex::Array; use vortex_dtype::DType; use vortex_error::{vortex_err, vortex_panic, VortexError, VortexExpect, VortexResult}; @@ -16,18 +14,20 @@ use vortex_schema::Schema; use crate::io::VortexReadAt; use crate::layouts::read::cache::LayoutMessageCache; -use crate::layouts::read::{LayoutReader, MessageId, ReadResult, Scan}; +use crate::layouts::read::selection::RowSelector; +use crate::layouts::read::{LayoutReader, MessageId, ReadResult}; +use crate::layouts::RangeResult; use crate::stream_writer::ByteRange; pub struct LayoutBatchStream { + dtype: DType, + row_count: u64, input: Option, layout_reader: Box, filter_reader: Option>, - scan: Scan, messages_cache: Arc>, + current_selector: Option, state: StreamingState, - dtype: DType, - cached_mask: Option, } impl LayoutBatchStream { @@ -37,23 +37,17 @@ impl LayoutBatchStream { filter_reader: Option>, messages_cache: Arc>, dtype: DType, - scan: Scan, + row_count: u64, ) -> Self { - let state = if filter_reader.is_some() { - StreamingState::FilterInit - } else { - StreamingState::Init - }; - LayoutBatchStream { + dtype, + row_count, input: Some(input), layout_reader, filter_reader, - scan, messages_cache, - dtype, - state, - cached_mask: None, + current_selector: None, + state: StreamingState::Init, } } @@ -74,107 +68,132 @@ impl LayoutBatchStream { type StreamStateFuture = BoxFuture<'static, VortexResult<(R, Vec<(MessageId, Bytes)>)>>; -#[derive(Default)] +#[derive(Debug, Clone, Copy)] +enum NextStreamState { + Init, + Filter(bool), + Read(bool), +} + enum StreamingState { - #[default] Init, - FilterInit, - Reading(StreamStateFuture), - FilterReading(StreamStateFuture), - Decoding(Array), + Filter(bool), + Read(bool), + Reading(StreamStateFuture, NextStreamState), Error, } -impl Stream for LayoutBatchStream { +impl Stream for LayoutBatchStream { type Item = VortexResult; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { match &mut self.state { StreamingState::Init => { - if let Some(read) = self.layout_reader.read_next()? { + let next_range = self + .filter_reader + .as_mut() + .map(|fr| fr.next_range()) + .unwrap_or_else(|| self.layout_reader.as_mut().next_range())?; + match next_range { + RangeResult::ReadMore(messages) => { + let reader = self.input.take().ok_or_else(|| { + vortex_err!("Invalid state transition - reader dropped") + })?; + let read_future = read_ranges(reader, messages).boxed(); + self.state = + StreamingState::Reading(read_future, NextStreamState::Init); + } + RangeResult::Rows(rs) => { + if let Some(s) = rs { + let read_more = s.end() as u64 != self.row_count; + self.current_selector = Some(s); + self.state = if self.filter_reader.is_some() { + StreamingState::Filter(read_more) + } else { + StreamingState::Read(read_more) + }; + } else { + return Poll::Ready(None); + } + } + } + } + StreamingState::Read(read_more) => { + let read_more = *read_more; + let selector = self + .current_selector + .clone() + .vortex_expect("Must have asked for range"); + if let Some(read) = self.layout_reader.read_next(selector)? { match read { ReadResult::ReadMore(messages) => { let reader = self.input.take().ok_or_else(|| { vortex_err!("Invalid state transition - reader dropped") })?; let read_future = read_ranges(reader, messages).boxed(); - self.state = StreamingState::Reading(read_future); + self.state = StreamingState::Reading( + read_future, + NextStreamState::Read(read_more), + ); + } + ReadResult::Batch(a) => return Poll::Ready(Some(Ok(a))), + ReadResult::Selector(_) => { + unreachable!("Reading data cannot produce selector") } - ReadResult::Batch(a) => self.state = StreamingState::Decoding(a), } + } else if read_more { + self.state = StreamingState::Init; } else { return Poll::Ready(None); } } - StreamingState::FilterInit => { - if let Some(read) = self + StreamingState::Filter(read_more) => { + let read_more = *read_more; + let selector = self + .current_selector + .clone() + .vortex_expect("Must have asked for range"); + if let Some(fr) = self .filter_reader .as_mut() .vortex_expect("Can't filter without reader") - .read_next()? + .read_next(selector)? { - match read { + match fr { ReadResult::ReadMore(messages) => { let reader = self.input.take().ok_or_else(|| { vortex_err!("Invalid state transition - reader dropped") })?; let read_future = read_ranges(reader, messages).boxed(); - self.state = StreamingState::FilterReading(read_future); + self.state = StreamingState::Reading( + read_future, + NextStreamState::Filter(read_more), + ); + } + ReadResult::Selector(rs) => { + self.current_selector = Some(rs); + self.state = StreamingState::Read(true); } - ReadResult::Batch(a) => { - let mask = self - .scan - .filter - .as_ref() - .vortex_expect("Cant filter without filter") - .evaluate(&a)?; - self.cached_mask = Some(mask); - self.state = StreamingState::Init; + ReadResult::Batch(_) => { + unreachable!("Filtering data cannot produce batches") } } } else { - return Poll::Ready(None); + self.state = StreamingState::Init; } } - StreamingState::Decoding(arr) => { - let mut batch = arr.clone(); - - if let Some(mask) = self.cached_mask.take() { - if mask.statistics().compute_true_count().unwrap_or_default() == 0 { - self.state = StreamingState::Init; - continue; - } - - batch = filter(batch, mask)?; - } - - let goto_state = if self.filter_reader.is_some() { - StreamingState::FilterInit - } else { - StreamingState::Init - }; - self.state = goto_state; - return Poll::Ready(Some(Ok(batch))); - } - StreamingState::Reading(f) => match ready!(f.poll_unpin(cx)) { - Ok((input, messages)) => { - self.store_messages(messages); - self.input = Some(input); - - self.state = StreamingState::Init - } - Err(e) => { - self.state = StreamingState::Error; - return Poll::Ready(Some(Err(e))); - } - }, - StreamingState::FilterReading(f) => match ready!(f.poll_unpin(cx)) { + StreamingState::Reading(f, next_state) => match ready!(f.poll_unpin(cx)) { Ok((input, messages)) => { + let next_state = *next_state; self.store_messages(messages); self.input = Some(input); - self.state = StreamingState::FilterInit + self.state = match next_state { + NextStreamState::Init => StreamingState::Init, + NextStreamState::Filter(rm) => StreamingState::Filter(rm), + NextStreamState::Read(rm) => StreamingState::Read(rm), + }; } Err(e) => { self.state = StreamingState::Error; @@ -187,7 +206,7 @@ impl Stream for LayoutBatchStream { } } -impl LayoutBatchStream { +impl LayoutBatchStream { pub async fn read_all(self) -> VortexResult { let dtype = self.schema().clone().into(); let vecs: Vec = self.try_collect().await?; diff --git a/vortex-serde/src/layouts/tests.rs b/vortex-serde/src/layouts/tests.rs index 4af61c58b7..ca07892d87 100644 --- a/vortex-serde/src/layouts/tests.rs +++ b/vortex-serde/src/layouts/tests.rs @@ -1,5 +1,3 @@ -#![allow(clippy::panic)] - use std::iter; use std::sync::Arc; @@ -11,6 +9,7 @@ use vortex::variants::StructArrayTrait; use vortex::{ArrayDType, IntoArray, IntoArrayVariant}; use vortex_dtype::field::Field; use vortex_dtype::{DType, Nullability, PType, StructDType}; +use vortex_error::vortex_panic; use vortex_expr::{BinaryExpr, Column, Literal, Operator}; use crate::layouts::write::LayoutWriter; @@ -58,7 +57,6 @@ async fn test_read_simple() { let written = writer.finalize().await.unwrap(); let mut stream = LayoutReaderBuilder::new(written, LayoutDeserializer::default()) - .with_batch_size(5) .build() .await .unwrap(); @@ -102,7 +100,6 @@ async fn test_read_projection() { let array = LayoutReaderBuilder::new(written.clone(), LayoutDeserializer::default()) .with_projection(Projection::new([0])) - .with_batch_size(5) .build() .await .unwrap() @@ -134,7 +131,6 @@ async fn test_read_projection() { let array = LayoutReaderBuilder::new(written.clone(), LayoutDeserializer::default()) .with_projection(Projection::Flat(vec![Field::Name("strings".to_string())])) - .with_batch_size(5) .build() .await .unwrap() @@ -166,7 +162,6 @@ async fn test_read_projection() { let array = LayoutReaderBuilder::new(written.clone(), LayoutDeserializer::default()) .with_projection(Projection::new([1])) - .with_batch_size(5) .build() .await .unwrap() @@ -194,7 +189,6 @@ async fn test_read_projection() { let array = LayoutReaderBuilder::new(written.clone(), LayoutDeserializer::default()) .with_projection(Projection::Flat(vec![Field::Name("numbers".to_string())])) - .with_batch_size(5) .build() .await .unwrap() @@ -243,7 +237,6 @@ async fn unequal_batches() { let written = writer.finalize().await.unwrap(); let mut stream = LayoutReaderBuilder::new(written, LayoutDeserializer::default()) - .with_batch_size(5) .build() .await .unwrap(); @@ -261,11 +254,11 @@ async fn unequal_batches() { let numbers = numbers.into_primitive().unwrap(); assert_eq!(numbers.ptype(), PType::U32); } else { - panic!("Expected column doesn't exist") + vortex_panic!("Expected column doesn't exist") } } assert_eq!(item_count, 10); - assert_eq!(batch_count, 2); + assert_eq!(batch_count, 3); } #[tokio::test] diff --git a/vortex-serde/src/layouts/write/mod.rs b/vortex-serde/src/layouts/write/mod.rs index c6dbe070d4..0bcd2fc901 100644 --- a/vortex-serde/src/layouts/write/mod.rs +++ b/vortex-serde/src/layouts/write/mod.rs @@ -1,3 +1,4 @@ +pub use layouts::Layout; pub use writer::LayoutWriter; mod footer; diff --git a/vortex-serde/src/stream_writer/mod.rs b/vortex-serde/src/stream_writer/mod.rs index c16f525a13..7ce1e461f8 100644 --- a/vortex-serde/src/stream_writer/mod.rs +++ b/vortex-serde/src/stream_writer/mod.rs @@ -1,4 +1,5 @@ use std::fmt::{Display, Formatter}; +use std::ops::Range; use futures_util::{Stream, TryStreamExt}; use vortex::array::ChunkedArray; @@ -122,6 +123,10 @@ impl ByteRange { pub fn is_empty(&self) -> bool { self.begin == self.end } + + pub fn to_range(&self) -> Range { + self.begin as usize..self.end as usize + } } #[derive(Clone, Debug)] From 8b99cc56b51b684530341d9ffebb6a8d8ed07de4 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 24 Oct 2024 13:56:01 +0100 Subject: [PATCH 02/51] fix --- vortex-serde/src/layouts/read/buffered.rs | 2 +- vortex-serde/src/layouts/read/selection.rs | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/vortex-serde/src/layouts/read/buffered.rs b/vortex-serde/src/layouts/read/buffered.rs index 9003f529c7..dfcac38125 100644 --- a/vortex-serde/src/layouts/read/buffered.rs +++ b/vortex-serde/src/layouts/read/buffered.rs @@ -141,7 +141,7 @@ fn buffer_read( ) -> VortexResult> { while let Some(((begin, end), mut layout)) = layouts.pop_front() { // This selection doesn't know about rows in this chunk, we should put it back and wait for another request with different range - if selection.end() <= begin { + if selection.end() <= begin || selection.begin() > end { layouts.push_front(((begin, end), layout)); return Ok(None); } diff --git a/vortex-serde/src/layouts/read/selection.rs b/vortex-serde/src/layouts/read/selection.rs index 4629ee1e3c..7a5234d00e 100644 --- a/vortex-serde/src/layouts/read/selection.rs +++ b/vortex-serde/src/layouts/read/selection.rs @@ -1,5 +1,6 @@ #![allow(dead_code)] use std::cmp::{max, min}; +use std::fmt::{Display, Formatter}; use arrow_buffer::{BooleanBuffer, MutableBuffer}; use croaring::Bitmap; @@ -17,6 +18,12 @@ pub struct RowSelector { end: usize, } +impl Display for RowSelector { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "RowSelector [{}..{}]", self.begin, self.end) + } +} + impl RowSelector { pub fn new(values: Bitmap, begin: usize, end: usize) -> Self { Self { values, begin, end } From a5719551fe1983bf97cc1e137fe90af77510dc9a Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 24 Oct 2024 15:48:04 +0100 Subject: [PATCH 03/51] shortcircuit all true --- vortex-serde/src/layouts/read/selection.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/vortex-serde/src/layouts/read/selection.rs b/vortex-serde/src/layouts/read/selection.rs index 7a5234d00e..03f21811cf 100644 --- a/vortex-serde/src/layouts/read/selection.rs +++ b/vortex-serde/src/layouts/read/selection.rs @@ -90,6 +90,12 @@ impl RowSelector { return Ok(None); } + let array = array.as_ref(); + + if self.values.cardinality() == array.len() as u64 { + return Ok(Some(array.clone())); + } + let bitset = self .values .to_bitset() From c4026375835b76c681567b1d7053d315aa315481 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 24 Oct 2024 16:01:06 +0100 Subject: [PATCH 04/51] extract --- vortex-serde/src/layouts/read/selection.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/vortex-serde/src/layouts/read/selection.rs b/vortex-serde/src/layouts/read/selection.rs index 03f21811cf..7ae329c1c2 100644 --- a/vortex-serde/src/layouts/read/selection.rs +++ b/vortex-serde/src/layouts/read/selection.rs @@ -86,13 +86,14 @@ impl RowSelector { vortex_bail!("Cannot filter arrays with absolute row selections") } - if self.values.cardinality() == 0 { + let true_count = self.values.cardinality(); + if true_count == 0 { return Ok(None); } let array = array.as_ref(); - if self.values.cardinality() == array.len() as u64 { + if true_count == array.len() as u64 { return Ok(Some(array.clone())); } From a8622526ae9623c7656f5561c940e69de26eb325 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 24 Oct 2024 22:38:32 +0100 Subject: [PATCH 05/51] something --- vortex-serde/src/layouts/read/batch.rs | 53 +------------------ .../src/layouts/read/filter_project.rs | 25 ++++++--- vortex-serde/src/layouts/read/filtering.rs | 41 +++++++++++++- .../src/layouts/read/layouts/column.rs | 2 +- vortex-serde/src/layouts/read/layouts/flat.rs | 38 ++++++------- vortex-serde/src/layouts/read/mod.rs | 44 +-------------- 6 files changed, 75 insertions(+), 128 deletions(-) diff --git a/vortex-serde/src/layouts/read/batch.rs b/vortex-serde/src/layouts/read/batch.rs index 23be6e843a..8929723edb 100644 --- a/vortex-serde/src/layouts/read/batch.rs +++ b/vortex-serde/src/layouts/read/batch.rs @@ -1,12 +1,10 @@ use std::mem; -use std::sync::Arc; use vortex::array::StructArray; use vortex::validity::Validity; use vortex::{Array, IntoArray}; use vortex_dtype::FieldNames; use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; -use vortex_expr::VortexExpr; use crate::layouts::read::selection::RowSelector; use crate::layouts::read::{LayoutReader, ReadResult}; @@ -99,7 +97,6 @@ impl ColumnBatchReader { messages.extend(message); } ReadResult::Batch(a) => *child_array = Some(a), - ReadResult::Selector(_) => unreachable!("Can only produce batches"), }, None => { debug_assert!( @@ -146,54 +143,6 @@ impl ColumnBatchReader { } } -#[derive(Debug)] -pub struct FilterLayoutReader { - reader: ColumnBatchReader, - filter: Arc, - row_offset: usize, -} - -impl FilterLayoutReader { - pub fn new(reader: ColumnBatchReader, filter: Arc, row_offset: usize) -> Self { - Self { - reader, - filter, - row_offset, - } - } -} - -impl LayoutReader for FilterLayoutReader { - fn next_range(&mut self) -> VortexResult { - self.reader.next_range() - } - - fn read_next(&mut self, selection: RowSelector) -> VortexResult> { - match self.reader.read_next(selection)? { - None => Ok(None), - Some(rr) => match rr { - ReadResult::ReadMore(m) => Ok(Some(ReadResult::ReadMore(m))), - ReadResult::Batch(b) => { - let filter_result = self.filter.evaluate(&b)?; - let selector = RowSelector::from_array( - &filter_result, - self.row_offset, - self.row_offset + filter_result.len(), - )?; - self.row_offset += b.len(); - Ok(Some(ReadResult::Selector(selector))) - } - ReadResult::Selector(_) => unreachable!("Can only produce batches"), - }, - } - } - - fn advance(&mut self, up_to_row: usize) -> VortexResult> { - self.row_offset = up_to_row; - self.reader.advance(up_to_row) - } -} - #[derive(Debug)] pub struct ColumnBatchFilter { children: Vec>, @@ -282,7 +231,7 @@ impl ColumnBatchFilter { } *child_selector = Some(s) } - ReadResult::Batch(_) => unreachable!("Can only produce selectors"), + ReadResult::Batch(a) => unreachable!("Can only produce selectors"), }, None => { debug_assert!( diff --git a/vortex-serde/src/layouts/read/filter_project.rs b/vortex-serde/src/layouts/read/filter_project.rs index 93a1a5a1ac..28134acd12 100644 --- a/vortex-serde/src/layouts/read/filter_project.rs +++ b/vortex-serde/src/layouts/read/filter_project.rs @@ -15,19 +15,28 @@ pub fn filter_project( Some(filter.clone()) } else if let Some(s) = filter.as_any().downcast_ref::() { match s { Select::Include(i) => { - let mut fields = i + let fields = i .iter() .filter(|f| projection.contains(f)) .cloned() .collect::>(); match fields.len() { - 0 | 1 => fields.pop().map(Column::new).map(|c| Arc::new(c) as _), + 0 => None, + 1 => Some(Arc::new(Identity)), _ => Some(Arc::new(Select::include(fields))), } } Select::Exclude(e) => { - let mut fields = projection + let fields = projection .iter() .filter(|f| !e.contains(f)) .cloned() .collect::>(); match fields.len() { - 0 | 1 => fields.pop().map(Column::new).map(|c| Arc::new(c) as _), + 0 => None, + 1 => Some(Arc::new(Identity)), _ => Some(Arc::new(Select::include(fields))), } } diff --git a/vortex-serde/src/layouts/read/layouts/chunked.rs b/vortex-serde/src/layouts/read/layouts/chunked.rs index 80674ac049..645756c44c 100644 --- a/vortex-serde/src/layouts/read/layouts/chunked.rs +++ b/vortex-serde/src/layouts/read/layouts/chunked.rs @@ -43,10 +43,10 @@ impl LayoutSpec for ChunkedLayoutSpec { } #[derive(Debug)] -pub enum ChunkedLayoutState { +pub enum MetadataState { Init, ReadMetadata((Box, usize)), - ReadChunks(BufferedArrayReader), + Array(Array), } /// In memory representation of Chunked NestedLayout. @@ -62,8 +62,8 @@ pub struct ChunkedLayout { scan: Scan, layout_builder: LayoutDeserializer, message_cache: RelativeLayoutCache, - state: ChunkedLayoutState, - metadata_array: Option, + chunk_reader: Option, + metadata_array: MetadataState, } impl ChunkedLayout { @@ -83,8 +83,8 @@ impl ChunkedLayout { scan, layout_builder, message_cache, - state: ChunkedLayoutState::Init, - metadata_array: None, + chunk_reader: None, + metadata_array: MetadataState::Init, } } @@ -96,7 +96,7 @@ impl ChunkedLayout { } fn child_ranges(&self) -> VortexResult> { - let Some(m) = self.metadata_array.as_ref() else { + let MetadataState::Array(ref m) = self.metadata_array else { vortex_bail!("Must fetch metadata before") }; @@ -168,97 +168,99 @@ impl ChunkedLayout { impl LayoutReader for ChunkedLayout { fn next_range(&mut self) -> VortexResult { - match &mut self.state { - ChunkedLayoutState::Init => { - self.state = ChunkedLayoutState::ReadMetadata(self.metadata_layout()?); - self.next_range() - } - ChunkedLayoutState::ReadMetadata((r, nchildren)) => { - match read_metadata(r.as_mut(), *nchildren)? { - None => { - self.state = ChunkedLayoutState::ReadChunks(BufferedArrayReader::new( - self.ranged_children()?, - )); - self.next_range() - } - Some(mr) => match mr { - MetadataResult::ReadMore(m) => Ok(RangeResult::ReadMore(m)), - MetadataResult::Batch(r) => { - if self.metadata_array.is_some() { - vortex_bail!("Metadata is not chunked for now"); - } else { - self.metadata_array = Some(r); - } - self.next_range() + if let Some(br) = &mut self.chunk_reader { + br.next_range() + } else { + match &mut self.metadata_array { + MetadataState::Init => { + self.metadata_array = MetadataState::ReadMetadata(self.metadata_layout()?); + self.next_range() + } + MetadataState::ReadMetadata((mr, nchildren)) => { + match mr.read_next(RowSelector::new( + Bitmap::from_range(0..*nchildren as u32), + 0, + *nchildren, + ))? { + None => { + unreachable!( + "Metadata isn't chunked, will terminate after first batch result" + ) } - }, + Some(mr) => match mr { + ReadResult::ReadMore(m) => Ok(RangeResult::ReadMore(m)), + ReadResult::Batch(r) => { + if matches!(self.metadata_array, MetadataState::Array(_)) { + vortex_bail!("Metadata is not chunked for now"); + } else { + self.metadata_array = MetadataState::Array(r); + self.chunk_reader = + Some(BufferedArrayReader::new(self.ranged_children()?)); + } + self.next_range() + } + }, + } + } + MetadataState::Array(_) => { + unreachable!("Already fetched metadata but didn't create reader") } } - ChunkedLayoutState::ReadChunks(rc) => rc.next_range(), } } fn read_next(&mut self, selector: RowSelector) -> VortexResult> { - match &mut self.state { - ChunkedLayoutState::Init => { - self.state = ChunkedLayoutState::ReadMetadata(self.metadata_layout()?); - self.read_next(selector) - } - ChunkedLayoutState::ReadMetadata((r, nchildren)) => { - match read_metadata(r.as_mut(), *nchildren)? { - None => { - self.state = ChunkedLayoutState::ReadChunks(BufferedArrayReader::new( - self.ranged_children()?, - )); - self.read_next(selector) - } - Some(mr) => match mr { - MetadataResult::ReadMore(m) => Ok(Some(ReadResult::ReadMore(m))), - MetadataResult::Batch(r) => { - if self.metadata_array.is_some() { - vortex_bail!("Metadata is not chunked for now"); - } else { - self.metadata_array = Some(r); - } - self.read_next(selector) + if let Some(br) = &mut self.chunk_reader { + br.read_next(selector) + } else { + match &mut self.metadata_array { + MetadataState::Init => { + self.metadata_array = MetadataState::ReadMetadata(self.metadata_layout()?); + self.read_next(selector) + } + MetadataState::ReadMetadata((mr, nchildren)) => { + match mr.read_next(RowSelector::new( + Bitmap::from_range(0..*nchildren as u32), + 0, + *nchildren, + ))? { + None => { + unreachable!( + "Metadata isn't chunked, will terminate after first batch result" + ) } - }, + Some(mr) => match mr { + ReadResult::ReadMore(m) => Ok(Some(ReadResult::ReadMore(m))), + ReadResult::Batch(r) => { + if matches!(self.metadata_array, MetadataState::Array(_)) { + vortex_bail!("Metadata is not chunked for now"); + } else { + self.metadata_array = MetadataState::Array(r); + self.chunk_reader = + Some(BufferedArrayReader::new(self.ranged_children()?)); + } + self.read_next(selector) + } + }, + } + } + MetadataState::Array(_) => { + unreachable!("Already fetched metadata but didn't create reader") } } - ChunkedLayoutState::ReadChunks(rc) => rc.read_next(selector), } } fn advance(&mut self, up_to_row: usize) -> VortexResult> { - match &mut self.state { - ChunkedLayoutState::ReadChunks(br) => br.advance(up_to_row), - _ => { - self.offset = up_to_row; - Ok(vec![]) - } + if let Some(br) = &mut self.chunk_reader { + br.advance(up_to_row) + } else { + self.offset = up_to_row; + Ok(Vec::new()) } } } -enum MetadataResult { - Batch(Array), - ReadMore(Vec), -} - -fn read_metadata( - reader: &mut dyn LayoutReader, - nchildren: usize, -) -> VortexResult> { - let selector = RowSelector::new(Bitmap::from_range(0..nchildren as u32), 0, nchildren); - match reader.read_next(selector)? { - None => Ok(None), - Some(rr) => match rr { - ReadResult::ReadMore(m) => Ok(Some(MetadataResult::ReadMore(m))), - ReadResult::Batch(a) => Ok(Some(MetadataResult::Batch(a))), - }, - } -} - #[cfg(test)] mod tests { use std::collections::VecDeque; diff --git a/vortex-serde/src/layouts/read/layouts/column.rs b/vortex-serde/src/layouts/read/layouts/column.rs index 3c9bdb8a9a..10ac6b2f59 100644 --- a/vortex-serde/src/layouts/read/layouts/column.rs +++ b/vortex-serde/src/layouts/read/layouts/column.rs @@ -1,11 +1,10 @@ use std::sync::Arc; use bytes::Bytes; -use flatbuffers::{ForwardsUOffset, Vector}; use itertools::Itertools; use vortex_dtype::field::Field; use vortex_dtype::DType; -use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; use vortex_expr::{Column, Select}; use vortex_flatbuffers::footer; @@ -89,43 +88,19 @@ impl ColumnLayout { } } - fn read_child( - &self, - idx: usize, - children: Vector>, - dtype: DType, - ) -> VortexResult> { - let mut layout = self.layout_serde.read_layout( - self.fb_bytes.clone(), - children.get(idx)._tab.loc(), - self.length, - // TODO(robert): Changes this once we support nested projections - Scan::new(None), - self.message_cache.relative( - idx as u16, - Arc::new(LazyDeserializedDType::from_dtype(dtype)), - ), - )?; - if self.offset != 0 { - layout.advance(self.offset)?; - } - Ok(layout) - } - - fn filter_reader(&mut self) -> VortexResult { - let Some(ref rf) = self.scan.expr else { - vortex_bail!("Must have scan expression"); - }; - - let filter_refs = self - .scan_fields()? - .vortex_expect("Can't be an empty filter"); - let lazy_dtype = self.message_cache.dtype().project(&filter_refs)?; - + fn column_reader(&mut self) -> VortexResult { let fb_children = self .flatbuffer() .children() .ok_or_else(|| vortex_err!("Missing children"))?; + let field_refs = self.scan_fields()?; + let lazy_dtype = field_refs + .as_ref() + .map(|e| self.message_cache.dtype().project(e)) + .unwrap_or_else(|| Ok(self.message_cache.dtype().clone()))?; + + let refs = field_refs.unwrap_or_else(|| (0..fb_children.len()).map(Field::from).collect()); + let filter_dtype = lazy_dtype.value()?; let DType::Struct(s, ..) = filter_dtype else { vortex_bail!("Column layout must have struct dtype") @@ -136,12 +111,20 @@ impl ColumnLayout { let mut handled_children = Vec::new(); let mut handled_names = Vec::new(); - for (idx, field) in filter_refs.into_iter().enumerate() { + for (field, (name, dtype)) in refs + .into_iter() + .zip_eq(s.names().iter().cloned().zip_eq(s.dtypes().iter().cloned())) + { let resolved_child = lazy_dtype.resolve_field(&field)?; let child_loc = fb_children.get(resolved_child)._tab.loc(); - let filter = filter_project(rf, &[field]); + let filter = self + .scan + .expr + .as_ref() + .and_then(|e| filter_project(e, &[field])); - let has_filter = filter.is_some(); + let handled = + self.scan.expr.is_none() || (self.scan.expr.is_some() && filter.is_some()); let mut child = self.layout_serde.read_layout( self.fb_bytes.clone(), @@ -150,7 +133,7 @@ impl ColumnLayout { Scan::new(filter), self.message_cache.relative( resolved_child as u16, - Arc::new(LazyDeserializedDType::from_dtype(s.dtypes()[idx].clone())), + Arc::new(LazyDeserializedDType::from_dtype(dtype)), ), )?; @@ -158,25 +141,36 @@ impl ColumnLayout { child.advance(self.offset)?; } - if has_filter { + if handled { handled_children.push(child); - handled_names.push(s.names()[idx].clone()); + handled_names.push(name); } else { unhandled_children.push(child); - unhandled_children_names.push(s.names()[idx].clone()); + unhandled_children_names.push(name); } } if !unhandled_children_names.is_empty() { - let Some(prf) = filter_project( - rf, - &unhandled_children_names - .iter() - .map(|n| Field::from(n.as_ref())) - .collect::>(), - ) else { - vortex_bail!("Must be able to project filter into unhandled space") - }; + let prf = self + .scan + .expr + .as_ref() + .and_then(|e| { + filter_project( + e, + &unhandled_children_names + .iter() + .map(|n| Field::from(n.as_ref())) + .collect::>(), + ) + }) + .ok_or_else(|| { + vortex_err!( + "Must be able to project {:?} filter into unhandled space {}", + self.scan.expr.as_ref(), + unhandled_children_names.iter().format(",") + ) + })?; handled_children.push(Box::new(ColumnBatchReader::new( unhandled_children_names.into(), @@ -187,54 +181,26 @@ impl ColumnLayout { handled_names.push("unhandled".into()); } - let filter = Some(Arc::new(RowFilter::from_conjunction( - handled_names - .iter() - .map(|f| Arc::new(Column::new(Field::from(&**f))) as _) - .collect(), - )) as _); + let filter = self + .scan + .expr + .as_ref() + .map(|e| e.as_any().downcast_ref::().is_some()) + .unwrap_or(false) + .then(|| { + Arc::new(RowFilter::from_conjunction( + handled_names + .iter() + .map(|f| Arc::new(Column::new(Field::from(&**f))) as _) + .collect(), + )) as _ + }); + let shortcircuit_siblings = filter.is_some(); Ok(ColumnBatchReader::new( handled_names.into(), handled_children, filter, - true, - )) - } - - fn read_children(&mut self) -> VortexResult { - let lazy_dtype = self - .scan_fields()? - .map(|e| self.message_cache.dtype().project(&e)) - .unwrap_or_else(|| Ok(self.message_cache.dtype().clone()))?; - let DType::Struct(s, _) = lazy_dtype.value()? else { - vortex_bail!("DType was not a struct") - }; - - let fb_children = self - .flatbuffer() - .children() - .ok_or_else(|| vortex_err!("Missing children"))?; - - let expr_fields = self.scan_fields()?; - - let child_layouts = match expr_fields { - None => (0..fb_children.len()) - .zip_eq(s.dtypes().iter()) - .map(|(index, dtype)| self.read_child(index, fb_children, dtype.clone())) - .collect::>>()?, - Some(e) => e - .into_iter() - .map(|f| lazy_dtype.resolve_field(&f)) - .zip(s.dtypes().iter().cloned()) - .map(|(child_idx, dtype)| self.read_child(child_idx?, fb_children, dtype)) - .collect::>>()?, - }; - - Ok(ColumnBatchReader::new( - s.names().clone(), - child_layouts, - None, - false, + shortcircuit_siblings, )) } @@ -246,7 +212,7 @@ impl ColumnLayout { if let Some(se) = e.as_any().downcast_ref::() { - match se { - Select::Include(i) => Ok(i.clone()), - Select::Exclude(_) => vortex_bail!("Select::Exclude is not supported"), - } - } else { - Ok(e.references().into_iter().cloned().collect::>()) + .map(|e| self.message_cache.dtype().project(e)) + .unwrap_or_else(|| Ok(self.message_cache.dtype().clone()))?; + + Ok(( + field_refs.unwrap_or_else(|| (0..fb_children.len()).map(Field::from).collect()), + lazy_dtype, + )) + } + + /// Get fields referenced by scan expression preserving order if we're using select to project + fn scan_fields(&self) -> Option> { + self.scan.expr.as_ref().map(|e| { + if let Some(se) = e.as_any().downcast_ref::() { + } else if expr.as_any().downcast_ref::().is_some() { + Some(expr.clone()) + } else if let Some(s) = expr.as_any().downcast_ref::