From 665b7e5c6edf074c6db7aa2e28bd8fed2e083bd0 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 9 Aug 2024 16:17:54 +0800 Subject: [PATCH] perf: merge small byte ranges for optimized fetching (#4520) --- src/mito2/src/sst/parquet/helper.rs | 94 ++++++----------------------- 1 file changed, 18 insertions(+), 76 deletions(-) diff --git a/src/mito2/src/sst/parquet/helper.rs b/src/mito2/src/sst/parquet/helper.rs index b3cc8f8279d3..e80f751af982 100644 --- a/src/mito2/src/sst/parquet/helper.rs +++ b/src/mito2/src/sst/parquet/helper.rs @@ -16,7 +16,7 @@ use std::ops::Range; use std::sync::Arc; use bytes::Bytes; -use object_store::{ErrorKind, ObjectStore}; +use object_store::ObjectStore; use parquet::basic::ColumnOrder; use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData}; use parquet::format; @@ -88,84 +88,26 @@ fn parse_column_orders( } } -/// Fetches data from object store. -/// If the object store supports blocking, use sequence blocking read. -/// Otherwise, use concurrent read. -pub async fn fetch_byte_ranges( - file_path: &str, - object_store: ObjectStore, - ranges: &[Range], -) -> object_store::Result> { - if object_store.info().full_capability().blocking { - fetch_ranges_seq(file_path, object_store, ranges).await - } else { - fetch_ranges_concurrent(file_path, object_store, ranges).await - } -} - -/// Fetches data from object store sequentially -async fn fetch_ranges_seq( - file_path: &str, - object_store: ObjectStore, - ranges: &[Range], -) -> object_store::Result> { - let block_object_store = object_store.blocking(); - let file_path = file_path.to_string(); - let ranges = ranges.to_vec(); - - let f = move || -> object_store::Result> { - ranges - .into_iter() - .map(|range| { - let data = block_object_store - .read_with(&file_path) - .range(range.start..range.end) - .call()?; - Ok::<_, object_store::Error>(data.to_bytes()) - }) - .collect::>>() - }; - - maybe_spawn_blocking(f).await -} +const FETCH_PARALLELISM: usize = 8; +const MERGE_GAP: usize = 512 * 1024; -/// Fetches data from object store concurrently. -async fn fetch_ranges_concurrent( +/// Asynchronously fetches byte ranges from an object store. +/// +/// * `FETCH_PARALLELISM` - The number of concurrent fetch operations. +/// * `MERGE_GAP` - The maximum gap size (in bytes) to merge small byte ranges for optimized fetching. +pub async fn fetch_byte_ranges( file_path: &str, object_store: ObjectStore, ranges: &[Range], ) -> object_store::Result> { - // TODO(QuenKar): may merge small ranges to a bigger range to optimize. - let mut handles = Vec::with_capacity(ranges.len()); - for range in ranges { - let future_read = object_store.read_with(file_path); - handles.push(async move { - let data = future_read.range(range.start..range.end).await?; - Ok::<_, object_store::Error>(data.to_bytes()) - }); - } - let results = futures::future::try_join_all(handles).await?; - Ok(results) -} - -// Port from https://github.com/apache/arrow-rs/blob/802ed428f87051fdca31180430ddb0ecb2f60e8b/object_store/src/util.rs#L74-L83 -/// Takes a function and spawns it to a tokio blocking pool if available -async fn maybe_spawn_blocking(f: F) -> object_store::Result -where - F: FnOnce() -> object_store::Result + Send + 'static, - T: Send + 'static, -{ - match tokio::runtime::Handle::try_current() { - Ok(runtime) => runtime - .spawn_blocking(f) - .await - .map_err(new_task_join_error)?, - Err(_) => f(), - } -} - -// https://github.com/apache/opendal/blob/v0.46.0/core/src/raw/tokio_util.rs#L21-L24 -/// Parse tokio error into opendal::Error. -fn new_task_join_error(e: tokio::task::JoinError) -> object_store::Error { - object_store::Error::new(ErrorKind::Unexpected, "tokio task join failed").set_source(e) + Ok(object_store + .reader_with(file_path) + .concurrent(FETCH_PARALLELISM) + .gap(MERGE_GAP) + .await? + .fetch(ranges.to_vec()) + .await? + .into_iter() + .map(|buf| buf.to_bytes()) + .collect::>()) }