From 18986c29e63160ebfb4765aa6bcdb311d0fe1da4 Mon Sep 17 00:00:00 2001 From: Will Manning Date: Mon, 18 Nov 2024 13:55:24 -0500 Subject: [PATCH] feat: add stat for uncompressed size in bytes (#1315) fixes #1237 --- .github/workflows/ci.yml | 2 +- Cargo.lock | 1 + Cargo.toml | 4 ++++ vortex-array/src/array/bool/stats.rs | 6 ++++- vortex-array/src/array/chunked/mod.rs | 20 ++++++++++++---- vortex-array/src/array/chunked/stats.rs | 2 ++ vortex-array/src/array/null/mod.rs | 6 ++++- vortex-array/src/array/primitive/stats.rs | 10 +++++--- vortex-array/src/array/struct_/mod.rs | 18 +++++++++++++-- vortex-array/src/array/varbin/stats.rs | 8 +++++-- vortex-array/src/array/varbinview/stats.rs | 9 ++++++-- vortex-array/src/compress.rs | 16 ++++++++++--- vortex-array/src/stats/flatbuffers.rs | 1 + vortex-array/src/stats/mod.rs | 14 +++++++++-- vortex-array/src/stats/statsset.rs | 11 ++++++++- vortex-array/src/view.rs | 5 ++++ .../flatbuffers/vortex-array/array.fbs | 1 + vortex-flatbuffers/src/generated/array.rs | 17 ++++++++++++++ vortex-flatbuffers/src/generated/message.rs | 2 +- vortex-sampling-compressor/Cargo.toml | 1 + .../src/compressors/chunked.rs | 13 ++++++----- .../src/compressors/mod.rs | 3 +++ .../src/compressors/struct_.rs | 8 +++---- vortex-sampling-compressor/src/constants.rs | 4 ++-- vortex-sampling-compressor/tests/smoketest.rs | 23 +++++++++++++++++++ 25 files changed, 169 insertions(+), 36 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 83d0074c57..0100ecaff6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -162,7 +162,7 @@ jobs: with: version: v1.0.0 - name: Rust Bench as test - run: cargo bench --bench '*[!noci]' -- --test + run: cargo bench --bench '*[!noci]' --profile benchtest -- --test generated-files: name: "Check generated proto/fbs files are up to date" diff --git a/Cargo.lock b/Cargo.lock index d782d7491d..9baaa4962a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4975,6 +4975,7 @@ dependencies = [ "vortex-roaring", "vortex-runend", "vortex-runend-bool", + "vortex-scalar", "vortex-zigzag", ] diff --git a/Cargo.toml b/Cargo.toml index 4b6422368f..b345c9fbff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -222,3 +222,7 @@ lto = "thin" # attempts to perform optimizations across all crates within t codegen-units = 16 # default for "release", which "bench" inherits lto = false # default debug = true + +[profile.benchtest] +inherits = "bench" +debug-assertions = true diff --git a/vortex-array/src/array/bool/stats.rs b/vortex-array/src/array/bool/stats.rs index 10f1a015d3..a092dfc171 100644 --- a/vortex-array/src/array/bool/stats.rs +++ b/vortex-array/src/array/bool/stats.rs @@ -8,10 +8,14 @@ use vortex_error::VortexResult; use crate::array::BoolArray; use crate::stats::{ArrayStatisticsCompute, Stat, StatsSet}; use crate::validity::{ArrayValidity, LogicalValidity}; -use crate::{ArrayDType, IntoArrayVariant}; +use crate::{ArrayDType, ArrayTrait as _, IntoArrayVariant}; impl ArrayStatisticsCompute for BoolArray { fn compute_statistics(&self, stat: Stat) -> VortexResult { + if stat == Stat::UncompressedSizeInBytes { + return Ok(StatsSet::of(stat, self.nbytes())); + } + if self.is_empty() { return Ok(StatsSet::from_iter([ (Stat::TrueCount, 0.into()), diff --git a/vortex-array/src/array/chunked/mod.rs b/vortex-array/src/array/chunked/mod.rs index 655e4f79b5..bda4619c8d 100644 --- a/vortex-array/src/array/chunked/mod.rs +++ b/vortex-array/src/array/chunked/mod.rs @@ -17,7 +17,7 @@ use crate::compute::unary::{scalar_at, scalar_at_unchecked, subtract_scalar, Sub use crate::compute::{search_sorted, SearchSortedSide}; use crate::encoding::ids; use crate::iter::{ArrayIterator, ArrayIteratorAdapter}; -use crate::stats::StatsSet; +use crate::stats::ArrayStatistics; use crate::stream::{ArrayStream, ArrayStreamAdapter}; use crate::validity::Validity::NonNullable; use crate::validity::{ArrayValidity, LogicalValidity, Validity}; @@ -61,9 +61,19 @@ impl ChunkedArray { .collect_vec(); let nchunks = chunk_offsets.len() - 1; - let length = *chunk_offsets.last().unwrap_or_else(|| { - unreachable!("Chunk ends is guaranteed to have at least one element") - }) as usize; + let length = *chunk_offsets + .last() + .vortex_expect("Chunk ends is guaranteed to have at least one element") + as usize; + + let stats = chunks + .iter() + .map(|chunk| chunk.statistics().to_set()) + .reduce(|mut acc, stats| { + acc.merge_ordered(&stats); + acc + }) + .unwrap_or_default(); let mut children = Vec::with_capacity(chunks.len() + 1); children.push(PrimitiveArray::from_vec(chunk_offsets, NonNullable).into_array()); @@ -74,7 +84,7 @@ impl ChunkedArray { length, ChunkedMetadata { nchunks }, children.into(), - StatsSet::default(), + stats, ) } diff --git a/vortex-array/src/array/chunked/stats.rs b/vortex-array/src/array/chunked/stats.rs index 02c659ff73..a605c4b210 100644 --- a/vortex-array/src/array/chunked/stats.rs +++ b/vortex-array/src/array/chunked/stats.rs @@ -5,6 +5,8 @@ use crate::stats::{ArrayStatistics, ArrayStatisticsCompute, Stat, StatsSet}; impl ArrayStatisticsCompute for ChunkedArray { fn compute_statistics(&self, stat: Stat) -> VortexResult { + // for UncompressedSizeInBytes, we end up with sum of chunk uncompressed sizes + // this ignores the `chunk_offsets` array child, so it won't exactly match self.nbytes() Ok(self .chunks() .map(|c| { diff --git a/vortex-array/src/array/null/mod.rs b/vortex-array/src/array/null/mod.rs index d8ed71dbdf..48c03145f5 100644 --- a/vortex-array/src/array/null/mod.rs +++ b/vortex-array/src/array/null/mod.rs @@ -54,7 +54,11 @@ impl ArrayValidity for NullArray { } impl ArrayStatisticsCompute for NullArray { - fn compute_statistics(&self, _stat: Stat) -> VortexResult { + fn compute_statistics(&self, stat: Stat) -> VortexResult { + if stat == Stat::UncompressedSizeInBytes { + return Ok(StatsSet::of(stat, self.nbytes())); + } + Ok(StatsSet::nulls(self.len(), &DType::Null)) } } diff --git a/vortex-array/src/array/primitive/stats.rs b/vortex-array/src/array/primitive/stats.rs index b2d23be667..98c1494f89 100644 --- a/vortex-array/src/array/primitive/stats.rs +++ b/vortex-array/src/array/primitive/stats.rs @@ -14,7 +14,7 @@ use crate::array::primitive::PrimitiveArray; use crate::stats::{ArrayStatisticsCompute, Stat, StatsSet}; use crate::validity::{ArrayValidity, LogicalValidity}; use crate::variants::PrimitiveArrayTrait; -use crate::{ArrayDType, IntoArrayVariant}; +use crate::{ArrayDType, ArrayTrait as _, IntoArrayVariant}; trait PStatsType: NativePType + Into + BitWidth {} @@ -22,6 +22,10 @@ impl + BitWidth> PStatsType for T {} impl ArrayStatisticsCompute for PrimitiveArray { fn compute_statistics(&self, stat: Stat) -> VortexResult { + if stat == Stat::UncompressedSizeInBytes { + return Ok(StatsSet::of(stat, self.nbytes())); + } + let mut stats = match_each_native_ptype!(self.ptype(), |$P| { match self.logical_validity() { LogicalValidity::AllValid(_) => self.maybe_null_slice::<$P>().compute_statistics(stat), @@ -77,7 +81,7 @@ impl ArrayStatisticsCompute for &[T] { self.iter().skip(1).for_each(|next| stats.next(*next)); stats.finish() } - Stat::TrueCount => StatsSet::default(), + Stat::TrueCount | Stat::UncompressedSizeInBytes => StatsSet::default(), }) } } @@ -87,7 +91,7 @@ struct NullableValues<'a, T: PStatsType>(&'a [T], &'a BooleanBuffer); impl ArrayStatisticsCompute for NullableValues<'_, T> { fn compute_statistics(&self, stat: Stat) -> VortexResult { let values = self.0; - if values.is_empty() || stat == Stat::TrueCount { + if values.is_empty() || stat == Stat::TrueCount || stat == Stat::UncompressedSizeInBytes { return Ok(StatsSet::default()); } diff --git a/vortex-array/src/array/struct_/mod.rs b/vortex-array/src/array/struct_/mod.rs index 9ced68e71c..b31ccc00b1 100644 --- a/vortex-array/src/array/struct_/mod.rs +++ b/vortex-array/src/array/struct_/mod.rs @@ -7,7 +7,7 @@ use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexExpect as _, Vor use crate::array::visitor::{AcceptArrayVisitor, ArrayVisitor}; use crate::encoding::ids; -use crate::stats::{ArrayStatisticsCompute, StatsSet}; +use crate::stats::{ArrayStatistics, ArrayStatisticsCompute, Stat, StatsSet}; use crate::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata}; use crate::variants::{ArrayVariants, StructArrayTrait}; use crate::{ @@ -191,7 +191,21 @@ impl AcceptArrayVisitor for StructArray { } } -impl ArrayStatisticsCompute for StructArray {} +impl ArrayStatisticsCompute for StructArray { + fn compute_statistics(&self, stat: Stat) -> VortexResult { + Ok(match stat { + Stat::UncompressedSizeInBytes => self + .children() + .map(|f| f.statistics().compute_uncompressed_size_in_bytes()) + .reduce(|acc, field_size| acc.zip(field_size).map(|(a, b)| a + b)) + .flatten() + .map(|size| StatsSet::of(stat, size)) + .unwrap_or_default(), + Stat::NullCount => StatsSet::of(stat, self.validity().null_count(self.len())?), + _ => StatsSet::default(), + }) + } +} #[cfg(test)] mod test { diff --git a/vortex-array/src/array/varbin/stats.rs b/vortex-array/src/array/varbin/stats.rs index 953989da0b..8c4eb2b43f 100644 --- a/vortex-array/src/array/varbin/stats.rs +++ b/vortex-array/src/array/varbin/stats.rs @@ -7,10 +7,14 @@ use vortex_error::VortexResult; use crate::accessor::ArrayAccessor; use crate::array::varbin::{varbin_scalar, VarBinArray}; use crate::stats::{ArrayStatisticsCompute, Stat, StatsSet}; -use crate::ArrayDType; +use crate::{ArrayDType, ArrayTrait as _}; impl ArrayStatisticsCompute for VarBinArray { - fn compute_statistics(&self, _stat: Stat) -> VortexResult { + fn compute_statistics(&self, stat: Stat) -> VortexResult { + if stat == Stat::UncompressedSizeInBytes { + return Ok(StatsSet::of(stat, self.nbytes())); + } + if self.is_empty() { return Ok(StatsSet::default()); } diff --git a/vortex-array/src/array/varbinview/stats.rs b/vortex-array/src/array/varbinview/stats.rs index 1f008131ce..bab7b647dc 100644 --- a/vortex-array/src/array/varbinview/stats.rs +++ b/vortex-array/src/array/varbinview/stats.rs @@ -4,13 +4,18 @@ use crate::accessor::ArrayAccessor; use crate::array::varbin::compute_stats; use crate::array::varbinview::VarBinViewArray; use crate::stats::{ArrayStatisticsCompute, Stat, StatsSet}; -use crate::ArrayDType; +use crate::{ArrayDType, ArrayTrait as _}; impl ArrayStatisticsCompute for VarBinViewArray { - fn compute_statistics(&self, _stat: Stat) -> VortexResult { + fn compute_statistics(&self, stat: Stat) -> VortexResult { + if stat == Stat::UncompressedSizeInBytes { + return Ok(StatsSet::of(stat, self.nbytes())); + } + if self.is_empty() { return Ok(StatsSet::default()); } + self.with_iterator(|iter| compute_stats(iter, self.dtype())) } } diff --git a/vortex-array/src/compress.rs b/vortex-array/src/compress.rs index f8f6d16861..afcb64edfc 100644 --- a/vortex-array/src/compress.rs +++ b/vortex-array/src/compress.rs @@ -53,7 +53,15 @@ pub fn check_statistics_unchanged(arr: &ArrayData, compressed: &ArrayData) { let _ = compressed; #[cfg(debug_assertions)] { - for (stat, value) in arr.statistics().to_set().into_iter() { + use crate::stats::Stat; + + // Run count merge_ordered assumes that the run is "broken" on each chunk, which is a useful estimate but not guaranteed to be correct. + for (stat, value) in arr + .statistics() + .to_set() + .into_iter() + .filter(|(stat, _)| *stat != Stat::RunCount) + { debug_assert_eq!( compressed.statistics().get(stat), Some(value.clone()), @@ -68,7 +76,9 @@ pub fn check_statistics_unchanged(arr: &ArrayData, compressed: &ArrayData) { } } -/// Compute pruning stats for an array. -pub fn compute_pruning_stats(arr: &ArrayData) -> VortexResult<()> { +/// Eagerly compute certain statistics (i.e., pruning stats plus UncompressedSizeInBytes) for an array. +/// This function is intended to be called in compressors, immediately before compression occurs. +pub fn compute_precompression_stats(arr: &ArrayData) -> VortexResult<()> { + arr.statistics().compute_uncompressed_size_in_bytes(); arr.statistics().compute_all(PRUNING_STATS).map(|_| ()) } diff --git a/vortex-array/src/stats/flatbuffers.rs b/vortex-array/src/stats/flatbuffers.rs index bc2b57a0bf..ec12c01ddf 100644 --- a/vortex-array/src/stats/flatbuffers.rs +++ b/vortex-array/src/stats/flatbuffers.rs @@ -40,6 +40,7 @@ impl WriteFlatBuffer for &dyn Statistics { null_count: self.get_as_cast::(Stat::NullCount), bit_width_freq, trailing_zero_freq, + uncompressed_size_in_bytes: self.get_as_cast::(Stat::UncompressedSizeInBytes), }; crate::flatbuffers::ArrayStats::create(fbb, stat_args) diff --git a/vortex-array/src/stats/mod.rs b/vortex-array/src/stats/mod.rs index 2761bb451e..e0e0dfe216 100644 --- a/vortex-array/src/stats/mod.rs +++ b/vortex-array/src/stats/mod.rs @@ -44,6 +44,7 @@ pub enum Stat { TrueCount, /// The number of null values in the array NullCount, + UncompressedSizeInBytes, } impl Stat { @@ -59,6 +60,7 @@ impl Stat { | Stat::Min | Stat::TrueCount | Stat::NullCount + | Stat::UncompressedSizeInBytes ) } @@ -81,6 +83,7 @@ impl Display for Stat { Self::RunCount => write!(f, "run_count"), Self::TrueCount => write!(f, "true_count"), Self::NullCount => write!(f, "null_count"), + Self::UncompressedSizeInBytes => write!(f, "uncompressed_size_in_bytes"), } } } @@ -100,10 +103,13 @@ pub trait Statistics { /// Compute all of the requested statistics (if not already present) /// Returns a StatsSet with the requested stats and any additional available stats fn compute_all(&self, stats: &[Stat]) -> VortexResult { + let mut stats_set = self.to_set(); for stat in stats { - let _ = self.compute(*stat); + if let Some(s) = self.compute(*stat) { + stats_set.set(*stat, s) + } } - Ok(self.to_set()) + Ok(stats_set) } } @@ -222,6 +228,10 @@ impl dyn Statistics + '_ { pub fn compute_trailing_zero_freq(&self) -> Option> { self.compute_as::>(Stat::TrailingZeroFreq) } + + pub fn compute_uncompressed_size_in_bytes(&self) -> Option { + self.compute_as(Stat::UncompressedSizeInBytes) + } } pub fn trailing_zeros(array: &ArrayData) -> u8 { diff --git a/vortex-array/src/stats/statsset.rs b/vortex-array/src/stats/statsset.rs index eb02c1361b..621e99f4d7 100644 --- a/vortex-array/src/stats/statsset.rs +++ b/vortex-array/src/stats/statsset.rs @@ -100,7 +100,10 @@ impl StatsSet { self.values[stat].as_ref() } - fn get_as TryFrom<&'a Scalar, Error = VortexError>>(&self, stat: Stat) -> Option { + pub fn get_as TryFrom<&'a Scalar, Error = VortexError>>( + &self, + stat: Stat, + ) -> Option { self.get(stat).map(|v| { T::try_from(v).unwrap_or_else(|err| { vortex_panic!( @@ -138,6 +141,7 @@ impl StatsSet { Stat::RunCount => self.merge_run_count(other), Stat::TrueCount => self.merge_true_count(other), Stat::NullCount => self.merge_null_count(other), + Stat::UncompressedSizeInBytes => self.merge_uncompressed_size_in_bytes(other), } } @@ -161,6 +165,7 @@ impl StatsSet { Stat::Min => self.merge_min(other), Stat::TrueCount => self.merge_true_count(other), Stat::NullCount => self.merge_null_count(other), + Stat::UncompressedSizeInBytes => self.merge_uncompressed_size_in_bytes(other), _ => vortex_panic!("Unrecognized commutative stat {}", s), } } @@ -241,6 +246,10 @@ impl StatsSet { self.merge_sum_stat(other, Stat::NullCount) } + fn merge_uncompressed_size_in_bytes(&mut self, other: &Self) { + self.merge_sum_stat(other, Stat::UncompressedSizeInBytes) + } + fn merge_sum_stat(&mut self, other: &Self, stat: Stat) { match (self.get_as::(stat), other.get_as::(stat)) { (Some(nc1), Some(nc2)) => { diff --git a/vortex-array/src/view.rs b/vortex-array/src/view.rs index 74aee433c8..599d97da7a 100644 --- a/vortex-array/src/view.rs +++ b/vortex-array/src/view.rs @@ -227,6 +227,11 @@ impl Statistics for ViewedArrayData { .trailing_zero_freq() .map(|v| v.iter().collect_vec()) .map(|v| v.into()), + Stat::UncompressedSizeInBytes => self + .flatbuffer() + .stats()? + .uncompressed_size_in_bytes() + .map(u64::into), } } diff --git a/vortex-flatbuffers/flatbuffers/vortex-array/array.fbs b/vortex-flatbuffers/flatbuffers/vortex-array/array.fbs index fc32282f24..133b00774d 100644 --- a/vortex-flatbuffers/flatbuffers/vortex-array/array.fbs +++ b/vortex-flatbuffers/flatbuffers/vortex-array/array.fbs @@ -24,6 +24,7 @@ table ArrayStats { null_count: uint64 = null; bit_width_freq: [uint64]; trailing_zero_freq: [uint64]; + uncompressed_size_in_bytes: uint64 = null; } diff --git a/vortex-flatbuffers/src/generated/array.rs b/vortex-flatbuffers/src/generated/array.rs index 58cad0616d..18c00caed1 100644 --- a/vortex-flatbuffers/src/generated/array.rs +++ b/vortex-flatbuffers/src/generated/array.rs @@ -300,6 +300,7 @@ impl<'a> ArrayStats<'a> { pub const VT_NULL_COUNT: flatbuffers::VOffsetT = 18; pub const VT_BIT_WIDTH_FREQ: flatbuffers::VOffsetT = 20; pub const VT_TRAILING_ZERO_FREQ: flatbuffers::VOffsetT = 22; + pub const VT_UNCOMPRESSED_SIZE_IN_BYTES: flatbuffers::VOffsetT = 24; #[inline] pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { @@ -311,6 +312,7 @@ impl<'a> ArrayStats<'a> { args: &'args ArrayStatsArgs<'args> ) -> flatbuffers::WIPOffset> { let mut builder = ArrayStatsBuilder::new(_fbb); + if let Some(x) = args.uncompressed_size_in_bytes { builder.add_uncompressed_size_in_bytes(x); } if let Some(x) = args.null_count { builder.add_null_count(x); } if let Some(x) = args.true_count { builder.add_true_count(x); } if let Some(x) = args.run_count { builder.add_run_count(x); } @@ -395,6 +397,13 @@ impl<'a> ArrayStats<'a> { // which contains a valid value in this slot unsafe { self._tab.get::>>(ArrayStats::VT_TRAILING_ZERO_FREQ, None)} } + #[inline] + pub fn uncompressed_size_in_bytes(&self) -> Option { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { self._tab.get::(ArrayStats::VT_UNCOMPRESSED_SIZE_IN_BYTES, None)} + } } impl flatbuffers::Verifiable for ArrayStats<'_> { @@ -414,6 +423,7 @@ impl flatbuffers::Verifiable for ArrayStats<'_> { .visit_field::("null_count", Self::VT_NULL_COUNT, false)? .visit_field::>>("bit_width_freq", Self::VT_BIT_WIDTH_FREQ, false)? .visit_field::>>("trailing_zero_freq", Self::VT_TRAILING_ZERO_FREQ, false)? + .visit_field::("uncompressed_size_in_bytes", Self::VT_UNCOMPRESSED_SIZE_IN_BYTES, false)? .finish(); Ok(()) } @@ -429,6 +439,7 @@ pub struct ArrayStatsArgs<'a> { pub null_count: Option, pub bit_width_freq: Option>>, pub trailing_zero_freq: Option>>, + pub uncompressed_size_in_bytes: Option, } impl<'a> Default for ArrayStatsArgs<'a> { #[inline] @@ -444,6 +455,7 @@ impl<'a> Default for ArrayStatsArgs<'a> { null_count: None, bit_width_freq: None, trailing_zero_freq: None, + uncompressed_size_in_bytes: None, } } } @@ -494,6 +506,10 @@ impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> ArrayStatsBuilder<'a, 'b, A> { self.fbb_.push_slot_always::>(ArrayStats::VT_TRAILING_ZERO_FREQ, trailing_zero_freq); } #[inline] + pub fn add_uncompressed_size_in_bytes(&mut self, uncompressed_size_in_bytes: u64) { + self.fbb_.push_slot_always::(ArrayStats::VT_UNCOMPRESSED_SIZE_IN_BYTES, uncompressed_size_in_bytes); + } + #[inline] pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> ArrayStatsBuilder<'a, 'b, A> { let start = _fbb.start_table(); ArrayStatsBuilder { @@ -521,6 +537,7 @@ impl core::fmt::Debug for ArrayStats<'_> { ds.field("null_count", &self.null_count()); ds.field("bit_width_freq", &self.bit_width_freq()); ds.field("trailing_zero_freq", &self.trailing_zero_freq()); + ds.field("uncompressed_size_in_bytes", &self.uncompressed_size_in_bytes()); ds.finish() } } diff --git a/vortex-flatbuffers/src/generated/message.rs b/vortex-flatbuffers/src/generated/message.rs index e9a123c1b8..6aaaf09199 100644 --- a/vortex-flatbuffers/src/generated/message.rs +++ b/vortex-flatbuffers/src/generated/message.rs @@ -4,8 +4,8 @@ // @generated use crate::dtype::*; -use crate::scalar::*; use crate::array::*; +use crate::scalar::*; use core::mem; use core::cmp::Ordering; diff --git a/vortex-sampling-compressor/Cargo.toml b/vortex-sampling-compressor/Cargo.toml index ad6b22cc6e..177d8421e5 100644 --- a/vortex-sampling-compressor/Cargo.toml +++ b/vortex-sampling-compressor/Cargo.toml @@ -36,6 +36,7 @@ vortex-zigzag = { workspace = true } [dev-dependencies] chrono = { workspace = true } +vortex-scalar = { workspace = true } [lints] workspace = true diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index acfc6cfc34..478a2f0831 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -1,10 +1,10 @@ use std::any::Any; use std::sync::Arc; -use log::warn; +use log::info; use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::{Chunked, ChunkedArray}; -use vortex_array::compress::compute_pruning_stats; +use vortex_array::compress::compute_precompression_stats; use vortex_array::encoding::EncodingRef; use vortex_array::stats::ArrayStatistics as _; use vortex_array::{ArrayDType, ArrayData, ArrayDef, IntoArrayData}; @@ -12,7 +12,7 @@ use vortex_error::{vortex_bail, VortexResult}; use super::EncoderMetadata; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; -use crate::SamplingCompressor; +use crate::{constants, SamplingCompressor}; #[derive(Debug)] pub struct ChunkedCompressor { @@ -37,7 +37,7 @@ impl EncodingCompressor for ChunkedCompressor { } fn cost(&self) -> u8 { - 0 + constants::CHUNKED_COST } fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { @@ -115,13 +115,14 @@ impl ChunkedCompressor { ctx.options().target_block_bytesize, ctx.options().target_block_size, )?; + let mut compressed_chunks = Vec::with_capacity(less_chunked.nchunks()); for (index, chunk) in less_chunked.chunks().enumerate() { // these are extremely valuable when reading/writing, but are potentially much more expensive // to compute post-compression. That's because not all encodings implement stats, so we would // potentially have to canonicalize during writes just to get stats, which would be silly. // Also, we only really require them for column chunks, not for every array. - compute_pruning_stats(&chunk)?; + compute_precompression_stats(&chunk)?; let like = previous.as_ref().map(|(like, _)| like); let (compressed_chunk, tree) = ctx @@ -136,7 +137,7 @@ impl ChunkedCompressor { .unwrap_or(false); if ratio > 1.0 || exceeded_target_ratio { - warn!("unsatisfactory ratio {} {:?}", ratio, previous); + info!("unsatisfactory ratio {}, previous: {:?}", ratio, previous); let (compressed_chunk, tree) = ctx.compress_array(&chunk)?.into_parts(); let new_ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32); previous = tree.map(|tree| (tree, new_ratio)); diff --git a/vortex-sampling-compressor/src/compressors/mod.rs b/vortex-sampling-compressor/src/compressors/mod.rs index a2b83fc185..ecec7ea21d 100644 --- a/vortex-sampling-compressor/src/compressors/mod.rs +++ b/vortex-sampling-compressor/src/compressors/mod.rs @@ -195,6 +195,9 @@ impl<'a> CompressedArray<'a> { stats_to_inherit: Option<&dyn Statistics>, ) -> Self { if let Some(stats) = stats_to_inherit { + // eagerly compute uncompressed size in bytes at compression time, since it's + // too expensive to compute after compression + let _ = stats.compute_uncompressed_size_in_bytes(); array.inherit_statistics(stats); } Self { array, path } diff --git a/vortex-sampling-compressor/src/compressors/struct_.rs b/vortex-sampling-compressor/src/compressors/struct_.rs index 1a556e0683..cd948cbc4b 100644 --- a/vortex-sampling-compressor/src/compressors/struct_.rs +++ b/vortex-sampling-compressor/src/compressors/struct_.rs @@ -1,7 +1,7 @@ use itertools::Itertools; use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::{Struct, StructArray}; -use vortex_array::compress::compute_pruning_stats; +use vortex_array::compress::compute_precompression_stats; use vortex_array::encoding::EncodingRef; use vortex_array::stats::ArrayStatistics as _; use vortex_array::variants::StructArrayTrait; @@ -9,7 +9,7 @@ use vortex_array::{ArrayData, ArrayDef, IntoArrayData}; use vortex_error::VortexResult; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; -use crate::SamplingCompressor; +use crate::{constants, SamplingCompressor}; #[derive(Debug)] pub struct StructCompressor; @@ -20,7 +20,7 @@ impl EncodingCompressor for StructCompressor { } fn cost(&self) -> u8 { - 0 + constants::STRUCT_COST } fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { @@ -51,7 +51,7 @@ impl EncodingCompressor for StructCompressor { // to compute post-compression. That's because not all encodings implement stats, so we would // potentially have to canonicalize during writes just to get stats, which would be silly. // Also, we only really require them for column chunks, not for every array. - compute_pruning_stats(&array)?; + compute_precompression_stats(&array)?; ctx.compress(&array, like.as_ref()) }) .process_results(|iter| iter.map(|x| (x.array, x.path)).unzip())?; diff --git a/vortex-sampling-compressor/src/constants.rs b/vortex-sampling-compressor/src/constants.rs index 45f60ccf95..c92b13b74c 100644 --- a/vortex-sampling-compressor/src/constants.rs +++ b/vortex-sampling-compressor/src/constants.rs @@ -2,8 +2,8 @@ // structural pass-throughs have no cost pub const SPARSE_COST: u8 = 0; -// TODO: struct -// TODO: chunked +pub const CHUNKED_COST: u8 = 0; +pub const STRUCT_COST: u8 = 0; // so fast that we can ignore the cost pub const BITPACKED_NO_PATCHES_COST: u8 = 0; diff --git a/vortex-sampling-compressor/tests/smoketest.rs b/vortex-sampling-compressor/tests/smoketest.rs index b0807d96c4..b48ab609b1 100644 --- a/vortex-sampling-compressor/tests/smoketest.rs +++ b/vortex-sampling-compressor/tests/smoketest.rs @@ -22,6 +22,7 @@ use vortex_sampling_compressor::{CompressConfig, SamplingCompressor}; #[cfg(test)] mod tests { use vortex_array::array::{Bool, BooleanBuffer, ChunkedArray, VarBin}; + use vortex_array::stats::{ArrayStatistics, Stat}; use vortex_array::variants::{ArrayVariants, StructArrayTrait}; use vortex_array::ArrayDef; use vortex_datetime_dtype::TimeUnit; @@ -33,6 +34,7 @@ mod tests { use vortex_sampling_compressor::compressors::bitpacked::BITPACK_WITH_PATCHES; use vortex_sampling_compressor::compressors::delta::DeltaCompressor; use vortex_sampling_compressor::compressors::fsst::FSSTCompressor; + use vortex_scalar::Scalar; use super::*; @@ -146,8 +148,13 @@ mod tests { .unwrap() .try_into() .unwrap(); + println!("prim_col num chunks: {}", prim_col.nchunks()); for chunk in prim_col.chunks() { assert_eq!(chunk.encoding().id(), FoR::ID); + assert_eq!( + chunk.statistics().get(Stat::UncompressedSizeInBytes), + Some(Scalar::from((chunk.len() * 8) as u64)) + ); } let bool_col: ChunkedArray = struct_array @@ -157,6 +164,10 @@ mod tests { .unwrap(); for chunk in bool_col.chunks() { assert_eq!(chunk.encoding().id(), Bool::ID); + assert_eq!( + chunk.statistics().get(Stat::UncompressedSizeInBytes), + Some(Scalar::from(chunk.len().div_ceil(8) as u64)) + ); } let varbin_col: ChunkedArray = struct_array @@ -166,6 +177,10 @@ mod tests { .unwrap(); for chunk in varbin_col.chunks() { assert!(chunk.encoding().id() == Dict::ID || chunk.encoding().id() == FSST::ID); + assert_eq!( + chunk.statistics().get(Stat::UncompressedSizeInBytes), + Some(Scalar::from(1392640u64)) + ); } let binary_col: ChunkedArray = struct_array @@ -175,6 +190,10 @@ mod tests { .unwrap(); for chunk in binary_col.chunks() { assert_eq!(chunk.encoding().id(), VarBin::ID); + assert_eq!( + chunk.statistics().get(Stat::UncompressedSizeInBytes), + Some(Scalar::from(134357000u64)) + ); } let timestamp_col: ChunkedArray = struct_array @@ -184,6 +203,10 @@ mod tests { .unwrap(); for chunk in timestamp_col.chunks() { assert_eq!(chunk.encoding().id(), DateTimeParts::ID); + assert_eq!( + chunk.statistics().get(Stat::UncompressedSizeInBytes), + Some((chunk.len() * 8).into()) + ) } }