From 28f44031a8c03e3552a6decb830a4e61213c47bc Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 14 Jan 2025 14:19:15 +0000 Subject: [PATCH] Arc InnerArrayData and add a WeakArrayData (#1930) Closes #1518 WeakArrayData will be used for weak caching during scans. This also impls Statistics directly on ArrayData so we don't have to clone in order to compute them. It also means we can remove Clone from InnerArrayData and remove a couple more Arcs. --- encodings/fsst/src/array.rs | 3 +- vortex-array/src/data/mod.rs | 102 ++++++++++--------- vortex-array/src/data/owned.rs | 93 ++---------------- vortex-array/src/data/statistics.rs | 145 ++++++++++++++++++++++++++++ vortex-array/src/data/viewed.rs | 112 ++------------------- vortex-array/src/macros.rs | 2 +- vortex-array/src/stats/statsset.rs | 19 +++- vortex-expr/src/lib.rs | 2 +- 8 files changed, 238 insertions(+), 240 deletions(-) create mode 100644 vortex-array/src/data/statistics.rs diff --git a/encodings/fsst/src/array.rs b/encodings/fsst/src/array.rs index b2e671f1d7..9382a2a7f0 100644 --- a/encodings/fsst/src/array.rs +++ b/encodings/fsst/src/array.rs @@ -1,5 +1,4 @@ use std::fmt::{Debug, Display}; -use std::sync::Arc; use fsst::{Decompressor, Symbol}; use serde::{Deserialize, Serialize}; @@ -89,7 +88,7 @@ impl FSSTArray { let len = codes.len(); let uncompressed_lengths_ptype = PType::try_from(uncompressed_lengths.dtype())?; let codes_nullability = codes.dtype().nullability(); - let children = Arc::new([symbols, symbol_lengths, codes, uncompressed_lengths]); + let children = [symbols, symbol_lengths, codes, uncompressed_lengths].into(); Self::try_from_parts( dtype, diff --git a/vortex-array/src/data/mod.rs b/vortex-array/src/data/mod.rs index 971d9c968c..abb4ee7108 100644 --- a/vortex-array/src/data/mod.rs +++ b/vortex-array/src/data/mod.rs @@ -1,6 +1,6 @@ use std::borrow::Cow; use std::fmt::{Display, Formatter}; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, RwLock, Weak}; use itertools::Itertools; use owned::OwnedArrayData; @@ -22,20 +22,21 @@ use crate::stats::{ArrayStatistics, Stat, Statistics, StatsSet}; use crate::stream::{ArrayStream, ArrayStreamAdapter}; use crate::validity::{ArrayValidity, LogicalValidity, ValidityVTable}; use crate::{ - ArrayChildrenIterator, ArrayDType, ArrayLen, ArrayMetadata, ContextRef, NamedChildrenCollector, - TryDeserializeArrayMetadata, + ArrayChildrenIterator, ArrayDType, ArrayLen, ArrayMetadata, ChildrenCollector, ContextRef, + NamedChildrenCollector, TryDeserializeArrayMetadata, }; mod owned; +mod statistics; mod viewed; /// A central type for all Vortex arrays, which are known length sequences of typed and possibly compressed data. /// /// This is the main entrypoint for working with in-memory Vortex data, and dispatches work over the underlying encoding or memory representations. #[derive(Debug, Clone)] -pub struct ArrayData(InnerArrayData); +pub struct ArrayData(Arc); -#[derive(Debug, Clone)] +#[derive(Debug)] enum InnerArrayData { /// Owned [`ArrayData`] with serialized metadata, backed by heap-allocated memory. Owned(OwnedArrayData), @@ -45,13 +46,13 @@ enum InnerArrayData { impl From for ArrayData { fn from(data: OwnedArrayData) -> Self { - ArrayData(InnerArrayData::Owned(data)) + ArrayData(Arc::new(InnerArrayData::Owned(data))) } } impl From for ArrayData { fn from(data: ViewedArrayData) -> Self { - ArrayData(InnerArrayData::Viewed(data)) + ArrayData(Arc::new(InnerArrayData::Viewed(data))) } } @@ -61,8 +62,8 @@ impl ArrayData { dtype: DType, len: usize, metadata: Arc, - buffers: Arc<[ByteBuffer]>, - children: Arc<[ArrayData]>, + buffers: Box<[ByteBuffer]>, + children: Box<[ArrayData]>, statistics: StatsSet, ) -> VortexResult { Self::try_new(InnerArrayData::Owned(OwnedArrayData { @@ -72,9 +73,9 @@ impl ArrayData { metadata, buffers, children, - stats_set: Arc::new(RwLock::new(statistics)), + stats_set: RwLock::new(statistics), #[cfg(feature = "canonical_counter")] - canonical_counter: Arc::new(std::sync::atomic::AtomicUsize::new(0)), + canonical_counter: std::sync::atomic::AtomicUsize::new(0), })) } @@ -113,7 +114,7 @@ impl ArrayData { buffers: buffers.into(), ctx, #[cfg(feature = "canonical_counter")] - canonical_counter: Arc::new(std::sync::atomic::AtomicUsize::new(0)), + canonical_counter: std::sync::atomic::AtomicUsize::new(0), }; Self::try_new(InnerArrayData::Viewed(view)) @@ -121,7 +122,7 @@ impl ArrayData { /// Shared constructor that performs common array validation. fn try_new(inner: InnerArrayData) -> VortexResult { - let array = ArrayData(inner); + let array = ArrayData(Arc::new(inner)); // Sanity check that the encoding implements the correct array trait debug_assert!( @@ -148,7 +149,7 @@ impl ArrayData { /// Return the array's encoding pub fn encoding(&self) -> EncodingRef { - match &self.0 { + match self.0.as_ref() { InnerArrayData::Owned(d) => d.encoding, InnerArrayData::Viewed(v) => v.encoding, } @@ -157,7 +158,7 @@ impl ArrayData { /// Returns the number of logical elements in the array. #[allow(clippy::same_name_method)] pub fn len(&self) -> usize { - match &self.0 { + match self.0.as_ref() { InnerArrayData::Owned(d) => d.len, InnerArrayData::Viewed(v) => v.len, } @@ -203,19 +204,26 @@ impl ArrayData { } pub fn child<'a>(&'a self, idx: usize, dtype: &'a DType, len: usize) -> VortexResult { - match &self.0 { + match self.0.as_ref() { InnerArrayData::Owned(d) => d.child(idx, dtype, len).cloned(), InnerArrayData::Viewed(v) => v .child(idx, dtype, len) - .map(|view| ArrayData(InnerArrayData::Viewed(view))), + .map(|view| ArrayData(Arc::new(InnerArrayData::Viewed(view)))), } } /// Returns a Vec of Arrays with all the array's child arrays. + // TODO(ngates): deprecate this function and return impl Iterator pub fn children(&self) -> Vec { - match &self.0 { + match self.0.as_ref() { InnerArrayData::Owned(d) => d.children().to_vec(), - InnerArrayData::Viewed(v) => v.children(), + InnerArrayData::Viewed(_) => { + let mut collector = ChildrenCollector::default(); + self.encoding() + .accept(self, &mut collector) + .vortex_expect("Failed to get children"); + collector.children() + } } } @@ -230,7 +238,7 @@ impl ArrayData { /// Returns the number of child arrays pub fn nchildren(&self) -> usize { - match &self.0 { + match self.0.as_ref() { InnerArrayData::Owned(d) => d.nchildren(), InnerArrayData::Viewed(v) => v.nchildren(), } @@ -270,7 +278,7 @@ impl ArrayData { } pub fn array_metadata(&self) -> &dyn ArrayMetadata { - match &self.0 { + match self.0.as_ref() { InnerArrayData::Owned(d) => &*d.metadata, InnerArrayData::Viewed(v) => &*v.metadata, } @@ -279,7 +287,7 @@ impl ArrayData { pub fn metadata TryDeserializeArrayMetadata<'m>>( &self, ) -> VortexResult<&M> { - match &self.0 { + match self.0.as_ref() { InnerArrayData::Owned(d) => &d.metadata, InnerArrayData::Viewed(v) => &v.metadata, } @@ -298,7 +306,7 @@ impl ArrayData { /// View arrays will return a reference to their bytes, while heap-backed arrays /// must first serialize their metadata, returning an owned byte array to the caller. pub fn metadata_bytes(&self) -> VortexResult> { - match &self.0 { + match self.0.as_ref() { InnerArrayData::Owned(array_data) => { // Heap-backed arrays must first try and serialize the metadata. let owned_meta: Vec = array_data @@ -320,14 +328,14 @@ impl ArrayData { } pub fn nbuffers(&self) -> usize { - match &self.0 { + match self.0.as_ref() { InnerArrayData::Owned(o) => o.buffers.len(), InnerArrayData::Viewed(v) => v.nbuffers(), } } pub fn byte_buffer(&self, index: usize) -> Option<&ByteBuffer> { - match &self.0 { + match self.0.as_ref() { InnerArrayData::Owned(d) => d.byte_buffer(index), InnerArrayData::Viewed(v) => v.buffer(index), } @@ -340,8 +348,11 @@ impl ArrayData { } pub fn into_byte_buffer(self, index: usize) -> Option { - match self.0 { - InnerArrayData::Owned(d) => d.into_byte_buffer(index), + // NOTE(ngates): we can't really into_inner an Arc, so instead we clone the buffer out, + // but we still consume self by value such that the ref-count drops at the end of this + // function. + match self.0.as_ref() { + InnerArrayData::Owned(d) => d.byte_buffer(index).cloned(), InnerArrayData::Viewed(v) => v.buffer(index).cloned(), } } @@ -368,7 +379,7 @@ impl ArrayData { #[cfg(feature = "canonical_counter")] pub(crate) fn inc_canonical_counter(&self) { - let prev = match &self.0 { + let prev = match self.0.as_ref() { InnerArrayData::Owned(o) => o .canonical_counter .fetch_add(1, std::sync::atomic::Ordering::Relaxed), @@ -404,7 +415,7 @@ impl ArrayData { impl Display for ArrayData { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let prefix = match &self.0 { + let prefix = match self.0.as_ref() { InnerArrayData::Owned(_) => "", InnerArrayData::Viewed(_) => "$", }; @@ -421,22 +432,13 @@ impl Display for ArrayData { impl> ArrayDType for T { fn dtype(&self) -> &DType { - match &self.as_ref().0 { + match self.as_ref().0.as_ref() { InnerArrayData::Owned(d) => &d.dtype, InnerArrayData::Viewed(v) => &v.dtype, } } } -impl ArrayData { - pub fn into_dtype(self) -> DType { - match self.0 { - InnerArrayData::Owned(d) => d.dtype, - InnerArrayData::Viewed(v) => v.dtype, - } - } -} - impl> ArrayLen for T { fn len(&self) -> usize { self.as_ref().len() @@ -461,10 +463,7 @@ impl> ArrayValidity for A { impl> ArrayStatistics for T { fn statistics(&self) -> &(dyn Statistics + '_) { - match &self.as_ref().0 { - InnerArrayData::Owned(d) => d, - InnerArrayData::Viewed(v) => v, - } + self.as_ref() } // FIXME(ngates): this is really slow... @@ -498,3 +497,20 @@ impl Iterator for ArrayDataIterator { } } } + +/// An array data that holds a weak reference to its internals. +pub struct WeakArrayData(Weak); + +impl WeakArrayData { + /// Attempts to upgrade the weak reference to a strong reference. + pub fn upgrade(&self) -> Option { + self.0.upgrade().map(ArrayData) + } +} + +impl ArrayData { + /// Downgrades the array data to a weak reference. + pub fn downgrade(self) -> WeakArrayData { + WeakArrayData(Arc::downgrade(&self.0)) + } +} diff --git a/vortex-array/src/data/owned.rs b/vortex-array/src/data/owned.rs index 81ad7c7316..1052100c30 100644 --- a/vortex-array/src/data/owned.rs +++ b/vortex-array/src/data/owned.rs @@ -2,25 +2,24 @@ use std::sync::{Arc, RwLock}; use vortex_buffer::ByteBuffer; use vortex_dtype::DType; -use vortex_error::{vortex_bail, vortex_panic, VortexResult}; -use vortex_scalar::Scalar; +use vortex_error::{vortex_bail, VortexResult}; use crate::encoding::EncodingRef; -use crate::stats::{Stat, Statistics, StatsSet}; +use crate::stats::StatsSet; use crate::{ArrayDType, ArrayData, ArrayMetadata}; /// Owned [`ArrayData`] with serialized metadata, backed by heap-allocated memory. -#[derive(Clone, Debug)] +#[derive(Debug)] pub(super) struct OwnedArrayData { pub(super) encoding: EncodingRef, - pub(super) dtype: DType, // FIXME(ngates): Arc? + pub(super) dtype: DType, pub(super) len: usize, pub(super) metadata: Arc, - pub(super) buffers: Arc<[ByteBuffer]>, - pub(super) children: Arc<[ArrayData]>, - pub(super) stats_set: Arc>, + pub(super) buffers: Box<[ByteBuffer]>, + pub(super) children: Box<[ArrayData]>, + pub(super) stats_set: RwLock, #[cfg(feature = "canonical_counter")] - pub(super) canonical_counter: Arc, + pub(super) canonical_counter: std::sync::atomic::AtomicUsize, } impl OwnedArrayData { @@ -32,12 +31,6 @@ impl OwnedArrayData { self.buffers.get(index) } - pub fn into_byte_buffer(self, index: usize) -> Option { - // While this does require a clone, we're still "into" because it makes sure the self - // reference is dropped. - self.buffers.get(index).cloned() - } - // We want to allow these panics because they are indicative of implementation error. #[allow(clippy::panic_in_result_fn)] pub fn child(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<&ArrayData> { @@ -68,75 +61,7 @@ impl OwnedArrayData { self.children.len() } - pub fn children(&self) -> &Arc<[ArrayData]> { + pub fn children(&self) -> &[ArrayData] { &self.children } } - -impl Statistics for OwnedArrayData { - fn get(&self, stat: Stat) -> Option { - self.stats_set - .read() - .unwrap_or_else(|_| { - vortex_panic!( - "Failed to acquire read lock on stats map while getting {}", - stat - ) - }) - .get(stat) - .cloned() - } - - fn to_set(&self) -> StatsSet { - self.stats_set - .read() - .unwrap_or_else(|_| vortex_panic!("Failed to acquire read lock on stats map")) - .clone() - } - - fn set(&self, stat: Stat, value: Scalar) { - self.stats_set - .write() - .unwrap_or_else(|_| { - vortex_panic!( - "Failed to acquire write lock on stats map while setting {} to {}", - stat, - value - ) - }) - .set(stat, value); - } - - fn clear(&self, stat: Stat) { - self.stats_set - .write() - .unwrap_or_else(|_| vortex_panic!("Failed to acquire write lock on stats map")) - .clear(stat); - } - - fn compute(&self, stat: Stat) -> Option { - if let Some(s) = self.get(stat) { - return Some(s); - } - - let computed = self - .encoding - .compute_statistics(&ArrayData::from(self.clone()), stat) - .ok()?; - - self.stats_set - .write() - .unwrap_or_else(|_| { - vortex_panic!("Failed to write to stats map while computing {}", stat) - }) - .extend(computed); - self.get(stat) - } - - fn retain_only(&self, stats: &[Stat]) { - self.stats_set - .write() - .unwrap_or_else(|_| vortex_panic!("Failed to acquire write lock on stats map")) - .retain_only(stats); - } -} diff --git a/vortex-array/src/data/statistics.rs b/vortex-array/src/data/statistics.rs new file mode 100644 index 0000000000..4e0a139756 --- /dev/null +++ b/vortex-array/src/data/statistics.rs @@ -0,0 +1,145 @@ +use std::sync::Arc; + +use enum_iterator::all; +use itertools::Itertools; +use vortex_dtype::{DType, Nullability, PType}; +use vortex_error::vortex_panic; +use vortex_scalar::{Scalar, ScalarValue}; + +use crate::data::InnerArrayData; +use crate::stats::{Stat, Statistics, StatsSet}; +use crate::{ArrayDType, ArrayData}; + +impl Statistics for ArrayData { + fn get(&self, stat: Stat) -> Option { + match self.0.as_ref() { + InnerArrayData::Owned(o) => o + .stats_set + .read() + .unwrap_or_else(|_| { + vortex_panic!( + "Failed to acquire read lock on stats map while getting {}", + stat + ) + }) + .get(stat) + .cloned(), + InnerArrayData::Viewed(v) => match stat { + Stat::Max => { + let max = v.flatbuffer().stats()?.max(); + max.and_then(|v| ScalarValue::try_from(v).ok()) + .map(|v| Scalar::new(self.dtype().clone(), v)) + } + Stat::Min => { + let min = v.flatbuffer().stats()?.min(); + min.and_then(|v| ScalarValue::try_from(v).ok()) + .map(|v| Scalar::new(self.dtype().clone(), v)) + } + Stat::IsConstant => v.flatbuffer().stats()?.is_constant().map(bool::into), + Stat::IsSorted => v.flatbuffer().stats()?.is_sorted().map(bool::into), + Stat::IsStrictSorted => v.flatbuffer().stats()?.is_strict_sorted().map(bool::into), + Stat::RunCount => v.flatbuffer().stats()?.run_count().map(u64::into), + Stat::TrueCount => v.flatbuffer().stats()?.true_count().map(u64::into), + Stat::NullCount => v.flatbuffer().stats()?.null_count().map(u64::into), + Stat::BitWidthFreq => { + let element_dtype = + Arc::new(DType::Primitive(PType::U64, Nullability::NonNullable)); + v.flatbuffer() + .stats()? + .bit_width_freq() + .map(|v| v.iter().map(Scalar::from).collect_vec()) + .map(|v| Scalar::list(element_dtype, v, Nullability::NonNullable)) + } + Stat::TrailingZeroFreq => v + .flatbuffer() + .stats()? + .trailing_zero_freq() + .map(|v| v.iter().collect_vec()) + .map(|v| v.into()), + Stat::UncompressedSizeInBytes => v + .flatbuffer() + .stats()? + .uncompressed_size_in_bytes() + .map(u64::into), + }, + } + } + + fn to_set(&self) -> StatsSet { + match self.0.as_ref() { + InnerArrayData::Owned(o) => o + .stats_set + .read() + .unwrap_or_else(|_| vortex_panic!("Failed to acquire read lock on stats map")) + .clone(), + InnerArrayData::Viewed(_) => StatsSet::from_iter( + all::().filter_map(|stat| self.get(stat).map(|v| (stat, v))), + ), + } + } + + fn set(&self, stat: Stat, value: Scalar) { + match self.0.as_ref() { + InnerArrayData::Owned(o) => o + .stats_set + .write() + .unwrap_or_else(|_| { + vortex_panic!( + "Failed to acquire write lock on stats map while setting {} to {}", + stat, + value + ) + }) + .set(stat, value), + InnerArrayData::Viewed(_) => { + // We cannot modify stats on a view + } + } + } + + fn clear(&self, stat: Stat) { + match self.0.as_ref() { + InnerArrayData::Owned(o) => { + o.stats_set + .write() + .unwrap_or_else(|_| vortex_panic!("Failed to acquire write lock on stats map")) + .clear(stat); + } + InnerArrayData::Viewed(_) => { + // We cannot modify stats on a view + } + } + } + + fn compute(&self, stat: Stat) -> Option { + if let Some(s) = self.get(stat) { + return Some(s); + } + let s = self + .encoding() + .compute_statistics(self, stat) + .ok()? + .get(stat) + .cloned(); + + if let Some(s) = &s { + self.set(stat, s.clone()); + } + + s + } + + fn retain_only(&self, stats: &[Stat]) { + match self.0.as_ref() { + InnerArrayData::Owned(o) => { + o.stats_set + .write() + .unwrap_or_else(|_| vortex_panic!("Failed to acquire write lock on stats map")) + .retain_only(stats); + } + InnerArrayData::Viewed(_) => { + // We cannot modify stats on a view + } + } + } +} diff --git a/vortex-array/src/data/viewed.rs b/vortex-array/src/data/viewed.rs index 5042bc6eb1..0d189eae4d 100644 --- a/vortex-array/src/data/viewed.rs +++ b/vortex-array/src/data/viewed.rs @@ -1,22 +1,17 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use enum_iterator::all; use flatbuffers::Follow; -use itertools::Itertools; use vortex_buffer::ByteBuffer; -use vortex_dtype::{DType, Nullability, PType}; -use vortex_error::{vortex_err, VortexExpect as _, VortexResult}; +use vortex_dtype::DType; +use vortex_error::{vortex_err, VortexResult}; use vortex_flatbuffers::FlatBuffer; -use vortex_scalar::{Scalar, ScalarValue}; use crate::encoding::opaque::OpaqueEncoding; use crate::encoding::EncodingRef; -use crate::stats::{Stat, Statistics, StatsSet}; -use crate::{flatbuffers as fb, ArrayData, ArrayMetadata, ChildrenCollector, ContextRef}; +use crate::{flatbuffers as fb, ArrayMetadata, ContextRef}; /// Zero-copy view over flatbuffer-encoded array data, created without eager serialization. -#[derive(Clone)] pub(super) struct ViewedArrayData { pub(super) encoding: EncodingRef, pub(super) dtype: DType, @@ -24,10 +19,10 @@ pub(super) struct ViewedArrayData { pub(super) metadata: Arc, pub(super) flatbuffer: FlatBuffer, pub(super) flatbuffer_loc: usize, - pub(super) buffers: Arc<[ByteBuffer]>, + pub(super) buffers: Box<[ByteBuffer]>, pub(super) ctx: ContextRef, #[cfg(feature = "canonical_counter")] - pub(super) canonical_counter: Arc, + pub(super) canonical_counter: std::sync::atomic::AtomicUsize, } impl Debug for ViewedArrayData { @@ -80,7 +75,7 @@ impl ViewedArrayData { buffers: self.buffers.clone(), ctx: self.ctx.clone(), #[cfg(feature = "canonical_counter")] - canonical_counter: Arc::new(std::sync::atomic::AtomicUsize::new(0)), + canonical_counter: std::sync::atomic::AtomicUsize::new(0), }) } @@ -93,14 +88,6 @@ impl ViewedArrayData { self.flatbuffer().children().map(|c| c.len()).unwrap_or(0) } - pub fn children(&self) -> Vec { - let mut collector = ChildrenCollector::default(); - self.encoding - .accept(&ArrayData::from(self.clone()), &mut collector) - .vortex_expect("Failed to get children"); - collector.children() - } - pub fn nbuffers(&self) -> usize { self.flatbuffer() .buffers() @@ -121,90 +108,3 @@ impl ViewedArrayData { .map(|idx| &self.buffers[idx]) } } - -impl Statistics for ViewedArrayData { - fn get(&self, stat: Stat) -> Option { - match stat { - Stat::Max => { - let max = self.flatbuffer().stats()?.max(); - max.and_then(|v| ScalarValue::try_from(v).ok()) - .map(|v| Scalar::new(self.dtype.clone(), v)) - } - Stat::Min => { - let min = self.flatbuffer().stats()?.min(); - min.and_then(|v| ScalarValue::try_from(v).ok()) - .map(|v| Scalar::new(self.dtype.clone(), v)) - } - Stat::IsConstant => self.flatbuffer().stats()?.is_constant().map(bool::into), - Stat::IsSorted => self.flatbuffer().stats()?.is_sorted().map(bool::into), - Stat::IsStrictSorted => self - .flatbuffer() - .stats()? - .is_strict_sorted() - .map(bool::into), - Stat::RunCount => self.flatbuffer().stats()?.run_count().map(u64::into), - Stat::TrueCount => self.flatbuffer().stats()?.true_count().map(u64::into), - Stat::NullCount => self.flatbuffer().stats()?.null_count().map(u64::into), - Stat::BitWidthFreq => { - let element_dtype = - Arc::new(DType::Primitive(PType::U64, Nullability::NonNullable)); - self.flatbuffer() - .stats()? - .bit_width_freq() - .map(|v| v.iter().map(Scalar::from).collect_vec()) - .map(|v| Scalar::list(element_dtype, v, Nullability::NonNullable)) - } - Stat::TrailingZeroFreq => self - .flatbuffer() - .stats()? - .trailing_zero_freq() - .map(|v| v.iter().collect_vec()) - .map(|v| v.into()), - Stat::UncompressedSizeInBytes => self - .flatbuffer() - .stats()? - .uncompressed_size_in_bytes() - .map(u64::into), - } - } - - /// NB: part of the contract for to_set is that it does not do any expensive computation. - /// In other implementations, this means returning the underlying stats map, but for the flatbuffer - /// implementation, we have 'precalculated' stats in the flatbuffer itself, so we need to - /// allocate a stats map and populate it with those fields. - fn to_set(&self) -> StatsSet { - let mut result = StatsSet::default(); - for stat in all::() { - if let Some(value) = self.get(stat) { - result.set(stat, value) - } - } - result - } - - /// We want to avoid any sort of allocation on instantiation of the ArrayView, so we - /// do not allocate a stats_set to cache values. - fn set(&self, _stat: Stat, _value: Scalar) { - // We cannot modify stats on a view - } - - fn clear(&self, _stat: Stat) { - // We cannot modify stats on a view - } - - fn retain_only(&self, _stats: &[Stat]) { - // We cannot modify stats on a view - } - - fn compute(&self, stat: Stat) -> Option { - if let Some(s) = self.get(stat) { - return Some(s); - } - - self.encoding - .compute_statistics(&ArrayData::from(self.clone()), stat) - .ok()? - .get(stat) - .cloned() - } -} diff --git a/vortex-array/src/macros.rs b/vortex-array/src/macros.rs index 4c0b5d5910..78c8564c87 100644 --- a/vortex-array/src/macros.rs +++ b/vortex-array/src/macros.rs @@ -46,7 +46,7 @@ macro_rules! impl_encoding { dtype: vortex_dtype::DType, len: usize, metadata: [<$Name Metadata>], - children: std::sync::Arc<[$crate::ArrayData]>, + children: Box<[$crate::ArrayData]>, stats: $crate::stats::StatsSet, ) -> VortexResult { Self::try_from($crate::ArrayData::try_new_owned( diff --git a/vortex-array/src/stats/statsset.rs b/vortex-array/src/stats/statsset.rs index 0398b0dacd..39b3b62530 100644 --- a/vortex-array/src/stats/statsset.rs +++ b/vortex-array/src/stats/statsset.rs @@ -188,12 +188,25 @@ impl IntoIterator for StatsSet { } } +impl FromIterator<(Stat, Scalar)> for StatsSet { + fn from_iter>(iter: T) -> Self { + let iter = iter.into_iter(); + let (lower_bound, _) = iter.size_hint(); + let mut this = Self { + values: Vec::with_capacity(lower_bound), + }; + this.extend(iter); + this + } +} + impl Extend<(Stat, Scalar)> for StatsSet { #[inline] fn extend>(&mut self, iter: T) { - iter.into_iter().for_each(|(stat, scalar)| { - self.set(stat, scalar); - }); + let iter = iter.into_iter(); + let (lower_bound, _) = iter.size_hint(); + self.values.reserve(lower_bound); + iter.for_each(|(stat, value)| self.set(stat, value)); } } diff --git a/vortex-expr/src/lib.rs b/vortex-expr/src/lib.rs index 645319f98f..9305327ed3 100644 --- a/vortex-expr/src/lib.rs +++ b/vortex-expr/src/lib.rs @@ -75,7 +75,7 @@ pub trait VortexExpr: Debug + Send + Sync + DynEq + DynHash + Display { fn return_dtype(&self, scope_dtype: &DType) -> VortexResult { let empty = Canonical::empty(scope_dtype)?.into_array(); self.unchecked_evaluate(&empty) - .map(|array| array.into_dtype()) + .map(|array| array.dtype().clone()) } }