diff --git a/Cargo.lock b/Cargo.lock index 6330d3840..fc6baf533 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4981,6 +4981,7 @@ dependencies = [ "vortex-expr", "vortex-file", "vortex-io", + "vortex-scan", ] [[package]] diff --git a/bench-vortex/src/clickbench.rs b/bench-vortex/src/clickbench.rs index f8f6772ff..f35a093c2 100644 --- a/bench-vortex/src/clickbench.rs +++ b/bench-vortex/src/clickbench.rs @@ -7,13 +7,16 @@ use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; use datafusion::prelude::{ParquetReadOptions, SessionContext}; -use futures::{stream, StreamExt, TryStreamExt}; +use futures::executor::block_on; +use itertools::Itertools; +use rayon::prelude::*; use tokio::fs::{create_dir_all, OpenOptions}; use vortex::aliases::hash_map::HashMap; use vortex::array::{ChunkedArray, StructArray}; use vortex::dtype::DType; use vortex::error::vortex_err; -use vortex::file::{VortexFileWriter, VORTEX_FILE_EXTENSION}; +use vortex::file::v2::VortexWriteOptions; +use vortex::file::VORTEX_FILE_EXTENSION; use vortex::sampling_compressor::SamplingCompressor; use vortex::variants::StructArrayTrait; use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; @@ -149,7 +152,9 @@ pub async fn register_vortex_files( let vortex_dir = input_path.join("vortex"); create_dir_all(&vortex_dir).await?; - stream::iter(0..100) + (0..100) + .collect_vec() + .par_iter() .map(|idx| { let parquet_file_path = input_path .join("parquet") @@ -158,7 +163,7 @@ pub async fn register_vortex_files( let session = session.clone(); let schema = schema.clone(); - tokio::spawn(async move { + block_on(async move { let output_path = output_path.clone(); idempotent_async(&output_path, move |vtx_file| async move { eprintln!("Processing file {idx}"); @@ -219,9 +224,9 @@ pub async fn register_vortex_files( .open(&vtx_file) .await?; - let mut writer = VortexFileWriter::new(f); - writer = writer.write_array_columns(data).await?; - writer.finalize().await?; + VortexWriteOptions::default() + .write(f, data.into_array_stream()) + .await?; anyhow::Ok(()) }) @@ -229,9 +234,7 @@ pub async fn register_vortex_files( .expect("Failed to write Vortex file") }) }) - .buffered(16) - .try_collect::>() - .await?; + .collect::>(); let format = Arc::new(VortexFormat::new(CTX.clone())); let table_path = vortex_dir diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index 9fc1e4441..f9e701230 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -2,7 +2,7 @@ use std::fs::File; use std::ops::Range; use std::path::{Path, PathBuf}; use std::process::Command; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use arrow_array::types::Int64Type; use arrow_array::{ @@ -24,18 +24,16 @@ use stream::StreamExt; use vortex::aliases::hash_map::HashMap; use vortex::array::ChunkedArray; use vortex::arrow::FromArrowType; -use vortex::buffer::Buffer; use vortex::compress::CompressionStrategy; use vortex::dtype::DType; use vortex::error::VortexResult; -use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder}; -use vortex::io::{IoDispatcher, ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite}; +use vortex::file::v2::{VortexOpenOptions, VortexWriteOptions}; +use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite}; use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT}; +use vortex::scan::Scan; +use vortex::stream::ArrayStreamExt; use vortex::{ArrayData, IntoArrayData, IntoCanonical}; -static DISPATCHER: LazyLock> = - LazyLock::new(|| Arc::new(IoDispatcher::default())); - pub const BATCH_SIZE: usize = 65_536; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -48,19 +46,12 @@ pub struct VortexFooter { pub async fn open_vortex(path: &Path) -> VortexResult { let file = TokioFile::open(path).unwrap(); - VortexReadBuilder::new( - file, - LayoutDeserializer::new( - ALL_ENCODINGS_CONTEXT.clone(), - LayoutContext::default().into(), - ), - ) - .with_io_dispatcher(DISPATCHER.clone()) - .build() - .await? - .into_stream() - .read_all() - .await + VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone()) + .open(file) + .await? + .scan(Scan::all())? + .into_array_data() + .await } pub async fn rewrite_parquet_as_vortex( @@ -69,11 +60,10 @@ pub async fn rewrite_parquet_as_vortex( ) -> VortexResult<()> { let chunked = compress_parquet_to_vortex(parquet_path.as_path())?; - VortexFileWriter::new(write) - .write_array_columns(chunked) - .await? - .finalize() + VortexWriteOptions::default() + .write(write, chunked.into_array_stream()) .await?; + Ok(()) } @@ -116,25 +106,19 @@ pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResu async fn take_vortex( reader: T, - indices: &[u64], + _indices: &[u64], ) -> VortexResult { - VortexReadBuilder::new( - reader, - LayoutDeserializer::new( - ALL_ENCODINGS_CONTEXT.clone(), - LayoutContext::default().into(), - ), - ) - .with_io_dispatcher(DISPATCHER.clone()) - .with_indices(Buffer::copy_from(indices).into_array()) - .build() - .await? - .into_stream() - .read_all() - .await - // For equivalence.... we decompress to make sure we're not cheating too much. - .and_then(IntoCanonical::into_canonical) - .map(ArrayData::from) + VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone()) + .open(reader) + .await? + // FIXME(ngates): support row indices + // .scan_rows(Scan::all(), indices.iter().copied())? + .scan(Scan::all())? + .into_array_data() + .await? + // For equivalence.... we decompress to make sure we're not cheating too much. + .into_canonical() + .map(ArrayData::from) } pub async fn take_vortex_object_store( diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index 0a96ee5c5..07d6ec389 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -16,7 +16,7 @@ use vortex::aliases::hash_map::HashMap; use vortex::array::{ChunkedArray, StructArray}; use vortex::arrow::FromArrowArray; use vortex::dtype::DType; -use vortex::file::{VortexFileWriter, VORTEX_FILE_EXTENSION}; +use vortex::file::VORTEX_FILE_EXTENSION; use vortex::sampling_compressor::SamplingCompressor; use vortex::variants::StructArrayTrait; use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; @@ -31,6 +31,7 @@ mod execute; pub mod schema; pub use execute::*; +use vortex::file::v2::VortexWriteOptions; pub const EXPECTED_ROW_COUNTS: [usize; 23] = [ 0, 4, 460, 11620, 5, 5, 1, 4, 2, 175, 37967, 1048, 2, 42, 1, 1, 18314, 1, 57, 1, 186, 411, 7, @@ -275,9 +276,9 @@ async fn register_vortex_file( .open(&vtx_file) .await?; - let mut writer = VortexFileWriter::new(f); - writer = writer.write_array_columns(data).await?; - writer.finalize().await?; + VortexWriteOptions::default() + .write(f, data.into_array_stream()) + .await?; anyhow::Ok(()) }) diff --git a/vortex-array/src/array/chunked/mod.rs b/vortex-array/src/array/chunked/mod.rs index d803cf368..3a27e475e 100644 --- a/vortex-array/src/array/chunked/mod.rs +++ b/vortex-array/src/array/chunked/mod.rs @@ -6,7 +6,7 @@ use std::fmt::{Debug, Display}; use futures_util::stream; use serde::{Deserialize, Serialize}; -use vortex_buffer::Buffer; +use vortex_buffer::BufferMut; use vortex_dtype::{DType, Nullability, PType}; use vortex_error::{vortex_bail, vortex_panic, VortexExpect as _, VortexResult, VortexUnwrap}; @@ -51,7 +51,8 @@ impl ChunkedArray { } } - let chunk_offsets = Buffer::from_iter( + let mut chunk_offsets = BufferMut::::with_capacity(chunks.len() + 1); + chunk_offsets.extend( [0u64] .into_iter() .chain(chunks.iter().map(|c| c.len() as u64)) diff --git a/vortex-array/src/array/varbinview/compute/mod.rs b/vortex-array/src/array/varbinview/compute/mod.rs index 114ee9c81..6336d5491 100644 --- a/vortex-array/src/array/varbinview/compute/mod.rs +++ b/vortex-array/src/array/varbinview/compute/mod.rs @@ -57,7 +57,6 @@ impl TakeFn for VarBinViewEncoding { fn take(&self, array: &VarBinViewArray, indices: &ArrayData) -> VortexResult { // Compute the new validity let validity = array.validity().take(indices)?; - let indices = indices.clone().into_primitive()?; let views_buffer = match_each_integer_ptype!(indices.ptype(), |$I| { diff --git a/vortex-buffer/src/buffer_mut.rs b/vortex-buffer/src/buffer_mut.rs index 23864bcc7..6ca2fb81c 100644 --- a/vortex-buffer/src/buffer_mut.rs +++ b/vortex-buffer/src/buffer_mut.rs @@ -258,7 +258,19 @@ impl BufferMut { T: Copy, { self.reserve(n); + unsafe { self.push_n_unchecked(item, n) } + } + /// Appends n scalars to the buffer. + /// + /// ## Safety + /// + /// The caller must ensure there is sufficient capacity in the array. + #[inline] + pub unsafe fn push_n_unchecked(&mut self, item: T, n: usize) + where + T: Copy, + { let mut dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast(); // SAFETY: we checked the capacity in the reserve call unsafe { diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index 29e26bc1b..7347474a1 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -42,6 +42,7 @@ vortex-error = { workspace = true, features = ["datafusion"] } vortex-expr = { workspace = true, features = ["datafusion"] } vortex-file = { workspace = true, features = ["object_store"] } vortex-io = { workspace = true, features = ["object_store", "tokio"] } +vortex-scan = { workspace = true } [dev-dependencies] anyhow = { workspace = true } diff --git a/vortex-datafusion/src/persistent/cache.rs b/vortex-datafusion/src/persistent/cache.rs index 7a6c174f2..7f8ef9aea 100644 --- a/vortex-datafusion/src/persistent/cache.rs +++ b/vortex-datafusion/src/persistent/cache.rs @@ -4,13 +4,14 @@ use chrono::{DateTime, Utc}; use moka::future::Cache; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; +use vortex_array::ContextRef; use vortex_error::{vortex_err, VortexError, VortexResult}; -use vortex_file::{read_initial_bytes, InitialRead}; +use vortex_file::v2::{FileLayout, VortexOpenOptions}; use vortex_io::ObjectStoreReadAt; #[derive(Debug, Clone)] -pub struct InitialReadCache { - inner: Cache, +pub struct FileLayoutCache { + inner: Cache, } #[derive(Hash, Eq, PartialEq, Debug)] @@ -28,28 +29,31 @@ impl From<&ObjectMeta> for Key { } } -impl InitialReadCache { +impl FileLayoutCache { pub fn new(size_mb: usize) -> Self { let inner = Cache::builder() - .weigher(|k: &Key, v: &InitialRead| (k.location.as_ref().len() + v.buf.len()) as u32) .max_capacity(size_mb as u64 * (2 << 20)) - .eviction_listener(|k, _v, cause| { + .eviction_listener(|k: Arc, _v, cause| { log::trace!("Removed {} due to {:?}", k.location, cause); }) .build(); Self { inner } } + pub async fn try_get( &self, object: &ObjectMeta, store: Arc, - ) -> VortexResult { + ) -> VortexResult { self.inner .try_get_with(Key::from(object), async { let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); - let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?; - VortexResult::Ok(initial_read) + let vxf = VortexOpenOptions::new(ContextRef::default()) + .with_file_size(object.size as u64) + .open(os_read_at) + .await?; + VortexResult::Ok(vxf.file_layout().clone()) }) .await .map_err(|e: Arc| match Arc::try_unwrap(e) { diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index 4ef551eb5..c7451c940 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use vortex_array::ContextRef; use vortex_dtype::FieldName; -use super::cache::InitialReadCache; +use super::cache::FileLayoutCache; use crate::persistent::opener::VortexFileOpener; #[derive(Debug, Clone)] @@ -25,7 +25,7 @@ pub struct VortexExec { plan_properties: PlanProperties, projected_statistics: Statistics, ctx: ContextRef, - initial_read_cache: InitialReadCache, + initial_read_cache: FileLayoutCache, } impl VortexExec { @@ -34,7 +34,7 @@ impl VortexExec { metrics: ExecutionPlanMetricsSet, predicate: Option>, ctx: ContextRef, - initial_read_cache: InitialReadCache, + initial_read_cache: FileLayoutCache, ) -> DFResult { let projected_schema = project_schema( &file_scan_config.file_schema, @@ -122,17 +122,18 @@ impl ExecutionPlan for VortexExec { projection .iter() .map(|i| FieldName::from(arrow_schema.fields[*i].name().clone())) - .collect_vec() + .collect() }); - let opener = VortexFileOpener { - ctx: self.ctx.clone(), + // TODO(joe): apply the predicate/filter mapping to vortex-expr once. + let opener = VortexFileOpener::new( + self.ctx.clone(), object_store, projection, - predicate: self.predicate.clone(), - initial_read_cache: self.initial_read_cache.clone(), + self.predicate.clone(), arrow_schema, - }; + self.initial_read_cache.clone(), + )?; let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?; Ok(Box::pin(stream)) diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 3aef3ae75..c3e16b0be 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -8,35 +8,26 @@ use datafusion::datasource::file_format::{FileFormat, FilePushdownSupport}; use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use datafusion::execution::SessionState; use datafusion_common::parsers::CompressionTypeVariant; -use datafusion_common::stats::Precision; -use datafusion_common::{ - not_impl_err, ColumnStatistics, DataFusionError, Result as DFResult, Statistics, -}; +use datafusion_common::{not_impl_err, DataFusionError, Result as DFResult, Statistics}; use datafusion_expr::Expr; use datafusion_physical_expr::{LexRequirement, PhysicalExpr}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::ExecutionPlan; use futures::{stream, StreamExt as _, TryStreamExt as _}; use object_store::{ObjectMeta, ObjectStore}; -use vortex_array::array::StructArray; use vortex_array::arrow::infer_schema; use vortex_array::ContextRef; use vortex_error::VortexResult; -use vortex_file::metadata::fetch_metadata; -use vortex_file::{ - LayoutContext, LayoutDeserializer, LayoutMessageCache, LayoutPath, Scan, VORTEX_FILE_EXTENSION, -}; -use vortex_io::{IoDispatcher, ObjectStoreReadAt}; +use vortex_file::VORTEX_FILE_EXTENSION; -use super::cache::InitialReadCache; +use super::cache::FileLayoutCache; use super::execution::VortexExec; -use super::statistics::{array_to_col_statistics, uncompressed_col_size}; use crate::can_be_pushed_down; #[derive(Debug)] pub struct VortexFormat { context: ContextRef, - initial_read_cache: InitialReadCache, + file_layout_cache: FileLayoutCache, opts: VortexFormatOptions, } @@ -61,7 +52,7 @@ impl Default for VortexFormat { Self { context: Default::default(), - initial_read_cache: InitialReadCache::new(opts.cache_size_mb), + file_layout_cache: FileLayoutCache::new(opts.cache_size_mb), opts, } } @@ -72,7 +63,7 @@ impl VortexFormat { let opts = VortexFormatOptions::default(); Self { context, - initial_read_cache: InitialReadCache::new(opts.cache_size_mb), + file_layout_cache: FileLayoutCache::new(opts.cache_size_mb), opts, } } @@ -109,12 +100,10 @@ impl FileFormat for VortexFormat { let file_schemas = stream::iter(objects.iter().cloned()) .map(|o| { let store = store.clone(); - let cache = self.initial_read_cache.clone(); + let cache = self.file_layout_cache.clone(); async move { - let initial_read = cache.try_get(&o, store).await?; - let top_level_dtype = initial_read.dtype(); - let s = infer_schema(&top_level_dtype)?; - + let file_layout = cache.try_get(&o, store).await?; + let s = infer_schema(file_layout.dtype())?; VortexResult::Ok(s) } }) @@ -130,60 +119,14 @@ impl FileFormat for VortexFormat { async fn infer_stats( &self, _state: &SessionState, - store: &Arc, + _store: &Arc, table_schema: SchemaRef, - object: &ObjectMeta, + _object: &ObjectMeta, ) -> DFResult { - let initial_read = self - .initial_read_cache - .try_get(object, store.clone()) - .await?; - - let layout = initial_read.fb_layout(); - let row_count = layout.row_count(); - - let layout_deserializer = - LayoutDeserializer::new(self.context.clone(), LayoutContext::default().into()); - - let root_layout = layout_deserializer.read_layout( - LayoutPath::default(), - initial_read.fb_layout(), - Scan::empty(), - initial_read.dtype().into(), - )?; - - let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); - let io = IoDispatcher::default(); - let mut stats = Statistics::new_unknown(&table_schema); - stats.num_rows = Precision::Exact(row_count as usize); - - let msgs = LayoutMessageCache::default(); - - if let Some(metadata_table) = - fetch_metadata(os_read_at, io.into(), root_layout, msgs).await? - { - let mut column_statistics = Vec::with_capacity(table_schema.fields().len()); - let mut total_size = 0_u64; - - for col_stats in metadata_table.into_iter() { - let col_stats = match col_stats { - Some(array) => { - let col_metadata_array = StructArray::try_from(array)?; - let col_stats = array_to_col_statistics(&col_metadata_array)?; - - total_size += - uncompressed_col_size(&col_metadata_array)?.unwrap_or_default(); - col_stats - } - None => ColumnStatistics::new_unknown(), - }; - column_statistics.push(col_stats); - } - stats.column_statistics = column_statistics; - stats.total_byte_size = Precision::Inexact(total_size as usize); - } - - Ok(stats) + // TODO(ngates): we should decide if it's worth returning file statistics. Since this + // call doesn't have projection information, I think it's better to wait until we can + // return per-partition statistics from VortexExpr ExecutionPlan node. + Ok(Statistics::new_unknown(table_schema.as_ref())) } async fn create_physical_plan( @@ -199,7 +142,7 @@ impl FileFormat for VortexFormat { metrics, filters.cloned(), self.context.clone(), - self.initial_read_cache.clone(), + self.file_layout_cache.clone(), )? .into_arc(); diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 086c42dd1..d175ee0e8 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; @@ -7,81 +7,109 @@ use datafusion_common::Result as DFResult; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; use futures::{FutureExt as _, StreamExt, TryStreamExt}; use object_store::ObjectStore; +use tokio::runtime::Handle; +use vortex_array::arrow::FromArrowType; use vortex_array::ContextRef; -use vortex_dtype::FieldName; +use vortex_dtype::{DType, FieldNames}; +use vortex_error::VortexResult; use vortex_expr::datafusion::convert_expr_to_vortex; -use vortex_expr::RowFilter; -use vortex_file::{LayoutContext, LayoutDeserializer, Projection, VortexReadBuilder}; -use vortex_io::{IoDispatcher, ObjectStoreReadAt}; +use vortex_expr::transform::simplify_typed::simplify_typed; +use vortex_expr::{and, get_item, ident, lit, pack, ExprRef, Identity}; +use vortex_file::v2::{ExecutionMode, VortexOpenOptions}; +use vortex_io::ObjectStoreReadAt; +use vortex_scan::Scan; -use super::cache::InitialReadCache; - -/// Share an IO dispatcher across all DataFusion instances. -static IO_DISPATCHER: LazyLock> = - LazyLock::new(|| Arc::new(IoDispatcher::default())); +use super::cache::FileLayoutCache; #[derive(Clone)] pub struct VortexFileOpener { pub ctx: ContextRef, pub object_store: Arc, - pub projection: Option>, - pub predicate: Option>, - pub arrow_schema: SchemaRef, - pub(crate) initial_read_cache: InitialReadCache, + pub projection: ExprRef, + pub filter: Option, + pub(crate) file_layout_cache: FileLayoutCache, +} + +impl VortexFileOpener { + pub fn new( + ctx: ContextRef, + object_store: Arc, + projection: Option, + predicate: Option>, + arrow_schema: SchemaRef, + file_layout_cache: FileLayoutCache, + ) -> VortexResult { + let dtype = DType::from_arrow(arrow_schema); + let filter = predicate + .as_ref() + // If we cannot convert an expr to a vortex expr, we run no filter, since datafusion + // will rerun the filter expression anyway. + .map(|expr| { + // This splits expressions into conjunctions and converts them to vortex expressions. + // Any inconvertible expressions are dropped since true /\ a == a. + let expr = split_conjunction(expr) + .into_iter() + .filter_map(|e| convert_expr_to_vortex(e.clone()).ok()) + .fold(lit(true), and); + + simplify_typed(expr, dtype) + }) + .transpose()?; + + let projection = projection + .as_ref() + .map(|fields| { + pack( + fields.clone(), + fields + .iter() + .map(|f| get_item(f.clone(), ident())) + .collect(), + ) + }) + .unwrap_or_else(|| Identity::new_expr()); + + Ok(Self { + ctx, + object_store, + projection, + filter, + // arrow_schema, + file_layout_cache, + }) + } } impl FileOpener for VortexFileOpener { fn open(&self, file_meta: FileMeta) -> DFResult { let this = self.clone(); - let f = async move { - let read_at = - ObjectStoreReadAt::new(this.object_store.clone(), file_meta.location().clone()); - let initial_read = this - .initial_read_cache - .try_get(&file_meta.object_meta, this.object_store.clone()) - .await?; - let mut builder = VortexReadBuilder::new( - read_at, - LayoutDeserializer::new(this.ctx.clone(), Arc::new(LayoutContext::default())), - ) - .with_io_dispatcher(IO_DISPATCHER.clone()) - .with_file_size(file_meta.object_meta.size as u64) - .with_initial_read(initial_read); + // Construct the projection expression based on the DataFusion projection mask. + // Each index in the mask corresponds to the field position of the root DType. - // We split the predicate and filter out the conjunction members that we can't push down - let row_filter = this - .predicate - .as_ref() - .map(|filter_expr| { - split_conjunction(filter_expr) - .into_iter() - .filter_map(|e| convert_expr_to_vortex(e.clone()).ok()) - .collect::>() - }) - .filter(|conjunction| !conjunction.is_empty()) - .map(RowFilter::from_conjunction); + let scan = Scan::new(self.projection.clone(), self.filter.clone()).into_arc(); - if let Some(row_filter) = row_filter { - builder = builder.with_row_filter(row_filter); - } + let read_at = + ObjectStoreReadAt::new(this.object_store.clone(), file_meta.location().clone()); - if let Some(projection) = this.projection.as_ref() { - builder = builder.with_projection(Projection::new(projection.clone())); - } + Ok(async move { + let vxf = VortexOpenOptions::new(this.ctx.clone()) + .with_file_layout( + this.file_layout_cache + .try_get(&file_meta.object_meta, this.object_store.clone()) + .await?, + ) + .with_execution_mode(ExecutionMode::TokioRuntime(Handle::current())) + .open(read_at) + .await?; - Ok(Box::pin( - builder - .build() - .await? - .into_stream() - .map_ok(RecordBatch::try_from) - .map(|r| r.and_then(|inner| inner)) - .map_err(|e| e.into()), - ) as _) + Ok(vxf + .scan(scan)? + .map_ok(RecordBatch::try_from) + .map(|r| r.and_then(|inner| inner)) + .map_err(|e| e.into()) + .boxed()) } - .boxed(); - - Ok(f) + .boxed()) } } diff --git a/vortex-expr/src/datafusion.rs b/vortex-expr/src/datafusion.rs index 2b89cca01..219e69f72 100644 --- a/vortex-expr/src/datafusion.rs +++ b/vortex-expr/src/datafusion.rs @@ -7,8 +7,10 @@ use datafusion_physical_expr::{expressions, PhysicalExpr}; use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult}; use vortex_scalar::Scalar; -use crate::{lit, BinaryExpr, Column, ExprRef, Like, Operator}; +use crate::{get_item, ident, lit, BinaryExpr, ExprRef, Like, Operator}; +// TODO(joe): Don't return an error when we have an unsupported node, bubble up "TRUE" as in keep +// for that node, up to any `and` or `or` node. pub fn convert_expr_to_vortex(physical_expr: Arc) -> VortexResult { if let Some(binary_expr) = physical_expr .as_any() @@ -22,9 +24,7 @@ pub fn convert_expr_to_vortex(physical_expr: Arc) -> VortexRes } if let Some(col_expr) = physical_expr.as_any().downcast_ref::() { - let expr = Column::from(col_expr.name().to_owned()); - - return Ok(Arc::new(expr) as _); + return Ok(get_item(col_expr.name().to_owned(), ident())); } if let Some(like) = physical_expr diff --git a/vortex-file/src/v2/exec/tokio.rs b/vortex-file/src/v2/exec/tokio.rs index 4259edfb3..9ebf557e9 100644 --- a/vortex-file/src/v2/exec/tokio.rs +++ b/vortex-file/src/v2/exec/tokio.rs @@ -8,29 +8,23 @@ use vortex_error::{vortex_err, VortexResult}; use crate::v2::exec::ExecDriver; /// An execution driver that spawns the futures onto a Tokio runtime. -pub struct TokioDriver { - handle: Handle, - concurrency: usize, -} - -impl TokioDriver { - pub fn new(handle: Handle, concurrency: usize) -> Self { - Self { - handle, - concurrency, - } - } -} +pub struct TokioDriver(pub Handle); impl ExecDriver for TokioDriver { fn drive( &self, stream: BoxStream<'static, BoxFuture<'static, VortexResult>>, ) -> BoxStream<'static, VortexResult> { - let handle = self.handle.clone(); + let handle = self.0.clone(); + + // This is how many file splits to make progress on at once. While I/O is resolving for + // the first, we may as well find out the segments required by the next. + // TODO(ngates): I picked this number somewhat arbitrarily :) + let concurrency = 2 * handle.metrics().num_workers(); + stream .map(move |future| handle.spawn(future)) - .buffered(self.concurrency) + .buffered(concurrency) .map(|result| match result { Ok(result) => result, Err(e) => Err(vortex_err!("Failed to join Tokio result {}", e)), diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 7e39e2e07..455a4f77f 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -62,6 +62,7 @@ impl VortexFile { .reader(segment_channel.reader(), self.ctx.clone())?; // Now we give one end of the channel to the layout reader... + log::info!("Starting scan with {} splits", self.splits.len()); let exec_stream = stream::iter(ArcIter::new(self.splits.clone())) .map(move |row_range| scan.clone().range_scan(row_range)) .map(move |range_scan| match range_scan { diff --git a/vortex-file/src/v2/io/file.rs b/vortex-file/src/v2/io/file.rs index 0fd3fa78d..8cf438dcc 100644 --- a/vortex-file/src/v2/io/file.rs +++ b/vortex-file/src/v2/io/file.rs @@ -1,16 +1,19 @@ use std::future::Future; use std::ops::Range; +use std::sync::Arc; use futures::channel::oneshot; use futures::Stream; +use futures_util::future::try_join_all; use futures_util::{stream, StreamExt}; use vortex_buffer::{ByteBuffer, ByteBufferMut}; use vortex_error::{vortex_err, VortexExpect, VortexResult}; use vortex_io::VortexReadAt; +use vortex_layout::segments::SegmentId; use crate::v2::footer::{FileLayout, Segment}; use crate::v2::io::IoDriver; -use crate::v2::segments::SegmentRequest; +use crate::v2::segments::{SegmentCache, SegmentRequest}; // TODO(ngates): use this sort of trait for I/O? #[allow(dead_code)] @@ -33,14 +36,27 @@ pub struct FileIoDriver { pub file_layout: FileLayout, /// The number of concurrent I/O requests to submit. pub concurrency: usize, + /// A segment cache to store segments in memory. + pub segment_cache: Arc, } #[derive(Debug)] struct FileSegmentRequest { + /// The segment ID. + pub(crate) id: SegmentId, /// The segment location. pub(crate) location: Segment, /// The callback channel - pub(crate) callback: oneshot::Sender>, + callback: oneshot::Sender>, +} + +impl FileSegmentRequest { + fn resolve(self, buffer: VortexResult) { + self.callback + .send(buffer) + .map_err(|_| vortex_err!("send failed")) + .vortex_expect("send failed"); + } } #[derive(Debug)] @@ -51,21 +67,6 @@ struct CoalescedSegmentRequest { pub(crate) requests: Vec, } -impl CoalescedSegmentRequest { - /// Resolve the coalesced segment request. - fn resolve(self, buffer: ByteBuffer) -> VortexResult<()> { - for req in self.requests { - let offset = usize::try_from(req.location.offset - self.byte_range.start)?; - req.callback - .send(Ok(buffer - .slice(offset..offset + req.location.length as usize) - .aligned(req.location.alignment))) - .map_err(|_| vortex_err!("send failed"))?; - } - Ok(()) - } -} - impl IoDriver for FileIoDriver { fn drive( &self, @@ -73,9 +74,11 @@ impl IoDriver for FileIoDriver { ) -> impl Stream> + 'static { let segment_map = self.file_layout.segments.clone(); let read = self.read.clone(); + let segment_cache1 = self.segment_cache.clone(); + let segment_cache2 = self.segment_cache.clone(); - // First, we need to map the segment requests to their respective locations within the file. stream + // We map the segment requests to their respective locations within the file. .filter_map(move |request| { let segment_map = segment_map.clone(); async move { @@ -88,11 +91,43 @@ impl IoDriver for FileIoDriver { return None; }; Some(FileSegmentRequest { + id: request.id, location: location.clone(), callback: request.callback, }) } }) + // We support zero-length segments (so layouts don't have to store this information) + // If we encounter a zero-length segment, we can just resolve it now. + .filter_map(move |request| async move { + if request.location.length == 0 { + let alignment = request.location.alignment; + request.resolve(Ok(ByteBuffer::empty_aligned(alignment))); + None + } else { + Some(request) + } + }) + // Check if the segment exists in the cache + .filter_map(move |request| { + let segment_cache = segment_cache1.clone(); + async move { + match segment_cache + .get(request.id, request.location.alignment) + .await + { + Ok(None) => Some(request), + Ok(Some(buffer)) => { + request.resolve(Ok(buffer)); + None + } + Err(e) => { + request.resolve(Err(e)); + None + } + } + } + }) // Grab all available segment requests from the I/O queue so we get maximal visibility into // the requests for coalescing. // Note that we can provide a somewhat arbitrarily high capacity here since we're going to @@ -103,41 +138,114 @@ impl IoDriver for FileIoDriver { .map(coalesce) .flat_map(stream::iter) // Submit the coalesced requests to the I/O. - .map(move |request| { - let read = read.clone(); - evaluate(read, request) - }) + .map(move |request| evaluate(read.clone(), request, segment_cache2.clone())) // Buffer some number of concurrent I/O operations. .buffer_unordered(self.concurrency) } } -async fn evaluate(read: R, request: CoalescedSegmentRequest) -> VortexResult<()> { - log::trace!( +async fn evaluate( + read: R, + request: CoalescedSegmentRequest, + segment_cache: Arc, +) -> VortexResult<()> { + log::debug!( "Reading byte range: {:?} {}", request.byte_range, request.byte_range.end - request.byte_range.start ); - let bytes = read + let buffer: ByteBuffer = read .read_byte_range( request.byte_range.start, request.byte_range.end - request.byte_range.start, ) .await - .map_err(|e| vortex_err!("Failed to read coalesced segment: {:?} {:?}", request, e))?; + .map_err(|e| vortex_err!("Failed to read coalesced segment: {:?} {:?}", request, e))? + .into(); // TODO(ngates): traverse the segment map to find un-requested segments that happen to // fall within the range of the request. Then we can populate those in the cache. - request.resolve(ByteBuffer::from(bytes)) + let mut cache_futures = Vec::with_capacity(request.requests.len()); + for req in request.requests { + let offset = usize::try_from(req.location.offset - request.byte_range.start)?; + let buf = buffer + .slice(offset..offset + req.location.length as usize) + .aligned(req.location.alignment); + + // Send the callback + req.callback + .send(Ok(buf.clone())) + .map_err(|_| vortex_err!("send failed"))?; + + cache_futures.push(segment_cache.put(req.id, buf)); + } + + // Populate the cache + try_join_all(cache_futures).await?; + + Ok(()) } /// TODO(ngates): outsource coalescing to a trait fn coalesce(requests: Vec) -> Vec { - requests - .into_iter() - .map(|req| CoalescedSegmentRequest { - byte_range: req.location.offset..req.location.offset + req.location.length as u64, - requests: vec![req], + const COALESCE: u64 = 1024 * 1024; // 1MB + let fetch_ranges = merge_ranges( + requests + .iter() + .map(|r| r.location.offset..r.location.offset + r.location.length as u64), + COALESCE, + ); + let mut coalesced = fetch_ranges + .iter() + .map(|range| CoalescedSegmentRequest { + byte_range: range.clone(), + requests: vec![], }) - .collect() + .collect::>(); + + for req in requests { + let idx = fetch_ranges.partition_point(|v| v.start <= req.location.offset) - 1; + coalesced.as_mut_slice()[idx].requests.push(req); + } + + coalesced +} + +/// Returns a sorted list of ranges that cover `ranges` +/// +/// From arrow-rs. +fn merge_ranges(ranges: R, coalesce: u64) -> Vec> +where + R: IntoIterator>, +{ + let mut ranges: Vec> = ranges.into_iter().collect(); + ranges.sort_unstable_by_key(|range| range.start); + + let mut ret = Vec::with_capacity(ranges.len()); + let mut start_idx = 0; + let mut end_idx = 1; + + while start_idx != ranges.len() { + let mut range_end = ranges[start_idx].end; + + while end_idx != ranges.len() + && ranges[end_idx] + .start + .checked_sub(range_end) + .map(|delta| delta <= coalesce) + .unwrap_or(true) + { + range_end = range_end.max(ranges[end_idx].end); + end_idx += 1; + } + + let start = ranges[start_idx].start; + let end = range_end; + ret.push(start..end); + + start_idx = end_idx; + end_idx += 1; + } + + ret } diff --git a/vortex-file/src/v2/open/exec.rs b/vortex-file/src/v2/open/exec.rs index c09bc0171..c32dfdc99 100644 --- a/vortex-file/src/v2/open/exec.rs +++ b/vortex-file/src/v2/open/exec.rs @@ -15,12 +15,7 @@ pub enum ExecutionMode { RayonThreadPool(Arc), /// Spawns the tasks onto a provided Tokio runtime. // TODO(ngates): feature-flag this dependency. - TokioRuntime(TokioRuntime), -} - -pub struct TokioRuntime { - pub handle: tokio::runtime::Handle, - pub concurrency: usize, + TokioRuntime(tokio::runtime::Handle), } impl ExecutionMode { @@ -30,10 +25,7 @@ impl ExecutionMode { ExecutionMode::RayonThreadPool(_) => { todo!() } - ExecutionMode::TokioRuntime(TokioRuntime { - handle, - concurrency, - }) => Arc::new(TokioDriver::new(handle, concurrency)), + ExecutionMode::TokioRuntime(handle) => Arc::new(TokioDriver(handle)), } } } diff --git a/vortex-file/src/v2/open/mod.rs b/vortex-file/src/v2/open/mod.rs index 1be454602..290208a19 100644 --- a/vortex-file/src/v2/open/mod.rs +++ b/vortex-file/src/v2/open/mod.rs @@ -30,10 +30,10 @@ pub struct VortexOpenOptions { ctx: ContextRef, /// The Vortex Layout encoding context. layout_ctx: LayoutContextRef, - /// An optional, externally provided, file layout. - file_layout: Option, /// An optional, externally provided, file size. file_size: Option, + /// An optional, externally provided, file layout. + file_layout: Option, // TODO(ngates): also support a messages_middleware that can wrap a message cache to provide // additional caching, metrics, or other intercepts, etc. It should support synchronous // read + write of Map or similar. @@ -50,12 +50,13 @@ impl VortexOpenOptions { Self { ctx, layout_ctx: LayoutContextRef::default(), - file_layout: None, file_size: None, + file_layout: None, initial_read_size: INITIAL_READ_SIZE, split_by: SplitBy::Layout, segment_cache: None, execution_mode: None, + // TODO(ngates): pick some numbers... io_concurrency: 16, } } @@ -105,6 +106,12 @@ impl VortexOpenOptions { pub fn without_segment_cache(self) -> Self { self.with_segment_cache(Arc::new(NoOpSegmentCache)) } + + /// Configure the execution mode + pub fn with_execution_mode(mut self, execution_mode: ExecutionMode) -> Self { + self.execution_mode = Some(execution_mode); + self + } } impl VortexOpenOptions { @@ -131,6 +138,7 @@ impl VortexOpenOptions { read, file_layout: file_layout.clone(), concurrency: self.io_concurrency, + segment_cache, }; // Set up the execution driver. diff --git a/vortex-file/src/v2/segments/cache.rs b/vortex-file/src/v2/segments/cache.rs index f9628f0a5..a135785d9 100644 --- a/vortex-file/src/v2/segments/cache.rs +++ b/vortex-file/src/v2/segments/cache.rs @@ -36,13 +36,14 @@ pub(crate) struct InMemorySegmentCache(RwLock>); #[async_trait] impl SegmentCache for InMemorySegmentCache { - async fn get(&self, id: SegmentId, _alignment: Alignment) -> VortexResult> { + async fn get(&self, id: SegmentId, alignment: Alignment) -> VortexResult> { Ok(self .0 .read() .map_err(|_| vortex_err!("poisoned"))? .get(&id) - .cloned()) + .cloned() + .map(|b| b.aligned(alignment))) } async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> { diff --git a/vortex-file/src/v2/strategy.rs b/vortex-file/src/v2/strategy.rs index 858fd73d5..5debd4073 100644 --- a/vortex-file/src/v2/strategy.rs +++ b/vortex-file/src/v2/strategy.rs @@ -3,6 +3,7 @@ use vortex_dtype::DType; use vortex_error::VortexResult; use vortex_layout::layouts::chunked::writer::{ChunkedLayoutOptions, ChunkedLayoutWriter}; +use vortex_layout::layouts::struct_::writer::StructLayoutWriter; use vortex_layout::strategies::{LayoutStrategy, LayoutWriter, LayoutWriterExt}; /// The default Vortex file layout strategy. @@ -12,6 +13,10 @@ pub struct VortexLayoutStrategy; impl LayoutStrategy for VortexLayoutStrategy { fn new_writer(&self, dtype: &DType) -> VortexResult> { - Ok(ChunkedLayoutWriter::new(dtype, ChunkedLayoutOptions::default()).boxed()) + if dtype.is_struct() { + StructLayoutWriter::try_new_with_factory(dtype, VortexLayoutStrategy).map(|w| w.boxed()) + } else { + Ok(ChunkedLayoutWriter::new(dtype, ChunkedLayoutOptions::default()).boxed()) + } } } diff --git a/vortex-layout/src/layouts/struct_/eval_expr.rs b/vortex-layout/src/layouts/struct_/eval_expr.rs index bd86bf513..afe47e281 100644 --- a/vortex-layout/src/layouts/struct_/eval_expr.rs +++ b/vortex-layout/src/layouts/struct_/eval_expr.rs @@ -16,7 +16,7 @@ use crate::ExprEvaluator; impl ExprEvaluator for StructReader { async fn evaluate_expr(&self, row_mask: RowMask, expr: ExprRef) -> VortexResult { // Partition the expression into expressions that can be evaluated over individual fields - let partitioned = partition(expr, self.struct_dtype())?; + let partitioned = partition(expr.clone(), self.struct_dtype())?; let field_readers: Vec<_> = partitioned .partitions .iter() @@ -49,7 +49,6 @@ impl ExprEvaluator for StructReader { )? .into_array(); - // Recombine the partitioned expressions into a single expression partitioned.root.evaluate(&root_scope) } } diff --git a/vortex-layout/src/layouts/struct_/writer.rs b/vortex-layout/src/layouts/struct_/writer.rs index dda5a3cf7..35249b92f 100644 --- a/vortex-layout/src/layouts/struct_/writer.rs +++ b/vortex-layout/src/layouts/struct_/writer.rs @@ -69,7 +69,10 @@ impl LayoutWriter for StructLayoutWriter { .vortex_expect("batch is a struct array") .maybe_null_field_by_idx(i) .vortex_expect("bounds already checked"); - self.column_strategies[i].push_chunk(segments, column)?; + + for column_chunk in column.into_array_iterator() { + self.column_strategies[i].push_chunk(segments, column_chunk?)?; + } } Ok(()) diff --git a/vortex-scan/src/range_scan.rs b/vortex-scan/src/range_scan.rs index 6261712df..55daf862b 100644 --- a/vortex-scan/src/range_scan.rs +++ b/vortex-scan/src/range_scan.rs @@ -83,6 +83,9 @@ impl RangeScan { /// Check for the next operation to perform. /// Returns `Poll::Ready` when the scan is complete. + /// + // FIXME(ngates): currently we have to clone the FilterMask to return it. Doing this + // forces the eager evaluation of the iterators. fn next(&self) -> NextOp { match &self.state { State::FilterEval((mask, expr)) => {