Skip to content

Commit

Permalink
feat: Use buffers for opaque data in VarBin and VarBinView (#1935)
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS authored Jan 14, 2025
1 parent 1c79fe0 commit fc6deac
Show file tree
Hide file tree
Showing 19 changed files with 123 additions and 254 deletions.
2 changes: 1 addition & 1 deletion bench-vortex/benches/bytes_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use vortex::{Context, IntoArrayData, IntoArrayVariant};
fn array_data_fixture() -> VarBinArray {
VarBinArray::try_new(
buffer![0i32, 5i32, 10i32, 15i32, 20i32].into_array(),
ByteBuffer::copy_from(b"helloworldhelloworld".as_bytes()).into_array(),
ByteBuffer::copy_from(b"helloworldhelloworld".as_bytes()),
DType::Utf8(Nullability::NonNullable),
Validity::NonNullable,
)
Expand Down
4 changes: 2 additions & 2 deletions docs/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Vortex array:
>>> parquet = pq.read_table("_static/example.parquet")
>>> vtx = vortex.array(parquet)
>>> vtx.nbytes
141070
141069

Compress
^^^^^^^^
Expand All @@ -46,7 +46,7 @@ Use :func:`~vortex.encoding.compress` to compress the Vortex array and check the

>>> cvtx = vortex.compress(vtx)
>>> cvtx.nbytes
16605
16604
>>> cvtx.nbytes / vtx.nbytes
0.11...

Expand Down
9 changes: 2 additions & 7 deletions encodings/dict/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,8 @@ fn dict_encode_varbin_bytes<'a, I: Iterator<Item = Option<&'a [u8]>>>(
let values_validity = dict_values_validity(dtype.is_nullable(), offsets.len() - 1);
(
PrimitiveArray::new(codes, Validity::NonNullable),
VarBinArray::try_new(
offsets.into_array(),
bytes.into_array(),
dtype,
values_validity,
)
.vortex_expect("Failed to create VarBinArray dictionary during encoding"),
VarBinArray::try_new(offsets.into_array(), bytes.freeze(), dtype, values_validity)
.vortex_expect("Failed to create VarBinArray dictionary during encoding"),
)
}

Expand Down
12 changes: 4 additions & 8 deletions encodings/fsst/src/canonical.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use arrow_array::builder::make_view;
use vortex_array::array::{BinaryView, VarBinArray, VarBinViewArray};
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::{
ArrayDType, ArrayLen, Canonical, IntoArrayData, IntoArrayVariant, IntoCanonical,
};
use vortex_array::{ArrayDType, ArrayLen, Canonical, IntoCanonical};
use vortex_buffer::{BufferMut, ByteBuffer};
use vortex_dtype::match_each_integer_ptype;
use vortex_error::VortexResult;
Expand All @@ -23,12 +21,10 @@ impl IntoCanonical for FSSTArray {
// call. We then turn our uncompressed_lengths into an offsets buffer
// necessary for a VarBinViewArray and construct the canonical array.

let compressed_bytes = VarBinArray::try_from(self.codes())?
.sliced_bytes()?
.into_primitive()?;
let bytes = VarBinArray::try_from(self.codes())?.sliced_bytes();

// Bulk-decompress the entire array.
let uncompressed_bytes = decompressor.decompress(compressed_bytes.as_slice::<u8>());
let uncompressed_bytes = decompressor.decompress(bytes.as_slice());

let uncompressed_lens_array = self
.uncompressed_lengths()
Expand All @@ -54,7 +50,7 @@ impl IntoCanonical for FSSTArray {
});

let views = views.freeze();
let uncompressed_bytes_array = ByteBuffer::from(uncompressed_bytes).into_array();
let uncompressed_bytes_array = ByteBuffer::from(uncompressed_bytes);

VarBinViewArray::try_new(
views,
Expand Down
10 changes: 3 additions & 7 deletions vortex-array/src/array/chunked/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,12 @@ fn pack_views(
// merged buffers list.
let buffers_offset = u32::try_from(buffers.len())?;
let canonical_chunk = chunk.clone().into_varbinview()?;
buffers.extend(canonical_chunk.buffers());

for buffer in canonical_chunk.buffers() {
let canonical_buffer = buffer.into_canonical()?.into_primitive()?.into_array();
buffers.push(canonical_buffer);
}

for view in canonical_chunk.binary_views()? {
for view in canonical_chunk.views().iter() {
if view.is_inlined() {
// Inlined views can be copied directly into the output
views.push(view);
views.push(*view);
} else {
// Referencing views must have their buffer_index adjusted with new offsets
let view_ref = view.as_view();
Expand Down
5 changes: 1 addition & 4 deletions vortex-array/src/array/constant/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,7 @@ fn canonical_byte_view(
let view = BinaryView::from(make_view(scalar_bytes, 0, 0));
let mut buffers = Vec::new();
if scalar_bytes.len() >= BinaryView::MAX_INLINED_SIZE {
buffers.push(
PrimitiveArray::new(Buffer::copy_from(scalar_bytes), Validity::NonNullable)
.into_array(),
);
buffers.push(Buffer::copy_from(scalar_bytes));
}

// Clone our constant view `len` times.
Expand Down
7 changes: 4 additions & 3 deletions vortex-array/src/array/varbin/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ impl ArrayAccessor<[u8]> for VarBinArray {
where
F: for<'a> FnOnce(&mut (dyn Iterator<Item = Option<&'a [u8]>>)) -> R,
{
// TODO(ngates): what happens if bytes is much larger than sliced_bytes?
let primitive = self.bytes().into_primitive()?;
let offsets = self.offsets().into_primitive()?;
let validity = self.logical_validity().to_null_buffer()?;

// TODO(ngates): what happens if bytes is much larger than sliced_bytes?
let bytes = self.bytes();
let bytes = bytes.as_slice();

match_each_integer_ptype!(offsets.ptype(), |$T| {
let offsets = offsets.as_slice::<$T>();
let bytes = primitive.as_slice::<u8>();

match validity {
None => {
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/varbin/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl ValidityVTable<VarBinArray> for VarBinEncoding {
impl VisitorVTable<VarBinArray> for VarBinEncoding {
fn accept(&self, array: &VarBinArray, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> {
visitor.visit_child("offsets", &array.offsets())?;
visitor.visit_child("bytes", &array.bytes())?;
visitor.visit_buffer(&array.bytes())?;
visitor.visit_validity(&array.validity())
}
}
17 changes: 5 additions & 12 deletions vortex-array/src/array/varbin/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,22 @@ pub(crate) fn varbin_to_arrow(varbin_array: &VarBinArray) -> VortexResult<ArrayR
.to_null_buffer()
.map_err(|err| err.with_context("Failed to get null buffer from logical validity"))?;

let data = varbin_array
.bytes()
.into_primitive()
.map_err(|err| err.with_context("Failed to canonicalize bytes"))?;
if data.dtype() != &DType::BYTES {
vortex_bail!("Expected bytes to be of type U8, got {}", data.ptype());
}
let data = data.byte_buffer();
let data = varbin_array.bytes();

// Switch on Arrow DType.
Ok(match varbin_array.dtype() {
DType::Binary(_) => match offsets.ptype() {
PType::I32 => Arc::new(unsafe {
BinaryArray::new_unchecked(
offsets.buffer::<i32>().into_arrow_offset_buffer(),
data.clone().into_arrow_buffer(),
data.into_arrow_buffer(),
nulls,
)
}),
PType::I64 => Arc::new(unsafe {
LargeBinaryArray::new_unchecked(
offsets.buffer::<i64>().into_arrow_offset_buffer(),
data.clone().into_arrow_buffer(),
data.into_arrow_buffer(),
nulls,
)
}),
Expand All @@ -63,14 +56,14 @@ pub(crate) fn varbin_to_arrow(varbin_array: &VarBinArray) -> VortexResult<ArrayR
PType::I32 => Arc::new(unsafe {
StringArray::new_unchecked(
offsets.buffer::<i32>().into_arrow_offset_buffer(),
data.clone().into_arrow_buffer(),
data.into_arrow_buffer(),
nulls,
)
}),
PType::I64 => Arc::new(unsafe {
LargeStringArray::new_unchecked(
offsets.buffer::<i64>().into_arrow_offset_buffer(),
data.clone().into_arrow_buffer(),
data.into_arrow_buffer(),
nulls,
)
}),
Expand Down
3 changes: 1 addition & 2 deletions vortex-array/src/array/varbin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ impl<O: NativePType + PrimInt> VarBinBuilder<O> {

pub fn finish(mut self, dtype: DType) -> VarBinArray {
let offsets = PrimitiveArray::new(self.offsets.freeze(), Validity::NonNullable);
let data = PrimitiveArray::new(self.data.freeze(), Validity::NonNullable);
let nulls = self.validity.finish();

let validity = if dtype.is_nullable() {
Expand All @@ -90,7 +89,7 @@ impl<O: NativePType + PrimInt> VarBinBuilder<O> {
Validity::NonNullable
};

VarBinArray::try_new(offsets.into_array(), data.into_array(), dtype, validity)
VarBinArray::try_new(offsets.into_array(), self.data.freeze(), dtype, validity)
.vortex_expect("Unexpected error while building VarBinArray")
}
}
Expand Down
9 changes: 4 additions & 5 deletions vortex-array/src/array/varbin/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn filter_select_var_bin_by_slice(
filter_select_var_bin_by_slice_primitive_offset(
values.dtype().clone(),
offsets.as_slice::<$O>(),
values.bytes().into_primitive()?.as_slice::<u8>(),
values.bytes().as_slice(),
mask,
values.validity(),
selection_count
Expand Down Expand Up @@ -136,7 +136,7 @@ fn filter_select_var_bin_by_index(
filter_select_var_bin_by_index_primitive_offset(
values.dtype().clone(),
offsets.as_slice::<$O>(),
values.bytes().into_primitive()?.as_slice::<u8>(),
values.bytes().as_slice(),
mask,
values.validity(),
selection_count
Expand Down Expand Up @@ -187,7 +187,7 @@ mod test {
use crate::array::BoolArray;
use crate::compute::{scalar_at, FilterMask};
use crate::validity::Validity;
use crate::{IntoArrayData, ToArrayData};
use crate::ToArrayData;

fn nullable_scalar_str(s: &str) -> Scalar {
Scalar::utf8(s.to_owned(), Nullable)
Expand Down Expand Up @@ -240,7 +240,7 @@ mod test {

#[test]
fn filter_var_bin_slice_null_test() {
let x = [
let bytes = [
b"one".as_slice(),
b"two".as_slice(),
b"three".as_slice(),
Expand All @@ -252,7 +252,6 @@ mod test {
.flat_map(|x| x.iter().cloned())
.collect::<ByteBuffer>();

let bytes = x.into_array();
let offsets = PrimitiveArray::from_iter([0, 3, 6, 11, 15, 19, 22]).to_array();
let validity =
Validity::Array(BoolArray::from_iter([true, false, true, true, true, true]).to_array());
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/varbin/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use crate::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
impl TakeFn<VarBinArray> for VarBinEncoding {
fn take(&self, array: &VarBinArray, indices: &ArrayData) -> VortexResult<ArrayData> {
let offsets = array.offsets().into_primitive()?;
let data = array.bytes().into_primitive()?;
let data = array.bytes();
let indices = indices.clone().into_primitive()?;
match_each_integer_ptype!(offsets.ptype(), |$O| {
match_each_integer_ptype!(indices.ptype(), |$I| {
Ok(take(
array.dtype().clone(),
offsets.as_slice::<$O>(),
data.as_slice::<u8>(),
data.as_slice(),
indices.as_slice::<$I>(),
array.validity(),
)?.into_array())
Expand Down
Loading

0 comments on commit fc6deac

Please sign in to comment.