Skip to content

Commit

Permalink
Arc InnerArrayData and add a WeakArrayData (#1930)
Browse files Browse the repository at this point in the history
Closes #1518

WeakArrayData will be used for weak caching during scans.

This also impls Statistics directly on ArrayData so we don't have to
clone in order to compute them. It also means we can remove Clone from
InnerArrayData and remove a couple more Arcs.
gatesn authored Jan 14, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent b492d1d commit 28f4403
Showing 8 changed files with 238 additions and 240 deletions.
3 changes: 1 addition & 2 deletions encodings/fsst/src/array.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::fmt::{Debug, Display};
use std::sync::Arc;

use fsst::{Decompressor, Symbol};
use serde::{Deserialize, Serialize};
@@ -89,7 +88,7 @@ impl FSSTArray {
let len = codes.len();
let uncompressed_lengths_ptype = PType::try_from(uncompressed_lengths.dtype())?;
let codes_nullability = codes.dtype().nullability();
let children = Arc::new([symbols, symbol_lengths, codes, uncompressed_lengths]);
let children = [symbols, symbol_lengths, codes, uncompressed_lengths].into();

Self::try_from_parts(
dtype,
102 changes: 59 additions & 43 deletions vortex-array/src/data/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::borrow::Cow;
use std::fmt::{Display, Formatter};
use std::sync::{Arc, RwLock};
use std::sync::{Arc, RwLock, Weak};

use itertools::Itertools;
use owned::OwnedArrayData;
@@ -22,20 +22,21 @@ use crate::stats::{ArrayStatistics, Stat, Statistics, StatsSet};
use crate::stream::{ArrayStream, ArrayStreamAdapter};
use crate::validity::{ArrayValidity, LogicalValidity, ValidityVTable};
use crate::{
ArrayChildrenIterator, ArrayDType, ArrayLen, ArrayMetadata, ContextRef, NamedChildrenCollector,
TryDeserializeArrayMetadata,
ArrayChildrenIterator, ArrayDType, ArrayLen, ArrayMetadata, ChildrenCollector, ContextRef,
NamedChildrenCollector, TryDeserializeArrayMetadata,
};

mod owned;
mod statistics;
mod viewed;

/// A central type for all Vortex arrays, which are known length sequences of typed and possibly compressed data.
///
/// This is the main entrypoint for working with in-memory Vortex data, and dispatches work over the underlying encoding or memory representations.
#[derive(Debug, Clone)]
pub struct ArrayData(InnerArrayData);
pub struct ArrayData(Arc<InnerArrayData>);

#[derive(Debug, Clone)]
#[derive(Debug)]
enum InnerArrayData {
/// Owned [`ArrayData`] with serialized metadata, backed by heap-allocated memory.
Owned(OwnedArrayData),
@@ -45,13 +46,13 @@ enum InnerArrayData {

impl From<OwnedArrayData> for ArrayData {
fn from(data: OwnedArrayData) -> Self {
ArrayData(InnerArrayData::Owned(data))
ArrayData(Arc::new(InnerArrayData::Owned(data)))
}
}

impl From<ViewedArrayData> for ArrayData {
fn from(data: ViewedArrayData) -> Self {
ArrayData(InnerArrayData::Viewed(data))
ArrayData(Arc::new(InnerArrayData::Viewed(data)))
}
}

@@ -61,8 +62,8 @@ impl ArrayData {
dtype: DType,
len: usize,
metadata: Arc<dyn ArrayMetadata>,
buffers: Arc<[ByteBuffer]>,
children: Arc<[ArrayData]>,
buffers: Box<[ByteBuffer]>,
children: Box<[ArrayData]>,
statistics: StatsSet,
) -> VortexResult<Self> {
Self::try_new(InnerArrayData::Owned(OwnedArrayData {
@@ -72,9 +73,9 @@ impl ArrayData {
metadata,
buffers,
children,
stats_set: Arc::new(RwLock::new(statistics)),
stats_set: RwLock::new(statistics),
#[cfg(feature = "canonical_counter")]
canonical_counter: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
canonical_counter: std::sync::atomic::AtomicUsize::new(0),
}))
}

@@ -113,15 +114,15 @@ impl ArrayData {
buffers: buffers.into(),
ctx,
#[cfg(feature = "canonical_counter")]
canonical_counter: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
canonical_counter: std::sync::atomic::AtomicUsize::new(0),
};

Self::try_new(InnerArrayData::Viewed(view))
}

/// Shared constructor that performs common array validation.
fn try_new(inner: InnerArrayData) -> VortexResult<Self> {
let array = ArrayData(inner);
let array = ArrayData(Arc::new(inner));

// Sanity check that the encoding implements the correct array trait
debug_assert!(
@@ -148,7 +149,7 @@ impl ArrayData {

/// Return the array's encoding
pub fn encoding(&self) -> EncodingRef {
match &self.0 {
match self.0.as_ref() {
InnerArrayData::Owned(d) => d.encoding,
InnerArrayData::Viewed(v) => v.encoding,
}
@@ -157,7 +158,7 @@ impl ArrayData {
/// Returns the number of logical elements in the array.
#[allow(clippy::same_name_method)]
pub fn len(&self) -> usize {
match &self.0 {
match self.0.as_ref() {
InnerArrayData::Owned(d) => d.len,
InnerArrayData::Viewed(v) => v.len,
}
@@ -203,19 +204,26 @@ impl ArrayData {
}

pub fn child<'a>(&'a self, idx: usize, dtype: &'a DType, len: usize) -> VortexResult<Self> {
match &self.0 {
match self.0.as_ref() {
InnerArrayData::Owned(d) => d.child(idx, dtype, len).cloned(),
InnerArrayData::Viewed(v) => v
.child(idx, dtype, len)
.map(|view| ArrayData(InnerArrayData::Viewed(view))),
.map(|view| ArrayData(Arc::new(InnerArrayData::Viewed(view)))),
}
}

/// Returns a Vec of Arrays with all the array's child arrays.
// TODO(ngates): deprecate this function and return impl Iterator
pub fn children(&self) -> Vec<ArrayData> {
match &self.0 {
match self.0.as_ref() {
InnerArrayData::Owned(d) => d.children().to_vec(),
InnerArrayData::Viewed(v) => v.children(),
InnerArrayData::Viewed(_) => {
let mut collector = ChildrenCollector::default();
self.encoding()
.accept(self, &mut collector)
.vortex_expect("Failed to get children");
collector.children()
}
}
}

@@ -230,7 +238,7 @@ impl ArrayData {

/// Returns the number of child arrays
pub fn nchildren(&self) -> usize {
match &self.0 {
match self.0.as_ref() {
InnerArrayData::Owned(d) => d.nchildren(),
InnerArrayData::Viewed(v) => v.nchildren(),
}
@@ -270,7 +278,7 @@ impl ArrayData {
}

pub fn array_metadata(&self) -> &dyn ArrayMetadata {
match &self.0 {
match self.0.as_ref() {
InnerArrayData::Owned(d) => &*d.metadata,
InnerArrayData::Viewed(v) => &*v.metadata,
}
@@ -279,7 +287,7 @@ impl ArrayData {
pub fn metadata<M: ArrayMetadata + Clone + for<'m> TryDeserializeArrayMetadata<'m>>(
&self,
) -> VortexResult<&M> {
match &self.0 {
match self.0.as_ref() {
InnerArrayData::Owned(d) => &d.metadata,
InnerArrayData::Viewed(v) => &v.metadata,
}
@@ -298,7 +306,7 @@ impl ArrayData {
/// View arrays will return a reference to their bytes, while heap-backed arrays
/// must first serialize their metadata, returning an owned byte array to the caller.
pub fn metadata_bytes(&self) -> VortexResult<Cow<[u8]>> {
match &self.0 {
match self.0.as_ref() {
InnerArrayData::Owned(array_data) => {
// Heap-backed arrays must first try and serialize the metadata.
let owned_meta: Vec<u8> = array_data
@@ -320,14 +328,14 @@ impl ArrayData {
}

pub fn nbuffers(&self) -> usize {
match &self.0 {
match self.0.as_ref() {
InnerArrayData::Owned(o) => o.buffers.len(),
InnerArrayData::Viewed(v) => v.nbuffers(),
}
}

pub fn byte_buffer(&self, index: usize) -> Option<&ByteBuffer> {
match &self.0 {
match self.0.as_ref() {
InnerArrayData::Owned(d) => d.byte_buffer(index),
InnerArrayData::Viewed(v) => v.buffer(index),
}
@@ -340,8 +348,11 @@ impl ArrayData {
}

pub fn into_byte_buffer(self, index: usize) -> Option<ByteBuffer> {
match self.0 {
InnerArrayData::Owned(d) => d.into_byte_buffer(index),
// NOTE(ngates): we can't really into_inner an Arc, so instead we clone the buffer out,
// but we still consume self by value such that the ref-count drops at the end of this
// function.
match self.0.as_ref() {
InnerArrayData::Owned(d) => d.byte_buffer(index).cloned(),
InnerArrayData::Viewed(v) => v.buffer(index).cloned(),
}
}
@@ -368,7 +379,7 @@ impl ArrayData {

#[cfg(feature = "canonical_counter")]
pub(crate) fn inc_canonical_counter(&self) {
let prev = match &self.0 {
let prev = match self.0.as_ref() {
InnerArrayData::Owned(o) => o
.canonical_counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
@@ -404,7 +415,7 @@ impl ArrayData {

impl Display for ArrayData {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let prefix = match &self.0 {
let prefix = match self.0.as_ref() {
InnerArrayData::Owned(_) => "",
InnerArrayData::Viewed(_) => "$",
};
@@ -421,22 +432,13 @@ impl Display for ArrayData {

impl<T: AsRef<ArrayData>> ArrayDType for T {
fn dtype(&self) -> &DType {
match &self.as_ref().0 {
match self.as_ref().0.as_ref() {
InnerArrayData::Owned(d) => &d.dtype,
InnerArrayData::Viewed(v) => &v.dtype,
}
}
}

impl ArrayData {
pub fn into_dtype(self) -> DType {
match self.0 {
InnerArrayData::Owned(d) => d.dtype,
InnerArrayData::Viewed(v) => v.dtype,
}
}
}

impl<T: AsRef<ArrayData>> ArrayLen for T {
fn len(&self) -> usize {
self.as_ref().len()
@@ -461,10 +463,7 @@ impl<A: AsRef<ArrayData>> ArrayValidity for A {

impl<T: AsRef<ArrayData>> ArrayStatistics for T {
fn statistics(&self) -> &(dyn Statistics + '_) {
match &self.as_ref().0 {
InnerArrayData::Owned(d) => d,
InnerArrayData::Viewed(v) => v,
}
self.as_ref()
}

// FIXME(ngates): this is really slow...
@@ -498,3 +497,20 @@ impl Iterator for ArrayDataIterator {
}
}
}

/// An array data that holds a weak reference to its internals.
pub struct WeakArrayData(Weak<InnerArrayData>);

impl WeakArrayData {
/// Attempts to upgrade the weak reference to a strong reference.
pub fn upgrade(&self) -> Option<ArrayData> {
self.0.upgrade().map(ArrayData)
}
}

impl ArrayData {
/// Downgrades the array data to a weak reference.
pub fn downgrade(self) -> WeakArrayData {
WeakArrayData(Arc::downgrade(&self.0))
}
}
Loading

0 comments on commit 28f4403

Please sign in to comment.