diff --git a/Cargo.lock b/Cargo.lock index 38552b430e..d782d7491d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4800,7 +4800,6 @@ dependencies = [ "arrow-select", "bytes", "compio", - "croaring", "flatbuffers", "flume", "futures", diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index 63e79aa81f..36a155eb3a 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -18,7 +18,6 @@ arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-schema = { workspace = true } bytes = { workspace = true } -croaring = { workspace = true } compio = { workspace = true, features = ["bytes", "macros"], optional = true } flatbuffers = { workspace = true } flume = { workspace = true } diff --git a/vortex-file/src/read/buffered.rs b/vortex-file/src/read/buffered.rs index f7836d9c25..a5e8499aa4 100644 --- a/vortex-file/src/read/buffered.rs +++ b/vortex-file/src/read/buffered.rs @@ -28,9 +28,7 @@ impl BufferedLayoutReader { // TODO(robert): Support out of order reads fn buffer_read(&mut self, mask: &RowMask) -> VortexResult>> { while let Some(((begin, end), layout)) = self.layouts.pop_front() { - if mask.begin() <= begin && begin < mask.end() - || mask.begin() < end && end <= mask.end() - { + if mask.end() > begin && mask.begin() <= end { self.layouts.push_front(((begin, end), layout)); break; } diff --git a/vortex-file/src/read/layouts/chunked.rs b/vortex-file/src/read/layouts/chunked.rs index 375f753287..6ee363a6e6 100644 --- a/vortex-file/src/read/layouts/chunked.rs +++ b/vortex-file/src/read/layouts/chunked.rs @@ -150,13 +150,13 @@ mod tests { use std::iter; use std::sync::{Arc, RwLock}; + use arrow_buffer::BooleanBufferBuilder; use bytes::Bytes; - use croaring::Bitmap; use flatbuffers::{root_unchecked, FlatBufferBuilder}; use futures_util::TryStreamExt; - use vortex_array::array::{ChunkedArray, PrimitiveArray}; + use vortex_array::array::{BoolArray, ChunkedArray, PrimitiveArray}; use vortex_array::{ArrayDType, IntoArrayData, IntoArrayVariant}; - use vortex_dtype::PType; + use vortex_dtype::{Nullability, PType}; use vortex_expr::{BinaryExpr, Identity, Literal, Operator}; use vortex_flatbuffers::{footer, WriteFlatBuffer}; use vortex_ipc::messages::writer::MessageWriter; @@ -292,7 +292,7 @@ mod tests { &mut projection_layout, cache, &buf, - &RowMask::try_new(Bitmap::from_range(0..500), 0, 500).unwrap(), + &RowMask::new_valid_between(0, 500), ); assert!(arr.is_some()); @@ -309,10 +309,29 @@ mod tests { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); let (_, mut projection_layout, buf, _) = layout_and_bytes(cache.clone(), Scan::new(None)).await; + + let mut first_range = BooleanBufferBuilder::new(200); + first_range.append_n(150, true); + first_range.append_n(50, false); + + let mut snd_range = BooleanBufferBuilder::new(200); + snd_range.append_n(50, false); + snd_range.append_n(100, true); + snd_range.append_n(50, false); let mut arr = [ - RowMask::try_new(Bitmap::from_range(0..150), 0, 200).unwrap(), - RowMask::try_new(Bitmap::from_range(50..150), 200, 400).unwrap(), - RowMask::try_new(Bitmap::from_range(0..100), 400, 500).unwrap(), + RowMask::try_new( + BoolArray::new(first_range.finish(), Nullability::NonNullable).into_array(), + 0, + 200, + ) + .unwrap(), + RowMask::try_new( + BoolArray::new(snd_range.finish(), Nullability::NonNullable).into_array(), + 200, + 400, + ) + .unwrap(), + RowMask::new_valid_between(400, 500), ] .into_iter() .flat_map(|s| read_layout_data(&mut projection_layout, cache.clone(), &buf, &s)) diff --git a/vortex-file/src/read/layouts/test_read.rs b/vortex-file/src/read/layouts/test_read.rs index 6edc27721c..5a288105d5 100644 --- a/vortex-file/src/read/layouts/test_read.rs +++ b/vortex-file/src/read/layouts/test_read.rs @@ -2,7 +2,6 @@ use std::collections::{BTreeSet, VecDeque}; use std::sync::{Arc, RwLock}; use bytes::Bytes; -use croaring::Bitmap; use itertools::Itertools; use vortex_array::ArrayData; use vortex_error::VortexUnwrap; @@ -17,9 +16,7 @@ pub fn layout_splits(layout: &mut dyn LayoutReader, length: usize) -> Vec() - .map(|(begin, end)| unsafe { - RowMask::new_unchecked(Bitmap::from_range(begin as u32..end as u32), 0, end) - }) + .map(|(begin, end)| RowMask::new_valid_between(begin, end)) .collect::>() } diff --git a/vortex-file/src/read/mask.rs b/vortex-file/src/read/mask.rs index 6b18c3b7cb..0b9a13374c 100644 --- a/vortex-file/src/read/mask.rs +++ b/vortex-file/src/read/mask.rs @@ -1,27 +1,45 @@ use std::cmp::{max, min}; use std::fmt::{Display, Formatter}; -use arrow_buffer::{BooleanBuffer, MutableBuffer}; -use croaring::Bitmap; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, MutableBuffer}; use vortex_array::array::{BoolArray, PrimitiveArray, SparseArray}; -use vortex_array::compute::{filter, slice, take, FilterMask, TakeOptions}; -use vortex_array::validity::{LogicalValidity, Validity}; -use vortex_array::{ - iterate_integer_array, ArrayData, IntoArrayData, IntoArrayVariant, IntoCanonical, -}; -use vortex_dtype::PType; -use vortex_error::{vortex_bail, vortex_err, VortexResult}; - +use vortex_array::compute::{and, filter, slice, take, FilterMask, TakeOptions}; +use vortex_array::stats::ArrayStatistics; +use vortex_array::validity::LogicalValidity; +use vortex_array::{iterate_integer_array, ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; +use vortex_dtype::{DType, Nullability, PType}; +use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; const PREFER_TAKE_TO_FILTER_DENSITY: f64 = 1.0 / 1024.0; /// Bitmap of selected rows within given [begin, end) row range -#[derive(Debug, Clone, Default, PartialEq, Eq)] +#[derive(Debug, Clone)] pub struct RowMask { - values: Bitmap, + bitmask: ArrayData, begin: usize, end: usize, } +#[cfg(test)] +impl PartialEq for RowMask { + fn eq(&self, other: &Self) -> bool { + use vortex_error::VortexUnwrap; + self.begin == other.begin + && self.end == other.end + && self + .bitmask + .clone() + .into_bool() + .vortex_unwrap() + .boolean_buffer() + == other + .bitmask + .clone() + .into_bool() + .vortex_unwrap() + .boolean_buffer() + } +} + impl Display for RowMask { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "RowSelector [{}..{}]", self.begin, self.end) @@ -29,34 +47,61 @@ impl Display for RowMask { } impl RowMask { - pub fn try_new(values: Bitmap, begin: usize, end: usize) -> VortexResult { - if values - .maximum() - .map(|m| m > (end - begin) as u32) - .unwrap_or(false) - { - vortex_bail!("Values bitmap must be in 0..(end-begin) range") + pub fn try_new(bitmask: ArrayData, begin: usize, end: usize) -> VortexResult { + if bitmask.dtype() != &DType::Bool(Nullability::NonNullable) { + vortex_bail!( + "bitmask must be a nonnullable bool array {}", + bitmask.dtype() + ) + } + if bitmask.len() != (end - begin) { + vortex_bail!( + "Bitmask must be the same length {} as the given range {}..{}", + bitmask.len(), + begin, + end + ); } - Ok(Self { values, begin, end }) + Ok(Self { + bitmask, + begin, + end, + }) } /// Construct a RowMask which is valid in the given range. pub fn new_valid_between(begin: usize, end: usize) -> Self { - unsafe { RowMask::new_unchecked(Bitmap::from_range(0..(end - begin) as u32), begin, end) } + unsafe { + RowMask::new_unchecked( + BoolArray::from(BooleanBuffer::new_set(end - begin)).into_array(), + begin, + end, + ) + } } /// Construct a RowMask which is invalid everywhere in the given range. pub fn new_invalid_between(begin: usize, end: usize) -> Self { - unsafe { RowMask::new_unchecked(Bitmap::new(), begin, end) } + unsafe { + RowMask::new_unchecked( + BoolArray::from(BooleanBuffer::new_unset(end - begin)).into_array(), + begin, + end, + ) + } } - /// Construct a RowMask from given bitmap and begin. + /// Construct a RowMask from given bitmask, begin and end. /// /// # Safety /// - /// The maximum set index of the `values` must be no greater than `end - begin`. - pub unsafe fn new_unchecked(values: Bitmap, begin: usize, end: usize) -> Self { - Self { values, begin, end } + /// The bitmask must be of a nonnullable bool array and length of end - begin + pub unsafe fn new_unchecked(bitmask: ArrayData, begin: usize, end: usize) -> Self { + Self { + bitmask, + begin, + end, + } } /// Construct a RowMask from a Boolean typed array. @@ -64,35 +109,15 @@ impl RowMask { /// True-valued positions are kept by the returned mask. pub fn from_mask_array(array: &ArrayData, begin: usize, end: usize) -> VortexResult { match array.with_dyn(|a| a.logical_validity()) { - LogicalValidity::AllValid(_) => { - Self::from_mask_array_ignoring_validity(array, begin, end) - } + LogicalValidity::AllValid(_) => Self::try_new(array.clone(), begin, end), LogicalValidity::AllInvalid(_) => Ok(Self::new_invalid_between(begin, end)), LogicalValidity::Array(validity) => { - let mut bits = Self::from_mask_array_ignoring_validity(array, begin, end)?; - let validity = Self::from_mask_array_ignoring_validity(&validity, begin, end)?; - bits.and_inplace(&validity)?; - Ok(bits) + let bitmask = and(array.clone(), validity)?; + Self::try_new(bitmask, begin, end) } } } - fn from_mask_array_ignoring_validity( - array: &ArrayData, - begin: usize, - end: usize, - ) -> VortexResult { - let mut bitmap = Bitmap::new(); - array - .clone() - .into_canonical()? - .into_bool()? - .boolean_buffer() - .set_slices() - .for_each(|(start, stop)| bitmap.add_range(start as u32..stop as u32)); - Ok(unsafe { RowMask::new_unchecked(bitmap, begin, end) }) - } - /// Construct a RowMask from an integral array. /// /// The array values are interpreted as indices and those indices are kept by the returned mask. @@ -105,29 +130,36 @@ impl RowMask { return Err(err()); } - let mut bitmap = Bitmap::new(); + let len = end - begin; + let mut buffer_builder = BooleanBufferBuilder::new_from_buffer(MutableBuffer::from_len_zeroed(len.div_ceil(8)), len); iterate_integer_array!(array, |$P, $iterator| { for batch in $iterator { for index in batch.data() { - bitmap.add(u32::try_from(*index).map_err(|_| err())?); + buffer_builder.set_bit(*index as usize, true) } } }); - Ok(unsafe { RowMask::new_unchecked(bitmap, begin, end) }) + Ok(unsafe { RowMask::new_unchecked(BoolArray::new(buffer_builder.finish(), Nullability::NonNullable).into_array(), begin, end) }) }) } /// Combine the RowMask with bitmask values resulting in new RowMask containing only values true in the bitmask pub fn and_bitmask(self, bitmask: ArrayData) -> VortexResult { // If we are a dense all true bitmap just take the bitmask array - if self.len() as u64 == self.values.cardinality() { + if self.len() + == self + .bitmask + .statistics() + .compute_true_count() + .vortex_expect("Must have a true count") + { if bitmask.len() != self.len() { vortex_bail!( "Bitmask length {} does not match our length {}", bitmask.len(), - self.values.cardinality() + self.bitmask.len() ); } Self::from_mask_array(&bitmask, self.begin, self.end) @@ -142,7 +174,11 @@ impl RowMask { } pub fn is_empty(&self) -> bool { - self.values.is_empty() + self.bitmask + .statistics() + .compute_true_count() + .vortex_expect("Must have true count") + == 0 } pub fn begin(&self) -> usize { @@ -154,47 +190,41 @@ impl RowMask { } pub fn len(&self) -> usize { - self.end - self.begin + self.bitmask.len() } /// Limit mask to [begin..end) range pub fn slice(&self, begin: usize, end: usize) -> Self { let range_begin = max(self.begin, begin); let range_end = min(self.end, end); - let mask = - Bitmap::from_range((range_begin - self.begin) as u32..(range_end - self.begin) as u32); unsafe { RowMask::new_unchecked( - self.values - .and(&mask) - .add_offset(-((range_begin - self.begin) as i64)), + if range_begin == self.begin && range_end == self.end { + self.bitmask.clone() + } else { + slice( + &self.bitmask, + range_begin - self.begin, + range_end - self.begin, + ) + .vortex_expect("Must be a valid slice") + }, range_begin, range_end, ) } } - /// Unset, in place, any bits that are unset in `other`. - pub fn and_inplace(&mut self, other: &RowMask) -> VortexResult<()> { - if self.begin != other.begin || self.end != other.end { - vortex_bail!( - "begin and ends must match: {}-{} {}-{}", - self.begin, - self.end, - other.begin, - other.end - ); - } - self.values.and_inplace(&other.values); - Ok(()) - } - /// Filter array with this `RowMask`. /// /// This function assumes that Array is no longer than the mask length and that the mask starts on same offset as the array, /// i.e. the beginning of the array corresponds to the beginning of the mask with begin = 0 pub fn filter_array(&self, array: impl AsRef) -> VortexResult> { - let true_count = self.values.cardinality(); + let true_count = self + .bitmask + .statistics() + .compute_true_count() + .vortex_expect("Must have a true count"); if true_count == 0 { return Ok(None); } @@ -207,7 +237,7 @@ impl RowMask { &slice(array, self.begin, self.end)? }; - if true_count == sliced.len() as u64 { + if true_count == sliced.len() { return Ok(Some(sliced.clone())); } @@ -215,33 +245,22 @@ impl RowMask { let indices = self.to_indices_array()?; take(sliced, indices, TakeOptions::default()).map(Some) } else { - let mask = self.to_filter_mask()?; + let mask = FilterMask::try_from(self.bitmask.clone())?; filter(sliced, &mask).map(Some) } } pub fn to_indices_array(&self) -> VortexResult { - Ok(PrimitiveArray::from_vec(self.values.to_vec(), Validity::NonNullable).into_array()) - } - - pub fn to_filter_mask(&self) -> VortexResult { - let bitset = self - .values - .to_bitset() - .ok_or_else(|| vortex_err!("Couldn't create bitset for RowSelection"))?; - - let byte_length = self.len().div_ceil(8); - let mut buffer = MutableBuffer::with_capacity(byte_length); - buffer.extend_from_slice(bitset.as_slice()); - if byte_length > bitset.size_in_bytes() { - buffer.extend_zeros(byte_length - bitset.size_in_bytes()); - } - - Ok(FilterMask::from(BoolArray::from(BooleanBuffer::new( - buffer.into(), - 0, - self.len(), - )))) + Ok(PrimitiveArray::from( + self.bitmask + .clone() + .into_bool()? + .boolean_buffer() + .set_indices() + .map(|i| i as u64) + .collect::>(), + ) + .into_array()) } pub fn shift(self, offset: usize) -> VortexResult { @@ -252,38 +271,41 @@ impl RowMask { self.begin ) } - Ok(unsafe { RowMask::new_unchecked(self.values, self.begin - offset, self.end - offset) }) + Ok(unsafe { RowMask::new_unchecked(self.bitmask, self.begin - offset, self.end - offset) }) } } #[cfg(test)] mod tests { - use croaring::Bitmap; + use arrow_buffer::BooleanBuffer; use rstest::rstest; - use vortex_array::array::PrimitiveArray; + use vortex_array::array::{BoolArray, PrimitiveArray}; use vortex_array::{IntoArrayData, IntoArrayVariant}; + use vortex_dtype::Nullability; use crate::read::mask::RowMask; #[rstest] #[case( - RowMask::try_new((0..2).chain(9..10).collect(), 0, 10).unwrap(), (0, 1), - RowMask::try_new((0..1).collect(), 0, 1).unwrap())] + RowMask::try_new(BoolArray::from_iter([true, true, true, false, false, false, false, false, true, true]).into_array(), 0, 10).unwrap(), (0, 1), + RowMask::try_new(BoolArray::from_iter([true]).into_array(), 0, 1).unwrap())] #[case( - RowMask::try_new((5..8).chain(9..10).collect(), 0, 10).unwrap(), (2, 5), - RowMask::try_new(Bitmap::new(), 2, 5).unwrap())] + RowMask::try_new(BoolArray::from_iter([false, false, false, false, false, true, true, true, true, true]).into_array(), 0, 10).unwrap(), (2, 5), + RowMask::try_new(BoolArray::from_iter([false, false, false]).into_array(), 2, 5).unwrap() + )] #[case( - RowMask::try_new((0..4).collect(), 0, 10).unwrap(), (2, 5), - RowMask::try_new((0..2).collect(), 2, 5).unwrap())] + RowMask::try_new(BoolArray::from_iter([true, true, true, true, false, false, false, false, false, false]).into_array(), 0, 10).unwrap(), (2, 5), + RowMask::try_new(BoolArray::from_iter([true, true, false]).into_array(), 2, 5).unwrap() + )] #[case( - RowMask::try_new((0..3).chain(5..6).collect(), 0, 10).unwrap(), (2, 6), - RowMask::try_new((0..1).chain(3..4).collect(), 2, 6).unwrap())] + RowMask::try_new(BoolArray::from_iter([true, true, true, false, false, true, true, false, false, false]).into_array(), 0, 10).unwrap(), (2, 6), + RowMask::try_new(BoolArray::from_iter([true, false, false, true]).into_array(), 2, 6).unwrap())] #[case( - RowMask::try_new((5..10).collect(), 0, 10).unwrap(), (7, 11), - RowMask::try_new((0..3).collect(), 7, 10).unwrap())] + RowMask::try_new(BoolArray::from_iter([false, false, false, false, false, true, true, true, true, true]).into_array(), 0, 10).unwrap(), (7, 11), + RowMask::try_new(BoolArray::from_iter([true, true, true]).into_array(), 7, 10).unwrap())] #[case( - RowMask::try_new((1..6).collect(), 3, 9).unwrap(), (0, 5), - RowMask::try_new((1..2).collect(), 3, 5).unwrap())] + RowMask::try_new(BoolArray::from_iter([false, true, true, true, true, true]).into_array(), 3, 9).unwrap(), (0, 5), + RowMask::try_new(BoolArray::from_iter([false, true]).into_array(), 3, 5).unwrap())] #[cfg_attr(miri, ignore)] fn slice(#[case] first: RowMask, #[case] range: (usize, usize), #[case] expected: RowMask) { assert_eq!(first.slice(range.0, range.1), expected); @@ -293,35 +315,61 @@ mod tests { #[should_panic] #[cfg_attr(miri, ignore)] fn test_new() { - RowMask::try_new((5..10).collect(), 5, 10).unwrap(); + RowMask::try_new( + BoolArray::new(BooleanBuffer::new_unset(10), Nullability::NonNullable).into_array(), + 5, + 10, + ) + .unwrap(); } #[test] #[should_panic] #[cfg_attr(miri, ignore)] fn shift_invalid() { - RowMask::try_new((0..5).collect(), 5, 10) - .unwrap() - .shift(7) - .unwrap(); + RowMask::try_new( + BoolArray::from_iter([true, true, true, true, true]).into_array(), + 5, + 10, + ) + .unwrap() + .shift(7) + .unwrap(); } #[test] #[cfg_attr(miri, ignore)] fn shift() { assert_eq!( - RowMask::try_new((0..5).collect(), 5, 10) - .unwrap() - .shift(5) - .unwrap(), - RowMask::try_new((0..5).collect(), 0, 5).unwrap() + RowMask::try_new( + BoolArray::from_iter([true, true, true, true, true]).into_array(), + 5, + 10 + ) + .unwrap() + .shift(5) + .unwrap(), + RowMask::try_new( + BoolArray::from_iter([true, true, true, true, true]).into_array(), + 0, + 5 + ) + .unwrap() ); } #[test] #[cfg_attr(miri, ignore)] fn filter_array() { - let mask = RowMask::try_new((5..10).collect(), 0, 10).unwrap(); + let mask = RowMask::try_new( + BoolArray::from_iter([ + false, false, false, false, false, true, true, true, true, true, + ]) + .into_array(), + 0, + 10, + ) + .unwrap(); let array = PrimitiveArray::from((0..20).collect::>()).into_array(); let filtered = mask.filter_array(array).unwrap().unwrap(); assert_eq!( diff --git a/vortex-file/src/read/splits.rs b/vortex-file/src/read/splits.rs index 965371bf4f..0aa1351940 100644 --- a/vortex-file/src/read/splits.rs +++ b/vortex-file/src/read/splits.rs @@ -181,6 +181,8 @@ impl Iterator for FixedSplitIterator { mod tests { use std::collections::BTreeSet; + use vortex_array::array::BoolArray; + use vortex_array::IntoArrayData; use vortex_error::VortexResult; use crate::read::splits::{FixedSplitIterator, MaskIterator, SplitMask}; @@ -204,10 +206,22 @@ mod tests { #[test] #[cfg_attr(miri, ignore)] fn filters_empty() { - let mut mask_iter = - FixedSplitIterator::new(10, Some(RowMask::try_new((4..6).collect(), 0, 10).unwrap())); + let mut mask_iter = FixedSplitIterator::new( + 10, + Some( + RowMask::try_new( + BoolArray::from_iter([ + false, false, false, false, true, true, false, false, false, false, + ]) + .into_array(), + 0, + 10, + ) + .unwrap(), + ), + ); mask_iter - .additional_splits(&mut BTreeSet::from([2, 4, 6, 8, 10])) + .additional_splits(&mut BTreeSet::from([0, 2, 4, 6, 8, 10])) .unwrap(); assert_eq!( mask_iter