Skip to content

Commit

Permalink
LayoutReader::read_selection uses immutable reference (#1295)
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 authored Nov 21, 2024
1 parent f052dd7 commit a231494
Show file tree
Hide file tree
Showing 29 changed files with 672 additions and 713 deletions.
13 changes: 0 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ members = [
"vortex-proto",
"vortex-sampling-compressor",
"vortex-scalar",
"vortex-schema",
"xtask",
]
resolver = "2"
Expand Down
9 changes: 2 additions & 7 deletions pyvortex/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use pyo3::types::{PyLong, PyString};
use vortex::arrow::infer_schema;
use vortex::dtype::field::Field;
use vortex::dtype::DType;
use vortex::error::{vortex_err, VortexResult};
use vortex::error::VortexResult;
use vortex::file::{
read_initial_bytes, LayoutContext, LayoutDeserializer, Projection, RowFilter,
VortexFileArrayStream, VortexReadBuilder, VortexRecordBatchReader,
Expand Down Expand Up @@ -62,12 +62,7 @@ pub async fn read_array_from_reader<T: VortexReadAt + Unpin + 'static>(

pub async fn read_dtype_from_reader<T: VortexReadAt + Unpin>(reader: T) -> VortexResult<DType> {
let initial_read = read_initial_bytes(&reader, reader.size().await).await?;
DType::try_from(
initial_read
.fb_schema()?
.dtype()
.ok_or_else(|| vortex_err!("Failed to fetch dtype from initial read"))?,
)
initial_read.lazy_dtype()?.value().cloned()
}

fn projection_from_python(columns: Option<Vec<Bound<PyAny>>>) -> PyResult<Projection> {
Expand Down
13 changes: 5 additions & 8 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ use datafusion_physical_plan::ExecutionPlan;
use object_store::{ObjectMeta, ObjectStore};
use vortex_array::arrow::infer_schema;
use vortex_array::Context;
use vortex_dtype::DType;
use vortex_file::VORTEX_FILE_EXTENSION;
use vortex_file::{read_initial_bytes, VORTEX_FILE_EXTENSION};
use vortex_io::ObjectStoreReadAt;

use super::execution::VortexExec;
Expand Down Expand Up @@ -68,11 +67,9 @@ impl FileFormat for VortexFormat {
let mut file_schemas = Vec::default();
for o in objects {
let os_read_at = ObjectStoreReadAt::new(store.clone(), o.location.clone());
let initial_read = vortex_file::read_initial_bytes(&os_read_at, o.size as u64).await?;
let dtype = DType::try_from(initial_read.fb_schema()?.dtype().ok_or_else(|| {
DataFusionError::External("Failed to fetch dtype from initial read".into())
})?)?;
let s = infer_schema(&dtype)?;
let initial_read = read_initial_bytes(&os_read_at, o.size as u64).await?;
let lazy_dtype = initial_read.lazy_dtype()?;
let s = infer_schema(lazy_dtype.value()?)?;
file_schemas.push(s);
}

Expand All @@ -87,7 +84,7 @@ impl FileFormat for VortexFormat {
object: &ObjectMeta,
) -> DFResult<Statistics> {
let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone());
let initial_read = vortex_file::read_initial_bytes(&os_read_at, object.size as u64).await?;
let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?;
let layout = initial_read.fb_layout()?;
let row_count = layout.row_count();

Expand Down
11 changes: 10 additions & 1 deletion vortex-dtype/src/serde/flatbuffers/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,17 @@ pub fn resolve_field<'a, 'b: 'a>(fb: fb::Struct_<'b>, field: &'a Field) -> Vorte
}
}

/// Deserialize single field out of a struct dtype and as a top level dtype
pub fn extract_field(fb_dtype: fb::DType<'_>, field: &Field) -> VortexResult<DType> {
let fb_struct = fb_dtype
.type__as_struct_()
.ok_or_else(|| vortex_err!("The top-level type should be a struct"))?;
let (_, dtype) = read_field(fb_struct, resolve_field(fb_struct, field)?)?;
Ok(dtype)
}

/// Deserialize flatbuffer schema selecting only columns defined by projection
pub fn deserialize_and_project(
pub fn project_and_deserialize(
fb_dtype: fb::DType<'_>,
projection: &[Field],
) -> VortexResult<DType> {
Expand Down
4 changes: 0 additions & 4 deletions vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,11 @@ vortex-flatbuffers = { workspace = true, features = ["file"] }
vortex-io = { workspace = true, features = ["tokio"] }
vortex-ipc = { workspace = true }
vortex-scalar = { workspace = true, features = ["flatbuffers"] }
vortex-schema = { workspace = true }

[dev-dependencies]
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
rstest = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["full"] }
vortex-sampling-compressor = { path = "../vortex-sampling-compressor" }

[lints]
workspace = true
Expand Down
6 changes: 3 additions & 3 deletions vortex-file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
//! A layout is a serialized array which is stored in some linear and contiguous block of
//! memory. Layouts are recursively defined in terms of one of three kinds:
//!
//! 1. The [flat layout][layouts::FlatLayoutSpec]. A contiguously serialized array using the [Vortex
//! 1. The [flat layout][layouts::FlatLayout]. A contiguously serialized array using the [Vortex
//! flatbuffer Batch message][vortex_flatbuffers::message].
//!
//! 2. The [columnar layout][layouts::ColumnarLayoutSpec]. Each column of a
//! 2. The [columnar layout][layouts::ColumnarLayout]. Each column of a
//! [StructArray][vortex_array::array::StructArray] is sequentially laid out at known
//! offsets. This permits reading a subset of columns in time linear in the number of kept
//! columns.
//!
//! 3. The [chunked layout][layouts::ChunkedLayoutSpec]. Each chunk of a
//! 3. The [chunked layout][layouts::ChunkedLayout]. Each chunk of a
//! [ChunkedArray][vortex_array::array::ChunkedArray] is sequentially laid out at known
//! offsets. This permits reading a subset of rows in time linear in the number of kept rows.
//!
Expand Down
86 changes: 0 additions & 86 deletions vortex-file/src/read/buffered.rs

This file was deleted.

16 changes: 4 additions & 12 deletions vortex-file/src/read/builder/initial_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ use flatbuffers::{root, root_unchecked};
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_flatbuffers::{footer, message};
use vortex_io::VortexReadAt;
use vortex_schema::projection::Projection;

use crate::{
LayoutDeserializer, LayoutReader, LazilyDeserializedDType, RelativeLayoutCache, Scan, EOF_SIZE,
LayoutDeserializer, LayoutReader, LazyDType, RelativeLayoutCache, Scan, EOF_SIZE,
INITIAL_READ_SIZE, MAGIC_BYTES, VERSION,
};

Expand Down Expand Up @@ -51,17 +50,11 @@ impl InitialRead {
Ok(schema_start..schema_end)
}

/// The `Schema` flatbuffer.
pub fn fb_schema(&self) -> VortexResult<message::Schema> {
Ok(unsafe { root_unchecked::<message::Schema>(&self.buf[self.fb_schema_byte_range()?]) })
}

pub fn lazy_dtype(&self) -> VortexResult<LazilyDeserializedDType> {
pub fn lazy_dtype(&self) -> VortexResult<LazyDType> {
// we validated the schema bytes at construction time
unsafe {
Ok(LazilyDeserializedDType::from_schema_bytes(
Ok(LazyDType::from_schema_bytes(
self.buf.slice(self.fb_schema_byte_range()?),
Projection::All,
))
}
}
Expand Down Expand Up @@ -176,8 +169,7 @@ pub async fn read_initial_bytes<R: VortexReadAt>(

#[cfg(test)]
mod tests {
use super::*;
use crate::MAX_FOOTER_SIZE;
use crate::{EOF_SIZE, INITIAL_READ_SIZE, MAX_FOOTER_SIZE};

#[test]
fn big_enough_initial_read() {
Expand Down
20 changes: 6 additions & 14 deletions vortex-file/src/read/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@ use std::sync::{Arc, RwLock};

use initial_read::{read_initial_bytes, read_layout_from_initial};
use vortex_array::{ArrayDType, ArrayData};
use vortex_dtype::flatbuffers::deserialize_and_project;
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexResult};
use vortex_error::VortexResult;
use vortex_expr::Select;
use vortex_io::{IoDispatcher, VortexReadAt};
use vortex_schema::projection::Projection;

use crate::read::cache::{LayoutMessageCache, RelativeLayoutCache};
use crate::read::context::LayoutDeserializer;
use crate::read::filtering::RowFilter;
use crate::read::projection::Projection;
use crate::read::stream::VortexFileArrayStream;
use crate::read::{RowMask, Scan};

Expand Down Expand Up @@ -121,23 +119,17 @@ impl<R: VortexReadAt> VortexReadBuilder<R> {
let initial_read = read_initial_bytes(&self.read_at, self.size().await).await?;

let layout = initial_read.fb_layout()?;
let schema = initial_read.fb_schema()?;

let row_count = layout.row_count();
let read_projection = self.projection.unwrap_or_default();
let lazy_dtype = Arc::new(initial_read.lazy_dtype()?);

let projected_dtype = {
let fb_dtype = schema
.dtype()
.ok_or_else(|| vortex_err!(InvalidSerde: "Schema missing DType"))?;
match read_projection {
Projection::All => DType::try_from(fb_dtype)?,
Projection::Flat(ref projection) => deserialize_and_project(fb_dtype, projection)?,
}
let projected_dtype = match read_projection {
Projection::All => lazy_dtype.clone(),
Projection::Flat(ref fields) => lazy_dtype.project(fields)?,
};

let message_cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
let lazy_dtype = Arc::new(initial_read.lazy_dtype()?);
let layout_reader = read_layout_from_initial(
&initial_read,
&self.layout_serde,
Expand Down
Loading

0 comments on commit a231494

Please sign in to comment.