Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cutover to Vortex Layouts #1899

Merged
merged 74 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
53599ef
DataFusion Layouts
gatesn Jan 7, 2025
e01cedc
DataFusion Layouts
gatesn Jan 7, 2025
3c9f285
DataFusion Layouts
gatesn Jan 7, 2025
6f4b711
DataFusion Layouts
gatesn Jan 7, 2025
f1e11d1
DataFusion Layouts
gatesn Jan 7, 2025
0d0b59e
add hash to vortex-expr and therefore vortex-scalar
joseph-isaacs Jan 9, 2025
b2453a4
use dyn-hash
joseph-isaacs Jan 9, 2025
e8b7d94
clippy
joseph-isaacs Jan 9, 2025
4314b62
fix
joseph-isaacs Jan 9, 2025
f122076
derive hash impl PartialEq
joseph-isaacs Jan 9, 2025
f91f062
wip
joseph-isaacs Jan 9, 2025
ca5253e
wip
joseph-isaacs Jan 9, 2025
aff1c14
wip
joseph-isaacs Jan 9, 2025
e1211bc
wip
joseph-isaacs Jan 9, 2025
e917d73
wip
joseph-isaacs Jan 9, 2025
9d47a02
Merge branch 'develop' into ji/rescope-exprs
joseph-isaacs Jan 9, 2025
0e54bb6
rescope exprs
joseph-isaacs Jan 10, 2025
912387e
Merge branch 'develop' into ji/rescope-exprs
joseph-isaacs Jan 10, 2025
c379aec
no dt
joseph-isaacs Jan 10, 2025
b4fa31e
update
joseph-isaacs Jan 10, 2025
fa0c6ac
update
joseph-isaacs Jan 10, 2025
2a0e675
update
joseph-isaacs Jan 10, 2025
2ce862a
update
joseph-isaacs Jan 10, 2025
7789787
update
joseph-isaacs Jan 10, 2025
abf15be
update
joseph-isaacs Jan 10, 2025
8ee6e0b
no add
joseph-isaacs Jan 10, 2025
21e1eea
merge
gatesn Jan 10, 2025
42c1373
merge
gatesn Jan 10, 2025
47e54ba
wip expr proj
joseph-isaacs Jan 10, 2025
116301e
wip
joseph-isaacs Jan 10, 2025
c4d7480
fix
joseph-isaacs Jan 10, 2025
12f11b5
fix
joseph-isaacs Jan 10, 2025
5853125
cleanup
joseph-isaacs Jan 10, 2025
29b6fec
todo
joseph-isaacs Jan 10, 2025
3bda2c3
I/O driver
gatesn Jan 10, 2025
71cc555
I/O driver
gatesn Jan 10, 2025
545108d
I/O driver
gatesn Jan 10, 2025
625aae7
Better buffer debug
gatesn Jan 10, 2025
fc71762
Better buffer debug
gatesn Jan 10, 2025
a6f274c
IntoArrow
gatesn Jan 10, 2025
ce6c023
Merge
gatesn Jan 11, 2025
eec354b
Fix conflicts
gatesn Jan 11, 2025
7af8780
Remove dtype index
gatesn Jan 11, 2025
d00a6ab
merge develop
gatesn Jan 11, 2025
fa0d5ce
Struct Layout
gatesn Jan 11, 2025
043a936
Merge branch 'develop' into ngates/datafusion-layouts
gatesn Jan 11, 2025
9c0ed0e
Struct Layout
gatesn Jan 11, 2025
b52817b
merge
gatesn Jan 13, 2025
97ca0bb
merge
gatesn Jan 13, 2025
84b276f
Fixes
gatesn Jan 13, 2025
48befca
Fixes
gatesn Jan 13, 2025
6d3ab6e
Cleanup Layouts
gatesn Jan 13, 2025
1f58672
Cleanup Layouts
gatesn Jan 13, 2025
49f79a4
Cleanup Layouts
gatesn Jan 13, 2025
e725890
Cleanup Layouts
gatesn Jan 13, 2025
619ffa8
Support opening Vortex files without I/O
gatesn Jan 13, 2025
442495c
Support opening Vortex files without I/O
gatesn Jan 13, 2025
52811f1
Dumb coalescing
gatesn Jan 13, 2025
bfcb2b1
Dumb coalescing
gatesn Jan 13, 2025
eb8b025
Dumb coalescing
gatesn Jan 13, 2025
c44ae3b
Dumb coalescing
gatesn Jan 13, 2025
588a4dc
Dumb coalescing
gatesn Jan 13, 2025
67e97f7
Dumb coalescing
gatesn Jan 13, 2025
7c336e0
Merge branch 'develop' into ngates/datafusion-layouts
joseph-isaacs Jan 14, 2025
94b38d6
ignore inconvertible df-exprs and run typed simplify
joseph-isaacs Jan 14, 2025
0e90cd4
Segment Cache
gatesn Jan 14, 2025
7f28796
Fix layouts (#1936)
joseph-isaacs Jan 14, 2025
9b40764
merge
gatesn Jan 14, 2025
e01cb19
Merge branch 'ngates/datafusion-layouts' of github.com:spiraldb/vorte…
gatesn Jan 14, 2025
abd97f7
merge
gatesn Jan 14, 2025
1ccc5ab
merge
gatesn Jan 14, 2025
e78905e
merge
gatesn Jan 14, 2025
ca8cb16
merge
gatesn Jan 14, 2025
ba2127e
merge
gatesn Jan 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 13 additions & 10 deletions bench-vortex/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use futures::{stream, StreamExt, TryStreamExt};
use futures::executor::block_on;
use itertools::Itertools;
use rayon::prelude::*;
use tokio::fs::{create_dir_all, OpenOptions};
use vortex::aliases::hash_map::HashMap;
use vortex::array::{ChunkedArray, StructArray};
use vortex::dtype::DType;
use vortex::error::vortex_err;
use vortex::file::{VortexFileWriter, VORTEX_FILE_EXTENSION};
use vortex::file::v2::VortexWriteOptions;
use vortex::file::VORTEX_FILE_EXTENSION;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::variants::StructArrayTrait;
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
Expand Down Expand Up @@ -149,7 +152,9 @@ pub async fn register_vortex_files(
let vortex_dir = input_path.join("vortex");
create_dir_all(&vortex_dir).await?;

stream::iter(0..100)
(0..100)
.collect_vec()
.par_iter()
.map(|idx| {
let parquet_file_path = input_path
.join("parquet")
Expand All @@ -158,7 +163,7 @@ pub async fn register_vortex_files(
let session = session.clone();
let schema = schema.clone();

tokio::spawn(async move {
block_on(async move {
let output_path = output_path.clone();
idempotent_async(&output_path, move |vtx_file| async move {
eprintln!("Processing file {idx}");
Expand Down Expand Up @@ -219,19 +224,17 @@ pub async fn register_vortex_files(
.open(&vtx_file)
.await?;

let mut writer = VortexFileWriter::new(f);
writer = writer.write_array_columns(data).await?;
writer.finalize().await?;
VortexWriteOptions::default()
.write(f, data.into_array_stream())
.await?;

anyhow::Ok(())
})
.await
.expect("Failed to write Vortex file")
})
})
.buffered(16)
.try_collect::<Vec<_>>()
.await?;
.collect::<Vec<_>>();

let format = Arc::new(VortexFormat::new(CTX.clone()));
let table_path = vortex_dir
Expand Down
68 changes: 26 additions & 42 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fs::File;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::{Arc, LazyLock};
use std::sync::Arc;

use arrow_array::types::Int64Type;
use arrow_array::{
Expand All @@ -24,18 +24,16 @@ use stream::StreamExt;
use vortex::aliases::hash_map::HashMap;
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::buffer::Buffer;
use vortex::compress::CompressionStrategy;
use vortex::dtype::DType;
use vortex::error::VortexResult;
use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
use vortex::io::{IoDispatcher, ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite};
use vortex::file::v2::{VortexOpenOptions, VortexWriteOptions};
use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite};
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
use vortex::scan::Scan;
use vortex::stream::ArrayStreamExt;
use vortex::{ArrayData, IntoArrayData, IntoCanonical};

static DISPATCHER: LazyLock<Arc<IoDispatcher>> =
LazyLock::new(|| Arc::new(IoDispatcher::default()));

pub const BATCH_SIZE: usize = 65_536;

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -48,19 +46,12 @@ pub struct VortexFooter {
pub async fn open_vortex(path: &Path) -> VortexResult<ArrayData> {
let file = TokioFile::open(path).unwrap();

VortexReadBuilder::new(
file,
LayoutDeserializer::new(
ALL_ENCODINGS_CONTEXT.clone(),
LayoutContext::default().into(),
),
)
.with_io_dispatcher(DISPATCHER.clone())
.build()
.await?
.into_stream()
.read_all()
.await
VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this the default context in vortex-file

.open(file)
.await?
.scan(Scan::all())?
.into_array_data()
.await
}

pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
Expand All @@ -69,11 +60,10 @@ pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
) -> VortexResult<()> {
let chunked = compress_parquet_to_vortex(parquet_path.as_path())?;

VortexFileWriter::new(write)
.write_array_columns(chunked)
.await?
.finalize()
VortexWriteOptions::default()
.write(write, chunked.into_array_stream())
.await?;

Ok(())
}

Expand Down Expand Up @@ -116,25 +106,19 @@ pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResu

async fn take_vortex<T: VortexReadAt + Unpin + 'static>(
reader: T,
indices: &[u64],
_indices: &[u64],
) -> VortexResult<ArrayData> {
VortexReadBuilder::new(
reader,
LayoutDeserializer::new(
ALL_ENCODINGS_CONTEXT.clone(),
LayoutContext::default().into(),
),
)
.with_io_dispatcher(DISPATCHER.clone())
.with_indices(Buffer::copy_from(indices).into_array())
.build()
.await?
.into_stream()
.read_all()
.await
// For equivalence.... we decompress to make sure we're not cheating too much.
.and_then(IntoCanonical::into_canonical)
.map(ArrayData::from)
VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone())
.open(reader)
.await?
// FIXME(ngates): support row indices
// .scan_rows(Scan::all(), indices.iter().copied())?
.scan(Scan::all())?
.into_array_data()
.await?
// For equivalence.... we decompress to make sure we're not cheating too much.
.into_canonical()
.map(ArrayData::from)
}

pub async fn take_vortex_object_store(
Expand Down
9 changes: 5 additions & 4 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use vortex::aliases::hash_map::HashMap;
use vortex::array::{ChunkedArray, StructArray};
use vortex::arrow::FromArrowArray;
use vortex::dtype::DType;
use vortex::file::{VortexFileWriter, VORTEX_FILE_EXTENSION};
use vortex::file::VORTEX_FILE_EXTENSION;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::variants::StructArrayTrait;
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
Expand All @@ -31,6 +31,7 @@ mod execute;
pub mod schema;

pub use execute::*;
use vortex::file::v2::VortexWriteOptions;

pub const EXPECTED_ROW_COUNTS: [usize; 23] = [
0, 4, 460, 11620, 5, 5, 1, 4, 2, 175, 37967, 1048, 2, 42, 1, 1, 18314, 1, 57, 1, 186, 411, 7,
Expand Down Expand Up @@ -275,9 +276,9 @@ async fn register_vortex_file(
.open(&vtx_file)
.await?;

let mut writer = VortexFileWriter::new(f);
writer = writer.write_array_columns(data).await?;
writer.finalize().await?;
VortexWriteOptions::default()
.write(f, data.into_array_stream())
.await?;

anyhow::Ok(())
})
Expand Down
5 changes: 3 additions & 2 deletions vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::fmt::{Debug, Display};

use futures_util::stream;
use serde::{Deserialize, Serialize};
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;
use vortex_dtype::{DType, Nullability, PType};
use vortex_error::{vortex_bail, vortex_panic, VortexExpect as _, VortexResult, VortexUnwrap};

Expand Down Expand Up @@ -51,7 +51,8 @@ impl ChunkedArray {
}
}

let chunk_offsets = Buffer::from_iter(
let mut chunk_offsets = BufferMut::<u64>::with_capacity(chunks.len() + 1);
chunk_offsets.extend(
[0u64]
.into_iter()
.chain(chunks.iter().map(|c| c.len() as u64))
Expand Down
1 change: 0 additions & 1 deletion vortex-array/src/array/varbinview/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ impl TakeFn<VarBinViewArray> for VarBinViewEncoding {
fn take(&self, array: &VarBinViewArray, indices: &ArrayData) -> VortexResult<ArrayData> {
// Compute the new validity
let validity = array.validity().take(indices)?;

let indices = indices.clone().into_primitive()?;

let views_buffer = match_each_integer_ptype!(indices.ptype(), |$I| {
Expand Down
12 changes: 12 additions & 0 deletions vortex-buffer/src/buffer_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,19 @@ impl<T> BufferMut<T> {
T: Copy,
{
self.reserve(n);
unsafe { self.push_n_unchecked(item, n) }
}

/// Appends n scalars to the buffer.
///
/// ## Safety
///
/// The caller must ensure there is sufficient capacity in the array.
#[inline]
pub unsafe fn push_n_unchecked(&mut self, item: T, n: usize)
where
T: Copy,
{
let mut dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
// SAFETY: we checked the capacity in the reserve call
unsafe {
Expand Down
1 change: 1 addition & 0 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ vortex-error = { workspace = true, features = ["datafusion"] }
vortex-expr = { workspace = true, features = ["datafusion"] }
vortex-file = { workspace = true, features = ["object_store"] }
vortex-io = { workspace = true, features = ["object_store", "tokio"] }
vortex-scan = { workspace = true }

[dev-dependencies]
anyhow = { workspace = true }
Expand Down
22 changes: 13 additions & 9 deletions vortex-datafusion/src/persistent/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use chrono::{DateTime, Utc};
use moka::future::Cache;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use vortex_array::ContextRef;
use vortex_error::{vortex_err, VortexError, VortexResult};
use vortex_file::{read_initial_bytes, InitialRead};
use vortex_file::v2::{FileLayout, VortexOpenOptions};
use vortex_io::ObjectStoreReadAt;

#[derive(Debug, Clone)]
pub struct InitialReadCache {
inner: Cache<Key, InitialRead>,
pub struct FileLayoutCache {
inner: Cache<Key, FileLayout>,
}

#[derive(Hash, Eq, PartialEq, Debug)]
Expand All @@ -28,28 +29,31 @@ impl From<&ObjectMeta> for Key {
}
}

impl InitialReadCache {
impl FileLayoutCache {
pub fn new(size_mb: usize) -> Self {
let inner = Cache::builder()
.weigher(|k: &Key, v: &InitialRead| (k.location.as_ref().len() + v.buf.len()) as u32)
.max_capacity(size_mb as u64 * (2 << 20))
.eviction_listener(|k, _v, cause| {
.eviction_listener(|k: Arc<Key>, _v, cause| {
log::trace!("Removed {} due to {:?}", k.location, cause);
})
.build();

Self { inner }
}

pub async fn try_get(
&self,
object: &ObjectMeta,
store: Arc<dyn ObjectStore>,
) -> VortexResult<InitialRead> {
) -> VortexResult<FileLayout> {
self.inner
.try_get_with(Key::from(object), async {
let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone());
let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?;
VortexResult::Ok(initial_read)
let vxf = VortexOpenOptions::new(ContextRef::default())
.with_file_size(object.size as u64)
.open(os_read_at)
.await?;
VortexResult::Ok(vxf.file_layout().clone())
})
.await
.map_err(|e: Arc<VortexError>| match Arc::try_unwrap(e) {
Expand Down
19 changes: 10 additions & 9 deletions vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use itertools::Itertools;
use vortex_array::ContextRef;
use vortex_dtype::FieldName;

use super::cache::InitialReadCache;
use super::cache::FileLayoutCache;
use crate::persistent::opener::VortexFileOpener;

#[derive(Debug, Clone)]
Expand All @@ -25,7 +25,7 @@ pub struct VortexExec {
plan_properties: PlanProperties,
projected_statistics: Statistics,
ctx: ContextRef,
initial_read_cache: InitialReadCache,
initial_read_cache: FileLayoutCache,
}

impl VortexExec {
Expand All @@ -34,7 +34,7 @@ impl VortexExec {
metrics: ExecutionPlanMetricsSet,
predicate: Option<Arc<dyn PhysicalExpr>>,
ctx: ContextRef,
initial_read_cache: InitialReadCache,
initial_read_cache: FileLayoutCache,
) -> DFResult<Self> {
let projected_schema = project_schema(
&file_scan_config.file_schema,
Expand Down Expand Up @@ -122,17 +122,18 @@ impl ExecutionPlan for VortexExec {
projection
.iter()
.map(|i| FieldName::from(arrow_schema.fields[*i].name().clone()))
.collect_vec()
.collect()
});

let opener = VortexFileOpener {
ctx: self.ctx.clone(),
// TODO(joe): apply the predicate/filter mapping to vortex-expr once.
let opener = VortexFileOpener::new(
self.ctx.clone(),
object_store,
projection,
predicate: self.predicate.clone(),
initial_read_cache: self.initial_read_cache.clone(),
self.predicate.clone(),
arrow_schema,
};
self.initial_read_cache.clone(),
)?;
let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?;

Ok(Box::pin(stream))
Expand Down
Loading
Loading