From 53599efc5afbb013630528f6d4324cc883e95f34 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 7 Jan 2025 06:45:17 +0000 Subject: [PATCH 01/61] DataFusion Layouts --- Cargo.lock | 1 + vortex-datafusion/Cargo.toml | 1 + vortex-datafusion/src/persistent/cache.rs | 22 ++++++---- vortex-datafusion/src/persistent/execution.rs | 6 +-- vortex-datafusion/src/persistent/format.rs | 43 ++++++++++++------- vortex-datafusion/src/persistent/opener.rs | 4 +- vortex-file/src/v2/file.rs | 21 +++++++-- vortex-file/src/v2/footer/file_layout.rs | 17 +++++++- vortex-file/src/v2/footer/mod.rs | 2 +- vortex-file/src/v2/mod.rs | 2 +- vortex-file/src/v2/open.rs | 17 ++++++++ 11 files changed, 98 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 707e9d7d75..37ba55c815 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4966,6 +4966,7 @@ dependencies = [ "vortex-expr", "vortex-file", "vortex-io", + "vortex-layout", ] [[package]] diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index 29e26bc1b3..2c40ae189e 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -42,6 +42,7 @@ vortex-error = { workspace = true, features = ["datafusion"] } vortex-expr = { workspace = true, features = ["datafusion"] } vortex-file = { workspace = true, features = ["object_store"] } vortex-io = { workspace = true, features = ["object_store", "tokio"] } +vortex-layout = { workspace = true } [dev-dependencies] anyhow = { workspace = true } diff --git a/vortex-datafusion/src/persistent/cache.rs b/vortex-datafusion/src/persistent/cache.rs index 7a6c174f22..7b5a497ce2 100644 --- a/vortex-datafusion/src/persistent/cache.rs +++ b/vortex-datafusion/src/persistent/cache.rs @@ -4,13 +4,15 @@ use chrono::{DateTime, Utc}; use moka::future::Cache; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; +use vortex_array::ContextRef; use vortex_error::{vortex_err, VortexError, VortexResult}; -use vortex_file::{read_initial_bytes, InitialRead}; +use vortex_file::v2::footer::FileLayout; +use vortex_file::v2::OpenOptions; use vortex_io::ObjectStoreReadAt; #[derive(Debug, Clone)] -pub struct InitialReadCache { - inner: Cache, +pub struct FileLayoutCache { + inner: Cache, } #[derive(Hash, Eq, PartialEq, Debug)] @@ -28,28 +30,30 @@ impl From<&ObjectMeta> for Key { } } -impl InitialReadCache { +impl FileLayoutCache { pub fn new(size_mb: usize) -> Self { let inner = Cache::builder() - .weigher(|k: &Key, v: &InitialRead| (k.location.as_ref().len() + v.buf.len()) as u32) .max_capacity(size_mb as u64 * (2 << 20)) - .eviction_listener(|k, _v, cause| { + .eviction_listener(|k: Arc, _v, cause| { log::trace!("Removed {} due to {:?}", k.location, cause); }) .build(); Self { inner } } + pub async fn try_get( &self, object: &ObjectMeta, store: Arc, - ) -> VortexResult { + ) -> VortexResult { self.inner .try_get_with(Key::from(object), async { let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); - let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?; - VortexResult::Ok(initial_read) + let vxf = OpenOptions::new(ContextRef::default()) + .open(os_read_at) + .await?; + VortexResult::Ok(vxf.file_layout()) }) .await .map_err(|e: Arc| match Arc::try_unwrap(e) { diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index 40514c1b90..94e28bdc60 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -13,7 +13,7 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, Plan use itertools::Itertools; use vortex_array::ContextRef; -use super::cache::InitialReadCache; +use super::cache::FileLayoutCache; use crate::persistent::opener::VortexFileOpener; #[derive(Debug, Clone)] @@ -24,7 +24,7 @@ pub struct VortexExec { plan_properties: PlanProperties, projected_statistics: Statistics, ctx: ContextRef, - initial_read_cache: InitialReadCache, + initial_read_cache: FileLayoutCache, } impl VortexExec { @@ -33,7 +33,7 @@ impl VortexExec { metrics: ExecutionPlanMetricsSet, predicate: Option>, ctx: ContextRef, - initial_read_cache: InitialReadCache, + initial_read_cache: FileLayoutCache, ) -> DFResult { let projected_schema = project_schema( &file_scan_config.file_schema, diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 8ab8d72248..50d5cc9b86 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -23,12 +23,13 @@ use vortex_array::arrow::infer_schema; use vortex_array::ContextRef; use vortex_error::VortexResult; use vortex_file::metadata::fetch_metadata; +use vortex_file::v2::OpenOptions; use vortex_file::{ LayoutContext, LayoutDeserializer, LayoutMessageCache, LayoutPath, Scan, VORTEX_FILE_EXTENSION, }; use vortex_io::{IoDispatcher, ObjectStoreReadAt}; -use super::cache::InitialReadCache; +use super::cache::FileLayoutCache; use super::execution::VortexExec; use super::statistics::{array_to_col_statistics, uncompressed_col_size}; use crate::can_be_pushed_down; @@ -36,7 +37,7 @@ use crate::can_be_pushed_down; #[derive(Debug)] pub struct VortexFormat { context: ContextRef, - initial_read_cache: InitialReadCache, + file_layout_cache: FileLayoutCache, opts: VortexFormatOptions, } @@ -61,7 +62,7 @@ impl Default for VortexFormat { Self { context: Default::default(), - initial_read_cache: InitialReadCache::new(opts.cache_size_mb), + file_layout_cache: FileLayoutCache::new(opts.cache_size_mb), opts, } } @@ -72,7 +73,7 @@ impl VortexFormat { let opts = VortexFormatOptions::default(); Self { context, - initial_read_cache: InitialReadCache::new(opts.cache_size_mb), + file_layout_cache: FileLayoutCache::new(opts.cache_size_mb), opts, } } @@ -109,12 +110,10 @@ impl FileFormat for VortexFormat { let file_schemas = stream::iter(objects.iter().cloned()) .map(|o| { let store = store.clone(); - let cache = self.initial_read_cache.clone(); + let cache = self.file_layout_cache.clone(); async move { - let initial_read = cache.try_get(&o, store).await?; - let lazy_dtype = initial_read.lazy_dtype(); - let s = infer_schema(lazy_dtype.value()?)?; - + let file_layout = cache.try_get(&o, store).await?; + let s = infer_schema(file_layout.dtype())?; VortexResult::Ok(s) } }) @@ -134,22 +133,34 @@ impl FileFormat for VortexFormat { table_schema: SchemaRef, object: &ObjectMeta, ) -> DFResult { - let initial_read = self - .initial_read_cache + let file_layout = self + .file_layout_cache .try_get(object, store.clone()) .await?; - let layout = initial_read.fb_layout(); - let row_count = layout.row_count(); + // Re-open the vortex file using the cached file layout + let vxf = OpenOptions::new(self.context.clone()) + .with_file_layout(file_layout) + .open(ObjectStoreReadAt::new( + store.clone(), + object.location.clone(), + )) + .await?; + + // Now we have to compute the column statistics for the table. + // If we assume a top-level struct DType (which is true for a DataFusion/Vortex + // integration), then we need some way to ask the layouts to fetch and compute the stats. + + let row_count = file_layout.row_count(); let layout_deserializer = LayoutDeserializer::new(self.context.clone(), LayoutContext::default().into()); let root_layout = layout_deserializer.read_layout( LayoutPath::default(), - initial_read.fb_layout(), + file_layout.fb_layout(), Scan::empty(), - initial_read.lazy_dtype().into(), + file_layout.lazy_dtype().into(), )?; let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); @@ -199,7 +210,7 @@ impl FileFormat for VortexFormat { metrics, filters.cloned(), self.context.clone(), - self.initial_read_cache.clone(), + self.file_layout_cache.clone(), )? .into_arc(); diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index b5ce6bfe96..8cf286ef76 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -13,7 +13,7 @@ use vortex_expr::RowFilter; use vortex_file::{LayoutContext, LayoutDeserializer, Projection, VortexReadBuilder}; use vortex_io::{IoDispatcher, ObjectStoreReadAt}; -use super::cache::InitialReadCache; +use super::cache::FileLayoutCache; /// Share an IO dispatcher across all DataFusion instances. static IO_DISPATCHER: LazyLock> = @@ -26,7 +26,7 @@ pub struct VortexFileOpener { pub projection: Option>, pub predicate: Option>, pub arrow_schema: SchemaRef, - pub(crate) initial_read_cache: InitialReadCache, + pub(crate) initial_read_cache: FileLayoutCache, } impl FileOpener for VortexFileOpener { diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 039b21f2d8..3ddce8011f 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -9,7 +9,7 @@ use vortex_io::VortexReadAt; use vortex_layout::scanner::{Poll, Scan}; use vortex_layout::{LayoutData, RowMask}; -use crate::v2::footer::Segment; +use crate::v2::footer::{FileLayout, Segment}; use crate::v2::segments::SegmentCache; pub struct VortexFile { @@ -20,18 +20,31 @@ pub struct VortexFile { pub(crate) segment_cache: SegmentCache, } -/// Async implementation of Vortex File. -impl VortexFile { +impl VortexFile { /// Returns the number of rows in the file. pub fn row_count(&self) -> u64 { self.layout.row_count() } - /// Returns the DType of the file. + /// Returns the [`DType`] of the file. pub fn dtype(&self) -> &DType { self.layout.dtype() } + /// Returns the [`FileLayout`] of the file. + /// + /// This can be passed to [`vortex_file::v2::VortexOpenOptions`] to reconstruct a + /// [`VortexFile`] without re-reading the footer. + pub fn file_layout(&self) -> FileLayout { + FileLayout { + root_layout: self.layout.clone(), + segments: self.segments.clone(), + } + } +} + +/// Async implementation of Vortex File. +impl VortexFile { /// Performs a scan operation over the file. pub fn scan(&self, scan: Scan) -> VortexResult { let layout_scan = self.layout.new_scan(scan, self.ctx.clone())?; diff --git a/vortex-file/src/v2/footer/file_layout.rs b/vortex-file/src/v2/footer/file_layout.rs index 03bb5cb984..32ab6b5793 100644 --- a/vortex-file/src/v2/footer/file_layout.rs +++ b/vortex-file/src/v2/footer/file_layout.rs @@ -1,15 +1,28 @@ +use vortex_dtype::DType; use vortex_flatbuffers::{footer2 as fb, FlatBufferRoot, WriteFlatBuffer}; use vortex_layout::LayoutData; use crate::v2::footer::segment::Segment; /// Captures the layout information of a Vortex file. -#[derive(Clone)] -pub(crate) struct FileLayout { +#[derive(Clone, Debug)] +pub struct FileLayout { pub(crate) root_layout: LayoutData, pub(crate) segments: Vec, } +impl FileLayout { + /// The [`DType`] of the file. + pub fn dtype(&self) -> &DType { + &self.root_layout.dtype() + } + + /// The row count of the file. + pub fn row_count(&self) -> u64 { + self.root_layout.row_count() + } +} + impl FlatBufferRoot for FileLayout {} impl WriteFlatBuffer for FileLayout { diff --git a/vortex-file/src/v2/footer/mod.rs b/vortex-file/src/v2/footer/mod.rs index a1448e0b5e..3e020e680f 100644 --- a/vortex-file/src/v2/footer/mod.rs +++ b/vortex-file/src/v2/footer/mod.rs @@ -2,6 +2,6 @@ mod file_layout; mod postscript; mod segment; -pub(crate) use file_layout::*; +pub use file_layout::*; pub(crate) use postscript::*; pub(crate) use segment::*; diff --git a/vortex-file/src/v2/mod.rs b/vortex-file/src/v2/mod.rs index a380f37e2f..93f635d6b0 100644 --- a/vortex-file/src/v2/mod.rs +++ b/vortex-file/src/v2/mod.rs @@ -1,5 +1,5 @@ mod file; -mod footer; +pub mod footer; mod open; mod segments; mod strategy; diff --git a/vortex-file/src/v2/open.rs b/vortex-file/src/v2/open.rs index 48906da71b..e821fcac6f 100644 --- a/vortex-file/src/v2/open.rs +++ b/vortex-file/src/v2/open.rs @@ -53,6 +53,12 @@ impl OpenOptions { self.initial_read_size = initial_read_size; Ok(self) } + + /// Configure a pre-existing file layout for the Vortex file. + pub fn with_file_layout(mut self, file_layout: FileLayout) -> Self { + self.file_layout = Some(file_layout); + self + } } impl OpenOptions { @@ -63,6 +69,17 @@ impl OpenOptions { /// Open the Vortex file using asynchronous IO. pub async fn open(self, read: R) -> VortexResult> { + // If we already have the file layout, we can skip the initial read entirely. + if let Some(file_layout) = self.file_layout { + return Ok(VortexFile { + read, + ctx: self.ctx.clone(), + layout: file_layout.root_layout, + segments: file_layout.segments, + segment_cache: Default::default(), + }); + } + // Fetch the file size and perform the initial read. let file_size = read.size().await?; let initial_read_size = self.initial_read_size.min(file_size); From e01cedcf42876c7a6de97e1ce7d28d5db05093d2 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 7 Jan 2025 07:34:33 +0000 Subject: [PATCH 02/61] DataFusion Layouts --- .history | 11 ++ vortex-datafusion/src/persistent/execution.rs | 2 +- vortex-datafusion/src/persistent/format.rs | 143 ++++++++---------- vortex-datafusion/src/persistent/opener.rs | 87 +++++------ vortex-file/src/v2/file.rs | 10 +- vortex-file/src/v2/open.rs | 14 +- vortex-file/src/v2/tests.rs | 9 +- vortex-layout/src/segments/mod.rs | 1 + 8 files changed, 138 insertions(+), 139 deletions(-) create mode 100644 .history diff --git a/.history b/.history new file mode 100644 index 0000000000..f95c77bcde --- /dev/null +++ b/.history @@ -0,0 +1,11 @@ +#V2 +copy (select arrow_cast(1, 'Int8') as x) to '/tmp/foo.parquet'; +describe '/tmp/foo.parquet'; +explain select x = arrow_cast(10000, 'Int32') from '/tmp/foo.parquet'; +select x + 10000 from '/tmp/foo.parquet'; +explain select x + 10000 from '/tmp/foo.parquet'; +explain select x = 10000 from '/tmp/foo.parquet'; +explain select x = arrow_cast(10000, "Int32") from '/tmp/foo.parquet'; +explain select x = cast(10000 AS Int32) from '/tmp/foo.parquet'; +explain select x = cast(10000 AS Int) from '/tmp/foo.parquet'; +explain select x = cast(10000 AS int) from '/tmp/foo.parquet'; diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index 94e28bdc60..233eb10182 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -122,7 +122,7 @@ impl ExecutionPlan for VortexExec { object_store, projection: self.file_scan_config.projection.clone(), predicate: self.predicate.clone(), - initial_read_cache: self.initial_read_cache.clone(), + file_layout_cache: self.initial_read_cache.clone(), arrow_schema, }; let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?; diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 50d5cc9b86..1f06a22a08 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -8,30 +8,20 @@ use datafusion::datasource::file_format::{FileFormat, FilePushdownSupport}; use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use datafusion::execution::SessionState; use datafusion_common::parsers::CompressionTypeVariant; -use datafusion_common::stats::Precision; -use datafusion_common::{ - not_impl_err, ColumnStatistics, DataFusionError, Result as DFResult, Statistics, -}; +use datafusion_common::{not_impl_err, DataFusionError, Result as DFResult, Statistics}; use datafusion_expr::Expr; use datafusion_physical_expr::{LexRequirement, PhysicalExpr}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::ExecutionPlan; use futures::{stream, StreamExt as _, TryStreamExt as _}; use object_store::{ObjectMeta, ObjectStore}; -use vortex_array::array::StructArray; use vortex_array::arrow::infer_schema; use vortex_array::ContextRef; use vortex_error::VortexResult; -use vortex_file::metadata::fetch_metadata; -use vortex_file::v2::OpenOptions; -use vortex_file::{ - LayoutContext, LayoutDeserializer, LayoutMessageCache, LayoutPath, Scan, VORTEX_FILE_EXTENSION, -}; -use vortex_io::{IoDispatcher, ObjectStoreReadAt}; +use vortex_file::VORTEX_FILE_EXTENSION; use super::cache::FileLayoutCache; use super::execution::VortexExec; -use super::statistics::{array_to_col_statistics, uncompressed_col_size}; use crate::can_be_pushed_down; #[derive(Debug)] @@ -129,72 +119,73 @@ impl FileFormat for VortexFormat { async fn infer_stats( &self, _state: &SessionState, - store: &Arc, + _store: &Arc, table_schema: SchemaRef, - object: &ObjectMeta, + _object: &ObjectMeta, ) -> DFResult { - let file_layout = self - .file_layout_cache - .try_get(object, store.clone()) - .await?; - - // Re-open the vortex file using the cached file layout - let vxf = OpenOptions::new(self.context.clone()) - .with_file_layout(file_layout) - .open(ObjectStoreReadAt::new( - store.clone(), - object.location.clone(), - )) - .await?; - - // Now we have to compute the column statistics for the table. - // If we assume a top-level struct DType (which is true for a DataFusion/Vortex - // integration), then we need some way to ask the layouts to fetch and compute the stats. - - let row_count = file_layout.row_count(); - - let layout_deserializer = - LayoutDeserializer::new(self.context.clone(), LayoutContext::default().into()); - - let root_layout = layout_deserializer.read_layout( - LayoutPath::default(), - file_layout.fb_layout(), - Scan::empty(), - file_layout.lazy_dtype().into(), - )?; - - let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); - let io = IoDispatcher::default(); - let mut stats = Statistics::new_unknown(&table_schema); - stats.num_rows = Precision::Exact(row_count as usize); - - let msgs = LayoutMessageCache::default(); - - if let Some(metadata_table) = - fetch_metadata(os_read_at, io.into(), root_layout, msgs).await? - { - let mut column_statistics = Vec::with_capacity(table_schema.fields().len()); - let mut total_size = 0_u64; - - for col_stats in metadata_table.into_iter() { - let col_stats = match col_stats { - Some(array) => { - let col_metadata_array = StructArray::try_from(array)?; - let col_stats = array_to_col_statistics(&col_metadata_array)?; - - total_size += - uncompressed_col_size(&col_metadata_array)?.unwrap_or_default(); - col_stats - } - None => ColumnStatistics::new_unknown(), - }; - column_statistics.push(col_stats); - } - stats.column_statistics = column_statistics; - stats.total_byte_size = Precision::Inexact(total_size as usize); - } - - Ok(stats) + Ok(Statistics::new_unknown(&table_schema)) + // let file_layout = self + // .file_layout_cache + // .try_get(object, store.clone()) + // .await?; + // + // // Re-open the vortex file using the cached file layout + // let vxf = OpenOptions::new(self.context.clone()) + // .with_file_layout(file_layout) + // .open(ObjectStoreReadAt::new( + // store.clone(), + // object.location.clone(), + // )) + // .await?; + // + // // Now we have to compute the column statistics for the table. + // // If we assume a top-level struct DType (which is true for a DataFusion/Vortex + // // integration), then we need some way to ask the layouts to fetch and compute the stats. + // + // let row_count = file_layout.row_count(); + // + // let layout_deserializer = + // LayoutDeserializer::new(self.context.clone(), LayoutContext::default().into()); + // + // let root_layout = layout_deserializer.read_layout( + // LayoutPath::default(), + // file_layout.fb_layout(), + // Scan::empty(), + // file_layout.lazy_dtype().into(), + // )?; + // + // let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); + // let io = IoDispatcher::default(); + // let mut stats = Statistics::new_unknown(&table_schema); + // stats.num_rows = Precision::Exact(row_count as usize); + // + // let msgs = LayoutMessageCache::default(); + // + // if let Some(metadata_table) = + // fetch_metadata(os_read_at, io.into(), root_layout, msgs).await? + // { + // let mut column_statistics = Vec::with_capacity(table_schema.fields().len()); + // let mut total_size = 0_u64; + // + // for col_stats in metadata_table.into_iter() { + // let col_stats = match col_stats { + // Some(array) => { + // let col_metadata_array = StructArray::try_from(array)?; + // let col_stats = array_to_col_statistics(&col_metadata_array)?; + // + // total_size += + // uncompressed_col_size(&col_metadata_array)?.unwrap_or_default(); + // col_stats + // } + // None => ColumnStatistics::new_unknown(), + // }; + // column_statistics.push(col_stats); + // } + // stats.column_statistics = column_statistics; + // stats.total_byte_size = Precision::Inexact(total_size as usize); + // } + // + // Ok(stats) } async fn create_physical_plan( diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 8cf286ef76..bf3ed79eb8 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -1,24 +1,21 @@ -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; use datafusion_common::Result as DFResult; -use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; +use datafusion_physical_expr::PhysicalExpr; use futures::{FutureExt as _, StreamExt, TryStreamExt}; use object_store::ObjectStore; use vortex_array::ContextRef; use vortex_expr::datafusion::convert_expr_to_vortex; -use vortex_expr::RowFilter; -use vortex_file::{LayoutContext, LayoutDeserializer, Projection, VortexReadBuilder}; -use vortex_io::{IoDispatcher, ObjectStoreReadAt}; +use vortex_expr::Identity; +use vortex_file::v2::OpenOptions; +use vortex_io::ObjectStoreReadAt; +use vortex_layout::scanner::Scan; use super::cache::FileLayoutCache; -/// Share an IO dispatcher across all DataFusion instances. -static IO_DISPATCHER: LazyLock> = - LazyLock::new(|| Arc::new(IoDispatcher::default())); - #[derive(Clone)] pub struct VortexFileOpener { pub ctx: ContextRef, @@ -26,61 +23,45 @@ pub struct VortexFileOpener { pub projection: Option>, pub predicate: Option>, pub arrow_schema: SchemaRef, - pub(crate) initial_read_cache: FileLayoutCache, + pub(crate) file_layout_cache: FileLayoutCache, } impl FileOpener for VortexFileOpener { fn open(&self, file_meta: FileMeta) -> DFResult { let this = self.clone(); - let f = async move { - let read_at = - ObjectStoreReadAt::new(this.object_store.clone(), file_meta.location().clone()); - let initial_read = this - .initial_read_cache - .try_get(&file_meta.object_meta, this.object_store.clone()) - .await?; - - let mut builder = VortexReadBuilder::new( - read_at, - LayoutDeserializer::new(this.ctx.clone(), Arc::new(LayoutContext::default())), - ) - .with_io_dispatcher(IO_DISPATCHER.clone()) - .with_file_size(file_meta.object_meta.size as u64) - .with_initial_read(initial_read); - // We split the predicate and filter out the conjunction members that we can't push down - let row_filter = this + // TODO(ngates): figure out how to map the column index projection into a projection expression. + let projection = Identity::new_expr(); + let scan = Scan { + projection, + filter: self .predicate .as_ref() - .map(|filter_expr| { - split_conjunction(filter_expr) - .into_iter() - .filter_map(|e| convert_expr_to_vortex(e.clone()).ok()) - .collect::>() - }) - .filter(|conjunction| !conjunction.is_empty()) - .map(RowFilter::from_conjunction); + .map(|expr| convert_expr_to_vortex(expr.clone())) + .transpose()?, + }; - if let Some(row_filter) = row_filter { - builder = builder.with_row_filter(row_filter); - } + Ok(async move { + let read_at = + ObjectStoreReadAt::new(this.object_store.clone(), file_meta.location().clone()); - if let Some(projection) = this.projection.as_ref() { - builder = builder.with_projection(Projection::new(projection)); - } + let vxf = OpenOptions::new(this.ctx.clone()) + .with_file_size(file_meta.object_meta.size as u64) + .with_file_layout( + this.file_layout_cache + .try_get(&file_meta.object_meta, this.object_store.clone()) + .await?, + ) + .open(read_at) + .await?; - Ok(Box::pin( - builder - .build() - .await? - .into_stream() - .map_ok(RecordBatch::try_from) - .map(|r| r.and_then(|inner| inner)) - .map_err(|e| e.into()), - ) as _) + Ok(vxf + .scan(scan)? + .map_ok(RecordBatch::try_from) + .map(|r| r.and_then(|inner| inner)) + .map_err(|e| e.into()) + .boxed()) } - .boxed(); - - Ok(f) + .boxed()) } } diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 3ddce8011f..7ab7a6e59d 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -46,7 +46,7 @@ impl VortexFile { /// Async implementation of Vortex File. impl VortexFile { /// Performs a scan operation over the file. - pub fn scan(&self, scan: Scan) -> VortexResult { + pub fn scan(self, scan: Scan) -> VortexResult { let layout_scan = self.layout.new_scan(scan, self.ctx.clone())?; let scan_dtype = layout_scan.dtype().clone(); @@ -57,17 +57,19 @@ impl VortexFile { let mut scanner = layout_scan.create_scanner(row_mask)?; let stream = stream::once(async move { + let segment_cache = self.segment_cache.clone(); + let segments = self.segments.clone(); loop { - match scanner.poll(&self.segment_cache)? { + match scanner.poll(&segment_cache)? { Poll::Some(array) => return Ok(array), Poll::NeedMore(segment_ids) => { for segment_id in segment_ids { - let segment = &self.segments[*segment_id as usize]; + let segment = &segments[*segment_id as usize]; let bytes = self .read .read_byte_range(segment.offset, segment.length as u64) .await?; - self.segment_cache.set(segment_id, bytes); + segment_cache.set(segment_id, bytes); } } } diff --git a/vortex-file/src/v2/open.rs b/vortex-file/src/v2/open.rs index e821fcac6f..351dca1957 100644 --- a/vortex-file/src/v2/open.rs +++ b/vortex-file/src/v2/open.rs @@ -24,6 +24,8 @@ pub struct OpenOptions { ctx: ContextRef, /// The Vortex Layout encoding context. layout_ctx: LayoutContextRef, + /// An optional, externally provided, file size. + file_size: Option, /// An optional, externally provided, file layout. file_layout: Option, /// An optional, externally provided, dtype. @@ -39,6 +41,7 @@ impl OpenOptions { Self { ctx, layout_ctx: LayoutContextRef::default(), + file_size: None, file_layout: None, dtype: None, initial_read_size: INITIAL_READ_SIZE, @@ -54,6 +57,12 @@ impl OpenOptions { Ok(self) } + /// Configure a known file size for the Vortex file. + pub fn with_file_size(mut self, file_size: u64) -> Self { + self.file_size = Some(file_size); + self + } + /// Configure a pre-existing file layout for the Vortex file. pub fn with_file_layout(mut self, file_layout: FileLayout) -> Self { self.file_layout = Some(file_layout); @@ -81,7 +90,10 @@ impl OpenOptions { } // Fetch the file size and perform the initial read. - let file_size = read.size().await?; + let file_size = match self.file_size { + None => read.size().await?, + Some(file_size) => file_size, + }; let initial_read_size = self.initial_read_size.min(file_size); let initial_offset = file_size - initial_read_size; let initial_read: ByteBuffer = read diff --git a/vortex-file/src/v2/tests.rs b/vortex-file/src/v2/tests.rs index 4718e33251..70d301644b 100644 --- a/vortex-file/src/v2/tests.rs +++ b/vortex-file/src/v2/tests.rs @@ -15,14 +15,15 @@ async fn write_read() { ]) .into_array(); - let written = WriteOptions::default() + let written: Bytes = WriteOptions::default() .write_async(vec![], arr.into_array_stream()) .await - .unwrap(); + .unwrap() + // TODO(ngates): no need to wrap Vec in Bytes if VortexReadAt doesn't require clone. + .into(); - // TODO(ngates): no need to wrap Vec in Bytes if VortexReadAt doesn't require clone. let vxf = OpenOptions::new(ContextRef::default()) - .open(Bytes::from(written)) + .open(written) .await .unwrap(); diff --git a/vortex-layout/src/segments/mod.rs b/vortex-layout/src/segments/mod.rs index 8b205e2632..276abe7886 100644 --- a/vortex-layout/src/segments/mod.rs +++ b/vortex-layout/src/segments/mod.rs @@ -27,6 +27,7 @@ pub trait SegmentReader { /// Attempt to get the data associated with a given segment ID. /// /// If the segment ID is not found, `None` is returned. + // TODO(ngates): we should probably take Alignment and return ByteBuffer here. fn get(&self, id: SegmentId) -> Option; } From 3c9f28594ce4648c90ccd3acb7254a430494f58a Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 7 Jan 2025 08:49:00 +0000 Subject: [PATCH 03/61] DataFusion Layouts --- bench-vortex/src/clickbench.rs | 23 ++++---- bench-vortex/src/reader.rs | 63 ++++++++-------------- bench-vortex/src/tpch/mod.rs | 9 ++-- vortex-datafusion/src/persistent/cache.rs | 4 +- vortex-datafusion/src/persistent/format.rs | 10 ++++ vortex-datafusion/src/persistent/opener.rs | 22 ++++++-- vortex-file/src/v2/file.rs | 28 +++++++++- vortex-file/src/v2/mod.rs | 2 + vortex-file/src/v2/open.rs | 6 +-- vortex-file/src/v2/tests.rs | 8 +-- vortex-file/src/v2/writer.rs | 10 ++-- 11 files changed, 111 insertions(+), 74 deletions(-) diff --git a/bench-vortex/src/clickbench.rs b/bench-vortex/src/clickbench.rs index ad73f0056e..e1a51c8076 100644 --- a/bench-vortex/src/clickbench.rs +++ b/bench-vortex/src/clickbench.rs @@ -7,13 +7,16 @@ use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; use datafusion::prelude::{ParquetReadOptions, SessionContext}; -use futures::{stream, StreamExt, TryStreamExt}; +use futures::executor::block_on; +use itertools::Itertools; +use rayon::prelude::*; use tokio::fs::{create_dir_all, OpenOptions}; use vortex::aliases::hash_map::HashMap; use vortex::array::{ChunkedArray, StructArray}; use vortex::dtype::DType; use vortex::error::vortex_err; -use vortex::file::{VortexFileWriter, VORTEX_FILE_EXTENSION}; +use vortex::file::v2::VortexWriteOptions; +use vortex::file::VORTEX_FILE_EXTENSION; use vortex::sampling_compressor::SamplingCompressor; use vortex::variants::StructArrayTrait; use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; @@ -149,7 +152,9 @@ pub async fn register_vortex_files( let vortex_dir = input_path.join("vortex"); create_dir_all(&vortex_dir).await?; - stream::iter(0..100) + (0..100) + .collect_vec() + .par_iter() .map(|idx| { let parquet_file_path = input_path .join("parquet") @@ -158,7 +163,7 @@ pub async fn register_vortex_files( let session = session.clone(); let schema = schema.clone(); - tokio::spawn(async move { + block_on(async move { let output_path = output_path.clone(); idempotent_async(&output_path, move |vtx_file| async move { eprintln!("Processing file {idx}"); @@ -219,9 +224,9 @@ pub async fn register_vortex_files( .open(&vtx_file) .await?; - let mut writer = VortexFileWriter::new(f); - writer = writer.write_array_columns(data).await?; - writer.finalize().await?; + VortexWriteOptions::default() + .write(f, data.into_array_stream()) + .await?; anyhow::Ok(()) }) @@ -229,9 +234,7 @@ pub async fn register_vortex_files( .expect("Failed to write Vortex file") }) }) - .buffered(16) - .try_collect::>() - .await?; + .collect::>(); let format = Arc::new(VortexFormat::new(CTX.clone())); let table_path = vortex_dir diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index 9fc1e4441c..d213fc35e5 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -2,7 +2,7 @@ use std::fs::File; use std::ops::Range; use std::path::{Path, PathBuf}; use std::process::Command; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use arrow_array::types::Int64Type; use arrow_array::{ @@ -24,18 +24,15 @@ use stream::StreamExt; use vortex::aliases::hash_map::HashMap; use vortex::array::ChunkedArray; use vortex::arrow::FromArrowType; -use vortex::buffer::Buffer; use vortex::compress::CompressionStrategy; use vortex::dtype::DType; use vortex::error::VortexResult; -use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder}; -use vortex::io::{IoDispatcher, ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite}; +use vortex::file::v2::{Scan, VortexOpenOptions, VortexWriteOptions}; +use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite}; use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT}; +use vortex::stream::ArrayStreamExt; use vortex::{ArrayData, IntoArrayData, IntoCanonical}; -static DISPATCHER: LazyLock> = - LazyLock::new(|| Arc::new(IoDispatcher::default())); - pub const BATCH_SIZE: usize = 65_536; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -48,19 +45,12 @@ pub struct VortexFooter { pub async fn open_vortex(path: &Path) -> VortexResult { let file = TokioFile::open(path).unwrap(); - VortexReadBuilder::new( - file, - LayoutDeserializer::new( - ALL_ENCODINGS_CONTEXT.clone(), - LayoutContext::default().into(), - ), - ) - .with_io_dispatcher(DISPATCHER.clone()) - .build() - .await? - .into_stream() - .read_all() - .await + VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone()) + .open(file) + .await? + .scan(Scan::all())? + .into_array_data() + .await } pub async fn rewrite_parquet_as_vortex( @@ -69,11 +59,10 @@ pub async fn rewrite_parquet_as_vortex( ) -> VortexResult<()> { let chunked = compress_parquet_to_vortex(parquet_path.as_path())?; - VortexFileWriter::new(write) - .write_array_columns(chunked) - .await? - .finalize() + VortexWriteOptions::default() + .write(write, chunked.into_array_stream()) .await?; + Ok(()) } @@ -118,23 +107,15 @@ async fn take_vortex( reader: T, indices: &[u64], ) -> VortexResult { - VortexReadBuilder::new( - reader, - LayoutDeserializer::new( - ALL_ENCODINGS_CONTEXT.clone(), - LayoutContext::default().into(), - ), - ) - .with_io_dispatcher(DISPATCHER.clone()) - .with_indices(Buffer::copy_from(indices).into_array()) - .build() - .await? - .into_stream() - .read_all() - .await - // For equivalence.... we decompress to make sure we're not cheating too much. - .and_then(IntoCanonical::into_canonical) - .map(ArrayData::from) + VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone()) + .open(reader) + .await? + .scan_rows(Scan::all(), indices.iter().copied())? + .into_array_data() + .await? + // For equivalence.... we decompress to make sure we're not cheating too much. + .into_canonical() + .map(ArrayData::from) } pub async fn take_vortex_object_store( diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index 3dabb7bb97..bbd4b4d5e7 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -16,7 +16,7 @@ use vortex::aliases::hash_map::HashMap; use vortex::array::{ChunkedArray, StructArray}; use vortex::arrow::FromArrowArray; use vortex::dtype::DType; -use vortex::file::{VortexFileWriter, VORTEX_FILE_EXTENSION}; +use vortex::file::VORTEX_FILE_EXTENSION; use vortex::sampling_compressor::SamplingCompressor; use vortex::variants::StructArrayTrait; use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; @@ -31,6 +31,7 @@ mod execute; pub mod schema; pub use execute::*; +use vortex::file::v2::VortexWriteOptions; pub const EXPECTED_ROW_COUNTS: [usize; 23] = [ 0, 4, 460, 11620, 5, 5, 1, 4, 2, 175, 37967, 1048, 2, 42, 1, 1, 18314, 1, 57, 1, 186, 411, 7, @@ -275,9 +276,9 @@ async fn register_vortex_file( .open(&vtx_file) .await?; - let mut writer = VortexFileWriter::new(f); - writer = writer.write_array_columns(data).await?; - writer.finalize().await?; + VortexWriteOptions::default() + .write(f, data.into_array_stream()) + .await?; anyhow::Ok(()) }) diff --git a/vortex-datafusion/src/persistent/cache.rs b/vortex-datafusion/src/persistent/cache.rs index 7b5a497ce2..4057afd9dd 100644 --- a/vortex-datafusion/src/persistent/cache.rs +++ b/vortex-datafusion/src/persistent/cache.rs @@ -7,7 +7,7 @@ use object_store::{ObjectMeta, ObjectStore}; use vortex_array::ContextRef; use vortex_error::{vortex_err, VortexError, VortexResult}; use vortex_file::v2::footer::FileLayout; -use vortex_file::v2::OpenOptions; +use vortex_file::v2::VortexOpenOptions; use vortex_io::ObjectStoreReadAt; #[derive(Debug, Clone)] @@ -50,7 +50,7 @@ impl FileLayoutCache { self.inner .try_get_with(Key::from(object), async { let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); - let vxf = OpenOptions::new(ContextRef::default()) + let vxf = VortexOpenOptions::new(ContextRef::default()) .open(os_read_at) .await?; VortexResult::Ok(vxf.file_layout()) diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 1f06a22a08..bc312d1968 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -194,6 +194,8 @@ impl FileFormat for VortexFormat { file_scan_config: FileScanConfig, filters: Option<&Arc>, ) -> DFResult> { + println!("PHYSICAL PLAN FILTERS {:?}", filters); + let _filters = format!("{:?}", filters); let metrics = ExecutionPlanMetricsSet::new(); let exec = VortexExec::try_new( @@ -224,6 +226,14 @@ impl FileFormat for VortexFormat { table_schema: &Schema, filters: &[&Expr], ) -> DFResult { + for filter in filters { + println!( + "FILTER {} {}", + filter, + can_be_pushed_down(filter, table_schema) + ); + } + let is_pushdown = filters .iter() .all(|expr| can_be_pushed_down(expr, table_schema)); diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index bf3ed79eb8..6d5f204ee6 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -7,10 +7,11 @@ use datafusion_common::Result as DFResult; use datafusion_physical_expr::PhysicalExpr; use futures::{FutureExt as _, StreamExt, TryStreamExt}; use object_store::ObjectStore; -use vortex_array::ContextRef; +use vortex_array::{ContextRef, IntoArrayData, IntoArrayVariant}; +use vortex_dtype::field::Field; use vortex_expr::datafusion::convert_expr_to_vortex; use vortex_expr::Identity; -use vortex_file::v2::OpenOptions; +use vortex_file::v2::VortexOpenOptions; use vortex_io::ObjectStoreReadAt; use vortex_layout::scanner::Scan; @@ -30,7 +31,8 @@ impl FileOpener for VortexFileOpener { fn open(&self, file_meta: FileMeta) -> DFResult { let this = self.clone(); - // TODO(ngates): figure out how to map the column index projection into a projection expression. + // FIXME(ngates): figure out how to map the column index projection into a projection expression. + // For now, we select columns later. let projection = Identity::new_expr(); let scan = Scan { projection, @@ -45,7 +47,7 @@ impl FileOpener for VortexFileOpener { let read_at = ObjectStoreReadAt::new(this.object_store.clone(), file_meta.location().clone()); - let vxf = OpenOptions::new(this.ctx.clone()) + let vxf = VortexOpenOptions::new(this.ctx.clone()) .with_file_size(file_meta.object_meta.size as u64) .with_file_layout( this.file_layout_cache @@ -55,8 +57,20 @@ impl FileOpener for VortexFileOpener { .open(read_at) .await?; + let vortex_projection: Option> = this + .projection + .map(|p| p.iter().map(|idx| Field::Index(*idx)).collect()); + Ok(vxf .scan(scan)? + .map_ok(move |array| { + if let Some(projection) = &vortex_projection { + Ok(array.into_struct()?.project(&projection)?.into_array()) + } else { + Ok(array) + } + }) + .map(|r| r.and_then(|inner| inner)) .map_ok(RecordBatch::try_from) .map(|r| r.and_then(|inner| inner)) .map_err(|e| e.into()) diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 7ab7a6e59d..217513684f 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -1,6 +1,7 @@ use std::io::Read; use futures_util::stream; +use vortex_array::compute::FilterMask; use vortex_array::stream::{ArrayStream, ArrayStreamAdapter}; use vortex_array::ContextRef; use vortex_dtype::DType; @@ -47,13 +48,38 @@ impl VortexFile { impl VortexFile { /// Performs a scan operation over the file. pub fn scan(self, scan: Scan) -> VortexResult { + let row_count = self.row_count(); + self.scan_range(scan, RowMask::new_valid_between(0, row_count)) + } + + /// Performs a scan operation over the file. + pub fn scan_rows>( + self, + scan: Scan, + indices: I, + ) -> VortexResult { + let row_count = self.row_count(); + + // TODO(ngates): do we only support "take" over usize rows? + let filter_mask = FilterMask::from_indices( + usize::try_from(row_count).expect("row count is too large for usize"), + indices + .into_iter() + .map(|i| usize::try_from(i).expect("row index is too large for usize")), + ); + let row_mask = RowMask::try_new(filter_mask, 0, row_count)?; + + self.scan_range(scan, row_mask) + } + + /// Performs a scan operation over a [`RowMask`] of the file. + fn scan_range(self, scan: Scan, row_mask: RowMask) -> VortexResult { let layout_scan = self.layout.new_scan(scan, self.ctx.clone())?; let scan_dtype = layout_scan.dtype().clone(); // TODO(ngates): we could query the layout for splits and then process them in parallel. // For now, we just scan the entire layout with one mask. // Note that to implement this we would use stream::try_unfold - let row_mask = RowMask::new_valid_between(0, layout_scan.layout().row_count()); let mut scanner = layout_scan.create_scanner(row_mask)?; let stream = stream::once(async move { diff --git a/vortex-file/src/v2/mod.rs b/vortex-file/src/v2/mod.rs index 93f635d6b0..571191ddd7 100644 --- a/vortex-file/src/v2/mod.rs +++ b/vortex-file/src/v2/mod.rs @@ -9,4 +9,6 @@ mod writer; pub use file::*; pub use open::*; +// TODO(ngates): probably can separate these APIs? For now, re-export the Scan. +pub use vortex_layout::scanner::Scan; pub use writer::*; diff --git a/vortex-file/src/v2/open.rs b/vortex-file/src/v2/open.rs index 351dca1957..35baa00714 100644 --- a/vortex-file/src/v2/open.rs +++ b/vortex-file/src/v2/open.rs @@ -19,7 +19,7 @@ use crate::{EOF_SIZE, MAGIC_BYTES, VERSION}; const INITIAL_READ_SIZE: u64 = 1 << 20; // 1 MB /// Open options for a Vortex file reader. -pub struct OpenOptions { +pub struct VortexOpenOptions { /// The Vortex Array encoding context. ctx: ContextRef, /// The Vortex Layout encoding context. @@ -36,7 +36,7 @@ pub struct OpenOptions { initial_read_size: u64, } -impl OpenOptions { +impl VortexOpenOptions { pub fn new(ctx: ContextRef) -> Self { Self { ctx, @@ -70,7 +70,7 @@ impl OpenOptions { } } -impl OpenOptions { +impl VortexOpenOptions { /// Open the Vortex file using synchronous IO. pub fn open_sync(self, _read: R) -> VortexResult> { todo!() diff --git a/vortex-file/src/v2/tests.rs b/vortex-file/src/v2/tests.rs index 70d301644b..734e265a11 100644 --- a/vortex-file/src/v2/tests.rs +++ b/vortex-file/src/v2/tests.rs @@ -5,7 +5,7 @@ use vortex_array::{ContextRef, IntoArrayData, IntoArrayVariant}; use vortex_buffer::buffer; use vortex_layout::scanner::Scan; -use crate::v2::{OpenOptions, WriteOptions}; +use crate::v2::{VortexOpenOptions, VortexWriteOptions}; #[tokio::test] async fn write_read() { @@ -15,14 +15,14 @@ async fn write_read() { ]) .into_array(); - let written: Bytes = WriteOptions::default() - .write_async(vec![], arr.into_array_stream()) + let written: Bytes = VortexWriteOptions::default() + .write(vec![], arr.into_array_stream()) .await .unwrap() // TODO(ngates): no need to wrap Vec in Bytes if VortexReadAt doesn't require clone. .into(); - let vxf = OpenOptions::new(ContextRef::default()) + let vxf = VortexOpenOptions::new(ContextRef::default()) .open(written) .await .unwrap(); diff --git a/vortex-file/src/v2/writer.rs b/vortex-file/src/v2/writer.rs index b1706c7ebc..1ba763cf1d 100644 --- a/vortex-file/src/v2/writer.rs +++ b/vortex-file/src/v2/writer.rs @@ -13,11 +13,11 @@ use crate::v2::segments::BufferedSegmentWriter; use crate::v2::strategy::VortexLayoutStrategy; use crate::{EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION}; -pub struct WriteOptions { +pub struct VortexWriteOptions { strategy: Box, } -impl Default for WriteOptions { +impl Default for VortexWriteOptions { fn default() -> Self { Self { strategy: Box::new(VortexLayoutStrategy), @@ -25,7 +25,7 @@ impl Default for WriteOptions { } } -impl WriteOptions { +impl VortexWriteOptions { /// Replace the default layout strategy with the provided one. pub fn with_strategy(mut self, strategy: Box) -> Self { self.strategy = strategy; @@ -33,14 +33,14 @@ impl WriteOptions { } } -impl WriteOptions { +impl VortexWriteOptions { /// Perform a blocking write of the provided iterator of `ArrayData`. pub fn write_sync(self, _write: W, _iter: I) -> VortexResult<()> { todo!() } /// Perform an async write of the provided stream of `ArrayData`. - pub async fn write_async( + pub async fn write( self, write: W, mut stream: S, From 6f4b711718539514c8093d61bd1236ee983a76e5 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 7 Jan 2025 09:15:23 +0000 Subject: [PATCH 04/61] DataFusion Layouts --- vortex-file/src/v2/file.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 217513684f..138c45b8ed 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -69,6 +69,20 @@ impl VortexFile { ); let row_mask = RowMask::try_new(filter_mask, 0, row_count)?; + // TODO(ngates): support thread pool execution. + // The plan is to have OpenOptions configure a Rayon ThreadPool for reading. We would + // par_iter each of the row masks (based on configured split by size or row count), + // launching their `poll` operation onto the thread pool. If a task returns NeedMore, + // then the segment IDs are handed to the IO dispatcher and a synchronous latch is + // returned. The IO dispatcher has visibility into all requested segments and can perform + // coalescing over ranges. Once a coalesced read returns, the dispatcher updates the + // segment cache with all read segments (including those that were incidentally read by + // in-between the coalesced ranges). A map of segment IDs -> set then provides + // a way for the dispatcher to notify the waiting tasks that their data is ready. When + // finished, the tasks push their results in order onto a channel that acts as the + // ArrayStream. + // This keeps I/O on the current thread (using the caller's existing runtime), while still + // enabling a CPU pool for decompression and filtering. self.scan_range(scan, row_mask) } From f1e11d1b677dcbc7cf2a1df4d800ee46955d8ff0 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 7 Jan 2025 09:20:40 +0000 Subject: [PATCH 05/61] DataFusion Layouts --- vortex-file/src/v2/file.rs | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 138c45b8ed..d75e7d2cfe 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -49,6 +49,22 @@ impl VortexFile { /// Performs a scan operation over the file. pub fn scan(self, scan: Scan) -> VortexResult { let row_count = self.row_count(); + + // TODO(ngates): support thread pool execution. + // The plan is to have OpenOptions configure a Rayon ThreadPool for reading. We would + // par_iter each of the row masks (based on configured split by size or row count), + // launching their `poll` operation onto the thread pool. If a task returns NeedMore, + // then the segment IDs are handed to the IO dispatcher and a synchronous latch is + // returned. The IO dispatcher has visibility into all requested segments and can perform + // coalescing over ranges. Once a coalesced read returns, the dispatcher updates the + // segment cache with all read segments (including those that were incidentally read by + // in-between the coalesced ranges). A map of segment IDs -> set then provides + // a way for the dispatcher to notify the waiting tasks that their data is ready. When + // finished, the tasks push their results in order onto a channel that acts as the + // ArrayStream. + // This keeps I/O on the current thread (using the caller's existing runtime), while still + // enabling a CPU pool for decompression and filtering. + self.scan_range(scan, RowMask::new_valid_between(0, row_count)) } @@ -69,20 +85,6 @@ impl VortexFile { ); let row_mask = RowMask::try_new(filter_mask, 0, row_count)?; - // TODO(ngates): support thread pool execution. - // The plan is to have OpenOptions configure a Rayon ThreadPool for reading. We would - // par_iter each of the row masks (based on configured split by size or row count), - // launching their `poll` operation onto the thread pool. If a task returns NeedMore, - // then the segment IDs are handed to the IO dispatcher and a synchronous latch is - // returned. The IO dispatcher has visibility into all requested segments and can perform - // coalescing over ranges. Once a coalesced read returns, the dispatcher updates the - // segment cache with all read segments (including those that were incidentally read by - // in-between the coalesced ranges). A map of segment IDs -> set then provides - // a way for the dispatcher to notify the waiting tasks that their data is ready. When - // finished, the tasks push their results in order onto a channel that acts as the - // ArrayStream. - // This keeps I/O on the current thread (using the caller's existing runtime), while still - // enabling a CPU pool for decompression and filtering. self.scan_range(scan, row_mask) } From 0d0b59eaae32ab86e832b52fefe08247338c09e1 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Thu, 9 Jan 2025 14:03:15 +0000 Subject: [PATCH 06/61] add hash to vortex-expr and therefore vortex-scalar --- vortex-buffer/src/alignment.rs | 2 +- vortex-buffer/src/buffer.rs | 2 +- vortex-buffer/src/string.rs | 2 +- vortex-expr/src/binary.rs | 3 ++- vortex-expr/src/column.rs | 1 + vortex-expr/src/get_item.rs | 3 ++- vortex-expr/src/identity.rs | 2 +- vortex-expr/src/lib.rs | 23 ++++++++++++++++++++++- vortex-expr/src/like.rs | 3 ++- vortex-expr/src/literal.rs | 2 +- vortex-expr/src/not.rs | 3 ++- vortex-expr/src/pack.rs | 11 ++--------- vortex-expr/src/row_filter.rs | 2 +- vortex-expr/src/select.rs | 4 ++-- vortex-scalar/src/lib.rs | 2 +- vortex-scalar/src/pvalue.rs | 27 ++++++++++++++++++++++++++- vortex-scalar/src/value.rs | 4 ++-- 17 files changed, 70 insertions(+), 26 deletions(-) diff --git a/vortex-buffer/src/alignment.rs b/vortex-buffer/src/alignment.rs index 267b536efb..7a1293762a 100644 --- a/vortex-buffer/src/alignment.rs +++ b/vortex-buffer/src/alignment.rs @@ -7,7 +7,7 @@ use vortex_error::VortexExpect; /// /// This type is a wrapper around `usize` that ensures the alignment is a power of 2 and fits into /// a `u16`. -#[derive(Clone, Debug, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Debug, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Alignment(usize); impl Alignment { diff --git a/vortex-buffer/src/buffer.rs b/vortex-buffer/src/buffer.rs index 31ec0391f3..d471263321 100644 --- a/vortex-buffer/src/buffer.rs +++ b/vortex-buffer/src/buffer.rs @@ -8,7 +8,7 @@ use vortex_error::{vortex_panic, VortexExpect}; use crate::{Alignment, BufferMut, ByteBuffer}; /// An immutable buffer of items of `T`. -#[derive(Clone, PartialEq, Eq, PartialOrd)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)] pub struct Buffer { pub(crate) bytes: Bytes, pub(crate) length: usize, diff --git a/vortex-buffer/src/string.rs b/vortex-buffer/src/string.rs index 7474e3a9ba..1c08de865d 100644 --- a/vortex-buffer/src/string.rs +++ b/vortex-buffer/src/string.rs @@ -5,7 +5,7 @@ use std::str::Utf8Error; use crate::ByteBuffer; /// A wrapper around a [`ByteBuffer`] that guarantees that the buffer contains valid UTF-8. -#[derive(Clone, PartialEq, Eq, PartialOrd)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)] pub struct BufferString(ByteBuffer); impl BufferString { diff --git a/vortex-expr/src/binary.rs b/vortex-expr/src/binary.rs index 07fbc8998d..5aef2c3652 100644 --- a/vortex-expr/src/binary.rs +++ b/vortex-expr/src/binary.rs @@ -1,5 +1,6 @@ use std::any::Any; use std::fmt::Display; +use std::hash::Hash; use std::sync::Arc; use vortex_array::compute::{and_kleene, compare, or_kleene, Operator as ArrayOperator}; @@ -8,7 +9,7 @@ use vortex_error::VortexResult; use crate::{ExprRef, Operator, VortexExpr}; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Hash)] pub struct BinaryExpr { lhs: ExprRef, operator: Operator, diff --git a/vortex-expr/src/column.rs b/vortex-expr/src/column.rs index 1260ca9560..b5cc969850 100644 --- a/vortex-expr/src/column.rs +++ b/vortex-expr/src/column.rs @@ -1,5 +1,6 @@ use std::any::Any; use std::fmt::Display; +use std::hash::Hash; use std::sync::Arc; use vortex_array::{ArrayDType, ArrayData}; diff --git a/vortex-expr/src/get_item.rs b/vortex-expr/src/get_item.rs index b519ae0572..d44fe7ea19 100644 --- a/vortex-expr/src/get_item.rs +++ b/vortex-expr/src/get_item.rs @@ -1,5 +1,6 @@ use std::any::Any; use std::fmt::{Debug, Display, Formatter}; +use std::hash::Hash; use std::sync::Arc; use vortex_array::ArrayData; @@ -8,7 +9,7 @@ use vortex_error::{vortex_err, VortexResult}; use crate::{ExprRef, VortexExpr}; -#[derive(Debug, Clone, Eq)] +#[derive(Debug, Clone, Eq, Hash)] pub struct GetItem { field: Field, child: ExprRef, diff --git a/vortex-expr/src/identity.rs b/vortex-expr/src/identity.rs index 56b6a65dd0..72aa4bc32a 100644 --- a/vortex-expr/src/identity.rs +++ b/vortex-expr/src/identity.rs @@ -7,7 +7,7 @@ use vortex_error::VortexResult; use crate::{ExprRef, VortexExpr}; -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug, PartialEq, Eq, Hash)] pub struct Identity; impl Identity { diff --git a/vortex-expr/src/lib.rs b/vortex-expr/src/lib.rs index 8998dd599e..6d49c94a11 100644 --- a/vortex-expr/src/lib.rs +++ b/vortex-expr/src/lib.rs @@ -1,5 +1,6 @@ use std::any::Any; use std::fmt::{Debug, Display}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; mod binary; @@ -42,7 +43,7 @@ use crate::traversal::{Node, ReferenceCollector}; pub type ExprRef = Arc; /// Represents logical operation on [`ArrayData`]s -pub trait VortexExpr: Debug + Send + Sync + DynEq + Display { +pub trait VortexExpr: Debug + Send + Sync + DynEq + DynHash + Display { /// Convert expression reference to reference of [`Any`] type fn as_any(&self) -> &dyn Any; @@ -108,6 +109,26 @@ impl PartialEq for dyn VortexExpr { impl Eq for dyn VortexExpr {} +/// [`PhysicalExpr`] can't be constrained by [`Hash`] directly because it must remain +/// object safe. To ease implementation blanket implementation is provided for [`Hash`] +/// types. +pub trait DynHash { + fn dyn_hash(&self, _state: &mut dyn Hasher); +} + +impl DynHash for T { + fn dyn_hash(&self, mut state: &mut dyn Hasher) { + self.type_id().hash(&mut state); + self.hash(&mut state) + } +} + +impl Hash for dyn VortexExpr { + fn hash(&self, state: &mut H) { + self.dyn_hash(state); + } +} + #[cfg(test)] mod tests { use vortex_dtype::{DType, Field, Nullability, PType, StructDType}; diff --git a/vortex-expr/src/like.rs b/vortex-expr/src/like.rs index 6050cfd560..43f0102fce 100644 --- a/vortex-expr/src/like.rs +++ b/vortex-expr/src/like.rs @@ -1,5 +1,6 @@ use std::any::Any; use std::fmt::Display; +use std::hash::Hash; use std::sync::Arc; use vortex_array::compute::{like, LikeOptions}; @@ -8,7 +9,7 @@ use vortex_error::VortexResult; use crate::{ExprRef, VortexExpr}; -#[derive(Debug)] +#[derive(Debug, Hash)] pub struct Like { child: ExprRef, pattern: ExprRef, diff --git a/vortex-expr/src/literal.rs b/vortex-expr/src/literal.rs index e9e7f78a1b..c1b16911a8 100644 --- a/vortex-expr/src/literal.rs +++ b/vortex-expr/src/literal.rs @@ -9,7 +9,7 @@ use vortex_scalar::Scalar; use crate::{ExprRef, VortexExpr}; -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Hash)] pub struct Literal { value: Scalar, } diff --git a/vortex-expr/src/not.rs b/vortex-expr/src/not.rs index adae94bc0e..17a0c0deb4 100644 --- a/vortex-expr/src/not.rs +++ b/vortex-expr/src/not.rs @@ -1,5 +1,6 @@ use std::any::Any; use std::fmt::Display; +use std::hash::Hash; use std::sync::Arc; use vortex_array::compute::invert; @@ -8,7 +9,7 @@ use vortex_error::VortexResult; use crate::{ExprRef, VortexExpr}; -#[derive(Debug)] +#[derive(Debug, Hash)] pub struct Not { child: ExprRef, } diff --git a/vortex-expr/src/pack.rs b/vortex-expr/src/pack.rs index f03da39dbd..6160e761ce 100644 --- a/vortex-expr/src/pack.rs +++ b/vortex-expr/src/pack.rs @@ -1,5 +1,6 @@ use std::any::Any; use std::fmt::Display; +use std::hash::Hash; use std::sync::Arc; use itertools::Itertools as _; @@ -37,7 +38,7 @@ use crate::{ExprRef, VortexExpr}; /// assert_eq!(scalar_at(&x_copy, 2).unwrap(), Scalar::from(200)); /// ``` /// -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Pack { names: FieldNames, values: Vec, @@ -102,14 +103,6 @@ impl VortexExpr for Pack { } } -impl PartialEq for Pack { - fn eq(&self, other: &Pack) -> bool { - self.names == other.names && self.values == other.values - } -} - -impl Eq for Pack {} - #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/vortex-expr/src/row_filter.rs b/vortex-expr/src/row_filter.rs index 0ceb5f9bf9..3bfbe4dece 100644 --- a/vortex-expr/src/row_filter.rs +++ b/vortex-expr/src/row_filter.rs @@ -12,7 +12,7 @@ use vortex_error::{VortexExpect, VortexResult}; use crate::{expr_project, split_conjunction, ExprRef, VortexExpr}; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct RowFilter { pub(crate) conjunction: Vec, } diff --git a/vortex-expr/src/select.rs b/vortex-expr/src/select.rs index a5fbad050f..d3d3116ef3 100644 --- a/vortex-expr/src/select.rs +++ b/vortex-expr/src/select.rs @@ -10,13 +10,13 @@ use vortex_error::{vortex_err, VortexResult}; use crate::{ExprRef, VortexExpr}; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum SelectField { Include(Vec), Exclude(Vec), } -#[derive(Debug, Clone, Eq)] +#[derive(Debug, Clone, Eq, Hash)] pub struct Select { fields: SelectField, child: ExprRef, diff --git a/vortex-scalar/src/lib.rs b/vortex-scalar/src/lib.rs index 92860b474f..086b3e0f51 100644 --- a/vortex-scalar/src/lib.rs +++ b/vortex-scalar/src/lib.rs @@ -43,7 +43,7 @@ use vortex_error::{vortex_bail, VortexExpect, VortexResult}; /// /// Note: [`PartialEq`] and [`PartialOrd`] are implemented only for an exact match of the scalar's /// dtype, including nullability. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Hash)] #[cfg_attr(feature = "serde", derive(::serde::Serialize, ::serde::Deserialize))] pub struct Scalar { dtype: DType, diff --git a/vortex-scalar/src/pvalue.rs b/vortex-scalar/src/pvalue.rs index 88d2eb97b8..147a678549 100644 --- a/vortex-scalar/src/pvalue.rs +++ b/vortex-scalar/src/pvalue.rs @@ -1,11 +1,12 @@ use core::fmt::Display; use std::cmp::Ordering; +use std::hash::{Hash, Hasher}; use std::mem; use num_traits::NumCast; use paste::paste; use vortex_dtype::half::f16; -use vortex_dtype::{NativePType, PType}; +use vortex_dtype::{NativePType, PType, ToBytes}; use vortex_error::{vortex_err, VortexError, VortexExpect}; #[derive(Debug, Clone, Copy)] @@ -61,6 +62,30 @@ impl PartialOrd for PValue { } } +impl Hash for PValue { + fn hash(&self, state: &mut H) { + self.to_le_bytes().hash(state); + } +} + +impl ToBytes for PValue { + fn to_le_bytes(&self) -> &[u8] { + match self { + PValue::U8(v) => v.to_le_bytes(), + PValue::U16(v) => v.to_le_bytes(), + PValue::U32(v) => v.to_le_bytes(), + PValue::U64(v) => v.to_le_bytes(), + PValue::I8(v) => v.to_le_bytes(), + PValue::I16(v) => v.to_le_bytes(), + PValue::I32(v) => v.to_le_bytes(), + PValue::I64(v) => v.to_le_bytes(), + PValue::F16(v) => v.to_le_bytes(), + PValue::F32(v) => v.to_le_bytes(), + PValue::F64(v) => v.to_le_bytes(), + } + } +} + macro_rules! as_primitive { ($T:ty, $PT:tt) => { paste! { diff --git a/vortex-scalar/src/value.rs b/vortex-scalar/src/value.rs index b6c91ad866..57deabbcc7 100644 --- a/vortex-scalar/src/value.rs +++ b/vortex-scalar/src/value.rs @@ -14,10 +14,10 @@ use crate::pvalue::PValue; /// Note that these values can be deserialized from JSON or other formats. So a PValue may not /// have the correct width for what the DType expects. Primitive values should therefore be /// read using [crate::PrimitiveScalar] which will handle the conversion. -#[derive(Debug, Clone, PartialEq, PartialOrd)] +#[derive(Debug, Clone, PartialEq, PartialOrd, Hash)] pub struct ScalarValue(pub(crate) InnerScalarValue); -#[derive(Debug, Clone, PartialEq, PartialOrd)] +#[derive(Debug, Clone, PartialEq, PartialOrd, Hash)] pub(crate) enum InnerScalarValue { Bool(bool), Primitive(PValue), From b2453a43ca12d0ebf07780dc437baee119353422 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Thu, 9 Jan 2025 14:32:21 +0000 Subject: [PATCH 07/61] use dyn-hash --- Cargo.lock | 19 +++++++++++++------ Cargo.toml | 1 + vortex-expr/Cargo.toml | 1 + vortex-expr/src/lib.rs | 23 +++-------------------- 4 files changed, 18 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3583c5c5e3..15a2800c0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1653,6 +1653,12 @@ dependencies = [ "syn 2.0.95", ] +[[package]] +name = "dyn-hash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a650a461c6a8ff1ef205ed9a2ad56579309853fecefc2423f73dced342f92258" + [[package]] name = "either" version = "1.13.0" @@ -1707,7 +1713,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2494,7 +2500,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3606,7 +3612,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3861,7 +3867,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -4377,7 +4383,7 @@ dependencies = [ "getrandom", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -5052,6 +5058,7 @@ version = "0.21.1" dependencies = [ "datafusion-expr", "datafusion-physical-expr", + "dyn-hash", "itertools 0.14.0", "prost", "serde", @@ -5488,7 +5495,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9360b40be3..f62162a9dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,6 +76,7 @@ datafusion-expr = "44.0.0" datafusion-physical-expr = "44.0.0" datafusion-physical-plan = "44.0.0" divan = "0.1.14" +dyn-hash = "0.2.0" enum-iterator = "2.0.0" fastlanes = "0.1.5" flatbuffers = "24.3.25" diff --git a/vortex-expr/Cargo.toml b/vortex-expr/Cargo.toml index 09e7a295ce..bfee19fbbb 100644 --- a/vortex-expr/Cargo.toml +++ b/vortex-expr/Cargo.toml @@ -24,6 +24,7 @@ workspace = true [dependencies] datafusion-expr = { workspace = true, optional = true } datafusion-physical-expr = { workspace = true, optional = true } +dyn-hash = { workspace = true } itertools = { workspace = true } prost = { workspace = true, optional = true } serde = { workspace = true, optional = true, features = ["derive"] } diff --git a/vortex-expr/src/lib.rs b/vortex-expr/src/lib.rs index 6d49c94a11..eba6deeb20 100644 --- a/vortex-expr/src/lib.rs +++ b/vortex-expr/src/lib.rs @@ -1,8 +1,9 @@ use std::any::Any; use std::fmt::{Debug, Display}; -use std::hash::{Hash, Hasher}; use std::sync::Arc; +use dyn_hash::DynHash; + mod binary; mod column; pub mod datafusion; @@ -109,25 +110,7 @@ impl PartialEq for dyn VortexExpr { impl Eq for dyn VortexExpr {} -/// [`PhysicalExpr`] can't be constrained by [`Hash`] directly because it must remain -/// object safe. To ease implementation blanket implementation is provided for [`Hash`] -/// types. -pub trait DynHash { - fn dyn_hash(&self, _state: &mut dyn Hasher); -} - -impl DynHash for T { - fn dyn_hash(&self, mut state: &mut dyn Hasher) { - self.type_id().hash(&mut state); - self.hash(&mut state) - } -} - -impl Hash for dyn VortexExpr { - fn hash(&self, state: &mut H) { - self.dyn_hash(state); - } -} +dyn_hash::hash_trait_object!(VortexExpr); #[cfg(test)] mod tests { From e8b7d9474feffcdb61b43b2ccea1f2046e25cf19 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Thu, 9 Jan 2025 15:10:10 +0000 Subject: [PATCH 08/61] clippy --- vortex-scalar/src/lib.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/vortex-scalar/src/lib.rs b/vortex-scalar/src/lib.rs index 086b3e0f51..e8297cfaf1 100644 --- a/vortex-scalar/src/lib.rs +++ b/vortex-scalar/src/lib.rs @@ -1,4 +1,5 @@ use std::cmp::Ordering; +use std::hash::Hash; use std::mem::discriminant; use std::sync::Arc; @@ -43,7 +44,7 @@ use vortex_error::{vortex_bail, VortexExpect, VortexResult}; /// /// Note: [`PartialEq`] and [`PartialOrd`] are implemented only for an exact match of the scalar's /// dtype, including nullability. -#[derive(Debug, Clone, Hash)] +#[derive(Debug, Clone)] #[cfg_attr(feature = "serde", derive(::serde::Serialize, ::serde::Deserialize))] pub struct Scalar { dtype: DType, @@ -221,6 +222,13 @@ impl PartialOrd for Scalar { } } +impl Hash for Scalar { + fn hash(&self, state: &mut H) { + discriminant(self.dtype()).hash(state); + self.value.0.hash(state); + } +} + impl AsRef for Scalar { fn as_ref(&self) -> &Self { self From 4314b622305df0bab97ee1d68c338542c55e6fe0 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Thu, 9 Jan 2025 17:54:26 +0000 Subject: [PATCH 09/61] fix --- vortex-expr/src/binary.rs | 5 ++--- vortex-expr/src/column.rs | 3 +-- vortex-expr/src/get_item.rs | 11 +---------- vortex-expr/src/like.rs | 5 ++--- vortex-expr/src/not.rs | 6 +++--- vortex-expr/src/select.rs | 1 + 6 files changed, 10 insertions(+), 21 deletions(-) diff --git a/vortex-expr/src/binary.rs b/vortex-expr/src/binary.rs index 5aef2c3652..213e3c8d53 100644 --- a/vortex-expr/src/binary.rs +++ b/vortex-expr/src/binary.rs @@ -9,7 +9,8 @@ use vortex_error::VortexResult; use crate::{ExprRef, Operator, VortexExpr}; -#[derive(Debug, Clone, Hash)] +#[derive(Debug, Clone, Eq, Hash)] +#[allow(clippy::derived_hash_with_manual_eq)] pub struct BinaryExpr { lhs: ExprRef, operator: Operator, @@ -77,8 +78,6 @@ impl PartialEq for BinaryExpr { } } -impl Eq for BinaryExpr {} - /// Create a new `BinaryExpr` using the `Eq` operator. /// /// ## Example usage diff --git a/vortex-expr/src/column.rs b/vortex-expr/src/column.rs index b5cc969850..7d6d212a60 100644 --- a/vortex-expr/src/column.rs +++ b/vortex-expr/src/column.rs @@ -9,7 +9,7 @@ use vortex_error::{vortex_err, VortexResult}; use crate::{ExprRef, VortexExpr}; -#[derive(Debug, PartialEq, Hash, Clone, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Column { field: Field, } @@ -58,7 +58,6 @@ impl VortexExpr for Column { fn as_any(&self) -> &dyn Any { self } - fn evaluate(&self, batch: &ArrayData) -> VortexResult { batch .as_struct_array() diff --git a/vortex-expr/src/get_item.rs b/vortex-expr/src/get_item.rs index d44fe7ea19..2ee97a03d8 100644 --- a/vortex-expr/src/get_item.rs +++ b/vortex-expr/src/get_item.rs @@ -10,6 +10,7 @@ use vortex_error::{vortex_err, VortexResult}; use crate::{ExprRef, VortexExpr}; #[derive(Debug, Clone, Eq, Hash)] +#[allow(clippy::derived_hash_with_manual_eq)] pub struct GetItem { field: Field, child: ExprRef, @@ -36,15 +37,6 @@ pub fn get_item(field: impl Into, child: ExprRef) -> ExprRef { GetItem::new_expr(field, child) } -impl PartialEq for GetItem { - fn eq(&self, other: &dyn Any) -> bool { - other - .downcast_ref::() - .map(|item| self.field == item.field && self.child.eq(&item.child)) - .unwrap_or(false) - } -} - impl Display for GetItem { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}.{}", self.child, self.field) @@ -55,7 +47,6 @@ impl VortexExpr for GetItem { fn as_any(&self) -> &dyn Any { self } - fn evaluate(&self, batch: &ArrayData) -> VortexResult { let child = self.child.evaluate(batch)?; child diff --git a/vortex-expr/src/like.rs b/vortex-expr/src/like.rs index 43f0102fce..672d47a21e 100644 --- a/vortex-expr/src/like.rs +++ b/vortex-expr/src/like.rs @@ -9,7 +9,8 @@ use vortex_error::VortexResult; use crate::{ExprRef, VortexExpr}; -#[derive(Debug, Hash)] +#[derive(Debug, Eq, Hash)] +#[allow(clippy::derived_hash_with_manual_eq)] pub struct Like { child: ExprRef, pattern: ExprRef, @@ -97,8 +98,6 @@ impl PartialEq for Like { } } -impl Eq for Like {} - #[cfg(test)] mod tests { use vortex_array::array::BoolArray; diff --git a/vortex-expr/src/not.rs b/vortex-expr/src/not.rs index 17a0c0deb4..b6061277dd 100644 --- a/vortex-expr/src/not.rs +++ b/vortex-expr/src/not.rs @@ -9,7 +9,9 @@ use vortex_error::VortexResult; use crate::{ExprRef, VortexExpr}; -#[derive(Debug, Hash)] +#[derive(Debug, Eq, Hash)] +// We cannot auto derive PartialEq because ExprRef, since its a Arc<..> and derive doesn't work +#[allow(clippy::derived_hash_with_manual_eq)] pub struct Not { child: ExprRef, } @@ -57,8 +59,6 @@ impl PartialEq for Not { } } -impl Eq for Not {} - pub fn not(operand: ExprRef) -> ExprRef { Not::new_expr(operand) } diff --git a/vortex-expr/src/select.rs b/vortex-expr/src/select.rs index d3d3116ef3..c93ffd0086 100644 --- a/vortex-expr/src/select.rs +++ b/vortex-expr/src/select.rs @@ -17,6 +17,7 @@ pub enum SelectField { } #[derive(Debug, Clone, Eq, Hash)] +#[allow(clippy::derived_hash_with_manual_eq)] pub struct Select { fields: SelectField, child: ExprRef, From f12207661a76dd305610bb69c84daa5bedf39824 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Thu, 9 Jan 2025 17:05:14 +0000 Subject: [PATCH 10/61] derive hash impl PartialEq --- Cargo.lock | 7 +++++++ Cargo.toml | 1 + vortex-expr/Cargo.toml | 1 + vortex-expr/src/binary.rs | 5 ----- vortex-expr/src/column.rs | 1 + vortex-expr/src/get_item.rs | 1 + vortex-expr/src/identity.rs | 5 ----- vortex-expr/src/lib.rs | 27 ++------------------------- vortex-expr/src/like.rs | 5 ----- vortex-expr/src/literal.rs | 5 ----- vortex-expr/src/not.rs | 5 ----- vortex-expr/src/pack.rs | 4 ---- vortex-expr/src/row_filter.rs | 5 ----- vortex-expr/src/select.rs | 5 ----- 14 files changed, 13 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 15a2800c0c..8f01be821c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1653,6 +1653,12 @@ dependencies = [ "syn 2.0.95", ] +[[package]] +name = "dyn-eq" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c2d035d21af5cde1a6f5c7b444a5bf963520a9f142e5d06931178433d7d5388" + [[package]] name = "dyn-hash" version = "0.2.0" @@ -5058,6 +5064,7 @@ version = "0.21.1" dependencies = [ "datafusion-expr", "datafusion-physical-expr", + "dyn-eq", "dyn-hash", "itertools 0.14.0", "prost", diff --git a/Cargo.toml b/Cargo.toml index f62162a9dc..ed275d8fb8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,6 +76,7 @@ datafusion-expr = "44.0.0" datafusion-physical-expr = "44.0.0" datafusion-physical-plan = "44.0.0" divan = "0.1.14" +dyn-eq = "0.1.3" dyn-hash = "0.2.0" enum-iterator = "2.0.0" fastlanes = "0.1.5" diff --git a/vortex-expr/Cargo.toml b/vortex-expr/Cargo.toml index bfee19fbbb..e0ce162351 100644 --- a/vortex-expr/Cargo.toml +++ b/vortex-expr/Cargo.toml @@ -24,6 +24,7 @@ workspace = true [dependencies] datafusion-expr = { workspace = true, optional = true } datafusion-physical-expr = { workspace = true, optional = true } +dyn-eq = { workspace = true, features = ["alloc"] } dyn-hash = { workspace = true } itertools = { workspace = true } prost = { workspace = true, optional = true } diff --git a/vortex-expr/src/binary.rs b/vortex-expr/src/binary.rs index 213e3c8d53..d2e3f3e7a2 100644 --- a/vortex-expr/src/binary.rs +++ b/vortex-expr/src/binary.rs @@ -1,4 +1,3 @@ -use std::any::Any; use std::fmt::Display; use std::hash::Hash; use std::sync::Arc; @@ -42,10 +41,6 @@ impl Display for BinaryExpr { } impl VortexExpr for BinaryExpr { - fn as_any(&self) -> &dyn Any { - self - } - fn evaluate(&self, batch: &ArrayData) -> VortexResult { let lhs = self.lhs.evaluate(batch)?; let rhs = self.rhs.evaluate(batch)?; diff --git a/vortex-expr/src/column.rs b/vortex-expr/src/column.rs index 7d6d212a60..a629a44393 100644 --- a/vortex-expr/src/column.rs +++ b/vortex-expr/src/column.rs @@ -58,6 +58,7 @@ impl VortexExpr for Column { fn as_any(&self) -> &dyn Any { self } + fn evaluate(&self, batch: &ArrayData) -> VortexResult { batch .as_struct_array() diff --git a/vortex-expr/src/get_item.rs b/vortex-expr/src/get_item.rs index 2ee97a03d8..e4c988cd85 100644 --- a/vortex-expr/src/get_item.rs +++ b/vortex-expr/src/get_item.rs @@ -47,6 +47,7 @@ impl VortexExpr for GetItem { fn as_any(&self) -> &dyn Any { self } + fn evaluate(&self, batch: &ArrayData) -> VortexResult { let child = self.child.evaluate(batch)?; child diff --git a/vortex-expr/src/identity.rs b/vortex-expr/src/identity.rs index 72aa4bc32a..46f7c86826 100644 --- a/vortex-expr/src/identity.rs +++ b/vortex-expr/src/identity.rs @@ -1,4 +1,3 @@ -use std::any::Any; use std::fmt::Display; use std::sync::Arc; @@ -23,10 +22,6 @@ impl Display for Identity { } impl VortexExpr for Identity { - fn as_any(&self) -> &dyn Any { - self - } - fn evaluate(&self, batch: &ArrayData) -> VortexResult { Ok(batch.clone()) } diff --git a/vortex-expr/src/lib.rs b/vortex-expr/src/lib.rs index eba6deeb20..2e61bb155c 100644 --- a/vortex-expr/src/lib.rs +++ b/vortex-expr/src/lib.rs @@ -1,7 +1,7 @@ -use std::any::Any; use std::fmt::{Debug, Display}; use std::sync::Arc; +use dyn_eq::DynEq; use dyn_hash::DynHash; mod binary; @@ -45,9 +45,6 @@ pub type ExprRef = Arc; /// Represents logical operation on [`ArrayData`]s pub trait VortexExpr: Debug + Send + Sync + DynEq + DynHash + Display { - /// Convert expression reference to reference of [`Any`] type - fn as_any(&self) -> &dyn Any; - /// Compute result of expression on given batch producing a new batch fn evaluate(&self, batch: &ArrayData) -> VortexResult; @@ -89,27 +86,7 @@ fn split_inner(expr: &ExprRef, exprs: &mut Vec) { } } -// Adapted from apache/datafusion https://github.com/apache/datafusion/blob/f31ca5b927c040ce03f6a3c8c8dc3d7f4ef5be34/datafusion/physical-expr-common/src/physical_expr.rs#L156 -/// [`VortexExpr`] can't be constrained by [`Eq`] directly because it must remain object -/// safe. To ease implementation blanket implementation is provided for [`Eq`] types. -pub trait DynEq { - fn dyn_eq(&self, other: &dyn Any) -> bool; -} - -impl DynEq for T { - fn dyn_eq(&self, other: &dyn Any) -> bool { - other.downcast_ref::() == Some(self) - } -} - -impl PartialEq for dyn VortexExpr { - fn eq(&self, other: &Self) -> bool { - self.dyn_eq(other.as_any()) - } -} - -impl Eq for dyn VortexExpr {} - +dyn_eq::eq_trait_object!(VortexExpr); dyn_hash::hash_trait_object!(VortexExpr); #[cfg(test)] diff --git a/vortex-expr/src/like.rs b/vortex-expr/src/like.rs index 672d47a21e..bd19285aac 100644 --- a/vortex-expr/src/like.rs +++ b/vortex-expr/src/like.rs @@ -1,4 +1,3 @@ -use std::any::Any; use std::fmt::Display; use std::hash::Hash; use std::sync::Arc; @@ -57,10 +56,6 @@ impl Display for Like { } impl VortexExpr for Like { - fn as_any(&self) -> &dyn Any { - self - } - fn evaluate(&self, batch: &ArrayData) -> VortexResult { let child = self.child().evaluate(batch)?; let pattern = self.pattern().evaluate(batch)?; diff --git a/vortex-expr/src/literal.rs b/vortex-expr/src/literal.rs index c1b16911a8..a5b516dd28 100644 --- a/vortex-expr/src/literal.rs +++ b/vortex-expr/src/literal.rs @@ -1,4 +1,3 @@ -use std::any::Any; use std::fmt::Display; use std::sync::Arc; @@ -33,10 +32,6 @@ impl Display for Literal { } impl VortexExpr for Literal { - fn as_any(&self) -> &dyn Any { - self - } - fn evaluate(&self, batch: &ArrayData) -> VortexResult { Ok(ConstantArray::new(self.value.clone(), batch.len()).into_array()) } diff --git a/vortex-expr/src/not.rs b/vortex-expr/src/not.rs index b6061277dd..78257f9341 100644 --- a/vortex-expr/src/not.rs +++ b/vortex-expr/src/not.rs @@ -1,4 +1,3 @@ -use std::any::Any; use std::fmt::Display; use std::hash::Hash; use std::sync::Arc; @@ -34,10 +33,6 @@ impl Display for Not { } impl VortexExpr for Not { - fn as_any(&self) -> &dyn Any { - self - } - fn evaluate(&self, batch: &ArrayData) -> VortexResult { let child_result = self.child.evaluate(batch)?; invert(&child_result) diff --git a/vortex-expr/src/pack.rs b/vortex-expr/src/pack.rs index 6160e761ce..b6445098c6 100644 --- a/vortex-expr/src/pack.rs +++ b/vortex-expr/src/pack.rs @@ -77,10 +77,6 @@ impl Display for Pack { } impl VortexExpr for Pack { - fn as_any(&self) -> &dyn Any { - self - } - fn evaluate(&self, batch: &ArrayData) -> VortexResult { let len = batch.len(); let value_arrays = self diff --git a/vortex-expr/src/row_filter.rs b/vortex-expr/src/row_filter.rs index 3bfbe4dece..2727ad508b 100644 --- a/vortex-expr/src/row_filter.rs +++ b/vortex-expr/src/row_filter.rs @@ -1,4 +1,3 @@ -use std::any::Any; use std::fmt::{Debug, Display}; use std::sync::Arc; @@ -60,10 +59,6 @@ impl Display for RowFilter { } impl VortexExpr for RowFilter { - fn as_any(&self) -> &dyn Any { - self - } - fn evaluate(&self, batch: &ArrayData) -> VortexResult { let mut filter_iter = self.conjunction.iter(); let mut mask = filter_iter diff --git a/vortex-expr/src/select.rs b/vortex-expr/src/select.rs index c93ffd0086..9055e66e0a 100644 --- a/vortex-expr/src/select.rs +++ b/vortex-expr/src/select.rs @@ -1,4 +1,3 @@ -use std::any::Any; use std::fmt::Display; use std::sync::Arc; @@ -78,10 +77,6 @@ impl Display for Select { } impl VortexExpr for Select { - fn as_any(&self) -> &dyn Any { - self - } - fn evaluate(&self, batch: &ArrayData) -> VortexResult { let batch = self.child.evaluate(batch)?; let st = batch From f91f062840e2a5c8f8b4101db7a387ffe5523722 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Thu, 9 Jan 2025 13:35:42 +0000 Subject: [PATCH 11/61] wip --- vortex-expr/src/forms/nnf.rs | 4 +- vortex-expr/src/lib.rs | 1 + vortex-expr/src/transform/mod.rs | 2 + vortex-expr/src/transform/project_expr.rs | 189 ++++++++++++++++++++++ vortex-expr/src/traversal/mod.rs | 81 +++++++++- 5 files changed, 272 insertions(+), 5 deletions(-) create mode 100644 vortex-expr/src/transform/mod.rs create mode 100644 vortex-expr/src/transform/project_expr.rs diff --git a/vortex-expr/src/forms/nnf.rs b/vortex-expr/src/forms/nnf.rs index 7ebe29f58e..4ecc314db7 100644 --- a/vortex-expr/src/forms/nnf.rs +++ b/vortex-expr/src/forms/nnf.rs @@ -1,6 +1,6 @@ use vortex_error::VortexResult; -use crate::traversal::{FoldChildren, FoldDown, FoldUp, Folder, Node as _}; +use crate::traversal::{FoldChildren, FoldDown, FoldUp, FolderMut, Node as _}; use crate::{not, BinaryExpr, ExprRef, Not, Operator}; /// Return an equivalent expression in Negative Normal Form (NNF). @@ -63,7 +63,7 @@ pub fn nnf(expr: ExprRef) -> VortexResult { #[derive(Default)] struct NNFVisitor {} -impl Folder for NNFVisitor { +impl FolderMut for NNFVisitor { type NodeTy = ExprRef; type Out = ExprRef; type Context = bool; diff --git a/vortex-expr/src/lib.rs b/vortex-expr/src/lib.rs index 2e61bb155c..66123c35e2 100644 --- a/vortex-expr/src/lib.rs +++ b/vortex-expr/src/lib.rs @@ -19,6 +19,7 @@ mod project; pub mod pruning; mod row_filter; mod select; +mod transform; #[allow(dead_code)] mod traversal; diff --git a/vortex-expr/src/transform/mod.rs b/vortex-expr/src/transform/mod.rs new file mode 100644 index 0000000000..d5e61a861b --- /dev/null +++ b/vortex-expr/src/transform/mod.rs @@ -0,0 +1,2 @@ +#[allow(dead_code)] +mod project_expr; diff --git a/vortex-expr/src/transform/project_expr.rs b/vortex-expr/src/transform/project_expr.rs new file mode 100644 index 0000000000..d368880482 --- /dev/null +++ b/vortex-expr/src/transform/project_expr.rs @@ -0,0 +1,189 @@ +use std::iter; + +use vortex_array::aliases::hash_map::HashMap; +use vortex_array::aliases::hash_set::HashSet; +use vortex_dtype::{Field, FieldName, FieldNames}; +use vortex_error::VortexResult; + +use crate::traversal::{FoldChildren, FoldDown, FoldUp, Folder}; +use crate::{ExprRef, GetItem, Identity}; + +/// Given an expression, an identity-type and a list of n fields return n optional expressions +/// ones containing only references to the corresponding field and an expression defined in terms of +/// the n expression which combines them back into a single expression. + +fn split_expression( + _expr: ExprRef, + _fields: &FieldNames, +) -> (ExprRef, HashMap) { + // st { + // a: + // b: { + // c: + // d: + // } + // } + + // f(id.a) /\ g(id) ==> f({a: id.a, b: id.b}.a) /\ g({a: id.a, b: id.b}) + + // e_1.1 /\ g({a: e_2.1, b: e_1.2}) where, e_1 = {1: f(id), 2: id} in a and e_2 = {1: id} in b + + // x > 5 and x < 10 and y > 5 + // let e1 = x > 5 /\ x < 10 in let e2 = y > 5 in e1 /\ e2 + + // x.a > 5 and y > 5 and x.b < 10 + // let e1 = pack(e11: x.a > 5, e12: x.b < 10) in let e2 = pack(e22: y > 5) in e1.e11 /\ e2.e22 /\ e1.e12 + todo!() +} + +struct ExprSplitter { + field: Field, + sub_expressions: Vec, +} + +impl ExprSplitter { + fn new(field: Field) -> Self { + Self { + field, + sub_expressions: vec![], + } + } +} + +// Hashmap from expr to [get_item(field, Identity)] + +// impl Folder for ExprSplitter { +// type NodeTy = ExprRef; +// type Out = ExprRef; +// type Context = Option; +// +// fn visit_down( +// &mut self, +// node: &Self::NodeTy, +// context: Self::Context, +// ) -> VortexResult> { +// node.references().contains(&self.field) +// } +// +// fn visit_up( +// &mut self, +// node: Self::NodeTy, +// context: Self::Context, +// children: FoldChildren, +// ) -> VortexResult> { +// todo!() +// } +// } + +#[derive(Clone, Debug)] +enum FieldAccesses { + Fields(HashSet), + AllFields, +} + +impl FieldAccesses { + fn union(self, other: &Self) -> Self { + match (self, other) { + (FieldAccesses::AllFields, _) => FieldAccesses::AllFields, + (_, FieldAccesses::AllFields) => FieldAccesses::AllFields, + (FieldAccesses::Fields(fields1), FieldAccesses::Fields(fields2)) => { + FieldAccesses::Fields(fields1.union(fields2).cloned().collect()) + } + } + } +} + +// For all subexpressions in an expression find the fields access directly on identity +struct ExprTopLevelRef<'a> { + sub_expressions: HashMap<&'a ExprRef, FieldNames>, +} + +impl<'a> ExprTopLevelRef<'a> { + fn new() -> Self { + Self { + sub_expressions: HashMap::new(), + } + } +} + +impl<'a> Folder<'a> for ExprTopLevelRef<'a> { + type NodeTy = ExprRef; + type Out = FieldAccesses; + type Context = Option; + + fn visit_down( + &mut self, + node: &'a ExprRef, + context: Option, + ) -> VortexResult> { + if let Some(item) = node.as_any().downcast_ref::() { + return Ok(FoldDown::Continue(Some(item.field().clone()))); + }; + + Ok(FoldDown::Continue(context)) + } + + fn visit_up( + &mut self, + node: &'a ExprRef, + context: Option, + children: FoldChildren, + ) -> VortexResult> { + let field_access = if node.as_any().downcast_ref::().is_some() { + assert!(children.is_empty()); + match context { + Some(field) => FieldAccesses::Fields(HashSet::from_iter(iter::once(field))), + None => FieldAccesses::AllFields, + } + } else { + children + .into_iter() + .fold(FieldAccesses::Fields(HashSet::new()), |acc, x| { + acc.union(&x) + }) + }; + + // self.sub_expressions.insert(&node, field_access.clone()); + + Ok(FoldUp::Continue(field_access)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::traversal::Node; + use crate::{get_item, ident}; + + #[test] + fn test_expr_top_level_ref() { + let expr = ident(); + let mut expr_top_level_ref = ExprTopLevelRef::new(); + let res = expr + .accept_with_context(&mut expr_top_level_ref, None) + .unwrap(); + + println!("{:?}", res); + + // let mut expected = HashMap::new(); + // expected.insert(&expr, FieldAccesses::AllFields); + + // assert_eq!(&expr_top_level_ref.sub_expressions, &expected); + } + + #[test] + fn test_expr_top_level_ref_get_item() { + let expr = get_item("a", ident()); + let mut expr_top_level_ref = ExprTopLevelRef::new(); + let res = expr + .accept_with_context(&mut expr_top_level_ref, None) + .unwrap(); + + println!("{:?}", res); + + // let mut expected = HashMap::new(); + // expected.insert(&expr, FieldAccesses::AllFields); + + // assert_eq!(&expr_top_level_ref.sub_expressions, &expected); + } +} diff --git a/vortex-expr/src/traversal/mod.rs b/vortex-expr/src/traversal/mod.rs index f13ee65540..0c6c3de2a2 100644 --- a/vortex-expr/src/traversal/mod.rs +++ b/vortex-expr/src/traversal/mod.rs @@ -87,6 +87,31 @@ pub enum FoldChildren { Children(Vec), } +impl IntoIterator for FoldChildren { + type Item = Out; + type IntoIter = as IntoIterator>::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + match self { + FoldChildren::Skipped => { + vec![] + } + FoldChildren::Children(children) => children, + } + .into_iter() + } +} + +impl FoldChildren { + pub fn is_empty(&self) -> bool { + match self { + FoldChildren::Skipped => true, + FoldChildren::Children(children) => children.is_empty(), + } + } +} + +#[derive(Debug)] pub enum FoldUp { Stop(Out), Continue(Out), @@ -101,7 +126,28 @@ impl FoldUp { } } -pub trait Folder { +pub trait Folder<'a> { + type NodeTy: Node; + type Out; + type Context: Clone; + + fn visit_down( + &mut self, + _node: &'a Self::NodeTy, + context: Self::Context, + ) -> VortexResult> { + Ok(FoldDown::Continue(context)) + } + + fn visit_up( + &mut self, + node: &'a Self::NodeTy, + context: Self::Context, + children: FoldChildren, + ) -> VortexResult>; +} + +pub trait FolderMut { type NodeTy: Node; type Out; type Context: Clone; @@ -128,12 +174,18 @@ pub trait Node: Sized { _visitor: &mut V, ) -> VortexResult; + fn accept_with_context<'a, V: Folder<'a, NodeTy = Self>>( + &'a self, + visitor: &mut V, + context: V::Context, + ) -> VortexResult>; + fn transform>( self, _visitor: &mut V, ) -> VortexResult>; - fn transform_with_context>( + fn transform_with_context>( self, _visitor: &mut V, _context: V::Context, @@ -165,6 +217,29 @@ impl Node for ExprRef { visitor.visit_up(self) } + fn accept_with_context<'a, V: Folder<'a, NodeTy = Self>>( + &'a self, + visitor: &mut V, + context: V::Context, + ) -> VortexResult> { + let children = match visitor.visit_down(self, context.clone())? { + FoldDown::Stop(out) => return Ok(FoldUp::Stop(out)), + FoldDown::SkipChildren => FoldChildren::Skipped, + FoldDown::Continue(child_context) => { + let mut new_children = Vec::with_capacity(self.children().len()); + for child in self.children() { + match child.accept_with_context(visitor, child_context.clone())? { + FoldUp::Stop(out) => return Ok(FoldUp::Stop(out)), + FoldUp::Continue(out) => new_children.push(out), + } + } + FoldChildren::Children(new_children) + } + }; + + visitor.visit_up(self, context, children) + } + // A pre-order transform, with an option to ignore sub-tress (using visit_down). fn transform>( self, @@ -217,7 +292,7 @@ impl Node for ExprRef { } } - fn transform_with_context>( + fn transform_with_context>( self, visitor: &mut V, context: V::Context, From ca5253ee2c2de0316f17d766101264678361e71f Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Thu, 9 Jan 2025 15:07:06 +0000 Subject: [PATCH 12/61] wip --- vortex-expr/src/transform/project_expr.rs | 131 +++++++++++----------- vortex-expr/src/traversal/mod.rs | 9 ++ 2 files changed, 77 insertions(+), 63 deletions(-) diff --git a/vortex-expr/src/transform/project_expr.rs b/vortex-expr/src/transform/project_expr.rs index d368880482..37b6163d89 100644 --- a/vortex-expr/src/transform/project_expr.rs +++ b/vortex-expr/src/transform/project_expr.rs @@ -1,11 +1,9 @@ -use std::iter; - use vortex_array::aliases::hash_map::HashMap; use vortex_array::aliases::hash_set::HashSet; use vortex_dtype::{Field, FieldName, FieldNames}; use vortex_error::VortexResult; -use crate::traversal::{FoldChildren, FoldDown, FoldUp, Folder}; +use crate::traversal::{FoldChildren, FoldUp, Folder}; use crate::{ExprRef, GetItem, Identity}; /// Given an expression, an identity-type and a list of n fields return n optional expressions @@ -75,75 +73,83 @@ impl ExprSplitter { // } // } -#[derive(Clone, Debug)] -enum FieldAccesses { - Fields(HashSet), - AllFields, -} - -impl FieldAccesses { - fn union(self, other: &Self) -> Self { - match (self, other) { - (FieldAccesses::AllFields, _) => FieldAccesses::AllFields, - (_, FieldAccesses::AllFields) => FieldAccesses::AllFields, - (FieldAccesses::Fields(fields1), FieldAccesses::Fields(fields2)) => { - FieldAccesses::Fields(fields1.union(fields2).cloned().collect()) - } - } - } -} +// #[derive(Clone, Debug, PartialEq, Eq)] +// enum FieldAccesses { +// Fields(HashSet), +// AllFields, +// } +// +// impl FieldAccesses { +// fn union(self, other: &Self) -> Self { +// match (self, other) { +// (FieldAccesses::AllFields, _) => FieldAccesses::AllFields, +// (_, FieldAccesses::AllFields) => FieldAccesses::AllFields, +// (FieldAccesses::Fields(fields1), FieldAccesses::Fields(fields2)) => { +// FieldAccesses::Fields(fields1.union(fields2).cloned().collect()) +// } +// } +// } +// } // For all subexpressions in an expression find the fields access directly on identity struct ExprTopLevelRef<'a> { - sub_expressions: HashMap<&'a ExprRef, FieldNames>, + sub_expressions: HashMap<&'a ExprRef, HashSet>, + identity: FieldNames, } impl<'a> ExprTopLevelRef<'a> { - fn new() -> Self { + fn new(fields: FieldNames) -> Self { Self { sub_expressions: HashMap::new(), + identity: fields, } } } impl<'a> Folder<'a> for ExprTopLevelRef<'a> { type NodeTy = ExprRef; - type Out = FieldAccesses; - type Context = Option; + type Out = HashSet; + type Context = (); - fn visit_down( + fn visit_up( &mut self, node: &'a ExprRef, - context: Option, - ) -> VortexResult> { - if let Some(item) = node.as_any().downcast_ref::() { - return Ok(FoldDown::Continue(Some(item.field().clone()))); - }; + _context: (), + children: FoldChildren>, + ) -> VortexResult>> { + if node.as_any().downcast_ref::().is_some() { + debug_assert!(children.is_empty()); + let field_names = HashSet::from_iter(self.identity.iter().cloned()); + return Ok(FoldUp::Continue(field_names)); + } - Ok(FoldDown::Continue(context)) - } + if let Some(item) = node.as_any().downcast_ref::() { + let field = item.field(); + let field_name = match field { + Field::Name(n) => n.clone(), + Field::Index(_) => todo!(), + }; + // let [child] = children.unwrap().as_slice(); + // let access = child.get(&field_name).cloned(); + let field_access = HashSet::from_iter(vec![field.clone()]); + self.sub_expressions.insert(&node, field_access.clone()); + return Ok(FoldUp::Continue(HashSet::from_iter(vec![field_name]))); + } - fn visit_up( - &mut self, - node: &'a ExprRef, - context: Option, - children: FoldChildren, - ) -> VortexResult> { - let field_access = if node.as_any().downcast_ref::().is_some() { - assert!(children.is_empty()); - match context { - Some(field) => FieldAccesses::Fields(HashSet::from_iter(iter::once(field))), - None => FieldAccesses::AllFields, - } - } else { - children - .into_iter() - .fold(FieldAccesses::Fields(HashSet::new()), |acc, x| { - acc.union(&x) - }) - }; - - // self.sub_expressions.insert(&node, field_access.clone()); + // else { + // }; + + let field_access = children.into_iter().fold(HashSet::new(), |acc, fields| { + acc.union(&fields).cloned().collect() + }); + self.sub_expressions.insert( + &node, + field_access + .clone() + .iter() + .map(|f| Field::Name(f.clone())) + .collect(), + ); Ok(FoldUp::Continue(field_access)) } @@ -158,28 +164,27 @@ mod tests { #[test] fn test_expr_top_level_ref() { let expr = ident(); - let mut expr_top_level_ref = ExprTopLevelRef::new(); + let mut expr_top_level_ref = + ExprTopLevelRef::new(FieldNames::from_iter(vec!["a".into(), "b".into()])); let res = expr - .accept_with_context(&mut expr_top_level_ref, None) + .accept_with_context(&mut expr_top_level_ref, ()) .unwrap(); println!("{:?}", res); - - // let mut expected = HashMap::new(); - // expected.insert(&expr, FieldAccesses::AllFields); - - // assert_eq!(&expr_top_level_ref.sub_expressions, &expected); + println!("{:?}", expr_top_level_ref.sub_expressions); } #[test] fn test_expr_top_level_ref_get_item() { - let expr = get_item("a", ident()); - let mut expr_top_level_ref = ExprTopLevelRef::new(); + let expr = get_item("b", get_item("a", ident())); + let mut expr_top_level_ref = + ExprTopLevelRef::new(FieldNames::from_iter(vec!["a".into(), "b".into()])); let res = expr - .accept_with_context(&mut expr_top_level_ref, None) + .accept_with_context(&mut expr_top_level_ref, ()) .unwrap(); println!("{:?}", res); + println!("{:?}", expr_top_level_ref.sub_expressions); // let mut expected = HashMap::new(); // expected.insert(&expr, FieldAccesses::AllFields); diff --git a/vortex-expr/src/traversal/mod.rs b/vortex-expr/src/traversal/mod.rs index 0c6c3de2a2..194c0887f3 100644 --- a/vortex-expr/src/traversal/mod.rs +++ b/vortex-expr/src/traversal/mod.rs @@ -102,6 +102,15 @@ impl IntoIterator for FoldChildren { } } +impl FoldChildren { + pub fn unwrap(self) -> Vec { + match self { + FoldChildren::Skipped => vec![], + FoldChildren::Children(children) => children, + } + } +} + impl FoldChildren { pub fn is_empty(&self) -> bool { match self { From aff1c14ab85e3d9d843f0e78edb9ac2aee4915db Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Thu, 9 Jan 2025 15:28:32 +0000 Subject: [PATCH 13/61] wip --- vortex-expr/src/transform/project_expr.rs | 71 ++++++++++++++--------- vortex-expr/src/traversal/mod.rs | 2 +- 2 files changed, 44 insertions(+), 29 deletions(-) diff --git a/vortex-expr/src/transform/project_expr.rs b/vortex-expr/src/transform/project_expr.rs index 37b6163d89..c6d35b9914 100644 --- a/vortex-expr/src/transform/project_expr.rs +++ b/vortex-expr/src/transform/project_expr.rs @@ -1,7 +1,7 @@ use vortex_array::aliases::hash_map::HashMap; use vortex_array::aliases::hash_set::HashSet; use vortex_dtype::{Field, FieldName, FieldNames}; -use vortex_error::VortexResult; +use vortex_error::{vortex_bail, VortexResult}; use crate::traversal::{FoldChildren, FoldUp, Folder}; use crate::{ExprRef, GetItem, Identity}; @@ -73,23 +73,27 @@ impl ExprSplitter { // } // } -// #[derive(Clone, Debug, PartialEq, Eq)] -// enum FieldAccesses { -// Fields(HashSet), -// AllFields, -// } -// -// impl FieldAccesses { -// fn union(self, other: &Self) -> Self { -// match (self, other) { -// (FieldAccesses::AllFields, _) => FieldAccesses::AllFields, -// (_, FieldAccesses::AllFields) => FieldAccesses::AllFields, -// (FieldAccesses::Fields(fields1), FieldAccesses::Fields(fields2)) => { -// FieldAccesses::Fields(fields1.union(fields2).cloned().collect()) -// } -// } -// } -// } +#[derive(Clone, Debug, PartialEq, Eq)] +enum AccessibleFields { + Fields(HashSet), + UntrackedFields, +} + +impl AccessibleFields { + fn important_fields(&self) -> HashSet { + match self { + AccessibleFields::Fields(fields) => fields.clone(), + AccessibleFields::UntrackedFields => HashSet::new(), + } + } + + fn get_field(&self, field: &FieldName) -> Option { + match self { + AccessibleFields::Fields(fields) => fields.get(field).cloned(), + AccessibleFields::UntrackedFields => None, + } + } +} // For all subexpressions in an expression find the fields access directly on identity struct ExprTopLevelRef<'a> { @@ -108,18 +112,19 @@ impl<'a> ExprTopLevelRef<'a> { impl<'a> Folder<'a> for ExprTopLevelRef<'a> { type NodeTy = ExprRef; - type Out = HashSet; + type Out = AccessibleFields; type Context = (); fn visit_up( &mut self, node: &'a ExprRef, _context: (), - children: FoldChildren>, - ) -> VortexResult>> { + children: FoldChildren, + ) -> VortexResult> { if node.as_any().downcast_ref::().is_some() { debug_assert!(children.is_empty()); - let field_names = HashSet::from_iter(self.identity.iter().cloned()); + let field_names = + AccessibleFields::Fields(HashSet::from_iter(self.identity.iter().cloned())); return Ok(FoldUp::Continue(field_names)); } @@ -129,16 +134,26 @@ impl<'a> Folder<'a> for ExprTopLevelRef<'a> { Field::Name(n) => n.clone(), Field::Index(_) => todo!(), }; - // let [child] = children.unwrap().as_slice(); - // let access = child.get(&field_name).cloned(); - let field_access = HashSet::from_iter(vec![field.clone()]); - self.sub_expressions.insert(&node, field_access.clone()); - return Ok(FoldUp::Continue(HashSet::from_iter(vec![field_name]))); + let [child] = children.contained_children().as_slice() else { + vortex_bail!("GetItem must have exactly one child") + }; + if let Some(access) = child.get_field(&field_name) { + let field_access = HashSet::from_iter(vec![Field::Name(access)]); + self.sub_expressions.insert(&node, field_access.clone()); + } + return Ok(FoldUp::Continue(AccessibleFields::UntrackedFields)); } // else { // }; + // |id| = {a: U, ..., z: U} + // get_item(f, id) => |id|(f) + // pack({a: e1, .., z: en}) => {a: |e1|, ..., z: |en|} + // select({a, b}, id) => {a: |id|.a, b: |id|.b} + + // pack(a: id, b: id) + let field_access = children.into_iter().fold(HashSet::new(), |acc, fields| { acc.union(&fields).cloned().collect() }); @@ -151,7 +166,7 @@ impl<'a> Folder<'a> for ExprTopLevelRef<'a> { .collect(), ); - Ok(FoldUp::Continue(field_access)) + Ok(FoldUp::Continue()) } } diff --git a/vortex-expr/src/traversal/mod.rs b/vortex-expr/src/traversal/mod.rs index 194c0887f3..b0f7984465 100644 --- a/vortex-expr/src/traversal/mod.rs +++ b/vortex-expr/src/traversal/mod.rs @@ -103,7 +103,7 @@ impl IntoIterator for FoldChildren { } impl FoldChildren { - pub fn unwrap(self) -> Vec { + pub fn contained_children(self) -> Vec { match self { FoldChildren::Skipped => vec![], FoldChildren::Children(children) => children, From e1211bcb6756affc545e0d871d2adfe4455a30c1 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Thu, 9 Jan 2025 17:40:39 +0000 Subject: [PATCH 14/61] wip --- vortex-expr/src/transform/project_expr.rs | 62 +++++++++++++---------- 1 file changed, 36 insertions(+), 26 deletions(-) diff --git a/vortex-expr/src/transform/project_expr.rs b/vortex-expr/src/transform/project_expr.rs index c6d35b9914..f1df9c384f 100644 --- a/vortex-expr/src/transform/project_expr.rs +++ b/vortex-expr/src/transform/project_expr.rs @@ -1,10 +1,10 @@ use vortex_array::aliases::hash_map::HashMap; use vortex_array::aliases::hash_set::HashSet; -use vortex_dtype::{Field, FieldName, FieldNames}; -use vortex_error::{vortex_bail, VortexResult}; +use vortex_dtype::{DType, Field, FieldName, FieldNames}; +use vortex_error::VortexResult; use crate::traversal::{FoldChildren, FoldUp, Folder}; -use crate::{ExprRef, GetItem, Identity}; +use crate::{ExprRef, Identity}; /// Given an expression, an identity-type and a list of n fields return n optional expressions /// ones containing only references to the corresponding field and an expression defined in terms of @@ -98,29 +98,32 @@ impl AccessibleFields { // For all subexpressions in an expression find the fields access directly on identity struct ExprTopLevelRef<'a> { sub_expressions: HashMap<&'a ExprRef, HashSet>, - identity: FieldNames, + identity: &'a DType, + tracked_dt: Vec, } impl<'a> ExprTopLevelRef<'a> { - fn new(fields: FieldNames) -> Self { + fn new(fields: &'a DType) -> Self { + let tracked_dt = vec![fields.clone()]; Self { sub_expressions: HashMap::new(), - identity: fields, + identity: tracked_dt.first().unwrap(), + tracked_dt, } } } impl<'a> Folder<'a> for ExprTopLevelRef<'a> { type NodeTy = ExprRef; - type Out = AccessibleFields; + type Out = (); type Context = (); fn visit_up( &mut self, node: &'a ExprRef, _context: (), - children: FoldChildren, - ) -> VortexResult> { + children: FoldChildren<()>, + ) -> VortexResult> { if node.as_any().downcast_ref::().is_some() { debug_assert!(children.is_empty()); let field_names = @@ -128,29 +131,30 @@ impl<'a> Folder<'a> for ExprTopLevelRef<'a> { return Ok(FoldUp::Continue(field_names)); } - if let Some(item) = node.as_any().downcast_ref::() { - let field = item.field(); - let field_name = match field { - Field::Name(n) => n.clone(), - Field::Index(_) => todo!(), - }; - let [child] = children.contained_children().as_slice() else { - vortex_bail!("GetItem must have exactly one child") - }; - if let Some(access) = child.get_field(&field_name) { - let field_access = HashSet::from_iter(vec![Field::Name(access)]); - self.sub_expressions.insert(&node, field_access.clone()); - } - return Ok(FoldUp::Continue(AccessibleFields::UntrackedFields)); - } + // if let Some(item) = node.as_any().downcast_ref::() { + // let field = item.field(); + // let field_name = match field { + // Field::Name(n) => n.clone(), + // Field::Index(_) => todo!(), + // }; + // let [child] = children.contained_children().as_slice() else { + // vortex_bail!("GetItem must have exactly one child") + // }; + // if let Some(access) = child.get_field(&field_name) { + // let field_access = HashSet::from_iter(vec![Field::Name(access)]); + // self.sub_expressions.insert(&node, field_access.clone()); + // } + // return Ok(FoldUp::Continue(AccessibleFields::UntrackedFields)); + // } // else { // }; - // |id| = {a: U, ..., z: U} - // get_item(f, id) => |id|(f) + // |id| = {a: t1, ..., z: tn} + // get_item(f, id) => |id|(f), read(e) = f if dt(id) in tracked and tracked := |id|(f) // pack({a: e1, .., z: en}) => {a: |e1|, ..., z: |en|} // select({a, b}, id) => {a: |id|.a, b: |id|.b} + // f(c1, ..., cn) = read(|c1|) if dt(c1) in tracked and tracked := |f(c1, ..., cn)| // pack(a: id, b: id) @@ -172,12 +176,18 @@ impl<'a> Folder<'a> for ExprTopLevelRef<'a> { #[cfg(test)] mod tests { + use vortex_dtype::Nullability::NonNullable; + use super::*; use crate::traversal::Node; use crate::{get_item, ident}; #[test] fn test_expr_top_level_ref() { + let dtyp = DType::Struct( + vec![Field::Name("a".into()), Field::Name("b".into())].into(), + NonNullable, + ); let expr = ident(); let mut expr_top_level_ref = ExprTopLevelRef::new(FieldNames::from_iter(vec!["a".into(), "b".into()])); From e917d7318898a6c1acb979074bd96c558a797f1e Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Thu, 9 Jan 2025 18:11:09 +0000 Subject: [PATCH 15/61] wip --- vortex-expr/src/binary.rs | 5 +++++ vortex-expr/src/lib.rs | 11 +++++++++-- vortex-expr/src/transform/project_expr.rs | 23 +++++++++++++---------- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/vortex-expr/src/binary.rs b/vortex-expr/src/binary.rs index d2e3f3e7a2..213e3c8d53 100644 --- a/vortex-expr/src/binary.rs +++ b/vortex-expr/src/binary.rs @@ -1,3 +1,4 @@ +use std::any::Any; use std::fmt::Display; use std::hash::Hash; use std::sync::Arc; @@ -41,6 +42,10 @@ impl Display for BinaryExpr { } impl VortexExpr for BinaryExpr { + fn as_any(&self) -> &dyn Any { + self + } + fn evaluate(&self, batch: &ArrayData) -> VortexResult { let lhs = self.lhs.evaluate(batch)?; let rhs = self.rhs.evaluate(batch)?; diff --git a/vortex-expr/src/lib.rs b/vortex-expr/src/lib.rs index 66123c35e2..3586824076 100644 --- a/vortex-expr/src/lib.rs +++ b/vortex-expr/src/lib.rs @@ -1,3 +1,4 @@ +use std::any::Any; use std::fmt::{Debug, Display}; use std::sync::Arc; @@ -36,8 +37,8 @@ pub use project::*; pub use row_filter::*; pub use select::*; use vortex_array::aliases::hash_set::HashSet; -use vortex_array::ArrayData; -use vortex_dtype::Field; +use vortex_array::{ArrayDType, ArrayData, Canonical, IntoArrayData}; +use vortex_dtype::{DType, Field}; use vortex_error::{VortexResult, VortexUnwrap}; use crate::traversal::{Node, ReferenceCollector}; @@ -46,12 +47,18 @@ pub type ExprRef = Arc; /// Represents logical operation on [`ArrayData`]s pub trait VortexExpr: Debug + Send + Sync + DynEq + DynHash + Display { + fn as_any(&self) -> &dyn Any; /// Compute result of expression on given batch producing a new batch fn evaluate(&self, batch: &ArrayData) -> VortexResult; fn children(&self) -> Vec<&ExprRef>; fn replacing_children(self: Arc, children: Vec) -> ExprRef; + + fn dtype(&self, input_dtype: &DType) -> VortexResult { + let empty = Canonical::empty(input_dtype)?.into_array(); + self.evaluate(&empty).map(|array| array.dtype().clone()) + } } pub trait VortexExprExt { diff --git a/vortex-expr/src/transform/project_expr.rs b/vortex-expr/src/transform/project_expr.rs index f1df9c384f..816697ca93 100644 --- a/vortex-expr/src/transform/project_expr.rs +++ b/vortex-expr/src/transform/project_expr.rs @@ -1,10 +1,11 @@ +use dyn_eq::DynEq; use vortex_array::aliases::hash_map::HashMap; use vortex_array::aliases::hash_set::HashSet; use vortex_dtype::{DType, Field, FieldName, FieldNames}; use vortex_error::VortexResult; use crate::traversal::{FoldChildren, FoldUp, Folder}; -use crate::{ExprRef, Identity}; +use crate::{ExprRef, GetItem}; /// Given an expression, an identity-type and a list of n fields return n optional expressions /// ones containing only references to the corresponding field and an expression defined in terms of @@ -98,16 +99,16 @@ impl AccessibleFields { // For all subexpressions in an expression find the fields access directly on identity struct ExprTopLevelRef<'a> { sub_expressions: HashMap<&'a ExprRef, HashSet>, - identity: &'a DType, + ident_dt: DType, tracked_dt: Vec, } impl<'a> ExprTopLevelRef<'a> { - fn new(fields: &'a DType) -> Self { - let tracked_dt = vec![fields.clone()]; + fn new(ident_dt: DType) -> Self { + let tracked_dt = vec![ident_dt.clone()]; Self { sub_expressions: HashMap::new(), - identity: tracked_dt.first().unwrap(), + ident_dt, tracked_dt, } } @@ -124,11 +125,13 @@ impl<'a> Folder<'a> for ExprTopLevelRef<'a> { _context: (), children: FoldChildren<()>, ) -> VortexResult> { - if node.as_any().downcast_ref::().is_some() { - debug_assert!(children.is_empty()); - let field_names = - AccessibleFields::Fields(HashSet::from_iter(self.identity.iter().cloned())); - return Ok(FoldUp::Continue(field_names)); + let dtype = node.dtype(&self.ident_dt)?; + + if let Some(get_item) = node.as_any().downcast_ref::() { + if self.tracked_dt.contains(&dtype) { + self.sub_expressions + .insert(&node, HashSet::from_iter(vec![get_item.field().clone()])); + } } // if let Some(item) = node.as_any().downcast_ref::() { From 0e54bb6a71cff4f0591cc5bc4b3da2cacddb4dae Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Fri, 10 Jan 2025 13:22:25 +0000 Subject: [PATCH 16/61] rescope exprs --- vortex-expr/src/binary.rs | 7 +- vortex-expr/src/forms/nnf.rs | 1 + vortex-expr/src/identity.rs | 5 + vortex-expr/src/like.rs | 5 + vortex-expr/src/literal.rs | 5 + vortex-expr/src/not.rs | 5 + vortex-expr/src/operators.rs | 5 +- vortex-expr/src/pack.rs | 35 +- vortex-expr/src/row_filter.rs | 5 + vortex-expr/src/select.rs | 13 + vortex-expr/src/transform/expr_simplify.rs | 37 ++ vortex-expr/src/transform/mod.rs | 1 + vortex-expr/src/transform/project_expr.rs | 494 ++++++++++++++++----- 13 files changed, 502 insertions(+), 116 deletions(-) create mode 100644 vortex-expr/src/transform/expr_simplify.rs diff --git a/vortex-expr/src/binary.rs b/vortex-expr/src/binary.rs index 213e3c8d53..cc81659537 100644 --- a/vortex-expr/src/binary.rs +++ b/vortex-expr/src/binary.rs @@ -4,7 +4,7 @@ use std::hash::Hash; use std::sync::Arc; use vortex_array::compute::{and_kleene, compare, or_kleene, Operator as ArrayOperator}; -use vortex_array::ArrayData; +use vortex_array::{compute, ArrayData}; use vortex_error::VortexResult; use crate::{ExprRef, Operator, VortexExpr}; @@ -59,6 +59,7 @@ impl VortexExpr for BinaryExpr { Operator::Gte => compare(lhs, rhs, ArrayOperator::Gte), Operator::And => and_kleene(lhs, rhs), Operator::Or => or_kleene(lhs, rhs), + Operator::Add => compute::add(lhs, rhs), } } @@ -257,3 +258,7 @@ pub fn or(lhs: ExprRef, rhs: ExprRef) -> ExprRef { pub fn and(lhs: ExprRef, rhs: ExprRef) -> ExprRef { BinaryExpr::new_expr(lhs, Operator::And, rhs) } + +pub fn add(lhs: ExprRef, rhs: ExprRef) -> ExprRef { + BinaryExpr::new_expr(lhs, Operator::Add, rhs) +} diff --git a/vortex-expr/src/forms/nnf.rs b/vortex-expr/src/forms/nnf.rs index 4ecc314db7..1365323203 100644 --- a/vortex-expr/src/forms/nnf.rs +++ b/vortex-expr/src/forms/nnf.rs @@ -115,6 +115,7 @@ impl FolderMut for NNFVisitor { Operator::Lte => Operator::Gt, Operator::And => Operator::Or, Operator::Or => Operator::And, + _ => panic!("Unexpected operator {:?}", binary_expr.op()), }; let (lhs, rhs) = match binary_expr.op() { Operator::Or | Operator::And => { diff --git a/vortex-expr/src/identity.rs b/vortex-expr/src/identity.rs index 46f7c86826..72aa4bc32a 100644 --- a/vortex-expr/src/identity.rs +++ b/vortex-expr/src/identity.rs @@ -1,3 +1,4 @@ +use std::any::Any; use std::fmt::Display; use std::sync::Arc; @@ -22,6 +23,10 @@ impl Display for Identity { } impl VortexExpr for Identity { + fn as_any(&self) -> &dyn Any { + self + } + fn evaluate(&self, batch: &ArrayData) -> VortexResult { Ok(batch.clone()) } diff --git a/vortex-expr/src/like.rs b/vortex-expr/src/like.rs index bd19285aac..672d47a21e 100644 --- a/vortex-expr/src/like.rs +++ b/vortex-expr/src/like.rs @@ -1,3 +1,4 @@ +use std::any::Any; use std::fmt::Display; use std::hash::Hash; use std::sync::Arc; @@ -56,6 +57,10 @@ impl Display for Like { } impl VortexExpr for Like { + fn as_any(&self) -> &dyn Any { + self + } + fn evaluate(&self, batch: &ArrayData) -> VortexResult { let child = self.child().evaluate(batch)?; let pattern = self.pattern().evaluate(batch)?; diff --git a/vortex-expr/src/literal.rs b/vortex-expr/src/literal.rs index a5b516dd28..c1b16911a8 100644 --- a/vortex-expr/src/literal.rs +++ b/vortex-expr/src/literal.rs @@ -1,3 +1,4 @@ +use std::any::Any; use std::fmt::Display; use std::sync::Arc; @@ -32,6 +33,10 @@ impl Display for Literal { } impl VortexExpr for Literal { + fn as_any(&self) -> &dyn Any { + self + } + fn evaluate(&self, batch: &ArrayData) -> VortexResult { Ok(ConstantArray::new(self.value.clone(), batch.len()).into_array()) } diff --git a/vortex-expr/src/not.rs b/vortex-expr/src/not.rs index 78257f9341..b6061277dd 100644 --- a/vortex-expr/src/not.rs +++ b/vortex-expr/src/not.rs @@ -1,3 +1,4 @@ +use std::any::Any; use std::fmt::Display; use std::hash::Hash; use std::sync::Arc; @@ -33,6 +34,10 @@ impl Display for Not { } impl VortexExpr for Not { + fn as_any(&self) -> &dyn Any { + self + } + fn evaluate(&self, batch: &ArrayData) -> VortexResult { let child_result = self.child.evaluate(batch)?; invert(&child_result) diff --git a/vortex-expr/src/operators.rs b/vortex-expr/src/operators.rs index a475a09fb2..64cc2edf88 100644 --- a/vortex-expr/src/operators.rs +++ b/vortex-expr/src/operators.rs @@ -14,6 +14,7 @@ pub enum Operator { // boolean algebra And, Or, + Add, } impl Display for Operator { @@ -27,6 +28,7 @@ impl Display for Operator { Operator::Lte => "<=", Operator::And => "and", Operator::Or => "or", + Operator::Add => "+", }; Display::fmt(display, f) } @@ -41,7 +43,7 @@ impl Operator { Operator::Gte => Some(Operator::Lt), Operator::Lt => Some(Operator::Gte), Operator::Lte => Some(Operator::Gt), - Operator::And | Operator::Or => None, + Operator::And | Operator::Or | Operator::Add => None, } } @@ -56,6 +58,7 @@ impl Operator { Operator::Lte => Operator::Gte, Operator::And => Operator::And, Operator::Or => Operator::Or, + Operator::Add => Operator::Add, } } } diff --git a/vortex-expr/src/pack.rs b/vortex-expr/src/pack.rs index b6445098c6..bfed4d7306 100644 --- a/vortex-expr/src/pack.rs +++ b/vortex-expr/src/pack.rs @@ -7,8 +7,8 @@ use itertools::Itertools as _; use vortex_array::array::StructArray; use vortex_array::validity::Validity; use vortex_array::{ArrayData, IntoArrayData}; -use vortex_dtype::FieldNames; -use vortex_error::{vortex_bail, VortexExpect as _, VortexResult}; +use vortex_dtype::{Field, FieldNames}; +use vortex_error::{vortex_bail, vortex_err, VortexExpect as _, VortexResult}; use crate::{ExprRef, VortexExpr}; @@ -51,6 +51,33 @@ impl Pack { } Ok(Arc::new(Pack { names, values })) } + + pub fn names(&self) -> &FieldNames { + &self.names + } + + pub fn field(&self, f: &Field) -> VortexResult { + let idx = match f { + Field::Name(n) => self + .names + .iter() + .position(|name| name == n) + .ok_or_else(|| { + vortex_err!("Cannot find field {} in pack fields {:?}", n, self.names) + })?, + Field::Index(idx) => *idx, + }; + + self.values + .get(idx) + .cloned() + .ok_or_else(|| vortex_err!("field index out of bounds: {}", idx)) + } +} + +pub fn pack(names: impl Into, values: Vec) -> ExprRef { + Pack::try_new_expr(names.into(), values) + .vortex_expect("pack names and values have the same length") } impl PartialEq for Pack { @@ -77,6 +104,10 @@ impl Display for Pack { } impl VortexExpr for Pack { + fn as_any(&self) -> &dyn Any { + self + } + fn evaluate(&self, batch: &ArrayData) -> VortexResult { let len = batch.len(); let value_arrays = self diff --git a/vortex-expr/src/row_filter.rs b/vortex-expr/src/row_filter.rs index 2727ad508b..3bfbe4dece 100644 --- a/vortex-expr/src/row_filter.rs +++ b/vortex-expr/src/row_filter.rs @@ -1,3 +1,4 @@ +use std::any::Any; use std::fmt::{Debug, Display}; use std::sync::Arc; @@ -59,6 +60,10 @@ impl Display for RowFilter { } impl VortexExpr for RowFilter { + fn as_any(&self) -> &dyn Any { + self + } + fn evaluate(&self, batch: &ArrayData) -> VortexResult { let mut filter_iter = self.conjunction.iter(); let mut mask = filter_iter diff --git a/vortex-expr/src/select.rs b/vortex-expr/src/select.rs index 9055e66e0a..c3efe2e64a 100644 --- a/vortex-expr/src/select.rs +++ b/vortex-expr/src/select.rs @@ -1,3 +1,4 @@ +use std::any::Any; use std::fmt::Display; use std::sync::Arc; @@ -22,6 +23,14 @@ pub struct Select { child: ExprRef, } +pub fn select(columns: Vec, child: ExprRef) -> ExprRef { + Select::include_expr(columns, child) +} + +pub fn select_exclude(columns: Vec, child: ExprRef) -> ExprRef { + Select::exclude_expr(columns, child) +} + impl Select { pub fn new_expr(fields: SelectField, child: ExprRef) -> ExprRef { Arc::new(Self { fields, child }) @@ -77,6 +86,10 @@ impl Display for Select { } impl VortexExpr for Select { + fn as_any(&self) -> &dyn Any { + self + } + fn evaluate(&self, batch: &ArrayData) -> VortexResult { let batch = self.child.evaluate(batch)?; let st = batch diff --git a/vortex-expr/src/transform/expr_simplify.rs b/vortex-expr/src/transform/expr_simplify.rs new file mode 100644 index 0000000000..01960156fe --- /dev/null +++ b/vortex-expr/src/transform/expr_simplify.rs @@ -0,0 +1,37 @@ +use vortex_error::VortexResult; + +use crate::traversal::{FoldChildren, FoldUp, FolderMut, Node}; +use crate::{ExprRef, GetItem, Pack}; + +pub struct ExprSimplify(); + +impl ExprSimplify { + pub fn simplify(e: ExprRef) -> VortexResult { + let mut folder = ExprSimplify(); + e.transform_with_context(&mut folder, ()) + .map(|e| e.result()) + } +} + +impl FolderMut for ExprSimplify { + type NodeTy = ExprRef; + type Out = ExprRef; + type Context = (); + + fn visit_up( + &mut self, + node: Self::NodeTy, + _context: Self::Context, + children: FoldChildren, + ) -> VortexResult> { + if let Some(get_item) = node.as_any().downcast_ref::() { + if let Some(pack) = get_item.child().as_any().downcast_ref::() { + let expr = pack.field(get_item.field())?; + return Ok(FoldUp::Continue(expr)); + } + } + Ok(FoldUp::Continue( + node.replacing_children(children.contained_children()), + )) + } +} diff --git a/vortex-expr/src/transform/mod.rs b/vortex-expr/src/transform/mod.rs index d5e61a861b..7e72ca025b 100644 --- a/vortex-expr/src/transform/mod.rs +++ b/vortex-expr/src/transform/mod.rs @@ -1,2 +1,3 @@ +mod expr_simplify; #[allow(dead_code)] mod project_expr; diff --git a/vortex-expr/src/transform/project_expr.rs b/vortex-expr/src/transform/project_expr.rs index 816697ca93..f2ce6af0cf 100644 --- a/vortex-expr/src/transform/project_expr.rs +++ b/vortex-expr/src/transform/project_expr.rs @@ -1,11 +1,12 @@ -use dyn_eq::DynEq; +use itertools::Itertools; use vortex_array::aliases::hash_map::HashMap; use vortex_array::aliases::hash_set::HashSet; -use vortex_dtype::{DType, Field, FieldName, FieldNames}; +use vortex_dtype::DType::Struct; +use vortex_dtype::{Field, FieldName, FieldNames, StructDType}; use vortex_error::VortexResult; -use crate::traversal::{FoldChildren, FoldUp, Folder}; -use crate::{ExprRef, GetItem}; +use crate::traversal::{FoldChildren, FoldDown, FoldUp, Folder, FolderMut, Node}; +use crate::{get_item, ident, pack, ExprRef, GetItem, Identity, Select, SelectField}; /// Given an expression, an identity-type and a list of n fields return n optional expressions /// ones containing only references to the corresponding field and an expression defined in terms of @@ -35,19 +36,19 @@ fn split_expression( todo!() } -struct ExprSplitter { - field: Field, - sub_expressions: Vec, -} - -impl ExprSplitter { - fn new(field: Field) -> Self { - Self { - field, - sub_expressions: vec![], - } - } -} +// struct ExprSplitter { +// field: Field, +// sub_expressions: Vec, +// } +// +// impl ExprSplitter { +// fn new(field: Field) -> Self { +// Self { +// field, +// sub_expressions: vec![], +// } +// } +// } // Hashmap from expr to [get_item(field, Identity)] @@ -74,149 +75,418 @@ impl ExprSplitter { // } // } -#[derive(Clone, Debug, PartialEq, Eq)] -enum AccessibleFields { - Fields(HashSet), - UntrackedFields, -} - -impl AccessibleFields { - fn important_fields(&self) -> HashSet { - match self { - AccessibleFields::Fields(fields) => fields.clone(), - AccessibleFields::UntrackedFields => HashSet::new(), - } - } +// #[derive(Clone, Debug, PartialEq, Eq)] +// enum AccessibleFields { +// Fields(HashSet), +// UntrackedFields, +// } +// +// impl AccessibleFields { +// fn important_fields(&self) -> HashSet { +// match self { +// AccessibleFields::Fields(fields) => fields.clone(), +// AccessibleFields::UntrackedFields => HashSet::new(), +// } +// } +// +// fn get_field(&self, field: &FieldName) -> Option { +// match self { +// AccessibleFields::Fields(fields) => fields.get(field).cloned(), +// AccessibleFields::UntrackedFields => None, +// } +// } +// } +// - fn get_field(&self, field: &FieldName) -> Option { - match self { - AccessibleFields::Fields(fields) => fields.get(field).cloned(), - AccessibleFields::UntrackedFields => None, - } - } -} +type FieldAccesses<'a> = HashMap<&'a ExprRef, HashSet>; // For all subexpressions in an expression find the fields access directly on identity struct ExprTopLevelRef<'a> { - sub_expressions: HashMap<&'a ExprRef, HashSet>, - ident_dt: DType, - tracked_dt: Vec, + sub_expressions: FieldAccesses<'a>, + ident_dt: StructDType, } impl<'a> ExprTopLevelRef<'a> { - fn new(ident_dt: DType) -> Self { - let tracked_dt = vec![ident_dt.clone()]; + fn new(ident_dt: StructDType) -> Self { Self { sub_expressions: HashMap::new(), ident_dt, - tracked_dt, } } } +// This is a very naive, but simple analysis to find the fields that are accessed directly on an +// identity node. This is combined to provide an over-approximation of the fields that are accessed +// by an expression. impl<'a> Folder<'a> for ExprTopLevelRef<'a> { type NodeTy = ExprRef; type Out = (); type Context = (); - fn visit_up( + fn visit_down( &mut self, - node: &'a ExprRef, + node: &'a Self::NodeTy, _context: (), - children: FoldChildren<()>, - ) -> VortexResult> { - let dtype = node.dtype(&self.ident_dt)?; - + ) -> VortexResult> { if let Some(get_item) = node.as_any().downcast_ref::() { - if self.tracked_dt.contains(&dtype) { + if get_item + .child() + .as_any() + .downcast_ref::() + .is_some() + { self.sub_expressions .insert(&node, HashSet::from_iter(vec![get_item.field().clone()])); + + return Ok(FoldDown::SkipChildren); } + } else if let Some(select) = node.as_any().downcast_ref::