Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use buffer for VarBinView views #1121

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions encodings/dict/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@ simplelog = { workspace = true }
[[bench]]
name = "dict_compress"
harness = false

[[bench]]
name = "dict_canonical"
harness = false
34 changes: 34 additions & 0 deletions encodings/dict/benches/dict_canonical.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#![allow(clippy::unwrap_used)]

use criterion::{criterion_group, criterion_main, Criterion};
use vortex::array::VarBinArray;
use vortex::{IntoArray, IntoCanonical};
use vortex_dict::{dict_encode_varbin, DictArray};
use vortex_dtype::{DType, Nullability};

fn fixture(len: usize) -> DictArray {
let values = [
Some("inlined"),
None,
Some("not inlined but repeated often"),
];

let strings = VarBinArray::from_iter(
values.into_iter().cycle().take(len),
DType::Utf8(Nullability::Nullable),
);

let (codes, values) = dict_encode_varbin(&strings);
DictArray::try_new(codes.into_array(), values.into_array()).unwrap()
}

fn bench_canonical(c: &mut Criterion) {
let dict_array = fixture(1024).into_array();

c.bench_function("canonical", |b| {
b.iter(|| dict_array.clone().into_canonical().unwrap())
});
}

criterion_group!(bench_dict_canonical, bench_canonical);
criterion_main!(bench_dict_canonical);
56 changes: 49 additions & 7 deletions encodings/dict/src/array.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::fmt::{Debug, Display};

