Skip to content

Commit

Permalink
In external sort: add choice and path tosave count map, add optioanl …
Browse files Browse the repository at this point in the history
…automatic clean after iteration.
  • Loading branch information
fxpineau committed Jan 14, 2025
1 parent 93dde4a commit 4dbd30c
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 67 deletions.
2 changes: 1 addition & 1 deletion src/nested/sort/cindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ pub struct FitsMMappedCIndex<T: HCIndexValue> {
_phantom: PhantomData<T>,
}
impl<T: HCIndexValue> FitsMMappedCIndex<T> {
/// Private, only ment to be called from FITS reader.
/// Private, only meant to be called from FITS reader.
fn new(
fits_creation_date: Option<String>,
indexed_file_name: Option<String>,
Expand Down
121 changes: 55 additions & 66 deletions src/nested/sort/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
/*
Strategy:
* create 48 directories if 68 files each (=> 3072 files)
* create a 'cumul count' at a given order at a given depth (e.g. 8)
* spilt too large files in xx ranges computed from 'cumul counts'
* read sorting each file when reading it
*/

use std::{
cmp::Ordering,
error::Error,
Expand Down Expand Up @@ -95,6 +87,8 @@ pub struct SimpleExtSortInfo {
n_elems_per_chunk: u32,
/// Number of used threads.
n_threads: Option<usize>,
/// Remove temporary files (and possibly temporary dir) once the output iteration is over.
clean: bool,
/// Total number of rows.
n_tot: u64,
/// Healpix depth of the ranges in filenames.
Expand All @@ -106,13 +100,15 @@ impl SimpleExtSortInfo {
fn new(
n_elems_per_chunk: u32,
n_threads: Option<usize>,
clean: bool,
n_tot: u64,
depth: u8,
ordered_ranges_counts: Vec<(Range<u32>, u32)>,
) -> Self {
Self {
n_elems_per_chunk,
n_threads,
clean,
n_tot,
depth,
ordered_ranges_counts,
Expand All @@ -128,24 +124,30 @@ pub struct SimpleExtSortParams {
n_elems_per_chunk: u32,
/// Provide a given number of threads, else use all threads available.
n_threads: Option<usize>,
/// Remove temporary files (and possibly temporary dir) once the output iteration is over.
clean: bool,
}
impl Default for SimpleExtSortParams {
fn default() -> Self {
Self {
tmp_dir: PathBuf::from(".sort_tmp/"),
n_elems_per_chunk: 10_000_000,
n_threads: None,
}
Self::new(PathBuf::from(".sort_tmp/"), 10_000_000, None, true)
}
}
impl SimpleExtSortParams {
const ALL_FILENAME: &'static str = "hpxsort.all.unsorted.bin";
const PREFIX: &'static str = "hpxsort.";
const INFO_FILENAME: &'static str = "hpxsort.info.toml";
const COUNT_MAP_FILENAME: &'static str = "hpxsort.countmap.fits";
const SUCCESS_FILENAME: &'static str = "hpxsort.success";
const SUFFIX: &'static str = ".unsorted.bin";

fn new(tmp_dir: PathBuf, n_elems_per_chunk: u32, n_threads: Option<usize>, clean: bool) -> Self {
Self {
tmp_dir,
n_elems_per_chunk,
n_threads,
clean,
}
}

fn clean<P: AsRef<Path>>(tmp_dir: P) -> Result<(), IoError> {
let mut path = PathBuf::from(tmp_dir.as_ref());
debug!("PATH: {:?}", &path);
Expand All @@ -154,10 +156,6 @@ impl SimpleExtSortParams {
path.push(Self::SUCCESS_FILENAME);
debug!("Remove file {:?} if exists.", &path);
fs::exists(&path).and_then(|exists| if exists { remove_file(&path) } else { Ok(()) })?;
// Remove count map file
path.set_file_name(Self::COUNT_MAP_FILENAME);
debug!("Remove file {:?} if exists.", &path);
fs::exists(&path).and_then(|exists| if exists { remove_file(&path) } else { Ok(()) })?;
// Remove info file
path.set_file_name(Self::INFO_FILENAME);
debug!("Remove file {:?} if exists.", &path);
Expand Down Expand Up @@ -219,6 +217,7 @@ impl SimpleExtSortParams {
tmp_dir,
n_elems_per_chunk: info.n_elems_per_chunk,
n_threads: info.n_threads.clone(),
clean: info.clean,
}
}

Expand Down Expand Up @@ -287,6 +286,7 @@ impl SimpleExtSortParams {
let info = SimpleExtSortInfo::new(
self.n_elems_per_chunk,
self.n_threads,
self.clean,
n_tot,
depth,
ordered_ranges_counts,
Expand All @@ -311,12 +311,6 @@ impl SimpleExtSortParams {
.and_then(|content| toml::from_str(&content).map_err(|e| e.into()))
}

fn create_countmap_file_path(&self) -> PathBuf {
let mut path = self.tmp_dir.clone();
path.push(Self::COUNT_MAP_FILENAME);
path
}

// Read/Write SUCESS file

fn create_ok_file_path(&self) -> PathBuf {
Expand Down Expand Up @@ -462,7 +456,6 @@ where
let twice_dd = dd << 1;
// Config bincode
let bincode = get_bincode();
// .allow_trailing_bytes();
// Create the temporary directory
params.create_tmp_dir()?;
// Init thread pool
Expand Down Expand Up @@ -522,32 +515,18 @@ where
// * unwrap() is ok on binary_search since ranges cover the full hash range.
let first_h = (hpx29(entries_view.first().unwrap()) >> twice_dd) as u32;
let last_h = (hpx29(entries_view.last().unwrap()) >> twice_dd) as u32;
//debug!("first_h: {}; last_h: {}", first_h, last_h);
let rstart = ranges_counts
.binary_search_by(get_range_binsearch(first_h))
.unwrap();
let rend = ranges_counts
.binary_search_by(get_range_binsearch(last_h))
.unwrap();
/*debug!(
"rstart: {}; rend: {}. Tot size: {}",
rstart,
rend,
ranges_counts.len()
);*/

for (range, count) in &mut ranges_counts[rstart..=rend] {
/*debug!(
"Range [{}, {}]; count before: {}",
range.start, range.end, count
);*/
let to = entries_view.partition_point(|row| {
let h = (hpx29(row) >> twice_dd) as u32;
//debug!("h: {}, range: {:?}", h, &range);
//debug!("Before contains range: {:?}", h, &range);
range.contains(&h)
});
// debug!("to: {}; tot: {}", to, entries_view.len());
if to > 0 {
let (to_be_writen, remaining) = entries_view.split_at_mut(to);
entries_view = remaining;
Expand All @@ -557,16 +536,6 @@ where
for row in to_be_writen {
bincode.serialize_into(&mut bufw, row)?;
}
/*debug!(
"Range file [{}, {}), {} row written in {} ms.",
range.start,
range.end,
to,
SystemTime::now()
.duration_since(tstart)
.unwrap_or_default()
.as_millis()
);*/

let to = to as u32;
if to > *count {
Expand Down Expand Up @@ -715,12 +684,16 @@ where
thread_pool: ThreadPool,
/// Method to extract/compute the HEALPix order29 index from a row.
hpx29: FF,
/// Iterates on the files and ragen info: ((PathBuf, Depth, Range), (Range, NRows)).
/// Iterates on the files and range info: ((PathBuf, Depth, Range), (Range, NRows)).
ordered_files_counts_it: Zip<IntoIter<(PathBuf, u8, Range<u32>)>, IntoIter<(Range<u32>, u32)>>,
/// Iterates on the sorted rows of a file.
rows_it: IntoIter<TT>,
/// The next chunk of rows to be sorted before iterating over
next_rows: Option<Vec<TT>>,
/// The temporary dir, used only if `clean == true`.
tmp_dir: PathBuf,
/// Clean temporary file once the iteration is over.
clean: bool,
}
impl<TT: ExtSortable, FF: Fn(&TT) -> u64 + Sync> Iterator for GlobalIt<TT, FF> {
type Item = Result<TT, Box<dyn Error>>;
Expand Down Expand Up @@ -757,7 +730,14 @@ where
Err(e) => Some(Err(e.into())),
}
}
None => None,
None => {
if self.clean {
if let Err(e) = SimpleExtSortParams::clean(&self.tmp_dir) {
error!("Error cleaning external sort temporary files: {}", e);
}
}
None
}
},
}
}
Expand All @@ -769,6 +749,8 @@ where
ordered_files_counts_it,
rows_it: rows_to_be_sorted.into_iter(),
next_rows: next_file_content.transpose()?,
tmp_dir: params.tmp_dir,
clean: params.clean,
})
}

Expand Down Expand Up @@ -838,11 +820,12 @@ where
/// For simple case, hy not use a `Clonable IntoIter`?
/// I just found [this post](https://orxfun.github.io/orxfun-notes/#/missing-iterable-traits-2024-12-13)
/// that seems to tackle a similar problem here (iterating several time over a same Collection).
pub fn hpx_external_sort<T, E, I, J, F>(
pub fn hpx_external_sort<T, E, I, J, F, P: AsRef<Path>>(
iterator: I,
iterable: J,
hpx29: F,
depth: u8,
save_countmap_in_file: Option<P>, // Save a copy of the computed count map in the given Path
sort_params: Option<SimpleExtSortParams>,
) -> Result<impl Iterator<Item = Result<T, Box<dyn Error>>>, Box<dyn Error>>
where
Expand Down Expand Up @@ -879,7 +862,9 @@ where
);
let tstart = SystemTime::now();
params.create_tmp_dir()?;
count_map.to_fits_file(params.create_countmap_file_path())?;
if let Some(countmap_path) = save_countmap_in_file {
count_map.to_fits_file(countmap_path)?;
}
debug!(
"Count map of writen in {} ms.",
SystemTime::now()
Expand All @@ -890,10 +875,11 @@ where
hpx_external_sort_with_knowledge(iterable.into_iter(), &count_map, hpx29, Some(params))
}

pub fn hpx_external_sort_stream<T, E, I, F>(
pub fn hpx_external_sort_stream<T, E, I, F, P: AsRef<Path>>(
stream: I,
hpx29: F,
depth: u8,
save_countmap_in_file: Option<P>, // Save a copy of the computed count map in the given Path
sort_params: Option<SimpleExtSortParams>,
) -> Result<impl Iterator<Item = Result<T, Box<dyn Error>>>, Box<dyn Error>>
where
Expand Down Expand Up @@ -935,6 +921,9 @@ where
};
// Get an iterator over all written rows and apply the second part of the algorithm.
let n_rows = count_map.values().map(|count| *count as u64).sum();
if let Some(countmap_path) = save_countmap_in_file {
count_map.to_fits_file(countmap_path)?;
}
let mut bufr = params.open_file_all().map(BufReader::new)?;
hpx_external_sort_with_knowledge(
(0..n_rows).map(move |_| bincode.deserialize_from(&mut bufr)),
Expand All @@ -961,7 +950,7 @@ where
/// # Note
/// For small files, we advice to read the full content of the in memory and use the
/// [hpx_internal_sort](#hpx_internal_sort) method.
pub fn hpx_external_sort_csv_file<IN: AsRef<Path>, OUT: AsRef<Path>>(
pub fn hpx_external_sort_csv_file<IN: AsRef<Path>, OUT: AsRef<Path>, P: AsRef<Path>>(
input_path: IN,
output_path: OUT,
output_overwrite: bool,
Expand All @@ -970,8 +959,8 @@ pub fn hpx_external_sort_csv_file<IN: AsRef<Path>, OUT: AsRef<Path>>(
has_header: bool,
separator: Option<char>,
depth: u8,
save_countmap_in_file: Option<P>, // Save a copy of the computed count map in the given Path
sort_params: Option<SimpleExtSortParams>,
clean: bool,
) -> Result<(), Box<dyn Error>> {
// Declare variables/functions
let separator = separator.unwrap_or(',');
Expand Down Expand Up @@ -1039,7 +1028,9 @@ pub fn hpx_external_sort_csv_file<IN: AsRef<Path>, OUT: AsRef<Path>>(
);
let tstart = SystemTime::now();
sort_params.create_tmp_dir()?;
count_map.to_fits_file(sort_params.create_countmap_file_path())?;
if let Some(countmap_path) = save_countmap_in_file {
count_map.to_fits_file(countmap_path)?;
}
debug!(
"Count map of writen in {} ms.",
SystemTime::now()
Expand All @@ -1065,7 +1056,6 @@ pub fn hpx_external_sort_csv_file<IN: AsRef<Path>, OUT: AsRef<Path>>(
line_res_it.next();
}
// Get the sorted iterator
let tmp_path = sort_params.tmp_dir.clone();
let sorted_it =
hpx_external_sort_with_knowledge(line_res_it, &count_map, hpx29, Some(sort_params))?;

Expand All @@ -1082,11 +1072,7 @@ pub fn hpx_external_sort_csv_file<IN: AsRef<Path>, OUT: AsRef<Path>>(
.unwrap_or_default()
.as_millis()
);
if clean {
SimpleExtSortParams::clean(tmp_path).map_err(|e| e.into())
} else {
Ok(())
}
Ok(())
}

/// Create an HEALPix ordered CSV file containing one row per HEALPix hash values at the given
Expand Down Expand Up @@ -1169,12 +1155,13 @@ mod tests {
false,
Some(','),
depth_sort,
None::<&'static str>, // Some(PathBuf::from("./local_resources/sort_tmp/hpxsort.countmap.fits"))
Some(SimpleExtSortParams {
tmp_dir: PathBuf::from("./local_resources/sort_tmp/"),
n_elems_per_chunk: 1000, // 1_000_000
n_threads: Some(4),
clean: true,
}),
true,
)
.unwrap();
let out = Command::new("bash")
Expand Down Expand Up @@ -1219,12 +1206,13 @@ mod tests {
false,
Some(','),
depth_sort,
None::<&'static str>,
Some(SimpleExtSortParams {
tmp_dir: PathBuf::from("/data/pineau/sandbox/sort_tmp/"),
n_elems_per_chunk: 10_000_000,
n_threads: Some(4),
clean: false
}),
false,
)
.unwrap();
let out = Command::new("bash")
Expand Down Expand Up @@ -1269,12 +1257,13 @@ mod tests {
false,
Some(','),
depth_sort,
None::<&'static str>,
Some(SimpleExtSortParams {
tmp_dir: PathBuf::from("./local_resources/sort_tmp/"),
n_elems_per_chunk: 5_000_000,
n_threads: Some(4),
clean: false
}),
false,
)
.unwrap();
let out = Command::new("bash")
Expand Down

0 comments on commit 4dbd30c

Please sign in to comment.