Skip to content

Commit

Permalink
perf: merge small byte ranges for optimized fetching (#4520)
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu authored Aug 9, 2024
1 parent 27d9aa0 commit 665b7e5
Showing 1 changed file with 18 additions and 76 deletions.
94 changes: 18 additions & 76 deletions src/mito2/src/sst/parquet/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::ops::Range;
use std::sync::Arc;

use bytes::Bytes;
use object_store::{ErrorKind, ObjectStore};
use object_store::ObjectStore;
use parquet::basic::ColumnOrder;
use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
use parquet::format;
Expand Down Expand Up @@ -88,84 +88,26 @@ fn parse_column_orders(
}
}

/// Fetches data from object store.
/// If the object store supports blocking, use sequence blocking read.
/// Otherwise, use concurrent read.
pub async fn fetch_byte_ranges(
file_path: &str,
object_store: ObjectStore,
ranges: &[Range<u64>],
) -> object_store::Result<Vec<Bytes>> {
if object_store.info().full_capability().blocking {
fetch_ranges_seq(file_path, object_store, ranges).await
} else {
fetch_ranges_concurrent(file_path, object_store, ranges).await
}
}

/// Fetches data from object store sequentially
async fn fetch_ranges_seq(
file_path: &str,
object_store: ObjectStore,
ranges: &[Range<u64>],
) -> object_store::Result<Vec<Bytes>> {
let block_object_store = object_store.blocking();
let file_path = file_path.to_string();
let ranges = ranges.to_vec();

let f = move || -> object_store::Result<Vec<Bytes>> {
ranges
.into_iter()
.map(|range| {
let data = block_object_store
.read_with(&file_path)
.range(range.start..range.end)
.call()?;
Ok::<_, object_store::Error>(data.to_bytes())
})
.collect::<object_store::Result<Vec<_>>>()
};

maybe_spawn_blocking(f).await
}
const FETCH_PARALLELISM: usize = 8;
const MERGE_GAP: usize = 512 * 1024;

/// Fetches data from object store concurrently.
async fn fetch_ranges_concurrent(
/// Asynchronously fetches byte ranges from an object store.
///
/// * `FETCH_PARALLELISM` - The number of concurrent fetch operations.
/// * `MERGE_GAP` - The maximum gap size (in bytes) to merge small byte ranges for optimized fetching.
pub async fn fetch_byte_ranges(
file_path: &str,
object_store: ObjectStore,
ranges: &[Range<u64>],
) -> object_store::Result<Vec<Bytes>> {
// TODO(QuenKar): may merge small ranges to a bigger range to optimize.
let mut handles = Vec::with_capacity(ranges.len());
for range in ranges {
let future_read = object_store.read_with(file_path);
handles.push(async move {
let data = future_read.range(range.start..range.end).await?;
Ok::<_, object_store::Error>(data.to_bytes())
});
}
let results = futures::future::try_join_all(handles).await?;
Ok(results)
}

// Port from https://github.com/apache/arrow-rs/blob/802ed428f87051fdca31180430ddb0ecb2f60e8b/object_store/src/util.rs#L74-L83
/// Takes a function and spawns it to a tokio blocking pool if available
async fn maybe_spawn_blocking<F, T>(f: F) -> object_store::Result<T>
where
F: FnOnce() -> object_store::Result<T> + Send + 'static,
T: Send + 'static,
{
match tokio::runtime::Handle::try_current() {
Ok(runtime) => runtime
.spawn_blocking(f)
.await
.map_err(new_task_join_error)?,
Err(_) => f(),
}
}

// https://github.com/apache/opendal/blob/v0.46.0/core/src/raw/tokio_util.rs#L21-L24
/// Parse tokio error into opendal::Error.
fn new_task_join_error(e: tokio::task::JoinError) -> object_store::Error {
object_store::Error::new(ErrorKind::Unexpected, "tokio task join failed").set_source(e)
Ok(object_store
.reader_with(file_path)
.concurrent(FETCH_PARALLELISM)
.gap(MERGE_GAP)
.await?
.fetch(ranges.to_vec())
.await?
.into_iter()
.map(|buf| buf.to_bytes())
.collect::<Vec<_>>())
}

0 comments on commit 665b7e5

Please sign in to comment.