Skip to content

Commit

Permalink
Cutover to Vortex Layouts (#1899)
Browse files Browse the repository at this point in the history
Given this branch now builds, it's worth keeping it that way while we
port functionality into the new layouts.

---------

Co-authored-by: Joe Isaacs <[email protected]>
  • Loading branch information
gatesn and joseph-isaacs authored Jan 14, 2025
1 parent fc6deac commit dfdeaf0
Show file tree
Hide file tree
Showing 23 changed files with 371 additions and 279 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 13 additions & 10 deletions bench-vortex/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use futures::{stream, StreamExt, TryStreamExt};
use futures::executor::block_on;
use itertools::Itertools;
use rayon::prelude::*;
use tokio::fs::{create_dir_all, OpenOptions};
use vortex::aliases::hash_map::HashMap;
use vortex::array::{ChunkedArray, StructArray};
use vortex::dtype::DType;
use vortex::error::vortex_err;
use vortex::file::{VortexFileWriter, VORTEX_FILE_EXTENSION};
use vortex::file::v2::VortexWriteOptions;
use vortex::file::VORTEX_FILE_EXTENSION;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::variants::StructArrayTrait;
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
Expand Down Expand Up @@ -149,7 +152,9 @@ pub async fn register_vortex_files(
let vortex_dir = input_path.join("vortex");
create_dir_all(&vortex_dir).await?;

stream::iter(0..100)
(0..100)
.collect_vec()
.par_iter()
.map(|idx| {
let parquet_file_path = input_path
.join("parquet")
Expand All @@ -158,7 +163,7 @@ pub async fn register_vortex_files(
let session = session.clone();
let schema = schema.clone();

tokio::spawn(async move {
block_on(async move {
let output_path = output_path.clone();
idempotent_async(&output_path, move |vtx_file| async move {
eprintln!("Processing file {idx}");
Expand Down Expand Up @@ -219,19 +224,17 @@ pub async fn register_vortex_files(
.open(&vtx_file)
.await?;

let mut writer = VortexFileWriter::new(f);
writer = writer.write_array_columns(data).await?;
writer.finalize().await?;
VortexWriteOptions::default()
.write(f, data.into_array_stream())
.await?;

anyhow::Ok(())
})
.await
.expect("Failed to write Vortex file")
})
})
.buffered(16)
.try_collect::<Vec<_>>()
.await?;
.collect::<Vec<_>>();

let format = Arc::new(VortexFormat::new(CTX.clone()));
let table_path = vortex_dir
Expand Down
68 changes: 26 additions & 42 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fs::File;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::{Arc, LazyLock};
use std::sync::Arc;

use arrow_array::types::Int64Type;
use arrow_array::{
Expand All @@ -24,18 +24,16 @@ use stream::StreamExt;
use vortex::aliases::hash_map::HashMap;
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::buffer::Buffer;
use vortex::compress::CompressionStrategy;
use vortex::dtype::DType;
use vortex::error::VortexResult;
use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
use vortex::io::{IoDispatcher, ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite};
use vortex::file::v2::{VortexOpenOptions, VortexWriteOptions};
use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite};
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
use vortex::scan::Scan;
use vortex::stream::ArrayStreamExt;
use vortex::{ArrayData, IntoArrayData, IntoCanonical};

static DISPATCHER: LazyLock<Arc<IoDispatcher>> =
LazyLock::new(|| Arc::new(IoDispatcher::default()));

pub const BATCH_SIZE: usize = 65_536;

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -48,19 +46,12 @@ pub struct VortexFooter {
pub async fn open_vortex(path: &Path) -> VortexResult<ArrayData> {
let file = TokioFile::open(path).unwrap();

VortexReadBuilder::new(
file,
LayoutDeserializer::new(
ALL_ENCODINGS_CONTEXT.clone(),
LayoutContext::default().into(),
),
)
.with_io_dispatcher(DISPATCHER.clone())
.build()
.await?
.into_stream()
.read_all()
.await
VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone())
.open(file)
.await?
.scan(Scan::all())?
.into_array_data()
.await
}

pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
Expand All @@ -69,11 +60,10 @@ pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
) -> VortexResult<()> {
let chunked = compress_parquet_to_vortex(parquet_path.as_path())?;

VortexFileWriter::new(write)
.write_array_columns(chunked)
.await?
.finalize()
VortexWriteOptions::default()
.write(write, chunked.into_array_stream())
.await?;

Ok(())
}

Expand Down Expand Up @@ -116,25 +106,19 @@ pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResu

async fn take_vortex<T: VortexReadAt + Unpin + 'static>(
reader: T,
indices: &[u64],
_indices: &[u64],
) -> VortexResult<ArrayData> {
VortexReadBuilder::new(
reader,
LayoutDeserializer::new(
ALL_ENCODINGS_CONTEXT.clone(),
LayoutContext::default().into(),
),
)
.with_io_dispatcher(DISPATCHER.clone())
.with_indices(Buffer::copy_from(indices).into_array())
.build()
.await?
.into_stream()
.read_all()
.await
// For equivalence.... we decompress to make sure we're not cheating too much.
.and_then(IntoCanonical::into_canonical)
.map(ArrayData::from)
VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone())
.open(reader)
.await?
// FIXME(ngates): support row indices
// .scan_rows(Scan::all(), indices.iter().copied())?
.scan(Scan::all())?
.into_array_data()
.await?
// For equivalence.... we decompress to make sure we're not cheating too much.
.into_canonical()
.map(ArrayData::from)
}

pub async fn take_vortex_object_store(
Expand Down
9 changes: 5 additions & 4 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use vortex::aliases::hash_map::HashMap;
use vortex::array::{ChunkedArray, StructArray};
use vortex::arrow::FromArrowArray;
use vortex::dtype::DType;
use vortex::file::{VortexFileWriter, VORTEX_FILE_EXTENSION};
use vortex::file::VORTEX_FILE_EXTENSION;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::variants::StructArrayTrait;
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
Expand All @@ -31,6 +31,7 @@ mod execute;
pub mod schema;

pub use execute::*;
use vortex::file::v2::VortexWriteOptions;

pub const EXPECTED_ROW_COUNTS: [usize; 23] = [
0, 4, 460, 11620, 5, 5, 1, 4, 2, 175, 37967, 1048, 2, 42, 1, 1, 18314, 1, 57, 1, 186, 411, 7,
Expand Down Expand Up @@ -275,9 +276,9 @@ async fn register_vortex_file(
.open(&vtx_file)
.await?;

let mut writer = VortexFileWriter::new(f);
writer = writer.write_array_columns(data).await?;
writer.finalize().await?;
VortexWriteOptions::default()
.write(f, data.into_array_stream())
.await?;

anyhow::Ok(())
})
Expand Down
5 changes: 3 additions & 2 deletions vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::fmt::{Debug, Display};

use futures_util::stream;
use serde::{Deserialize, Serialize};
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;
use vortex_dtype::{DType, Nullability, PType};
use vortex_error::{vortex_bail, vortex_panic, VortexExpect as _, VortexResult, VortexUnwrap};

Expand Down Expand Up @@ -51,7 +51,8 @@ impl ChunkedArray {
}
}

let chunk_offsets = Buffer::from_iter(
let mut chunk_offsets = BufferMut::<u64>::with_capacity(chunks.len() + 1);
chunk_offsets.extend(
[0u64]
.into_iter()
.chain(chunks.iter().map(|c| c.len() as u64))
Expand Down
1 change: 0 additions & 1 deletion vortex-array/src/array/varbinview/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ impl TakeFn<VarBinViewArray> for VarBinViewEncoding {
fn take(&self, array: &VarBinViewArray, indices: &ArrayData) -> VortexResult<ArrayData> {
// Compute the new validity
let validity = array.validity().take(indices)?;

let indices = indices.clone().into_primitive()?;

let views_buffer = match_each_integer_ptype!(indices.ptype(), |$I| {
Expand Down
12 changes: 12 additions & 0 deletions vortex-buffer/src/buffer_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,19 @@ impl<T> BufferMut<T> {
T: Copy,
{
self.reserve(n);
unsafe { self.push_n_unchecked(item, n) }
}

/// Appends n scalars to the buffer.
///
/// ## Safety
///
/// The caller must ensure there is sufficient capacity in the array.
#[inline]
pub unsafe fn push_n_unchecked(&mut self, item: T, n: usize)
where
T: Copy,
{
let mut dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
// SAFETY: we checked the capacity in the reserve call
unsafe {
Expand Down
1 change: 1 addition & 0 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ vortex-error = { workspace = true, features = ["datafusion"] }
vortex-expr = { workspace = true, features = ["datafusion"] }
vortex-file = { workspace = true, features = ["object_store"] }
vortex-io = { workspace = true, features = ["object_store", "tokio"] }
vortex-scan = { workspace = true }

[dev-dependencies]
anyhow = { workspace = true }
Expand Down
22 changes: 13 additions & 9 deletions vortex-datafusion/src/persistent/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use chrono::{DateTime, Utc};
use moka::future::Cache;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use vortex_array::ContextRef;
use vortex_error::{vortex_err, VortexError, VortexResult};
use vortex_file::{read_initial_bytes, InitialRead};
use vortex_file::v2::{FileLayout, VortexOpenOptions};
use vortex_io::ObjectStoreReadAt;

#[derive(Debug, Clone)]
pub struct InitialReadCache {
inner: Cache<Key, InitialRead>,
pub struct FileLayoutCache {
inner: Cache<Key, FileLayout>,
}

#[derive(Hash, Eq, PartialEq, Debug)]
Expand All @@ -28,28 +29,31 @@ impl From<&ObjectMeta> for Key {
}
}

impl InitialReadCache {
impl FileLayoutCache {
pub fn new(size_mb: usize) -> Self {
let inner = Cache::builder()
.weigher(|k: &Key, v: &InitialRead| (k.location.as_ref().len() + v.buf.len()) as u32)
.max_capacity(size_mb as u64 * (2 << 20))
.eviction_listener(|k, _v, cause| {
.eviction_listener(|k: Arc<Key>, _v, cause| {
log::trace!("Removed {} due to {:?}", k.location, cause);
})
.build();

Self { inner }
}

pub async fn try_get(
&self,
object: &ObjectMeta,
store: Arc<dyn ObjectStore>,
) -> VortexResult<InitialRead> {
) -> VortexResult<FileLayout> {
self.inner
.try_get_with(Key::from(object), async {
let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone());
let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?;
VortexResult::Ok(initial_read)
let vxf = VortexOpenOptions::new(ContextRef::default())
.with_file_size(object.size as u64)
.open(os_read_at)
.await?;
VortexResult::Ok(vxf.file_layout().clone())
})
.await
.map_err(|e: Arc<VortexError>| match Arc::try_unwrap(e) {
Expand Down
19 changes: 10 additions & 9 deletions vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use itertools::Itertools;
use vortex_array::ContextRef;
use vortex_dtype::FieldName;

use super::cache::InitialReadCache;
use super::cache::FileLayoutCache;
use crate::persistent::opener::VortexFileOpener;

#[derive(Debug, Clone)]
Expand All @@ -25,7 +25,7 @@ pub struct VortexExec {
plan_properties: PlanProperties,
projected_statistics: Statistics,
ctx: ContextRef,
initial_read_cache: InitialReadCache,
initial_read_cache: FileLayoutCache,
}

impl VortexExec {
Expand All @@ -34,7 +34,7 @@ impl VortexExec {
metrics: ExecutionPlanMetricsSet,
predicate: Option<Arc<dyn PhysicalExpr>>,
ctx: ContextRef,
initial_read_cache: InitialReadCache,
initial_read_cache: FileLayoutCache,
) -> DFResult<Self> {
let projected_schema = project_schema(
&file_scan_config.file_schema,
Expand Down Expand Up @@ -122,17 +122,18 @@ impl ExecutionPlan for VortexExec {
projection
.iter()
.map(|i| FieldName::from(arrow_schema.fields[*i].name().clone()))
.collect_vec()
.collect()
});

let opener = VortexFileOpener {
ctx: self.ctx.clone(),
// TODO(joe): apply the predicate/filter mapping to vortex-expr once.
let opener = VortexFileOpener::new(
self.ctx.clone(),
object_store,
projection,
predicate: self.predicate.clone(),
initial_read_cache: self.initial_read_cache.clone(),
self.predicate.clone(),
arrow_schema,
};
self.initial_read_cache.clone(),
)?;
let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?;

Ok(Box::pin(stream))
Expand Down
Loading

0 comments on commit dfdeaf0

Please sign in to comment.