Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter pushdown over layouts #1124

Merged
merged 51 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
3537205
Filter pushdown
robert3005 Oct 21, 2024
8b99cc5
fix
robert3005 Oct 24, 2024
a571955
shortcircuit all true
robert3005 Oct 24, 2024
c402637
extract
robert3005 Oct 24, 2024
a862252
something
robert3005 Oct 24, 2024
75bbc3d
compiles
robert3005 Oct 24, 2024
75aa351
less
robert3005 Oct 25, 2024
4d1fc8b
less
robert3005 Oct 25, 2024
da63388
fixes
robert3005 Oct 25, 2024
a9995b9
fixes
robert3005 Oct 25, 2024
59b2608
clearer
robert3005 Oct 25, 2024
448b81c
print
robert3005 Oct 25, 2024
697d4f9
error
robert3005 Oct 25, 2024
ca6f501
dead
robert3005 Oct 27, 2024
ebf53e1
fixes
robert3005 Oct 28, 2024
412e7bc
more
robert3005 Oct 28, 2024
b6cfde0
inline lengths
robert3005 Oct 29, 2024
9b8a605
less
robert3005 Oct 29, 2024
eb839b7
less
robert3005 Oct 30, 2024
b90fda6
less
robert3005 Oct 30, 2024
773acad
fix
robert3005 Oct 30, 2024
277f2e0
less
robert3005 Oct 30, 2024
12ab636
clippy
robert3005 Oct 30, 2024
63a7e54
less
robert3005 Oct 31, 2024
b5c59a0
refactor
robert3005 Oct 31, 2024
3084223
assert
robert3005 Oct 31, 2024
930293a
renames
robert3005 Oct 31, 2024
f8d9dd6
changes
robert3005 Oct 31, 2024
f0836ad
nounwrap
robert3005 Oct 31, 2024
8c055bf
limit
robert3005 Oct 31, 2024
24f22ab
simpler
robert3005 Oct 31, 2024
0928585
less
robert3005 Oct 31, 2024
d56403f
docs
robert3005 Oct 31, 2024
bee7fe1
rename
robert3005 Oct 31, 2024
fe04b1f
fixes
robert3005 Oct 31, 2024
20e6583
comment
robert3005 Oct 31, 2024
0f8ad20
no
robert3005 Oct 31, 2024
19134d4
move
robert3005 Nov 1, 2024
49e2b8f
nounwrap
robert3005 Nov 1, 2024
f83443e
shuffle
robert3005 Nov 1, 2024
33d659c
less
robert3005 Nov 1, 2024
bec27b9
fixes
robert3005 Nov 1, 2024
32511a6
less
robert3005 Nov 1, 2024
2fa9b4a
comment
robert3005 Nov 1, 2024
2e3cd95
more
robert3005 Nov 1, 2024
5be421b
more
robert3005 Nov 1, 2024
78fd9d6
fix
robert3005 Nov 1, 2024
92f3475
miri
robert3005 Nov 1, 2024
52c6b83
more
robert3005 Nov 1, 2024
e0172d0
skip
robert3005 Nov 1, 2024
ca3e936
for real
robert3005 Nov 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 10 additions & 12 deletions pyvortex/python/vortex/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ def head(
table : :class:`.pyarrow.Table`

"""
if batch_size is not None:
raise ValueError("batch_size is not supported")
if batch_readahead is not None:
raise ValueError("batch_readahead not supported")
if fragment_readahead is not None:
Expand All @@ -109,11 +111,7 @@ def head(
del memory_pool
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
if filter is not None:
filter = arrow_to_vortex_expr(filter, self.schema)
return (
self._dataset.to_array(columns=columns, batch_size=batch_size, row_filter=filter)
.slice(0, num_rows)
.to_arrow_table()
)
return self._dataset.to_array(columns=columns, row_filter=filter).slice(0, num_rows).to_arrow_table()

def join(
self,
Expand Down Expand Up @@ -232,11 +230,7 @@ def take(
table : :class:`.pyarrow.Table`

"""
return (
self._dataset.to_array(columns=columns, batch_size=batch_size, row_filter=filter)
.take(encoding.array(indices))
.to_arrow_table()
)
return self._dataset.to_array(columns=columns, row_filter=filter).take(encoding.array(indices)).to_arrow_table()

def to_record_batch_reader(
self,
Expand Down Expand Up @@ -276,6 +270,8 @@ def to_record_batch_reader(
table : :class:`.pyarrow.Table`

"""
if batch_size is not None:
raise ValueError("batch_size is not supported")
if batch_readahead is not None:
raise ValueError("batch_readahead not supported")
if fragment_readahead is not None:
Expand All @@ -289,7 +285,7 @@ def to_record_batch_reader(
del memory_pool
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
if filter is not None:
filter = arrow_to_vortex_expr(filter, self.schema)
return self._dataset.to_record_batch_reader(columns=columns, batch_size=batch_size, row_filter=filter)
return self._dataset.to_record_batch_reader(columns=columns, row_filter=filter)

def to_batches(
self,
Expand Down Expand Up @@ -383,6 +379,8 @@ def to_table(
table : :class:`.pyarrow.Table`

"""
if batch_size is not None:
raise ValueError("batch_size is not supported")
if batch_readahead is not None:
raise ValueError("batch_readahead not supported")
if fragment_readahead is not None:
Expand All @@ -396,7 +394,7 @@ def to_table(
del memory_pool
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
if filter is not None:
filter = arrow_to_vortex_expr(filter, self.schema)
return self._dataset.to_array(columns=columns, batch_size=batch_size, row_filter=filter).to_arrow_table()
return self._dataset.to_array(columns=columns, row_filter=filter).to_arrow_table()


def from_path(path: str) -> VortexDataset:
Expand Down
36 changes: 9 additions & 27 deletions pyvortex/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::{PyArray, TOKIO_RUNTIME};
pub async fn layout_stream_from_reader<T: VortexReadAt + Unpin>(
reader: T,
projection: Projection,
batch_size: Option<usize>,
row_filter: Option<RowFilter>,
) -> VortexResult<LayoutBatchStream<T>> {
let mut builder = LayoutReaderBuilder::new(
Expand All @@ -39,10 +38,6 @@ pub async fn layout_stream_from_reader<T: VortexReadAt + Unpin>(
)
.with_projection(projection);

if let Some(batch_size) = batch_size {
builder = builder.with_batch_size(batch_size);
}

if let Some(row_filter) = row_filter {
builder = builder.with_row_filter(row_filter);
}
Expand All @@ -53,10 +48,9 @@ pub async fn layout_stream_from_reader<T: VortexReadAt + Unpin>(
pub async fn read_array_from_reader<T: VortexReadAt + Unpin + 'static>(
reader: T,
projection: Projection,
batch_size: Option<usize>,
row_filter: Option<RowFilter>,
) -> VortexResult<Array> {
layout_stream_from_reader(reader, projection, batch_size, row_filter)
layout_stream_from_reader(reader, projection, row_filter)
.await?
.read_all()
.await
Expand Down Expand Up @@ -124,13 +118,11 @@ impl TokioFileDataset {
async fn async_to_array(
&self,
columns: Option<Vec<Bound<'_, PyAny>>>,
batch_size: Option<usize>,
row_filter: Option<&Bound<'_, PyExpr>>,
) -> PyResult<PyArray> {
let inner = read_array_from_reader(
self.file().await?,
projection_from_python(columns)?,
batch_size,
row_filter_from_python(row_filter),
)
.await?;
Expand All @@ -140,13 +132,11 @@ impl TokioFileDataset {
async fn async_to_record_batch_reader(
self_: PyRef<'_, Self>,
columns: Option<Vec<Bound<'_, PyAny>>>,
batch_size: Option<usize>,
row_filter: Option<&Bound<'_, PyExpr>>,
) -> PyResult<PyObject> {
let layout_reader = layout_stream_from_reader(
self_.file().await?,
projection_from_python(columns)?,
batch_size,
row_filter_from_python(row_filter),
)
.await?;
Expand All @@ -164,25 +154,23 @@ impl TokioFileDataset {
self_.schema.clone().to_pyarrow(self_.py())
}

#[pyo3(signature = (*, columns=None, batch_size=None, row_filter=None))]
#[pyo3(signature = (*, columns=None, row_filter=None))]
pub fn to_array(
&self,
columns: Option<Vec<Bound<'_, PyAny>>>,
batch_size: Option<usize>,
row_filter: Option<&Bound<'_, PyExpr>>,
) -> PyResult<PyArray> {
TOKIO_RUNTIME.block_on(self.async_to_array(columns, batch_size, row_filter))
TOKIO_RUNTIME.block_on(self.async_to_array(columns, row_filter))
}

#[pyo3(signature = (*, columns=None, batch_size=None, row_filter=None))]
#[pyo3(signature = (*, columns=None, row_filter=None))]
pub fn to_record_batch_reader(
self_: PyRef<Self>,
columns: Option<Vec<Bound<'_, PyAny>>>,
batch_size: Option<usize>,
row_filter: Option<&Bound<'_, PyExpr>>,
) -> PyResult<PyObject> {
TOKIO_RUNTIME.block_on(Self::async_to_record_batch_reader(
self_, columns, batch_size, row_filter,
self_, columns, row_filter,
))
}
}
Expand All @@ -208,13 +196,11 @@ impl ObjectStoreUrlDataset {
async fn async_to_array(
&self,
columns: Option<Vec<Bound<'_, PyAny>>>,
batch_size: Option<usize>,
row_filter: Option<&Bound<'_, PyExpr>>,
) -> PyResult<PyArray> {
let inner = read_array_from_reader(
self.reader().await?,
projection_from_python(columns)?,
batch_size,
row_filter_from_python(row_filter),
)
.await?;
Expand All @@ -224,13 +210,11 @@ impl ObjectStoreUrlDataset {
async fn async_to_record_batch_reader(
self_: PyRef<'_, Self>,
columns: Option<Vec<Bound<'_, PyAny>>>,
batch_size: Option<usize>,
row_filter: Option<&Bound<'_, PyExpr>>,
) -> PyResult<PyObject> {
let layout_reader = layout_stream_from_reader(
self_.reader().await?,
projection_from_python(columns)?,
batch_size,
row_filter_from_python(row_filter),
)
.await?;
Expand All @@ -248,25 +232,23 @@ impl ObjectStoreUrlDataset {
self_.schema.clone().to_pyarrow(self_.py())
}

#[pyo3(signature = (*, columns=None, batch_size=None, row_filter=None))]
#[pyo3(signature = (*, columns=None, row_filter=None))]
pub fn to_array(
&self,
columns: Option<Vec<Bound<'_, PyAny>>>,
batch_size: Option<usize>,
row_filter: Option<&Bound<'_, PyExpr>>,
) -> PyResult<PyArray> {
TOKIO_RUNTIME.block_on(self.async_to_array(columns, batch_size, row_filter))
TOKIO_RUNTIME.block_on(self.async_to_array(columns, row_filter))
}

#[pyo3(signature = (*, columns=None, batch_size=None, row_filter=None))]
#[pyo3(signature = (*, columns=None, row_filter=None))]
pub fn to_record_batch_reader(
self_: PyRef<Self>,
columns: Option<Vec<Bound<'_, PyAny>>>,
batch_size: Option<usize>,
row_filter: Option<&Bound<'_, PyExpr>>,
) -> PyResult<PyObject> {
TOKIO_RUNTIME.block_on(Self::async_to_record_batch_reader(
self_, columns, batch_size, row_filter,
self_, columns, row_filter,
))
}
}
Expand Down
4 changes: 2 additions & 2 deletions pyvortex/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub fn read_path(
row_filter: Option<&Bound<PyExpr>>,
) -> PyResult<PyArray> {
let dataset = TOKIO_RUNTIME.block_on(TokioFileDataset::try_new(path.extract()?))?;
dataset.to_array(projection, None, row_filter)
dataset.to_array(projection, row_filter)
}

/// Read a vortex struct array from a URL.
Expand Down Expand Up @@ -184,7 +184,7 @@ pub fn read_url(
row_filter: Option<&Bound<PyExpr>>,
) -> PyResult<PyArray> {
let dataset = TOKIO_RUNTIME.block_on(ObjectStoreUrlDataset::try_new(url.extract()?))?;
dataset.to_array(projection, None, row_filter)
dataset.to_array(projection, row_filter)
}

/// Write a vortex struct array to the local filesystem.
Expand Down
2 changes: 1 addition & 1 deletion pyvortex/test/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_to_batches(ds):

chunk0 = next(ds.to_batches(columns=["string", "bool"]))
assert chunk0.to_struct_array() == pa.array(
[record(x, columns=["string", "bool"]) for x in range(1 << 16)], type=schema
[record(x, columns=["string", "bool"]) for x in range(1_000_000)], type=schema
)


Expand Down
1 change: 0 additions & 1 deletion vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ impl ExecutionPlan for VortexExec {
ctx: self.ctx.clone(),
object_store,
projection: self.file_scan_config.projection.clone(),
batch_size: None,
predicate: self.predicate.clone(),
arrow_schema,
};
Expand Down
5 changes: 0 additions & 5 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use vortex_serde::layouts::{
pub struct VortexFileOpener {
pub ctx: Arc<Context>,
pub object_store: Arc<dyn ObjectStore>,
pub batch_size: Option<usize>,
pub projection: Option<Vec<usize>>,
pub predicate: Option<Arc<dyn PhysicalExpr>>,
pub arrow_schema: SchemaRef,
Expand All @@ -33,10 +32,6 @@ impl FileOpener for VortexFileOpener {
LayoutDeserializer::new(self.ctx.clone(), Arc::new(LayoutContext::default())),
);

if let Some(batch_size) = self.batch_size {
builder = builder.with_batch_size(batch_size);
}

let row_filter = self
.predicate
.clone()
Expand Down
2 changes: 1 addition & 1 deletion vortex-expr/src/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl PartialEq<dyn Any> for Identity {
fn eq(&self, other: &dyn Any) -> bool {
unbox_any(other)
.downcast_ref::<Self>()
.map(|x| x == other)
.map(|x| x == self)
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
.unwrap_or(false)
}
}
2 changes: 1 addition & 1 deletion vortex-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn split_inner(expr: &Arc<dyn VortexExpr>, exprs: &mut Vec<Arc<dyn VortexExpr>>)
}

// Taken from apache-datafusion, necessary since you can't require VortexExpr implement PartialEq<dyn VortexExpr>
pub(crate) fn unbox_any(any: &dyn Any) -> &dyn Any {
pub fn unbox_any(any: &dyn Any) -> &dyn Any {
if any.is::<Arc<dyn VortexExpr>>() {
any.downcast_ref::<Arc<dyn VortexExpr>>()
.vortex_expect("any.is::<Arc<dyn VortexExpr>> returned true but downcast_ref failed")
Expand Down
1 change: 1 addition & 0 deletions vortex-flatbuffers/flatbuffers/vortex-serde/footer.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ table Layout {
encoding: uint16;
buffers: [Buffer];
children: [Layout];
length: uint64;
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
metadata: [ubyte];
}

Expand Down
19 changes: 18 additions & 1 deletion vortex-flatbuffers/src/generated/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ impl<'a> Layout<'a> {
pub const VT_ENCODING: flatbuffers::VOffsetT = 4;
pub const VT_BUFFERS: flatbuffers::VOffsetT = 6;
pub const VT_CHILDREN: flatbuffers::VOffsetT = 8;
pub const VT_METADATA: flatbuffers::VOffsetT = 10;
pub const VT_LENGTH: flatbuffers::VOffsetT = 10;
pub const VT_METADATA: flatbuffers::VOffsetT = 12;

#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
Expand All @@ -164,6 +165,7 @@ impl<'a> Layout<'a> {
args: &'args LayoutArgs<'args>
) -> flatbuffers::WIPOffset<Layout<'bldr>> {
let mut builder = LayoutBuilder::new(_fbb);
builder.add_length(args.length);
if let Some(x) = args.metadata { builder.add_metadata(x); }
if let Some(x) = args.children { builder.add_children(x); }
if let Some(x) = args.buffers { builder.add_buffers(x); }
Expand Down Expand Up @@ -194,6 +196,13 @@ impl<'a> Layout<'a> {
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset<Layout>>>>(Layout::VT_CHILDREN, None)}
}
#[inline]
pub fn length(&self) -> u64 {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<u64>(Layout::VT_LENGTH, Some(0)).unwrap()}
}
#[inline]
pub fn metadata(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
Expand All @@ -212,6 +221,7 @@ impl flatbuffers::Verifiable for Layout<'_> {
.visit_field::<u16>("encoding", Self::VT_ENCODING, false)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, Buffer>>>("buffers", Self::VT_BUFFERS, false)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, flatbuffers::ForwardsUOffset<Layout>>>>("children", Self::VT_CHILDREN, false)?
.visit_field::<u64>("length", Self::VT_LENGTH, false)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("metadata", Self::VT_METADATA, false)?
.finish();
Ok(())
Expand All @@ -221,6 +231,7 @@ pub struct LayoutArgs<'a> {
pub encoding: u16,
pub buffers: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, Buffer>>>,
pub children: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset<Layout<'a>>>>>,
pub length: u64,
pub metadata: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for LayoutArgs<'a> {
Expand All @@ -230,6 +241,7 @@ impl<'a> Default for LayoutArgs<'a> {
encoding: 0,
buffers: None,
children: None,
length: 0,
metadata: None,
}
}
Expand All @@ -253,6 +265,10 @@ impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> LayoutBuilder<'a, 'b, A> {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(Layout::VT_CHILDREN, children);
}
#[inline]
pub fn add_length(&mut self, length: u64) {
self.fbb_.push_slot::<u64>(Layout::VT_LENGTH, length, 0);
}
#[inline]
pub fn add_metadata(&mut self, metadata: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(Layout::VT_METADATA, metadata);
}
Expand All @@ -277,6 +293,7 @@ impl core::fmt::Debug for Layout<'_> {
ds.field("encoding", &self.encoding());
ds.field("buffers", &self.buffers());
ds.field("children", &self.children());
ds.field("length", &self.length());
ds.field("metadata", &self.metadata());
ds.finish()
}
Expand Down
Loading
Loading