use arrow_buffer::BooleanBuffer;
use arrow_buffer::{BooleanBuffer, ScalarBuffer};
use serde::{Deserialize, Serialize};
use vortex::array::visitor::{AcceptArrayVisitor, ArrayVisitor};
use vortex::array::BoolArray;
use vortex::compute::take;
use vortex::compute::unary::scalar_at;
use vortex::array::{BoolArray, ConstantArray, VarBinViewArray};
use vortex::compute::unary::{scalar_at, try_cast};
use vortex::compute::{compare, take, Operator};
use vortex::encoding::ids;
use vortex::stats::StatsSet;
use vortex::validity::{ArrayValidity, LogicalValidity};
use vortex::validity::{ArrayValidity, LogicalValidity, Validity};
use vortex::{
impl_encoding, Array, ArrayDType, ArrayTrait, Canonical, IntoArray, IntoArrayVariant,
IntoCanonical,
Expand Down Expand Up @@ -41,6 +41,7 @@ impl DictArray {
DictMetadata {
codes_ptype: PType::try_from(codes.dtype())
.vortex_expect("codes dtype must be uint"),

values_len: values.len(),
},
[values, codes].into(),
Expand All @@ -67,11 +68,52 @@ impl ArrayTrait for DictArray {}

impl IntoCanonical for DictArray {
fn into_canonical(self) -> VortexResult<Canonical> {
let canonical_values: Array = self.values().into_canonical()?.into();
take(canonical_values, self.codes())?.into_canonical()
match self.dtype() {
DType::Utf8(_) | DType::Binary(_) => canonicalize_string(self),
_ => canonicalize_primitive(self),
}
}
}

/// Canonicalize a set of codes and values.
fn canonicalize_string(array: DictArray) -> VortexResult<Canonical> {
let values = array.values().into_varbinview()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess into_varbinview() runs a canonicalize internally? The name of this function doesn't scream quite how expensive it is I think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We made #507. Per https://rust-lang.github.io/api-guidelines/naming.html#ad-hoc-conversions-follow-as_-to_-into_-conventions-c-conv this should likely be to_varbinview() but also since this is owned -> owned conversion the docs suggest into_, however, to_ in Rust is meant to indicate expensive conversions

let codes = try_cast(array.codes(), PType::U64.into())?.into_primitive()?;

let value_views = ScalarBuffer::<u128>::from(values.views().clone().into_arrow());

// Gather the views from value_views into full_views using the dictionary codes.
let full_views: Vec<u128> = codes
.maybe_null_slice::<u64>()
.iter()
.map(|code| value_views[*code as usize])
.collect();

let validity = if array.dtype().is_nullable() {
// For nullable arrays, a code of 0 indicates null value.
Validity::Array(compare(
codes.as_ref(),
ConstantArray::new(0u64, codes.len()).as_ref(),
Operator::Eq,
)?)
} else {
Validity::NonNullable
};

VarBinViewArray::try_new(
full_views.into(),
values.buffers().collect(),
array.dtype().clone(),
validity,
)
.map(Canonical::VarBinView)
}

fn canonicalize_primitive(array: DictArray) -> VortexResult<Canonical> {
let canonical_values: Array = array.values().into_canonical()?.into();
take(canonical_values, array.codes())?.into_canonical()
}

impl ArrayValidity for DictArray {
fn is_valid(&self, index: usize) -> bool {
let values_index = scalar_at(self.codes(), index)
Expand Down
3 changes: 0 additions & 3 deletions encodings/dict/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ impl ScalarAtFn for DictArray {

impl TakeFn for DictArray {
fn take(&self, indices: &Array) -> VortexResult<Array> {
// Dict
// codes: 0 0 1
// dict: a b c d e f g h
let codes = take(self.codes(), indices)?;
Self::try_new(codes, self.values()).map(|a| a.into_array())
}
Expand Down
12 changes: 4 additions & 8 deletions vortex-array/src/array/chunked/canonical.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow_buffer::{BooleanBufferBuilder, Buffer, MutableBuffer, ScalarBuffer};
use arrow_buffer::{BooleanBufferBuilder, MutableBuffer, ScalarBuffer};
use vortex_dtype::{DType, PType, StructDType};
use vortex_error::{vortex_bail, vortex_err, ErrString, VortexResult};

Expand Down Expand Up @@ -179,11 +179,7 @@ fn pack_primitives(
buffer.extend_from_slice(chunk.buffer());
}

Ok(PrimitiveArray::new(
Buffer::from(buffer).into(),
ptype,
validity,
))
Ok(PrimitiveArray::new(buffer.into(), ptype, validity))
}

/// Builds a new [VarBinViewArray] by repacking the values from the chunks into a single
Expand Down Expand Up @@ -231,8 +227,8 @@ fn pack_views(
}
}

let views_buffer: Buffer = ScalarBuffer::<u128>::from(views).into_inner();
VarBinViewArray::try_new(Array::from(views_buffer), buffers, dtype.clone(), validity)
let views_buffer = ScalarBuffer::<u128>::from(views).into_inner();
VarBinViewArray::try_new(views_buffer.into(), buffers, dtype.clone(), validity)
}

#[cfg(test)]
Expand Down
12 changes: 5 additions & 7 deletions vortex-array/src/array/constant/canonical.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use arrow_array::builder::make_view;
use arrow_buffer::{BooleanBuffer, BufferBuilder};
use arrow_buffer::{BooleanBuffer, BufferBuilder, MutableBuffer};
use vortex_buffer::Buffer;
use vortex_dtype::{match_each_native_ptype, DType, Nullability, PType};
use vortex_error::{vortex_bail, VortexResult};
use vortex_scalar::{BinaryScalar, BoolScalar, Scalar, Utf8Scalar};
use vortex_scalar::{BinaryScalar, BoolScalar, Utf8Scalar};

use crate::array::constant::ConstantArray;
use crate::array::primitive::PrimitiveArray;
Expand Down Expand Up @@ -70,10 +70,10 @@ fn canonical_byte_view(
) -> VortexResult<VarBinViewArray> {
match scalar_bytes {
None => {
let views = ConstantArray::new(Scalar::null(dtype.clone()), len);
let views = MutableBuffer::from(Vec::<u128>::with_capacity(1));

VarBinViewArray::try_new(
views.into_array(),
views.into(),
Vec::new(),
dtype.clone(),
Validity::AllInvalid,
Expand All @@ -100,9 +100,7 @@ fn canonical_byte_view(
// add u128 PType, see https://github.com/spiraldb/vortex/issues/1110
let mut views = BufferBuilder::<u128>::new(len);
views.append_n(len, view);
let views =
PrimitiveArray::new(views.finish().into(), PType::U8, Validity::NonNullable)
.into_array();
let views = views.finish().into();

let validity = if dtype.nullability() == Nullability::NonNullable {
Validity::NonNullable
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/varbin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl VarBinArray {
vortex_bail!(MismatchedTypes: "utf8 or binary", dtype);
}
if dtype.is_nullable() == (validity == Validity::NonNullable) {
vortex_bail!("incorrect validity {:?}", validity);
vortex_bail!("incorrect validity {:?} for {}", validity, dtype);
}

let length = offsets.len() - 1;
Expand Down
38 changes: 23 additions & 15 deletions vortex-array/src/array/varbinview/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@ use std::sync::Arc;
use arrow_array::cast::AsArray;
use arrow_array::types::ByteViewType;
use arrow_array::{Datum, GenericByteViewArray};
use arrow_buffer::ScalarBuffer;
use arrow_ord::cmp;
use arrow_schema::DataType;
use vortex_buffer::Buffer;
use vortex_dtype::PType;
use vortex_error::{vortex_bail, VortexResult, VortexUnwrap};
use vortex_scalar::Scalar;

use crate::array::varbin::varbin_scalar;
use crate::array::varbinview::{VarBinViewArray, VIEW_SIZE_BYTES};
use crate::array::{varbinview_as_arrow, ConstantArray};
use crate::array::ConstantArray;
use crate::arrow::FromArrowArray;
use crate::compute::unary::ScalarAtFn;
use crate::compute::{slice, ArrayCompute, MaybeCompareFn, Operator, SliceFn, TakeFn};
use crate::{Array, ArrayDType, IntoArray, IntoCanonical};
use crate::compute::unary::{try_cast, ScalarAtFn};
use crate::compute::{ArrayCompute, MaybeCompareFn, Operator, SliceFn, TakeFn};
use crate::{Array, ArrayDType, IntoArray, IntoArrayVariant, IntoCanonical};

impl ArrayCompute for VarBinViewArray {
fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
Expand Down Expand Up @@ -45,11 +47,8 @@ impl ScalarAtFn for VarBinViewArray {
impl SliceFn for VarBinViewArray {
fn slice(&self, start: usize, stop: usize) -> VortexResult<Array> {
Ok(Self::try_new(
slice(
self.views(),
start * VIEW_SIZE_BYTES,
stop * VIEW_SIZE_BYTES,
)?,
self.views()
.slice(start * VIEW_SIZE_BYTES..stop * VIEW_SIZE_BYTES),
(0..self.metadata().buffer_lens.len())
.map(|i| self.buffer(i))
.collect::<Vec<_>>(),
Expand All @@ -63,12 +62,21 @@ impl SliceFn for VarBinViewArray {
/// Take involves creating a new array that references the old array, just with the given set of views.
impl TakeFn for VarBinViewArray {
fn take(&self, indices: &Array) -> VortexResult<Array> {
let array_ref = varbinview_as_arrow(self);
let indices_arrow = indices.clone().into_canonical()?.into_arrow()?;

let take_arrow = arrow_select::take::take(&array_ref, &indices_arrow, None)?;
let nullable = take_arrow.is_nullable();
Ok(Array::from_arrow(take_arrow, nullable))
let views = ScalarBuffer::<u128>::from(self.views().clone().into_arrow());

let taken: Vec<u128> = try_cast(indices, PType::U64.into())?
.into_primitive()?
.maybe_null_slice::<u64>()
.iter()
.map(|idx| views[*idx as usize])
.collect();
VarBinViewArray::try_new(
ScalarBuffer::from(taken).into_inner().into(),
self.buffers().collect(),
self.dtype().clone(),
self.validity().take(indices)?,
)
.map(|a| a.into_array())
}
}

Expand Down
Loading
Loading