From a23149459a9452c1ce1cb7e16c52b9146d7e0175 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 21 Nov 2024 20:32:18 +0000 Subject: [PATCH] LayoutReader::read_selection uses immutable reference (#1295) --- Cargo.lock | 13 - Cargo.toml | 1 - pyvortex/src/dataset.rs | 9 +- vortex-datafusion/src/persistent/format.rs | 13 +- vortex-dtype/src/serde/flatbuffers/project.rs | 11 +- vortex-file/Cargo.toml | 4 - vortex-file/src/lib.rs | 6 +- vortex-file/src/read/buffered.rs | 86 ------ vortex-file/src/read/builder/initial_read.rs | 16 +- vortex-file/src/read/builder/mod.rs | 20 +- vortex-file/src/read/cache.rs | 236 +++++++++++---- vortex-file/src/read/column_batch.rs | 119 -------- vortex-file/src/read/context.rs | 29 +- vortex-file/src/read/layouts/chunked.rs | 271 +++++++++++++----- vortex-file/src/read/layouts/columnar.rs | 227 +++++++++------ vortex-file/src/read/layouts/flat.rs | 32 +-- vortex-file/src/read/layouts/inline_dtype.rs | 118 +++----- vortex-file/src/read/layouts/mod.rs | 12 +- vortex-file/src/read/mod.rs | 8 +- .../src/read}/projection.rs | 13 + vortex-file/src/read/stream.rs | 18 +- vortex-file/src/write/layout.rs | 18 +- vortex-file/src/write/mod.rs | 2 +- vortex-file/src/write/writer.rs | 19 +- vortex-schema/Cargo.toml | 21 -- vortex-schema/src/lib.rs | 53 ---- vortex-serde/src/layouts/write/mod.rs | 7 + vortex/Cargo.toml | 1 - vortex/src/lib.rs | 2 +- 29 files changed, 672 insertions(+), 713 deletions(-) delete mode 100644 vortex-file/src/read/buffered.rs delete mode 100644 vortex-file/src/read/column_batch.rs rename {vortex-schema/src => vortex-file/src/read}/projection.rs (58%) delete mode 100644 vortex-schema/Cargo.toml delete mode 100644 vortex-schema/src/lib.rs create mode 100644 vortex-serde/src/layouts/write/mod.rs diff --git a/Cargo.lock b/Cargo.lock index c28dfccf83..a08230e71a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4659,7 +4659,6 @@ dependencies = [ "vortex-runend-bool", "vortex-sampling-compressor", "vortex-scalar", - "vortex-schema", "vortex-zigzag", ] @@ -4891,7 +4890,6 @@ dependencies = [ "arrow-array", "arrow-buffer", "arrow-schema", - "arrow-select", "bytes", "flatbuffers", "futures", @@ -4900,7 +4898,6 @@ dependencies = [ "itertools 0.13.0", "once_cell", "rstest", - "tempfile", "tokio", "tracing", "vortex-array", @@ -4911,9 +4908,7 @@ dependencies = [ "vortex-flatbuffers", "vortex-io", "vortex-ipc", - "vortex-sampling-compressor", "vortex-scalar", - "vortex-schema", ] [[package]] @@ -5098,14 +5093,6 @@ dependencies = [ "vortex-proto", ] -[[package]] -name = "vortex-schema" -version = "0.19.0" -dependencies = [ - "vortex-dtype", - "vortex-error", -] - [[package]] name = "vortex-zigzag" version = "0.19.0" diff --git a/Cargo.toml b/Cargo.toml index a35da4f32b..ca4b63b628 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,6 @@ members = [ "vortex-proto", "vortex-sampling-compressor", "vortex-scalar", - "vortex-schema", "xtask", ] resolver = "2" diff --git a/pyvortex/src/dataset.rs b/pyvortex/src/dataset.rs index 1e866ddf7f..99549f54c5 100644 --- a/pyvortex/src/dataset.rs +++ b/pyvortex/src/dataset.rs @@ -9,7 +9,7 @@ use pyo3::types::{PyLong, PyString}; use vortex::arrow::infer_schema; use vortex::dtype::field::Field; use vortex::dtype::DType; -use vortex::error::{vortex_err, VortexResult}; +use vortex::error::VortexResult; use vortex::file::{ read_initial_bytes, LayoutContext, LayoutDeserializer, Projection, RowFilter, VortexFileArrayStream, VortexReadBuilder, VortexRecordBatchReader, @@ -62,12 +62,7 @@ pub async fn read_array_from_reader( pub async fn read_dtype_from_reader(reader: T) -> VortexResult { let initial_read = read_initial_bytes(&reader, reader.size().await).await?; - DType::try_from( - initial_read - .fb_schema()? - .dtype() - .ok_or_else(|| vortex_err!("Failed to fetch dtype from initial read"))?, - ) + initial_read.lazy_dtype()?.value().cloned() } fn projection_from_python(columns: Option>>) -> PyResult { diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 4000cde050..d408eb2d8d 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -17,8 +17,7 @@ use datafusion_physical_plan::ExecutionPlan; use object_store::{ObjectMeta, ObjectStore}; use vortex_array::arrow::infer_schema; use vortex_array::Context; -use vortex_dtype::DType; -use vortex_file::VORTEX_FILE_EXTENSION; +use vortex_file::{read_initial_bytes, VORTEX_FILE_EXTENSION}; use vortex_io::ObjectStoreReadAt; use super::execution::VortexExec; @@ -68,11 +67,9 @@ impl FileFormat for VortexFormat { let mut file_schemas = Vec::default(); for o in objects { let os_read_at = ObjectStoreReadAt::new(store.clone(), o.location.clone()); - let initial_read = vortex_file::read_initial_bytes(&os_read_at, o.size as u64).await?; - let dtype = DType::try_from(initial_read.fb_schema()?.dtype().ok_or_else(|| { - DataFusionError::External("Failed to fetch dtype from initial read".into()) - })?)?; - let s = infer_schema(&dtype)?; + let initial_read = read_initial_bytes(&os_read_at, o.size as u64).await?; + let lazy_dtype = initial_read.lazy_dtype()?; + let s = infer_schema(lazy_dtype.value()?)?; file_schemas.push(s); } @@ -87,7 +84,7 @@ impl FileFormat for VortexFormat { object: &ObjectMeta, ) -> DFResult { let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); - let initial_read = vortex_file::read_initial_bytes(&os_read_at, object.size as u64).await?; + let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?; let layout = initial_read.fb_layout()?; let row_count = layout.row_count(); diff --git a/vortex-dtype/src/serde/flatbuffers/project.rs b/vortex-dtype/src/serde/flatbuffers/project.rs index d57727153f..308ff8447c 100644 --- a/vortex-dtype/src/serde/flatbuffers/project.rs +++ b/vortex-dtype/src/serde/flatbuffers/project.rs @@ -23,8 +23,17 @@ pub fn resolve_field<'a, 'b: 'a>(fb: fb::Struct_<'b>, field: &'a Field) -> Vorte } } +/// Deserialize single field out of a struct dtype and as a top level dtype +pub fn extract_field(fb_dtype: fb::DType<'_>, field: &Field) -> VortexResult { + let fb_struct = fb_dtype + .type__as_struct_() + .ok_or_else(|| vortex_err!("The top-level type should be a struct"))?; + let (_, dtype) = read_field(fb_struct, resolve_field(fb_struct, field)?)?; + Ok(dtype) +} + /// Deserialize flatbuffer schema selecting only columns defined by projection -pub fn deserialize_and_project( +pub fn project_and_deserialize( fb_dtype: fb::DType<'_>, projection: &[Field], ) -> VortexResult { diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index c6096cea9c..1e49922e8e 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -39,15 +39,11 @@ vortex-flatbuffers = { workspace = true, features = ["file"] } vortex-io = { workspace = true, features = ["tokio"] } vortex-ipc = { workspace = true } vortex-scalar = { workspace = true, features = ["flatbuffers"] } -vortex-schema = { workspace = true } [dev-dependencies] arrow-schema = { workspace = true } -arrow-select = { workspace = true } rstest = { workspace = true } -tempfile = { workspace = true } tokio = { workspace = true, features = ["full"] } -vortex-sampling-compressor = { path = "../vortex-sampling-compressor" } [lints] workspace = true diff --git a/vortex-file/src/lib.rs b/vortex-file/src/lib.rs index 74f7d6ded3..156d36d854 100644 --- a/vortex-file/src/lib.rs +++ b/vortex-file/src/lib.rs @@ -4,15 +4,15 @@ //! A layout is a serialized array which is stored in some linear and contiguous block of //! memory. Layouts are recursively defined in terms of one of three kinds: //! -//! 1. The [flat layout][layouts::FlatLayoutSpec]. A contiguously serialized array using the [Vortex +//! 1. The [flat layout][layouts::FlatLayout]. A contiguously serialized array using the [Vortex //! flatbuffer Batch message][vortex_flatbuffers::message]. //! -//! 2. The [columnar layout][layouts::ColumnarLayoutSpec]. Each column of a +//! 2. The [columnar layout][layouts::ColumnarLayout]. Each column of a //! [StructArray][vortex_array::array::StructArray] is sequentially laid out at known //! offsets. This permits reading a subset of columns in time linear in the number of kept //! columns. //! -//! 3. The [chunked layout][layouts::ChunkedLayoutSpec]. Each chunk of a +//! 3. The [chunked layout][layouts::ChunkedLayout]. Each chunk of a //! [ChunkedArray][vortex_array::array::ChunkedArray] is sequentially laid out at known //! offsets. This permits reading a subset of rows in time linear in the number of kept rows. //! diff --git a/vortex-file/src/read/buffered.rs b/vortex-file/src/read/buffered.rs deleted file mode 100644 index e89687f97e..0000000000 --- a/vortex-file/src/read/buffered.rs +++ /dev/null @@ -1,86 +0,0 @@ -use std::collections::VecDeque; -use std::mem; - -use vortex_array::array::ChunkedArray; -use vortex_array::{ArrayDType, ArrayData, IntoArrayData}; -use vortex_error::VortexResult; - -use crate::read::mask::RowMask; -use crate::read::{BatchRead, LayoutReader, MessageLocator}; - -pub type RangedLayoutReader = ((usize, usize), Box); - -/// Layout reader that continues reading children until all rows referenced in the mask have been handled -#[derive(Debug)] -pub struct BufferedLayoutReader { - layouts: VecDeque, - arrays: Vec, -} - -impl BufferedLayoutReader { - pub fn new(layouts: VecDeque) -> Self { - Self { - layouts, - arrays: Vec::new(), - } - } - - // TODO(robert): Support out of order reads - fn buffer_read(&mut self, mask: &RowMask) -> VortexResult>> { - while let Some(((begin, end), layout)) = self.layouts.pop_front() { - // Since ends are exclusive there's only overlap if either end is larger/smaller by at least 1 - if mask.end() > begin && mask.begin() < end { - self.layouts.push_front(((begin, end), layout)); - break; - } - } - - while let Some(((begin, end), mut layout)) = self.layouts.pop_front() { - // This selection doesn't know about rows in this chunk, we should put it back and wait for another request with different range - if mask.end() <= begin || mask.begin() >= end { - self.layouts.push_front(((begin, end), layout)); - return Ok(None); - } - let layout_selection = mask.slice(begin, end).shift(begin)?; - if let Some(rr) = layout.read_selection(&layout_selection)? { - match rr { - BatchRead::ReadMore(m) => { - self.layouts.push_front(((begin, end), layout)); - return Ok(Some(m)); - } - BatchRead::Batch(a) => { - self.arrays.push(a); - if end > mask.end() { - self.layouts.push_front(((begin, end), layout)); - return Ok(None); - } - } - } - } else { - if end > mask.end() && begin < mask.end() { - self.layouts.push_front(((begin, end), layout)); - return Ok(None); - } - continue; - } - } - Ok(None) - } - - pub fn read_next(&mut self, mask: &RowMask) -> VortexResult> { - if let Some(bufs) = self.buffer_read(mask)? { - return Ok(Some(BatchRead::ReadMore(bufs))); - } - - let mut result = mem::take(&mut self.arrays); - match result.len() { - 0 | 1 => Ok(result.pop().map(BatchRead::Batch)), - _ => { - let dtype = result[0].dtype().clone(); - Ok(Some(BatchRead::Batch( - ChunkedArray::try_new(result, dtype)?.into_array(), - ))) - } - } - } -} diff --git a/vortex-file/src/read/builder/initial_read.rs b/vortex-file/src/read/builder/initial_read.rs index 38898d6bcf..4b0e8b6925 100644 --- a/vortex-file/src/read/builder/initial_read.rs +++ b/vortex-file/src/read/builder/initial_read.rs @@ -5,10 +5,9 @@ use flatbuffers::{root, root_unchecked}; use vortex_error::{vortex_bail, vortex_err, VortexResult}; use vortex_flatbuffers::{footer, message}; use vortex_io::VortexReadAt; -use vortex_schema::projection::Projection; use crate::{ - LayoutDeserializer, LayoutReader, LazilyDeserializedDType, RelativeLayoutCache, Scan, EOF_SIZE, + LayoutDeserializer, LayoutReader, LazyDType, RelativeLayoutCache, Scan, EOF_SIZE, INITIAL_READ_SIZE, MAGIC_BYTES, VERSION, }; @@ -51,17 +50,11 @@ impl InitialRead { Ok(schema_start..schema_end) } - /// The `Schema` flatbuffer. - pub fn fb_schema(&self) -> VortexResult { - Ok(unsafe { root_unchecked::(&self.buf[self.fb_schema_byte_range()?]) }) - } - - pub fn lazy_dtype(&self) -> VortexResult { + pub fn lazy_dtype(&self) -> VortexResult { // we validated the schema bytes at construction time unsafe { - Ok(LazilyDeserializedDType::from_schema_bytes( + Ok(LazyDType::from_schema_bytes( self.buf.slice(self.fb_schema_byte_range()?), - Projection::All, )) } } @@ -176,8 +169,7 @@ pub async fn read_initial_bytes( #[cfg(test)] mod tests { - use super::*; - use crate::MAX_FOOTER_SIZE; + use crate::{EOF_SIZE, INITIAL_READ_SIZE, MAX_FOOTER_SIZE}; #[test] fn big_enough_initial_read() { diff --git a/vortex-file/src/read/builder/mod.rs b/vortex-file/src/read/builder/mod.rs index 8569804fd5..7f0c4be940 100644 --- a/vortex-file/src/read/builder/mod.rs +++ b/vortex-file/src/read/builder/mod.rs @@ -2,16 +2,14 @@ use std::sync::{Arc, RwLock}; use initial_read::{read_initial_bytes, read_layout_from_initial}; use vortex_array::{ArrayDType, ArrayData}; -use vortex_dtype::flatbuffers::deserialize_and_project; -use vortex_dtype::DType; -use vortex_error::{vortex_err, VortexResult}; +use vortex_error::VortexResult; use vortex_expr::Select; use vortex_io::{IoDispatcher, VortexReadAt}; -use vortex_schema::projection::Projection; use crate::read::cache::{LayoutMessageCache, RelativeLayoutCache}; use crate::read::context::LayoutDeserializer; use crate::read::filtering::RowFilter; +use crate::read::projection::Projection; use crate::read::stream::VortexFileArrayStream; use crate::read::{RowMask, Scan}; @@ -121,23 +119,17 @@ impl VortexReadBuilder { let initial_read = read_initial_bytes(&self.read_at, self.size().await).await?; let layout = initial_read.fb_layout()?; - let schema = initial_read.fb_schema()?; let row_count = layout.row_count(); let read_projection = self.projection.unwrap_or_default(); + let lazy_dtype = Arc::new(initial_read.lazy_dtype()?); - let projected_dtype = { - let fb_dtype = schema - .dtype() - .ok_or_else(|| vortex_err!(InvalidSerde: "Schema missing DType"))?; - match read_projection { - Projection::All => DType::try_from(fb_dtype)?, - Projection::Flat(ref projection) => deserialize_and_project(fb_dtype, projection)?, - } + let projected_dtype = match read_projection { + Projection::All => lazy_dtype.clone(), + Projection::Flat(ref fields) => lazy_dtype.project(fields)?, }; let message_cache = Arc::new(RwLock::new(LayoutMessageCache::default())); - let lazy_dtype = Arc::new(initial_read.lazy_dtype()?); let layout_reader = read_layout_from_initial( &initial_read, &self.layout_serde, diff --git a/vortex-file/src/read/cache.rs b/vortex-file/src/read/cache.rs index f76592e598..22a41201e0 100644 --- a/vortex-file/src/read/cache.rs +++ b/vortex-file/src/read/cache.rs @@ -1,3 +1,4 @@ +use std::fmt::Debug; use std::sync::{Arc, RwLock}; use bytes::Bytes; @@ -5,12 +6,13 @@ use flatbuffers::root_unchecked; use once_cell::sync::OnceCell; use vortex_array::aliases::hash_map::HashMap; use vortex_dtype::field::Field; -use vortex_dtype::flatbuffers::{deserialize_and_project, resolve_field}; -use vortex_dtype::DType; -use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexExpect, VortexResult}; +use vortex_dtype::flatbuffers::{extract_field, project_and_deserialize, resolve_field}; +use vortex_dtype::{DType, FieldNames}; +use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexResult}; +use vortex_flatbuffers::dtype::Struct_; use vortex_flatbuffers::message; -use vortex_schema::projection::Projection; +use crate::read::projection::Projection; use crate::read::{LayoutPartId, MessageId}; #[derive(Default, Debug)] @@ -38,83 +40,165 @@ impl LayoutMessageCache { } } +#[derive(Debug)] +enum SerializedDTypeField { + Projection(Projection), + Field(Field), +} + +impl SerializedDTypeField { + pub fn project(&self, fields: &[Field]) -> VortexResult { + match self { + SerializedDTypeField::Projection(p) => { + Ok(SerializedDTypeField::Projection(p.project(fields)?)) + } + SerializedDTypeField::Field(f) => { + if fields.len() > 1 && &fields[0] != f { + vortex_bail!("Can't project field {f} into {fields:?}") + } + Ok(SerializedDTypeField::Field(f.clone())) + } + } + } + + pub fn field(&self, field: &Field) -> VortexResult { + match self { + SerializedDTypeField::Projection(p) => { + match p { + Projection::All => {} + Projection::Flat(fields) => { + if !fields.iter().any(|pf| pf == field) { + vortex_bail!("Can't project {fields:?} into {field}") + } + } + } + Ok(SerializedDTypeField::Field(field.clone())) + } + SerializedDTypeField::Field(f) => { + if f != field { + vortex_bail!("Can't extract field from field") + } + Ok(SerializedDTypeField::Field(field.clone())) + } + } + } +} + #[derive(Debug)] enum LazyDTypeState { - Value(DType), - Serialized(Bytes, OnceCell, Projection), + DType(DType), + Serialized(Bytes, OnceCell, SerializedDTypeField), + Unknown, } #[derive(Debug)] -pub struct LazilyDeserializedDType { +pub struct LazyDType { inner: LazyDTypeState, } -impl LazilyDeserializedDType { +impl LazyDType { /// Create a LazilyDeserializedDType from a flatbuffer schema bytes /// i.e., these bytes need to be deserializable as message::Schema /// /// # Safety /// This function is unsafe because it trusts the caller to pass in a valid flatbuffer /// representing a message::Schema. - pub unsafe fn from_schema_bytes(schema_bytes: Bytes, projection: Projection) -> Self { + pub unsafe fn from_schema_bytes(dtype_bytes: Bytes) -> Self { Self { - inner: LazyDTypeState::Serialized(schema_bytes, OnceCell::new(), projection), + inner: LazyDTypeState::Serialized( + dtype_bytes, + OnceCell::new(), + SerializedDTypeField::Projection(Projection::All), + ), } } pub fn from_dtype(dtype: DType) -> Self { Self { - inner: LazyDTypeState::Value(dtype), + inner: LazyDTypeState::DType(dtype), + } + } + + pub fn unknown() -> Self { + Self { + inner: LazyDTypeState::Unknown, } } /// Restrict the underlying dtype to selected fields - pub fn project(&self, projection: &[Field]) -> VortexResult> { + pub fn project(&self, fields: &[Field]) -> VortexResult> { match &self.inner { - LazyDTypeState::Value(dtype) => { + LazyDTypeState::DType(dtype) => { let DType::Struct(sdt, n) = dtype else { vortex_bail!("Not a struct dtype") }; - Ok(Arc::new(LazilyDeserializedDType::from_dtype( - DType::Struct(sdt.project(projection)?, *n), + Ok(Arc::new(LazyDType::from_dtype(DType::Struct( + sdt.project(fields)?, + *n, + )))) + } + LazyDTypeState::Serialized(b, _, current_projection) => Ok(Arc::new(Self { + inner: LazyDTypeState::Serialized( + b.clone(), + OnceCell::new(), + current_projection.project(fields)?, + ), + })), + LazyDTypeState::Unknown => vortex_bail!("Unknown dtype"), + } + } + + /// Extract single field out of this dtype + pub fn field(&self, field: &Field) -> VortexResult> { + match &self.inner { + LazyDTypeState::DType(dtype) => { + let DType::Struct(sdt, _) = dtype else { + vortex_bail!("Not a struct dtype") + }; + Ok(Arc::new(LazyDType::from_dtype( + sdt.field_info(field)?.dtype.clone(), ))) } - LazyDTypeState::Serialized(b, _, proj) => { - let projection = match proj { - Projection::All => Projection::Flat(projection.to_vec()), - // TODO(robert): Respect existing projection list, only really an issue for nested structs - Projection::Flat(_) => vortex_bail!("Can't project already projected dtype"), + LazyDTypeState::Serialized(b, _, current_projection) => Ok(Arc::new(Self { + inner: LazyDTypeState::Serialized( + b.clone(), + OnceCell::new(), + current_projection.field(field)?, + ), + })), + LazyDTypeState::Unknown => vortex_bail!("Unknown dtype"), + } + } + + /// Extract field names from the underlying dtype if there are any + pub fn names(&self) -> VortexResult { + match &self.inner { + LazyDTypeState::DType(dtype) => { + let DType::Struct(sdt, _) = dtype else { + vortex_bail!("Not a struct dtype") }; - unsafe { - Ok(Arc::new(LazilyDeserializedDType::from_schema_bytes( - b.clone(), - projection, - ))) - } + Ok(sdt.names().clone()) } + LazyDTypeState::Serialized(b, _, proj) => field_names(b, proj), + LazyDTypeState::Unknown => vortex_bail!("Unknown dtype"), } } /// Get vortex dtype out of serialized bytes pub fn value(&self) -> VortexResult<&DType> { match &self.inner { - LazyDTypeState::Value(dtype) => Ok(dtype), - LazyDTypeState::Serialized(bytes, cache, proj) => cache.get_or_try_init(|| { - let fb_dtype = Self::fb_schema(bytes)? - .dtype() - .ok_or_else(|| vortex_err!(InvalidSerde: "Schema missing DType"))?; - match &proj { - Projection::All => DType::try_from(fb_dtype), - Projection::Flat(p) => deserialize_and_project(fb_dtype, p), - } - }), + LazyDTypeState::DType(dtype) => Ok(dtype), + LazyDTypeState::Serialized(bytes, cache, proj) => { + cache.get_or_try_init(|| project_dtype_bytes(bytes, proj)) + } + LazyDTypeState::Unknown => vortex_bail!("Unknown dtype"), } } /// Convert all name based references to index based to create globally addressable filter pub(crate) fn resolve_field(&self, field: &Field) -> VortexResult { match &self.inner { - LazyDTypeState::Value(dtype) => { + LazyDTypeState::DType(dtype) => { let DType::Struct(sdt, _) = dtype else { vortex_bail!("Trying to resolve fields in non struct dtype") }; @@ -127,55 +211,85 @@ impl LazilyDeserializedDType { Field::Index(i) => Ok(*i), } } - LazyDTypeState::Serialized(b, ..) => { - let fb_struct = Self::fb_schema(b.as_ref())? - .dtype() - .and_then(|d| d.type__as_struct_()) - .ok_or_else(|| vortex_err!("The top-level type should be a struct"))?; - resolve_field(fb_struct, field) - } + LazyDTypeState::Serialized(b, ..) => resolve_field(fb_struct(b.as_ref())?, field), + LazyDTypeState::Unknown => vortex_bail!("Unknown dtype"), } } +} - fn fb_schema(bytes: &[u8]) -> VortexResult { - Ok(unsafe { root_unchecked::(bytes) }) +fn field_names(bytes: &[u8], dtype_field: &SerializedDTypeField) -> VortexResult { + let struct_field = fb_struct(bytes)?; + let names = struct_field + .names() + .ok_or_else(|| vortex_err!("Not a struct dtype"))?; + match dtype_field { + SerializedDTypeField::Projection(projection) => match projection { + Projection::All => Ok(names.iter().map(Arc::from).collect()), + Projection::Flat(fields) => fields + .iter() + .map(|f| resolve_field(struct_field, f)) + .map(|idx| idx.map(|i| Arc::from(names.get(i)))) + .collect(), + }, + SerializedDTypeField::Field(f) => Ok(Arc::new([Arc::from( + names.get(resolve_field(struct_field, f)?), + )])), } } -#[derive(Debug)] +fn project_dtype_bytes(bytes: &[u8], dtype_field: &SerializedDTypeField) -> VortexResult { + let fb_dtype = fb_schema(bytes) + .dtype() + .ok_or_else(|| vortex_err!(InvalidSerde: "Schema missing DType"))?; + + match dtype_field { + SerializedDTypeField::Projection(projection) => match projection { + Projection::All => DType::try_from(fb_dtype), + Projection::Flat(p) => project_and_deserialize(fb_dtype, p), + }, + SerializedDTypeField::Field(f) => extract_field(fb_dtype, f), + } +} + +fn fb_struct(bytes: &[u8]) -> VortexResult { + fb_schema(bytes) + .dtype() + .and_then(|d| d.type__as_struct_()) + .ok_or_else(|| vortex_err!("The top-level type should be a struct")) +} + +fn fb_schema(bytes: &[u8]) -> message::Schema { + unsafe { root_unchecked::(bytes) } +} + +#[derive(Debug, Clone)] pub struct RelativeLayoutCache { root: Arc>, - dtype: Option>, + dtype: Arc, path: MessageId, } impl RelativeLayoutCache { - pub fn new(root: Arc>, dtype: Arc) -> Self { + pub fn new(root: Arc>, dtype: Arc) -> Self { Self { root, - dtype: Some(dtype), + dtype, path: Vec::new(), } } - pub fn relative(&self, id: LayoutPartId, dtype: Arc) -> Self { + pub fn relative(&self, id: LayoutPartId, dtype: Arc) -> Self { let mut new_path = self.path.clone(); new_path.push(id); Self { root: self.root.clone(), path: new_path, - dtype: Some(dtype), + dtype, } } pub fn unknown_dtype(&self, id: LayoutPartId) -> Self { - let mut new_path = self.path.clone(); - new_path.push(id); - Self { - root: self.root.clone(), - path: new_path, - dtype: None, - } + self.relative(id, Arc::new(LazyDType::unknown())) } pub fn get(&self, path: &[LayoutPartId]) -> Option { @@ -204,8 +318,8 @@ impl RelativeLayoutCache { .remove(&self.absolute_id(path)) } - pub fn dtype(&self) -> &Arc { - self.dtype.as_ref().vortex_expect("Must have dtype") + pub fn dtype(&self) -> &Arc { + &self.dtype } pub fn absolute_id(&self, path: &[LayoutPartId]) -> MessageId { diff --git a/vortex-file/src/read/column_batch.rs b/vortex-file/src/read/column_batch.rs deleted file mode 100644 index 7c3c3fc479..0000000000 --- a/vortex-file/src/read/column_batch.rs +++ /dev/null @@ -1,119 +0,0 @@ -use std::collections::BTreeSet; -use std::mem; - -use vortex_array::array::StructArray; -use vortex_array::stats::ArrayStatistics; -use vortex_array::validity::Validity; -use vortex_array::{ArrayData, IntoArrayData}; -use vortex_dtype::FieldNames; -use vortex_error::{vortex_err, VortexExpect, VortexResult}; -use vortex_expr::ExprRef; - -use crate::read::mask::RowMask; -use crate::read::{BatchRead, LayoutReader}; - -/// Read multiple layouts by combining them into one struct array -/// -/// Result can be optionally reduced with an expression, i.e. to produce a bitmask for other columns -#[derive(Debug)] -pub struct ColumnBatchReader { - names: FieldNames, - children: Vec>, - arrays: Vec>, - expr: Option, - // TODO(robert): This is a hack/optimization that tells us if we're reducing results with AND or not - shortcircuit_siblings: bool, -} - -impl ColumnBatchReader { - pub fn new( - names: FieldNames, - children: Vec>, - expr: Option, - shortcircuit_siblings: bool, - ) -> Self { - assert_eq!( - names.len(), - children.len(), - "Names and children must be of same length" - ); - let arrays = vec![None; children.len()]; - Self { - names, - children, - arrays, - expr, - shortcircuit_siblings, - } - } -} - -impl LayoutReader for ColumnBatchReader { - fn add_splits(&self, row_offset: usize, splits: &mut BTreeSet) -> VortexResult<()> { - for child in &self.children { - child.add_splits(row_offset, splits)? - } - Ok(()) - } - - fn read_selection(&mut self, selection: &RowMask) -> VortexResult> { - let mut messages = Vec::new(); - for (i, child_array) in self - .arrays - .iter_mut() - .enumerate() - .filter(|(_, a)| a.is_none()) - { - match self.children[i].read_selection(selection)? { - Some(rr) => match rr { - BatchRead::ReadMore(message) => { - messages.extend(message); - } - BatchRead::Batch(arr) => { - if self.shortcircuit_siblings - && arr.statistics().compute_true_count().vortex_expect( - "must be a bool array if shortcircuit_siblings is set to true", - ) == 0 - { - // Reset local state when short circuiting, these arrays are no longer necessary - self.arrays = vec![None; self.children.len()]; - return Ok(None); - } - *child_array = Some(arr) - } - }, - None => { - debug_assert!( - self.arrays.iter().all(Option::is_none), - "Expected layout {}({i}) to produce an array but it was empty", - self.names[i] - ); - return Ok(None); - } - } - } - - if messages.is_empty() { - let child_arrays = mem::replace(&mut self.arrays, vec![None; self.children.len()]) - .into_iter() - .enumerate() - .map(|(i, a)| a.ok_or_else(|| vortex_err!("Missing child array at index {i}"))) - .collect::>>()?; - let len = child_arrays - .first() - .map(|l| l.len()) - .unwrap_or(selection.len()); - let array = - StructArray::try_new(self.names.clone(), child_arrays, len, Validity::NonNullable)? - .into_array(); - self.expr - .as_ref() - .map(|e| e.evaluate(&array)) - .unwrap_or_else(|| Ok(array)) - .map(BatchRead::Batch) - .map(Some) - } else { - Ok(Some(BatchRead::ReadMore(messages))) - } - } -} diff --git a/vortex-file/src/read/context.rs b/vortex-file/src/read/context.rs index 505ffc2590..e8c027c34f 100644 --- a/vortex-file/src/read/context.rs +++ b/vortex-file/src/read/context.rs @@ -7,11 +7,8 @@ use vortex_array::Context; use vortex_error::{vortex_err, VortexResult}; use vortex_flatbuffers::footer as fb; -use crate::read::cache::RelativeLayoutCache; -use crate::read::layouts::{ - ChunkedLayoutSpec, ColumnarLayoutSpec, FlatLayoutSpec, InlineDTypeLayoutSpec, -}; -use crate::read::{LayoutReader, Scan}; +use crate::layouts::{ChunkedLayout, ColumnarLayout, FlatLayout, InlineDTypeLayout}; +use crate::{LayoutReader, RelativeLayoutCache, Scan}; #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] pub struct LayoutId(pub u16); @@ -22,10 +19,10 @@ impl Display for LayoutId { } } -pub trait LayoutSpec: Debug + Send + Sync { +pub trait Layout: Debug + Send + Sync { fn id(&self) -> LayoutId; - fn layout_reader( + fn reader( &self, fb_bytes: Bytes, fb_loc: usize, @@ -35,19 +32,19 @@ pub trait LayoutSpec: Debug + Send + Sync { ) -> VortexResult>; } -pub type LayoutSpecRef = &'static dyn LayoutSpec; +pub type LayoutRef = &'static dyn Layout; #[derive(Debug, Clone)] pub struct LayoutContext { - layout_refs: HashMap, + layout_refs: HashMap, } impl LayoutContext { - pub fn new(layout_refs: HashMap) -> Self { + pub fn new(layout_refs: HashMap) -> Self { Self { layout_refs } } - pub fn lookup_layout(&self, id: &LayoutId) -> Option { + pub fn lookup_layout(&self, id: &LayoutId) -> Option { self.layout_refs.get(id).cloned() } } @@ -56,10 +53,10 @@ impl Default for LayoutContext { fn default() -> Self { Self::new( [ - &ColumnarLayoutSpec as LayoutSpecRef, - &ChunkedLayoutSpec, - &InlineDTypeLayoutSpec, - &FlatLayoutSpec, + &ColumnarLayout as LayoutRef, + &ChunkedLayout, + &InlineDTypeLayout, + &FlatLayout, ] .into_iter() .map(|l| (l.id(), l)) @@ -94,7 +91,7 @@ impl LayoutDeserializer { self.layout_ctx .lookup_layout(&layout_id) .ok_or_else(|| vortex_err!("Unknown layout definition {layout_id}"))? - .layout_reader(fb_bytes, fb_loc, scan, self.clone(), message_cache) + .reader(fb_bytes, fb_loc, scan, self.clone(), message_cache) } pub(crate) fn ctx(&self) -> Arc { diff --git a/vortex-file/src/read/layouts/chunked.rs b/vortex-file/src/read/layouts/chunked.rs index d5a5918332..21aa568bc2 100644 --- a/vortex-file/src/read/layouts/chunked.rs +++ b/vortex-file/src/read/layouts/chunked.rs @@ -1,26 +1,35 @@ -use std::collections::{BTreeSet, VecDeque}; +use std::collections::BTreeSet; +use std::sync::RwLock; use bytes::Bytes; use itertools::Itertools; -use vortex_error::{vortex_err, VortexResult}; +use vortex_array::aliases::hash_map::HashMap; +use vortex_array::array::ChunkedArray; +use vortex_array::{ArrayDType, ArrayData, IntoArrayData}; +use vortex_error::{vortex_err, vortex_panic, VortexResult}; use vortex_flatbuffers::footer; -use crate::read::buffered::{BufferedLayoutReader, RangedLayoutReader}; +use crate::layouts::RangedLayoutReader; use crate::read::cache::RelativeLayoutCache; use crate::read::mask::RowMask; use crate::{ - BatchRead, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, LayoutSpec, Scan, - CHUNKED_LAYOUT_ID, + BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, MessageLocator, + Scan, CHUNKED_LAYOUT_ID, }; + #[derive(Default, Debug)] -pub struct ChunkedLayoutSpec; +pub struct ChunkedLayout; -impl LayoutSpec for ChunkedLayoutSpec { +/// In memory representation of Chunked NestedLayout. +/// +/// First child in the list is the metadata table +/// Subsequent children are consecutive chunks of this layout +impl Layout for ChunkedLayout { fn id(&self) -> LayoutId { CHUNKED_LAYOUT_ID } - fn layout_reader( + fn reader( &self, fb_bytes: Bytes, fb_loc: usize, @@ -28,13 +37,16 @@ impl LayoutSpec for ChunkedLayoutSpec { layout_builder: LayoutDeserializer, message_cache: RelativeLayoutCache, ) -> VortexResult> { - Ok(Box::new(ChunkedLayout::new( - fb_bytes, - fb_loc, - scan, - layout_builder, - message_cache, - ))) + Ok(Box::new( + ChunkedLayoutBuilder { + fb_bytes, + fb_loc, + scan, + layout_builder, + message_cache, + } + .build()?, + )) } } @@ -44,34 +56,15 @@ const METADATA_LAYOUT_PART_ID: LayoutPartId = 0; /// /// First child in the list is the metadata table /// Subsequent children are consecutive chunks of this layout -#[derive(Debug)] -pub struct ChunkedLayout { +struct ChunkedLayoutBuilder { fb_bytes: Bytes, fb_loc: usize, scan: Scan, layout_builder: LayoutDeserializer, message_cache: RelativeLayoutCache, - chunk_reader: Option, } -impl ChunkedLayout { - pub fn new( - fb_bytes: Bytes, - fb_loc: usize, - scan: Scan, - layout_builder: LayoutDeserializer, - message_cache: RelativeLayoutCache, - ) -> Self { - Self { - fb_bytes, - fb_loc, - scan, - layout_builder, - message_cache, - chunk_reader: None, - } - } - +impl ChunkedLayoutBuilder { fn flatbuffer(&self) -> footer::Layout { unsafe { let tab = flatbuffers::Table::new(&self.fb_bytes, self.fb_loc); @@ -79,7 +72,6 @@ impl ChunkedLayout { } } - #[allow(dead_code)] fn metadata_layout(&self) -> VortexResult>> { self.has_metadata() .then(|| { @@ -91,6 +83,7 @@ impl ChunkedLayout { self.layout_builder.read_layout( self.fb_bytes.clone(), metadata_fb._tab.loc(), + // TODO(robert): Create stats projection Scan::new(None), self.message_cache.unknown_dtype(METADATA_LAYOUT_PART_ID), ) @@ -105,12 +98,6 @@ impl ChunkedLayout { .unwrap_or(false) } - #[allow(dead_code)] - fn n_chunks(&self) -> usize { - self.flatbuffer().children().unwrap_or_default().len() - - (if self.has_metadata() { 1 } else { 0 }) - } - fn children(&self) -> impl Iterator { self.flatbuffer() .children() @@ -120,7 +107,7 @@ impl ChunkedLayout { .skip(if self.has_metadata() { 1 } else { 0 }) } - fn child_ranges(&self) -> Vec<(usize, usize)> { + fn children_ranges(&self) -> Vec<(usize, usize)> { self.children() .map(|(_, c)| c.row_count()) .scan(0u64, |acc, row_count| { @@ -131,42 +118,163 @@ impl ChunkedLayout { .collect::>() } - fn child_layouts RelativeLayoutCache>( - &self, - cache: C, - ) -> VortexResult> { + fn children_layouts(&self) -> VortexResult> { self.children() - .zip_eq(self.child_ranges()) + .zip_eq(self.children_ranges()) .map(|((i, c), (begin, end))| { let layout = self.layout_builder.read_layout( self.fb_bytes.clone(), c._tab.loc(), self.scan.clone(), - cache(i as LayoutPartId), + self.message_cache + .relative(i as u16, self.message_cache.dtype().clone()), )?; Ok(((begin, end), layout)) }) - .collect::>>() + .collect::>>() + } + + pub fn build(&self) -> VortexResult { + Ok(ChunkedLayoutReader::new( + self.children_layouts()?, + self.metadata_layout()?, + )) + } +} + +#[derive(Debug, Default, Clone)] +enum ChildRead { + #[default] + NotStarted, + Finished(Option), +} + +impl ChildRead { + pub fn finished(&self) -> bool { + matches!(self, Self::Finished(_)) + } + + pub fn into_value(self) -> Option { + match self { + ChildRead::NotStarted => None, + ChildRead::Finished(v) => v, + } } } -impl LayoutReader for ChunkedLayout { +type InProgressLayoutRanges = RwLock, Vec)>>; + +#[allow(dead_code)] +#[derive(Debug)] +pub struct ChunkedLayoutReader { + layouts: Vec, + metadata_layout: Option>, + in_progress_ranges: InProgressLayoutRanges, +} + +impl ChunkedLayoutReader { + pub fn new( + layouts: Vec, + metadata_layout: Option>, + ) -> Self { + Self { + layouts, + metadata_layout, + in_progress_ranges: RwLock::new(HashMap::new()), + } + } + + fn buffer_read(&self, mask: &RowMask) -> VortexResult> { + let mut in_progress_guard = self + .in_progress_ranges + .write() + .unwrap_or_else(|poison| vortex_panic!("Failed to write to message cache: {poison}")); + let (layout_idxs, in_progress_range) = in_progress_guard + .entry((mask.begin(), mask.end())) + .or_insert_with(|| { + let layouts_in_range = self + .layouts + .iter() + .enumerate() + .filter_map(|(i, ((begin, end), _))| { + (mask.end() > *begin && mask.begin() < *end).then_some(i) + }) + .collect::>(); + let num_layouts = layouts_in_range.len(); + (layouts_in_range, vec![ChildRead::default(); num_layouts]) + }); + + let mut messages_to_fetch = Vec::new(); + for (((begin, end), layout), array_slot) in layout_idxs + .iter() + .map(|i| &self.layouts[*i]) + .zip(in_progress_range) + .filter(|(_, cr)| !cr.finished()) + { + let layout_selection = mask.slice(*begin, *end).shift(*begin)?; + if let Some(rr) = layout.read_selection(&layout_selection)? { + match rr { + BatchRead::ReadMore(m) => { + messages_to_fetch.extend(m); + } + BatchRead::Batch(a) => { + *array_slot = ChildRead::Finished(Some(a)); + } + } + } else { + *array_slot = ChildRead::Finished(None); + } + } + + Ok(messages_to_fetch) + } + + #[allow(dead_code)] + pub fn n_chunks(&self) -> usize { + self.layouts.len() + } + + #[allow(dead_code)] + pub fn metadata_layout(&self) -> Option<&dyn LayoutReader> { + self.metadata_layout.as_deref() + } +} + +impl LayoutReader for ChunkedLayoutReader { fn add_splits(&self, row_offset: usize, splits: &mut BTreeSet) -> VortexResult<()> { - for ((begin, _), child) in self.child_layouts(|i| self.message_cache.unknown_dtype(i))? { + for ((begin, _), child) in &self.layouts { child.add_splits(row_offset + begin, splits)? } Ok(()) } - fn read_selection(&mut self, selector: &RowMask) -> VortexResult> { - if let Some(br) = &mut self.chunk_reader { - br.read_next(selector) + fn read_selection(&self, selector: &RowMask) -> VortexResult> { + let messages_to_fetch = self.buffer_read(selector)?; + if !messages_to_fetch.is_empty() { + return Ok(Some(BatchRead::ReadMore(messages_to_fetch))); + } + + if let Some((_, arrays_in_range)) = self + .in_progress_ranges + .write() + .unwrap_or_else(|poison| vortex_panic!("Failed to write to message cache: {poison}")) + .remove(&(selector.begin(), selector.end())) + { + let mut child_arrays = arrays_in_range + .into_iter() + .filter_map(ChildRead::into_value) + .collect::>(); + match child_arrays.len() { + 0 | 1 => Ok(child_arrays.pop().map(BatchRead::Batch)), + _ => { + let dtype = child_arrays[0].dtype().clone(); + Ok(Some(BatchRead::Batch( + ChunkedArray::try_new(child_arrays, dtype)?.into_array(), + ))) + } + } } else { - self.chunk_reader = Some(BufferedLayoutReader::new(self.child_layouts(|i| { - self.message_cache - .relative(i, self.message_cache.dtype().clone()) - })?)); - self.read_selection(selector) + Ok(None) } } } @@ -189,8 +297,8 @@ mod tests { use vortex_ipc::messages::writer::MessageWriter; use vortex_ipc::stream_writer::ByteRange; - use crate::read::cache::{LazilyDeserializedDType, RelativeLayoutCache}; - use crate::read::layouts::chunked::ChunkedLayout; + use crate::layouts::chunked::{ChunkedLayoutBuilder, ChunkedLayoutReader}; + use crate::read::cache::{LazyDType, RelativeLayoutCache}; use crate::read::layouts::test_read::{filter_read_layout, read_layout, read_layout_data}; use crate::read::mask::RowMask; use crate::{write, LayoutDeserializer, LayoutMessageCache, RowFilter, Scan}; @@ -198,7 +306,7 @@ mod tests { async fn layout_and_bytes( cache: Arc>, scan: Scan, - ) -> (ChunkedLayout, ChunkedLayout, Bytes, usize) { + ) -> (ChunkedLayoutReader, ChunkedLayoutReader, Bytes, usize) { let mut writer = MessageWriter::new(Vec::new()); let array = PrimitiveArray::from((0..100).collect::>()).into_array(); let array_dtype = array.dtype().clone(); @@ -225,7 +333,7 @@ mod tests { .zip(row_offsets.iter().skip(1)) .map(|(begin, end)| end - begin), ) - .map(|((begin, end), len)| write::Layout::flat(ByteRange::new(*begin, *end), len)) + .map(|((begin, end), len)| write::LayoutSpec::flat(ByteRange::new(*begin, *end), len)) .collect::>(); row_offsets.truncate(row_offsets.len() - 1); @@ -233,7 +341,7 @@ mod tests { let written = writer.into_inner(); let mut fb = FlatBufferBuilder::new(); - let chunked_layout = write::Layout::chunked(flat_layouts.into(), len as u64, false); + let chunked_layout = write::LayoutSpec::chunked(flat_layouts.into(), len as u64, false); let flat_buf = chunked_layout.write_flatbuffer(&mut fb); fb.finish_minimal(flat_buf); let fb_bytes = Bytes::copy_from_slice(fb.finished_data()); @@ -242,22 +350,27 @@ mod tests { ._tab .loc(); - let dtype = Arc::new(LazilyDeserializedDType::from_dtype(PType::I32.into())); + let dtype = Arc::new(LazyDType::from_dtype(PType::I32.into())); + let layout_builder = LayoutDeserializer::default(); ( - ChunkedLayout::new( - fb_bytes.clone(), + ChunkedLayoutBuilder { + fb_bytes: fb_bytes.clone(), fb_loc, scan, - LayoutDeserializer::default(), - RelativeLayoutCache::new(cache.clone(), dtype.clone()), - ), - ChunkedLayout::new( + layout_builder: layout_builder.clone(), + message_cache: RelativeLayoutCache::new(cache.clone(), dtype.clone()), + } + .build() + .unwrap(), + ChunkedLayoutBuilder { fb_bytes, fb_loc, - Scan::new(None), - LayoutDeserializer::default(), - RelativeLayoutCache::new(cache, dtype), - ), + scan: Scan::new(None), + layout_builder, + message_cache: RelativeLayoutCache::new(cache, dtype), + } + .build() + .unwrap(), Bytes::from(written), len, ) @@ -280,8 +393,8 @@ mod tests { assert_eq!(filter_layout.n_chunks(), 5); assert_eq!(projection_layout.n_chunks(), 5); - assert!(filter_layout.metadata_layout().unwrap().is_none()); - assert!(projection_layout.metadata_layout().unwrap().is_none()); + assert!(filter_layout.metadata_layout().is_none()); + assert!(projection_layout.metadata_layout().is_none()); let arr = filter_read_layout( &mut filter_layout, diff --git a/vortex-file/src/read/layouts/columnar.rs b/vortex-file/src/read/layouts/columnar.rs index 0d0354ed21..4ce305e99a 100644 --- a/vortex-file/src/read/layouts/columnar.rs +++ b/vortex-file/src/read/layouts/columnar.rs @@ -1,32 +1,36 @@ use std::collections::BTreeSet; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use bytes::Bytes; use itertools::Itertools; +use vortex_array::aliases::hash_map::HashMap; +use vortex_array::array::StructArray; +use vortex_array::stats::ArrayStatistics; +use vortex_array::validity::Validity; +use vortex_array::{ArrayData, IntoArrayData}; use vortex_dtype::field::Field; -use vortex_dtype::DType; -use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexResult}; -use vortex_expr::{Column, Select}; +use vortex_dtype::FieldNames; +use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult}; +use vortex_expr::{Column, Select, VortexExpr}; use vortex_flatbuffers::footer; -use crate::read::cache::{LazilyDeserializedDType, RelativeLayoutCache}; -use crate::read::column_batch::ColumnBatchReader; +use crate::read::cache::{LazyDType, RelativeLayoutCache}; use crate::read::expr_project::expr_project; use crate::read::mask::RowMask; use crate::{ - BatchRead, LayoutDeserializer, LayoutId, LayoutReader, LayoutSpec, RowFilter, Scan, + BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutReader, RowFilter, Scan, COLUMNAR_LAYOUT_ID, }; #[derive(Debug)] -pub struct ColumnarLayoutSpec; +pub struct ColumnarLayout; -impl LayoutSpec for ColumnarLayoutSpec { +impl Layout for ColumnarLayout { fn id(&self) -> LayoutId { COLUMNAR_LAYOUT_ID } - fn layout_reader( + fn reader( &self, fb_bytes: Bytes, fb_loc: usize, @@ -34,47 +38,28 @@ impl LayoutSpec for ColumnarLayoutSpec { layout_serde: LayoutDeserializer, message_cache: RelativeLayoutCache, ) -> VortexResult> { - Ok(Box::new(ColumnarLayout::new( - fb_bytes, - fb_loc, - scan, - layout_serde, - message_cache, - ))) + Ok(Box::new( + ColumnarLayoutBuilder { + fb_bytes, + fb_loc, + scan, + layout_serde, + message_cache, + } + .build()?, + )) } } -/// In memory representation of Columnar NestedLayout. -/// -/// Each child represents a column -#[derive(Debug)] -pub struct ColumnarLayout { +struct ColumnarLayoutBuilder { fb_bytes: Bytes, fb_loc: usize, scan: Scan, layout_serde: LayoutDeserializer, message_cache: RelativeLayoutCache, - reader: Option, } -impl ColumnarLayout { - pub fn new( - fb_bytes: Bytes, - fb_loc: usize, - scan: Scan, - layout_serde: LayoutDeserializer, - message_cache: RelativeLayoutCache, - ) -> Self { - Self { - fb_bytes, - fb_loc, - scan, - layout_serde, - message_cache, - reader: None, - } - } - +impl ColumnarLayoutBuilder { fn flatbuffer(&self) -> footer::Layout { unsafe { let tab = flatbuffers::Table::new(&self.fb_bytes, self.fb_loc); @@ -82,45 +67,18 @@ impl ColumnarLayout { } } - /// Perform minimal amount of work to construct children that can be queried for splits - fn children_for_splits(&self) -> VortexResult>> { - let (refs, lazy_dtype) = self.fields_with_dtypes()?; - let fb_children = self.flatbuffer().children().unwrap_or_default(); - - refs.into_iter() - .map(|field| { - let resolved_child = lazy_dtype.resolve_field(&field)?; - let child_loc = fb_children.get(resolved_child)._tab.loc(); - - self.layout_serde.read_layout( - self.fb_bytes.clone(), - child_loc, - Scan::new(None), - self.message_cache.unknown_dtype(resolved_child as u16), - ) - }) - .collect::>>() - } - - fn column_reader(&self) -> VortexResult { + fn build(&self) -> VortexResult { let (refs, lazy_dtype) = self.fields_with_dtypes()?; let fb_children = self.flatbuffer().children().unwrap_or_default(); - let filter_dtype = lazy_dtype.value()?; - let DType::Struct(s, ..) = filter_dtype else { - vortex_bail!("Column layout must have struct dtype") - }; - let mut unhandled_names = Vec::new(); let mut unhandled_children = Vec::new(); let mut handled_children = Vec::new(); let mut handled_names = Vec::new(); - for (field, (name, dtype)) in refs - .into_iter() - .zip_eq(s.names().iter().cloned().zip_eq(s.dtypes().iter().cloned())) - { + for (field, name) in refs.into_iter().zip_eq(lazy_dtype.names()?.iter()) { let resolved_child = lazy_dtype.resolve_field(&field)?; + let child_field = lazy_dtype.field(&field)?; let child_loc = fb_children.get(resolved_child)._tab.loc(); let projected_expr = self .scan @@ -135,18 +93,16 @@ impl ColumnarLayout { self.fb_bytes.clone(), child_loc, Scan::new(projected_expr), - self.message_cache.relative( - resolved_child as u16, - Arc::new(LazilyDeserializedDType::from_dtype(dtype)), - ), + self.message_cache + .relative(resolved_child as u16, child_field), )?; if handled { handled_children.push(child); - handled_names.push(name); + handled_names.push(name.clone()); } else { unhandled_children.push(child); - unhandled_names.push(name); + unhandled_names.push(name.clone()); } } @@ -172,7 +128,7 @@ impl ColumnarLayout { ) })?; - handled_children.push(Box::new(ColumnBatchReader::new( + handled_children.push(Box::new(ColumnarLayoutReader::new( unhandled_names.into(), unhandled_children, Some(prf), @@ -196,7 +152,7 @@ impl ColumnarLayout { )) as _ }); let shortcircuit_siblings = top_level_expr.is_some(); - Ok(ColumnBatchReader::new( + Ok(ColumnarLayoutReader::new( handled_names.into(), handled_children, top_level_expr, @@ -205,7 +161,7 @@ impl ColumnarLayout { } /// Get fields referenced by scan expression along with their dtype - fn fields_with_dtypes(&self) -> VortexResult<(Vec, Arc)> { + fn fields_with_dtypes(&self) -> VortexResult<(Vec, Arc)> { let fb_children = self.flatbuffer().children().unwrap_or_default(); let field_refs = self.scan_fields(); let lazy_dtype = field_refs @@ -234,20 +190,117 @@ impl ColumnarLayout { } } -impl LayoutReader for ColumnarLayout { +type InProgressRanges = RwLock>>>; + +/// In memory representation of Columnar NestedLayout. +/// +/// Each child represents a column +#[derive(Debug)] +pub struct ColumnarLayoutReader { + names: FieldNames, + children: Vec>, + in_progress_ranges: InProgressRanges, + expr: Option>, + // TODO(robert): This is a hack/optimization that tells us if we're reducing results with AND or not + shortcircuit_siblings: bool, +} + +impl ColumnarLayoutReader { + pub fn new( + names: FieldNames, + children: Vec>, + expr: Option>, + shortcircuit_siblings: bool, + ) -> Self { + assert_eq!( + names.len(), + children.len(), + "Names and children must be of same length" + ); + Self { + names, + children, + in_progress_ranges: RwLock::new(HashMap::new()), + expr, + shortcircuit_siblings, + } + } +} + +impl LayoutReader for ColumnarLayoutReader { fn add_splits(&self, row_offset: usize, splits: &mut BTreeSet) -> VortexResult<()> { - for child in self.children_for_splits()? { + for child in &self.children { child.add_splits(row_offset, splits)? } Ok(()) } - fn read_selection(&mut self, selector: &RowMask) -> VortexResult> { - if let Some(r) = &mut self.reader { - r.read_selection(selector) + fn read_selection(&self, selection: &RowMask) -> VortexResult> { + let mut in_progress_guard = self + .in_progress_ranges + .write() + .unwrap_or_else(|poison| vortex_panic!("Failed to write to message cache: {poison}")); + let selection_range = (selection.begin(), selection.end()); + let in_progress_selection = in_progress_guard + .entry(selection_range) + .or_insert_with(|| vec![None; self.children.len()]); + let mut messages = Vec::new(); + for (i, child_array) in in_progress_selection + .iter_mut() + .enumerate() + .filter(|(_, a)| a.is_none()) + { + match self.children[i].read_selection(selection)? { + Some(rr) => match rr { + BatchRead::ReadMore(message) => { + messages.extend(message); + } + BatchRead::Batch(arr) => { + if self.shortcircuit_siblings + && arr.statistics().compute_true_count().vortex_expect( + "must be a bool array if shortcircuit_siblings is set to true", + ) == 0 + { + in_progress_guard.remove(&selection_range); + return Ok(None); + } + *child_array = Some(arr) + } + }, + None => { + debug_assert!( + in_progress_selection.iter().all(Option::is_none), + "Expected layout {}({i}) to produce an array but it was empty", + self.names[i] + ); + return Ok(None); + } + } + } + + if messages.is_empty() { + let child_arrays = in_progress_guard + .remove(&selection_range) + .ok_or_else(|| vortex_err!("There were no arrays and no messages"))? + .into_iter() + .enumerate() + .map(|(i, a)| a.ok_or_else(|| vortex_err!("Missing child array at index {i}"))) + .collect::>>()?; + let len = child_arrays + .first() + .map(|l| l.len()) + .unwrap_or(selection.len()); + let array = + StructArray::try_new(self.names.clone(), child_arrays, len, Validity::NonNullable)? + .into_array(); + self.expr + .as_ref() + .map(|e| e.evaluate(&array)) + .unwrap_or_else(|| Ok(array)) + .map(BatchRead::Batch) + .map(Some) } else { - self.reader = Some(self.column_reader()?); - self.read_selection(selector) + Ok(Some(BatchRead::ReadMore(messages))) } } } diff --git a/vortex-file/src/read/layouts/flat.rs b/vortex-file/src/read/layouts/flat.rs index 174fadca52..d1ef8e7c8e 100644 --- a/vortex-file/src/read/layouts/flat.rs +++ b/vortex-file/src/read/layouts/flat.rs @@ -11,19 +11,19 @@ use vortex_ipc::stream_writer::ByteRange; use crate::read::cache::RelativeLayoutCache; use crate::read::mask::RowMask; use crate::{ - BatchRead, LayoutDeserializer, LayoutId, LayoutReader, LayoutSpec, MessageLocator, Scan, + BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutReader, MessageLocator, Scan, FLAT_LAYOUT_ID, }; #[derive(Debug)] -pub struct FlatLayoutSpec; +pub struct FlatLayout; -impl LayoutSpec for FlatLayoutSpec { +impl Layout for FlatLayout { fn id(&self) -> LayoutId { FLAT_LAYOUT_ID } - fn layout_reader( + fn reader( &self, fb_bytes: Bytes, fb_loc: usize, @@ -41,7 +41,7 @@ impl LayoutSpec for FlatLayoutSpec { } let buf = buffers.get(0); - Ok(Box::new(FlatLayout::new( + Ok(Box::new(FlatLayoutReader::new( ByteRange::new(buf.begin(), buf.end()), scan, layout_serde.ctx(), @@ -51,14 +51,14 @@ impl LayoutSpec for FlatLayoutSpec { } #[derive(Debug)] -pub struct FlatLayout { +pub struct FlatLayoutReader { range: ByteRange, scan: Scan, ctx: Arc, message_cache: RelativeLayoutCache, } -impl FlatLayout { +impl FlatLayoutReader { pub fn new( range: ByteRange, scan: Scan, @@ -90,13 +90,13 @@ impl FlatLayout { } } -impl LayoutReader for FlatLayout { +impl LayoutReader for FlatLayoutReader { fn add_splits(&self, row_offset: usize, splits: &mut BTreeSet) -> VortexResult<()> { splits.insert(row_offset); Ok(()) } - fn read_selection(&mut self, selection: &RowMask) -> VortexResult> { + fn read_selection(&self, selection: &RowMask) -> VortexResult> { if let Some(buf) = self.message_cache.get(&[]) { let array = self.array_from_bytes(buf)?; selection @@ -130,14 +130,14 @@ mod tests { use vortex_ipc::messages::writer::MessageWriter; use vortex_ipc::stream_writer::ByteRange; - use crate::read::cache::{LazilyDeserializedDType, RelativeLayoutCache}; - use crate::read::layouts::flat::FlatLayout; + use crate::layouts::flat::FlatLayoutReader; + use crate::read::cache::{LazyDType, RelativeLayoutCache}; use crate::read::layouts::test_read::{filter_read_layout, read_layout}; use crate::{LayoutMessageCache, RowFilter, Scan}; async fn read_only_layout( cache: Arc>, - ) -> (FlatLayout, Bytes, usize, Arc) { + ) -> (FlatLayoutReader, Bytes, usize, Arc) { let mut writer = MessageWriter::new(Vec::new()); let array = PrimitiveArray::from((0..100).collect::>()).into_array(); let len = array.len(); @@ -145,10 +145,10 @@ mod tests { let written = writer.into_inner(); let projection_scan = Scan::new(None); - let dtype = Arc::new(LazilyDeserializedDType::from_dtype(PType::I32.into())); + let dtype = Arc::new(LazyDType::from_dtype(PType::I32.into())); ( - FlatLayout::new( + FlatLayoutReader::new( ByteRange::new(0, written.len() as u64), projection_scan, Arc::new(Context::default()), @@ -163,11 +163,11 @@ mod tests { async fn layout_and_bytes( cache: Arc>, scan: Scan, - ) -> (FlatLayout, FlatLayout, Bytes, usize) { + ) -> (FlatLayoutReader, FlatLayoutReader, Bytes, usize) { let (read_layout, bytes, len, dtype) = read_only_layout(cache.clone()).await; ( - FlatLayout::new( + FlatLayoutReader::new( ByteRange::new(0, bytes.len() as u64), scan, Arc::new(Context::default()), diff --git a/vortex-file/src/read/layouts/inline_dtype.rs b/vortex-file/src/read/layouts/inline_dtype.rs index 6c7e83abb5..5668d3f080 100644 --- a/vortex-file/src/read/layouts/inline_dtype.rs +++ b/vortex-file/src/read/layouts/inline_dtype.rs @@ -3,28 +3,28 @@ use std::sync::Arc; use bytes::Bytes; use flatbuffers::root; -use vortex_dtype::DType; -use vortex_error::{vortex_bail, vortex_err, VortexResult}; +use once_cell::sync::OnceCell; +use vortex_error::{vortex_bail, VortexResult}; use vortex_flatbuffers::{footer, message}; use vortex_ipc::messages::reader::MESSAGE_PREFIX_LENGTH; use vortex_ipc::stream_writer::ByteRange; -use crate::read::cache::{LazilyDeserializedDType, RelativeLayoutCache}; +use crate::read::cache::{LazyDType, RelativeLayoutCache}; use crate::read::mask::RowMask; use crate::{ - BatchRead, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, LayoutSpec, - MessageLocator, Scan, INLINE_SCHEMA_LAYOUT_ID, + BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, MessageLocator, + Scan, INLINE_SCHEMA_LAYOUT_ID, }; #[derive(Debug)] -pub struct InlineDTypeLayoutSpec; +pub struct InlineDTypeLayout; -impl LayoutSpec for InlineDTypeLayoutSpec { +impl Layout for InlineDTypeLayout { fn id(&self) -> LayoutId { INLINE_SCHEMA_LAYOUT_ID } - fn layout_reader( + fn reader( &self, fb_bytes: Bytes, fb_loc: usize, @@ -32,7 +32,7 @@ impl LayoutSpec for InlineDTypeLayoutSpec { layout_reader: LayoutDeserializer, message_cache: RelativeLayoutCache, ) -> VortexResult> { - Ok(Box::new(InlineDTypeLayout::new( + Ok(Box::new(InlineDTypeLayoutReader::new( fb_bytes, fb_loc, scan, @@ -44,29 +44,19 @@ impl LayoutSpec for InlineDTypeLayoutSpec { /// Layout that contains its own DType. #[derive(Debug)] -pub struct InlineDTypeLayout { +pub struct InlineDTypeLayoutReader { fb_bytes: Bytes, fb_loc: usize, scan: Scan, layout_builder: LayoutDeserializer, message_cache: RelativeLayoutCache, - child_layout: Option>, -} - -enum DTypeReadResult { - ReadMore(Vec), - DType(DType), -} - -enum ChildReaderResult { - ReadMore(Vec), - Reader(Box), + child_layout: OnceCell>, } const INLINE_DTYPE_BUFFER_IDX: LayoutPartId = 0; const INLINE_DTYPE_CHILD_IDX: LayoutPartId = 1; -impl InlineDTypeLayout { +impl InlineDTypeLayoutReader { pub fn new( fb_bytes: Bytes, fb_loc: usize, @@ -80,7 +70,7 @@ impl InlineDTypeLayout { scan, layout_builder, message_cache, - child_layout: None, + child_layout: OnceCell::new(), } } @@ -91,45 +81,35 @@ impl InlineDTypeLayout { } } - fn dtype(&self) -> VortexResult { - if let Some(dt_bytes) = self.message_cache.get(&[INLINE_DTYPE_BUFFER_IDX]) { - let msg = root::(&dt_bytes[MESSAGE_PREFIX_LENGTH..])? - .header_as_schema() - .ok_or_else(|| vortex_err!("Expected schema message"))?; + fn dtype_message(&self) -> VortexResult { + let buffers = self.flatbuffer().buffers().unwrap_or_default(); + if buffers.is_empty() { + vortex_bail!("Missing buffers for inline dtype layout") + } + let dtype_buf = buffers.get(0); + Ok(MessageLocator( + self.message_cache.absolute_id(&[INLINE_DTYPE_BUFFER_IDX]), + ByteRange::new(dtype_buf.begin(), dtype_buf.end()), + )) + } - Ok(DTypeReadResult::DType(DType::try_from( - msg.dtype() - .ok_or_else(|| vortex_err!(InvalidSerde: "Schema missing DType"))?, - )?)) + fn dtype(&self) -> VortexResult> { + if let Some(dt_bytes) = self.message_cache.get(&[INLINE_DTYPE_BUFFER_IDX]) { + root::(&dt_bytes[MESSAGE_PREFIX_LENGTH..])?; + Ok(Arc::new(unsafe { LazyDType::from_schema_bytes(dt_bytes) })) } else { - let buffers = self.flatbuffer().buffers().unwrap_or_default(); - if buffers.is_empty() { - vortex_bail!("Missing buffers for inline dtype layout") - } - let dtype_buf = buffers.get(0); - Ok(DTypeReadResult::ReadMore(vec![MessageLocator( - self.message_cache.absolute_id(&[INLINE_DTYPE_BUFFER_IDX]), - ByteRange::new(dtype_buf.begin(), dtype_buf.end()), - )])) + Ok(Arc::new(LazyDType::unknown())) } } - fn child_reader(&self) -> VortexResult { - match self.dtype()? { - DTypeReadResult::ReadMore(m) => Ok(ChildReaderResult::ReadMore(m)), - DTypeReadResult::DType(d) => { - let child_layout = self.layout_builder.read_layout( - self.fb_bytes.clone(), - self.child_layout()?._tab.loc(), - self.scan.clone(), - self.message_cache.relative( - INLINE_DTYPE_CHILD_IDX, - Arc::new(LazilyDeserializedDType::from_dtype(d)), - ), - )?; - Ok(ChildReaderResult::Reader(child_layout)) - } - } + fn child_reader(&self) -> VortexResult> { + self.layout_builder.read_layout( + self.fb_bytes.clone(), + self.child_layout()?._tab.loc(), + self.scan.clone(), + self.message_cache + .relative(INLINE_DTYPE_CHILD_IDX, self.dtype()?), + ) } fn child_layout(&self) -> VortexResult { @@ -141,28 +121,20 @@ impl InlineDTypeLayout { } } -impl LayoutReader for InlineDTypeLayout { +impl LayoutReader for InlineDTypeLayoutReader { fn add_splits(&self, row_offset: usize, splits: &mut BTreeSet) -> VortexResult<()> { - let child_layout = self.layout_builder.read_layout( - self.fb_bytes.clone(), - self.child_layout()?._tab.loc(), - Scan::new(None), - self.message_cache.unknown_dtype(INLINE_DTYPE_CHILD_IDX), - )?; - child_layout.add_splits(row_offset, splits) + self.child_reader()?.add_splits(row_offset, splits) } - fn read_selection(&mut self, selector: &RowMask) -> VortexResult> { - if let Some(cr) = self.child_layout.as_mut() { + fn read_selection(&self, selector: &RowMask) -> VortexResult> { + if let Some(cr) = self.child_layout.get() { cr.read_selection(selector) } else { - match self.child_reader()? { - ChildReaderResult::ReadMore(rm) => Ok(Some(BatchRead::ReadMore(rm))), - ChildReaderResult::Reader(r) => { - self.child_layout = Some(r); - self.read_selection(selector) - } + if self.message_cache.get(&[INLINE_DTYPE_BUFFER_IDX]).is_some() { + self.child_layout.get_or_try_init(|| self.child_reader())?; + return self.read_selection(selector); } + Ok(Some(BatchRead::ReadMore(vec![self.dtype_message()?]))) } } } diff --git a/vortex-file/src/read/layouts/mod.rs b/vortex-file/src/read/layouts/mod.rs index fb0276f96b..b1925b7901 100644 --- a/vortex-file/src/read/layouts/mod.rs +++ b/vortex-file/src/read/layouts/mod.rs @@ -5,7 +5,11 @@ mod inline_dtype; #[cfg(test)] mod test_read; -pub use chunked::ChunkedLayoutSpec; -pub use columnar::ColumnarLayoutSpec; -pub use flat::FlatLayoutSpec; -pub use inline_dtype::InlineDTypeLayoutSpec; +pub use chunked::ChunkedLayout; +pub use columnar::ColumnarLayout; +pub use flat::FlatLayout; +pub use inline_dtype::InlineDTypeLayout; + +use crate::LayoutReader; + +type RangedLayoutReader = ((usize, usize), Box); diff --git a/vortex-file/src/read/mod.rs b/vortex-file/src/read/mod.rs index 8425a400d8..6125b5b945 100644 --- a/vortex-file/src/read/mod.rs +++ b/vortex-file/src/read/mod.rs @@ -4,15 +4,14 @@ use std::fmt::Debug; use vortex_array::ArrayData; use vortex_error::VortexResult; -mod buffered; pub mod builder; mod cache; -mod column_batch; mod context; mod expr_project; mod filtering; pub mod layouts; mod mask; +pub mod projection; mod recordbatchreader; mod splits; mod stream; @@ -22,12 +21,11 @@ pub use builder::VortexReadBuilder; pub use cache::*; pub use context::*; pub use filtering::RowFilter; +pub use projection::Projection; pub use recordbatchreader::{AsyncRuntime, VortexRecordBatchReader}; pub use stream::VortexFileArrayStream; use vortex_expr::ExprRef; use vortex_ipc::stream_writer::ByteRange; -pub use vortex_schema::projection::Projection; -pub use vortex_schema::Schema; pub use crate::read::mask::RowMask; @@ -84,5 +82,5 @@ pub trait LayoutReader: Debug + Send { /// creating the invoked instance of this trait and then call back into this function. /// /// The layout is finished producing data for selection when it returns None - fn read_selection(&mut self, selector: &RowMask) -> VortexResult>; + fn read_selection(&self, selector: &RowMask) -> VortexResult>; } diff --git a/vortex-schema/src/projection.rs b/vortex-file/src/read/projection.rs similarity index 58% rename from vortex-schema/src/projection.rs rename to vortex-file/src/read/projection.rs index 375da3d764..cc599995de 100644 --- a/vortex-schema/src/projection.rs +++ b/vortex-file/src/read/projection.rs @@ -1,4 +1,5 @@ use vortex_dtype::field::Field; +use vortex_error::{vortex_bail, VortexResult}; // TODO(robert): Add ability to project nested columns. // Until datafusion supports nested column pruning we should create a separate variant to implement it @@ -13,6 +14,18 @@ impl Projection { pub fn new(indices: impl AsRef<[usize]>) -> Self { Self::Flat(indices.as_ref().iter().copied().map(Field::from).collect()) } + + pub fn project(&self, fields: &[Field]) -> VortexResult { + Ok(match self { + Projection::All => Projection::Flat(fields.to_vec()), + Projection::Flat(own_projection) => { + if !fields.iter().all(|f| own_projection.contains(f)) { + vortex_bail!("Can't project {own_projection:?} into {fields:?}") + } + Projection::Flat(fields.to_vec()) + } + }) + } } impl From> for Projection { diff --git a/vortex-file/src/read/stream.rs b/vortex-file/src/read/stream.rs index 309e7b33b4..fbf3e78a86 100644 --- a/vortex-file/src/read/stream.rs +++ b/vortex-file/src/read/stream.rs @@ -11,14 +11,16 @@ use futures_util::{stream, FutureExt, StreamExt, TryStreamExt}; use vortex_array::array::ChunkedArray; use vortex_array::{ArrayData, IntoArrayData}; use vortex_dtype::DType; -use vortex_error::{vortex_bail, vortex_panic, VortexError, VortexExpect, VortexResult}; +use vortex_error::{ + vortex_bail, vortex_panic, VortexError, VortexExpect, VortexResult, VortexUnwrap, +}; use vortex_io::{Dispatch, IoDispatcher, VortexReadAt}; -use vortex_schema::Schema; use crate::read::cache::LayoutMessageCache; use crate::read::mask::RowMask; use crate::read::splits::{FilteringRowSplitIterator, FixedSplitIterator, MaskIterator, SplitMask}; use crate::read::{BatchRead, LayoutReader, MessageId, MessageLocator}; +use crate::LazyDType; /// An asynchronous Vortex file that returns a [`Stream`] of [`ArrayData`]s. /// @@ -28,7 +30,7 @@ use crate::read::{BatchRead, LayoutReader, MessageId, MessageLocator}; /// Use [VortexReadBuilder][crate::read::builder::VortexReadBuilder] to build one /// from a reader. pub struct VortexFileArrayStream { - dtype: DType, + dtype: Arc, row_count: u64, layout_reader: Box, messages_cache: Arc>, @@ -44,7 +46,7 @@ impl VortexFileArrayStream { layout_reader: Box, filter_reader: Option>, messages_cache: Arc>, - dtype: DType, + dtype: Arc, row_count: u64, row_mask: Option, dispatcher: Arc, @@ -67,17 +69,13 @@ impl VortexFileArrayStream { } pub fn dtype(&self) -> &DType { - &self.dtype + self.dtype.value().vortex_unwrap() } pub fn row_count(&self) -> u64 { self.row_count } - pub fn schema(&self) -> Schema { - Schema::new(self.dtype.clone()) - } - fn store_messages(&self, messages: Vec) { let mut write_cache_guard = self .messages_cache @@ -175,7 +173,7 @@ fn finished() -> VortexResult { impl VortexFileArrayStream { fn step( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, current_state: StreamingState, ) -> VortexResult { diff --git a/vortex-file/src/write/layout.rs b/vortex-file/src/write/layout.rs index 7e973d24fa..7a62b00c64 100644 --- a/vortex-file/src/write/layout.rs +++ b/vortex-file/src/write/layout.rs @@ -8,15 +8,15 @@ use crate::{ }; #[derive(Debug, Clone)] -pub struct Layout { +pub struct LayoutSpec { id: LayoutId, buffers: Option>, - children: Option>, + children: Option>, row_count: u64, metadata: Option, } -impl Layout { +impl LayoutSpec { pub fn flat(buffer: ByteRange, row_count: u64) -> Self { Self { id: FLAT_LAYOUT_ID, @@ -30,7 +30,7 @@ impl Layout { /// Create a chunked layout with children. /// /// has_metadata indicates whether first child is a layout containing metadata about other children. - pub fn chunked(children: Vec, row_count: u64, has_metadata: bool) -> Self { + pub fn chunked(children: Vec, row_count: u64, has_metadata: bool) -> Self { Self { id: CHUNKED_LAYOUT_ID, buffers: None, @@ -40,7 +40,7 @@ impl Layout { } } - pub fn column(children: Vec, row_count: u64) -> Self { + pub fn column(children: Vec, row_count: u64) -> Self { Self { id: COLUMNAR_LAYOUT_ID, buffers: None, @@ -50,7 +50,11 @@ impl Layout { } } - pub fn inlined_schema(children: Vec, row_count: u64, dtype_buffer: ByteRange) -> Self { + pub fn inlined_schema( + children: Vec, + row_count: u64, + dtype_buffer: ByteRange, + ) -> Self { Self { id: INLINE_SCHEMA_LAYOUT_ID, buffers: Some(vec![dtype_buffer]), @@ -61,7 +65,7 @@ impl Layout { } } -impl WriteFlatBuffer for Layout { +impl WriteFlatBuffer for LayoutSpec { type Target<'a> = fb::Layout<'a>; fn write_flatbuffer<'fb>( diff --git a/vortex-file/src/write/mod.rs b/vortex-file/src/write/mod.rs index 6b71e11af6..601268fc4b 100644 --- a/vortex-file/src/write/mod.rs +++ b/vortex-file/src/write/mod.rs @@ -1,4 +1,4 @@ -pub use layout::Layout; +pub use layout::LayoutSpec; pub use writer::VortexFileWriter; mod layout; diff --git a/vortex-file/src/write/writer.rs b/vortex-file/src/write/writer.rs index b07a87e0cf..6124dc1a7d 100644 --- a/vortex-file/src/write/writer.rs +++ b/vortex-file/src/write/writer.rs @@ -15,10 +15,9 @@ use vortex_ipc::messages::writer::MessageWriter; use vortex_ipc::messages::IPCSchema; use vortex_ipc::stream_writer::ByteRange; -use crate::write::layout::Layout; use crate::write::metadata_accumulators::{new_metadata_accumulator, MetadataAccumulator}; use crate::write::postscript::Postscript; -use crate::{EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION}; +use crate::{LayoutSpec, EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION}; const STATS_TO_WRITE: &[Stat] = &[ Stat::Min, @@ -120,7 +119,7 @@ impl VortexFileWriter { column_writer.write_chunks(stream, &mut self.msgs).await } - async fn write_metadata_arrays(&mut self) -> VortexResult { + async fn write_metadata_arrays(&mut self) -> VortexResult { let mut column_layouts = Vec::with_capacity(self.column_writers.len()); for column_writer in mem::take(&mut self.column_writers) { column_layouts.push( @@ -130,7 +129,7 @@ impl VortexFileWriter { ); } - Ok(Layout::column(column_layouts, self.row_count)) + Ok(LayoutSpec::column(column_layouts, self.row_count)) } pub async fn finalize(mut self) -> VortexResult { @@ -252,7 +251,7 @@ impl ColumnWriter { self, row_count: u64, msgs: &mut MessageWriter, - ) -> VortexResult { + ) -> VortexResult { let data_chunks = self .batch_byte_offsets .iter() @@ -268,7 +267,7 @@ impl ColumnWriter { .zip(row_offsets.iter().skip(1)) .map(|(begin, end)| end - begin), ) - .map(|(range, len)| Layout::flat(range, len)) + .map(|(range, len)| LayoutSpec::flat(range, len)) }); if let Some(metadata_array) = self.metadata.into_array()? { @@ -280,8 +279,8 @@ impl ColumnWriter { msgs.write_batch(metadata_array).await?; let metadata_array_end = msgs.tell(); - let layouts = [Layout::inlined_schema( - vec![Layout::flat( + let layouts = [LayoutSpec::inlined_schema( + vec![LayoutSpec::flat( ByteRange::new(dtype_end, metadata_array_end), expected_n_data_chunks as u64, )], @@ -299,9 +298,9 @@ impl ColumnWriter { layouts.len() ); } - Ok(Layout::chunked(layouts, row_count, true)) + Ok(LayoutSpec::chunked(layouts, row_count, true)) } else { - Ok(Layout::chunked(data_chunks.collect(), row_count, false)) + Ok(LayoutSpec::chunked(data_chunks.collect(), row_count, false)) } } } diff --git a/vortex-schema/Cargo.toml b/vortex-schema/Cargo.toml deleted file mode 100644 index 19a7927963..0000000000 --- a/vortex-schema/Cargo.toml +++ /dev/null @@ -1,21 +0,0 @@ -[package] -name = "vortex-schema" -description = "Vortex file schema abstraction" -version = { workspace = true } -homepage = { workspace = true } -repository = { workspace = true } -authors = { workspace = true } -license = { workspace = true } -keywords = { workspace = true } -include = { workspace = true } -edition = { workspace = true } -rust-version = { workspace = true } -categories = { workspace = true } -readme = { workspace = true } - -[dependencies] -vortex-dtype = { workspace = true } -vortex-error = { workspace = true } - -[lints] -workspace = true diff --git a/vortex-schema/src/lib.rs b/vortex-schema/src/lib.rs deleted file mode 100644 index efaf64b35f..0000000000 --- a/vortex-schema/src/lib.rs +++ /dev/null @@ -1,53 +0,0 @@ -use vortex_dtype::field::Field; -use vortex_dtype::DType; -use vortex_error::{vortex_bail, vortex_err, VortexResult}; - -use self::projection::Projection; - -pub mod projection; - -#[derive(Clone, Debug)] -pub struct Schema(pub(crate) DType); - -impl Schema { - pub fn new(schema_dtype: DType) -> Self { - Self(schema_dtype) - } - - pub fn project(&self, projection: Projection) -> VortexResult { - match projection { - Projection::All => Ok(self.clone()), - Projection::Flat(fields) => { - let DType::Struct(s, n) = &self.0 else { - vortex_bail!("Can't project non struct types") - }; - s.project(fields.as_ref()) - .map(|p| Self(DType::Struct(p, *n))) - } - } - } - - pub fn dtype(&self) -> &DType { - &self.0 - } - - pub fn field_type(&self, field: &Field) -> VortexResult { - let DType::Struct(s, _) = &self.0 else { - vortex_bail!("Can't project non struct types") - }; - - let idx = match field { - Field::Name(name) => s.find_name(name), - Field::Index(i) => Some(*i), - }; - - idx.and_then(|idx| s.dtypes().get(idx).cloned()) - .ok_or_else(|| vortex_err!("Couldn't find field {field}")) - } -} - -impl From for DType { - fn from(value: Schema) -> Self { - value.0 - } -} diff --git a/vortex-serde/src/layouts/write/mod.rs b/vortex-serde/src/layouts/write/mod.rs new file mode 100644 index 0000000000..5db6f3505d --- /dev/null +++ b/vortex-serde/src/layouts/write/mod.rs @@ -0,0 +1,7 @@ +pub use layouts::LayoutSpec; +pub use writer::LayoutWriter; + +mod footer; +mod layouts; +mod metadata_accumulators; +mod writer; diff --git a/vortex/Cargo.toml b/vortex/Cargo.toml index 841c0dddfa..c5d58a6eaf 100644 --- a/vortex/Cargo.toml +++ b/vortex/Cargo.toml @@ -44,7 +44,6 @@ vortex-runend = { workspace = true } vortex-runend-bool = { workspace = true } vortex-sampling-compressor = { workspace = true } vortex-scalar = { workspace = true, default-features = true } -vortex-schema = { workspace = true } vortex-zigzag = { workspace = true } [features] diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index 506416f486..79d6973341 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -7,5 +7,5 @@ pub use { vortex_fsst as fsst, vortex_io as io, vortex_ipc as ipc, vortex_proto as proto, vortex_roaring as roaring, vortex_runend as runend, vortex_runend_bool as runend_bool, vortex_sampling_compressor as sampling_compressor, vortex_scalar as scalar, - vortex_schema as schema, vortex_zigzag as zigzag, + vortex_zigzag as zigzag, };