Skip to content

Commit

Permalink
Remove Field from vortex-expr, replace with FieldName (#1915)
Browse files Browse the repository at this point in the history
  • Loading branch information
joseph-isaacs authored Jan 13, 2025
1 parent d658e5f commit 98e2968
Show file tree
Hide file tree
Showing 40 changed files with 411 additions and 538 deletions.
4 changes: 2 additions & 2 deletions bench-vortex/benches/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use regex::Regex;
use simplelog::*;
use tokio::runtime::Runtime;
use vortex::array::{ChunkedArray, StructArray};
use vortex::dtype::Field;
use vortex::dtype::FieldName;
use vortex::error::VortexResult;
use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
use vortex::sampling_compressor::compressors::fsst::FSSTCompressor;
Expand Down Expand Up @@ -384,7 +384,7 @@ fn tpc_h_l_comment(c: &mut Criterion) {
.map(|chunk| {
StructArray::try_from(chunk)
.unwrap()
.project(&[Field::from("l_comment")])
.project(&[FieldName::from("l_comment")])
.unwrap()
.into_array()
})
Expand Down
14 changes: 6 additions & 8 deletions pyvortex/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use arrow::datatypes::SchemaRef;
use arrow::pyarrow::{IntoPyArrow, ToPyArrow};
use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use pyo3::types::{PyLong, PyString};
use pyo3::types::PyString;
use vortex::arrow::infer_schema;
use vortex::dtype::{DType, Field};
use vortex::dtype::{DType, FieldName};
use vortex::error::VortexResult;
use vortex::expr::RowFilter;
use vortex::file::{
Expand Down Expand Up @@ -66,14 +66,12 @@ pub async fn read_dtype_from_reader<T: VortexReadAt + Unpin>(reader: T) -> Vorte
}

fn projection_from_python(columns: Option<Vec<Bound<PyAny>>>) -> PyResult<Projection> {
fn field_from_pyany(field: &Bound<PyAny>) -> PyResult<Field> {
fn field_from_pyany(field: &Bound<PyAny>) -> PyResult<FieldName> {
if field.clone().is_instance_of::<PyString>() {
Ok(Field::from(field.downcast::<PyString>()?.to_str()?))
} else if field.is_instance_of::<PyLong>() {
Ok(Field::Index(field.extract()?))
Ok(FieldName::from(field.downcast::<PyString>()?.to_str()?))
} else {
Err(PyTypeError::new_err(format!(
"projection: expected list of string, int, and None, but found: {}.",
"projection: expected list of strings or None, but found: {}.",
field,
)))
}
Expand All @@ -85,7 +83,7 @@ fn projection_from_python(columns: Option<Vec<Bound<PyAny>>>) -> PyResult<Projec
columns
.iter()
.map(field_from_pyany)
.collect::<PyResult<Vec<Field>>>()?,
.collect::<PyResult<Vec<FieldName>>>()?,
),
})
}
Expand Down
21 changes: 4 additions & 17 deletions pyvortex/src/expr.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::sync::Arc;

use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::*;
use vortex::dtype::half::f16;
use vortex::dtype::{DType, Field, Nullability, PType};
use vortex::dtype::{DType, Nullability, PType};
use vortex::expr::{col, lit, BinaryExpr, ExprRef, GetItem, Operator};
use vortex::scalar::Scalar;

Expand Down Expand Up @@ -225,8 +223,8 @@ impl PyExpr {
py_binary_opeartor(self_, Operator::Or, coerce_expr(right)?)
}

fn __getitem__(self_: PyRef<'_, Self>, field: PyObject) -> PyResult<PyExpr> {
get_item(self_.py(), field, self_.clone())
fn __getitem__(self_: PyRef<'_, Self>, field: String) -> PyResult<PyExpr> {
get_item(field, self_.clone())
}
}

Expand Down Expand Up @@ -311,18 +309,7 @@ pub fn scalar_helper(dtype: DType, value: &Bound<'_, PyAny>) -> PyResult<Scalar>
}
}

