Skip to content

Commit

Permalink
DF repartition by file sizes instead of number of files (#1572)
Browse files Browse the repository at this point in the history
AdamGS authored Dec 5, 2024

Verified

This commit was signed with the committer’s verified signature.
bearice Bearice Ren
1 parent ae58bb9 commit 9f254c4
Showing 1 changed file with 38 additions and 15 deletions.
53 changes: 38 additions & 15 deletions vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
@@ -136,7 +136,7 @@ impl ExecutionPlan for VortexExec {
) -> DFResult<Option<Arc<dyn ExecutionPlan>>> {
let file_groups = self.file_scan_config.file_groups.clone();

let repartitioned_file_groups = repartition_by_count(file_groups, target_partitions);
let repartitioned_file_groups = repartition_by_size(file_groups, target_partitions);

let mut new_plan = self.clone();

@@ -150,20 +150,43 @@ impl ExecutionPlan for VortexExec {
}
}

fn repartition_by_count(
fn repartition_by_size(
file_groups: Vec<Vec<PartitionedFile>>,
desired_partitions: usize,
) -> Vec<Vec<PartitionedFile>> {
let all_files = file_groups.into_iter().concat();
let total_file_count = all_files.len();
let total_size = all_files.iter().map(|f| f.object_meta.size).sum::<usize>();
let target_partition_size = total_size / (desired_partitions + 1);

let approx_files_per_partition = all_files.len().div_ceil(desired_partitions);
let mut repartitioned_file_groups = Vec::default();
let mut partitions = Vec::with_capacity(desired_partitions);

for chunk in &all_files.into_iter().chunks(approx_files_per_partition) {
repartitioned_file_groups.push(chunk.collect::<Vec<_>>());
let mut curr_partition_size = 0;
let mut curr_partition = Vec::default();

for file in all_files.into_iter() {
curr_partition_size += file.object_meta.size;
curr_partition.push(file);

if curr_partition_size > target_partition_size {
curr_partition_size = 0;
partitions.push(std::mem::take(&mut curr_partition));
}
}

repartitioned_file_groups
// if there's anything left, we shove it into existing partitions
for (idx, file) in curr_partition.into_iter().enumerate() {
let part_idx = idx % partitions.len();
partitions[part_idx].push(file);
}

assert_eq!(
partitions.len(),
usize::min(total_file_count, desired_partitions),
"The final number of partitions should be smallest between the total number of files and the desired partition count."
);

partitions
}

#[cfg(test)]
@@ -172,16 +195,16 @@ mod tests {

#[test]
fn basic_repartition_test() {
let input_file_groups = vec![vec![
PartitionedFile::new("a", 0),
PartitionedFile::new("b", 0),
PartitionedFile::new("c", 0),
PartitionedFile::new("d", 0),
PartitionedFile::new("e", 0),
let file_groups = vec![vec![
PartitionedFile::new("a", 100),
PartitionedFile::new("b", 25),
PartitionedFile::new("c", 25),
PartitionedFile::new("d", 25),
PartitionedFile::new("e", 50),
]];

let file_groups = repartition_by_count(input_file_groups, 2);
let output = repartition_by_size(file_groups, 2);

assert_eq!(file_groups.len(), 2);
assert_eq!(output.len(), 2);
}
}

0 comments on commit 9f254c4

Please sign in to comment.