pub fn get_item(py: Python, field: PyObject, child: PyExpr) -> PyResult<PyExpr> {
let field = if let Ok(value) = field.downcast_bound::<PyLong>(py) {
Field::Index(value.extract()?)
} else if let Ok(value) = field.downcast_bound::<PyString>(py) {
Field::Name(Arc::from(value.extract::<String>()?.as_str()))
} else {
return Err(PyValueError::new_err(format!(
"expected int, or str but found: {}",
field
)));
};

pub fn get_item(field: String, child: PyExpr) -> PyResult<PyExpr> {
Ok(PyExpr {
inner: GetItem::new_expr(field, child.inner),
})
Expand Down
15 changes: 0 additions & 15 deletions pyvortex/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,6 @@ use crate::{PyArray, TOKIO_RUNTIME};
/// null
/// ]
///
/// Read just the name column, by its index:
///
/// >>> d = vortex.io.read_path("a.vortex", projection = [1])
/// >>> d.to_arrow_array()
/// <pyarrow.lib.StructArray object at ...>
/// -- is_valid: all not null
/// -- child 0 type: string_view
/// [
/// "Joseph",
/// null,
/// "Angela",
/// "Mikhail",
/// null
/// ]
///
///
/// Keep rows with an age above 35. This will read O(N_KEPT) rows, when the file format allows.
///
Expand Down
13 changes: 10 additions & 3 deletions vortex-array/src/array/chunked/variants.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use vortex_dtype::{DType, Field};
use itertools::Itertools;
use vortex_dtype::{DType, Field, FieldName};
use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};

use crate::array::chunked::ChunkedArray;
Expand Down Expand Up @@ -84,7 +85,7 @@ impl StructArrayTrait for ChunkedArray {
Some(chunked)
}

fn project(&self, projection: &[Field]) -> VortexResult<ArrayData> {
fn project(&self, projection: &[FieldName]) -> VortexResult<ArrayData> {
let mut chunks = Vec::with_capacity(self.nchunks());
for chunk in self.chunks() {
chunks.push(
Expand All @@ -99,7 +100,13 @@ impl StructArrayTrait for ChunkedArray {
.dtype()
.as_struct()
.ok_or_else(|| vortex_err!("Not a struct dtype"))?
.project(projection)?;
.project(
projection
.iter()
.map(|f| Field::Name(f.clone()))
.collect_vec()
.as_slice(),
)?;
ChunkedArray::try_new(
chunks,
DType::Struct(projected_dtype, self.dtype().nullability()),
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/constant/variants.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use vortex_dtype::Field;
use vortex_dtype::FieldName;
use vortex_error::{VortexError, VortexExpect as _, VortexResult};
use vortex_scalar::Scalar;

Expand Down Expand Up @@ -97,7 +97,7 @@ impl StructArrayTrait for ConstantArray {
.map(|scalar| ConstantArray::new(scalar, self.len()).into_array())
}

fn project(&self, projection: &[Field]) -> VortexResult<ArrayData> {
fn project(&self, projection: &[FieldName]) -> VortexResult<ArrayData> {
Ok(
ConstantArray::new(self.scalar().as_struct().project(projection)?, self.len())
.into_array(),
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/sparse/variants.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use vortex_dtype::Field;
use vortex_dtype::FieldName;
use vortex_error::{vortex_err, VortexExpect, VortexResult};
use vortex_scalar::StructScalar;

Expand Down Expand Up @@ -87,7 +87,7 @@ impl StructArrayTrait for SparseArray {
)
}

fn project(&self, projection: &[Field]) -> VortexResult<ArrayData> {
fn project(&self, projection: &[FieldName]) -> VortexResult<ArrayData> {
let new_patches = self.patches().map_values(|values| {
values
.as_struct_array()
Expand Down
23 changes: 10 additions & 13 deletions vortex-array/src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,16 @@ impl StructArray {
/// perform column re-ordering, deletion, or duplication at a logical level, without any data
/// copying.
#[allow(clippy::same_name_method)]
pub fn project(&self, projection: &[Field]) -> VortexResult<Self> {
pub fn project(&self, projection: &[FieldName]) -> VortexResult<Self> {
let mut children = Vec::with_capacity(projection.len());
let mut names = Vec::with_capacity(projection.len());

for field in projection.iter() {
let idx = match field {
Field::Name(n) => self
.names()
.iter()
.position(|name| name == n)
.ok_or_else(|| vortex_err!("Unknown field {n}"))?,
Field::Index(i) => *i,
};
for f_name in projection.iter() {
let idx = self
.names()
.iter()
.position(|name| name == f_name)
.ok_or_else(|| vortex_err!("Unknown field {f_name}"))?;

names.push(self.names()[idx].clone());
children.push(
Expand Down Expand Up @@ -170,7 +167,7 @@ impl StructArrayTrait for StructArray {
)
}

fn project(&self, projection: &[Field]) -> VortexResult<ArrayData> {
fn project(&self, projection: &[FieldName]) -> VortexResult<ArrayData> {
self.project(projection).map(|a| a.into_array())
}
}
Expand Down Expand Up @@ -223,7 +220,7 @@ impl StatisticsVTable<StructArray> for StructEncoding {
#[cfg(test)]
mod test {
use vortex_buffer::buffer;
use vortex_dtype::{DType, Field, FieldName, FieldNames, Nullability};
use vortex_dtype::{DType, FieldName, FieldNames, Nullability};

use crate::array::primitive::PrimitiveArray;
use crate::array::struct_::StructArray;
Expand Down Expand Up @@ -251,7 +248,7 @@ mod test {
.unwrap();

let struct_b = struct_a
.project(&[Field::from(2usize), Field::from(0)])
.project(&[FieldName::from("zs"), FieldName::from("xs")])
.unwrap();
assert_eq!(
struct_b.names().as_ref(),
Expand Down
8 changes: 1 addition & 7 deletions vortex-array/src/arrow/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use arrow_array::{BinaryViewArray, GenericByteViewArray, GenericListArray, Strin
use arrow_buffer::buffer::{NullBuffer, OffsetBuffer};
use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer, ScalarBuffer};
use arrow_schema::{DataType, TimeUnit as ArrowTimeUnit};
use itertools::Itertools;
use vortex_buffer::{Alignment, ByteBuffer};
use vortex_datetime_dtype::TimeUnit;
use vortex_dtype::{DType, NativePType, Nullability, PType};
Expand Down Expand Up @@ -166,12 +165,7 @@ impl FromArrowArray<&ArrowBooleanArray> for ArrayData {
impl FromArrowArray<&ArrowStructArray> for ArrayData {
fn from_arrow(value: &ArrowStructArray, nullable: bool) -> Self {
StructArray::try_new(
value
.column_names()
.iter()
.map(|s| (*s).into())
.collect_vec()
.into(),
value.column_names().iter().map(|s| (*s).into()).collect(),
value
.columns()
.iter()
Expand Down
8 changes: 2 additions & 6 deletions vortex-array/src/arrow/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ impl FromArrowType<SchemaRef> for DType {
.fields()
.iter()
.map(|f| f.name().as_str().into())
.collect_vec()
.into(),
.collect(),
value
.fields()
.iter()
Expand Down Expand Up @@ -91,10 +90,7 @@ impl FromArrowType<&Field> for DType {
}
DataType::Struct(f) => Struct(
StructDType::new(
f.iter()
.map(|f| f.name().as_str().into())
.collect_vec()
.into(),
f.iter().map(|f| f.name().as_str().into()).collect(),
f.iter().map(|f| Self::from_arrow(f.as_ref())).collect_vec(),
),
nullability,
Expand Down
4 changes: 1 addition & 3 deletions vortex-array/src/arrow/record_batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use arrow_array::cast::AsArray;
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Schema};
use itertools::Itertools;
use vortex_error::{vortex_err, VortexError, VortexResult};

use crate::array::StructArray;
Expand All @@ -19,8 +18,7 @@ impl TryFrom<RecordBatch> for ArrayData {
.fields()
.iter()
.map(|f| f.name().as_str().into())
.collect_vec()
.into(),
.collect(),
value
.columns()
.iter()
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/variants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::sync::Arc;

use vortex_dtype::{DType, ExtDType, Field, FieldInfo, FieldNames, PType};
use vortex_dtype::{DType, ExtDType, Field, FieldInfo, FieldName, FieldNames, PType};
use vortex_error::{vortex_panic, VortexError, VortexExpect as _, VortexResult};

use crate::encoding::Encoding;
Expand Down Expand Up @@ -228,7 +228,7 @@ pub trait StructArrayTrait: ArrayTrait {
}
}

fn project(&self, projection: &[Field]) -> VortexResult<ArrayData>;
fn project(&self, projection: &[FieldName]) -> VortexResult<ArrayData>;
}

pub trait ListArrayTrait: ArrayTrait {}
Expand Down
33 changes: 23 additions & 10 deletions vortex-datafusion/src/memory/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use vortex_array::array::ChunkedArray;
use vortex_array::{ArrayDType, ArrayLen};
use vortex_dtype::Field;
use vortex_error::VortexResult;
use vortex_dtype::{FieldName, FieldNames};
use vortex_error::{vortex_err, VortexResult};

use crate::memory::statistics::chunked_array_df_stats;
use crate::memory::stream::VortexRecordBatchStream;
Expand All @@ -17,7 +17,7 @@ use crate::memory::stream::VortexRecordBatchStream;
#[derive(Clone)]
pub struct VortexScanExec {
array: ChunkedArray,
scan_projection: Vec<usize>,
scan_projection: FieldNames,
plan_properties: PlanProperties,
statistics: Statistics,
}
Expand All @@ -28,7 +28,25 @@ impl VortexScanExec {
scan_projection: Vec<usize>,
plan_properties: PlanProperties,
) -> VortexResult<Self> {
let statistics = chunked_array_df_stats(&array, &scan_projection)?;
let dtype = array.dtype().as_struct().ok_or_else(|| {
vortex_err!(
"VortexScanExec: expected struct array, found {:?}",
array.dtype()
)
})?;
let scan_projection: FieldNames = scan_projection
.iter()
.map(|idx| {
dtype.names().get(*idx).cloned().ok_or_else(|| {
vortex_err!(
"VortexScanExec: invalid field index {idx} in dtype {:?}",
dtype.names()
)
})
})
.collect::<VortexResult<Vec<FieldName>>>()?
.into();
let statistics = chunked_array_df_stats(&array, scan_projection.clone())?;
Ok(Self {
array,
scan_projection,
Expand Down Expand Up @@ -91,12 +109,7 @@ impl ExecutionPlan for VortexScanExec {
idx: 0,
num_chunks: self.array.nchunks(),
chunks: self.array.clone(),
projection: self
.scan_projection
.iter()
.copied()
.map(Field::from)
.collect(),
projection: self.scan_projection.iter().cloned().collect(),
}))
}

Expand Down
Loading

0 comments on commit 98e2968

Please sign in to comment.