From 244006cc96a0778578129f3765eb31f443edd124 Mon Sep 17 00:00:00 2001 From: Hesam Pakdaman <14890379+hesampakdaman@users.noreply.github.com> Date: Sun, 19 May 2024 14:15:39 +0200 Subject: [PATCH] Pipeline (#17) --- README.org | 18 ++++++++------- src/main.rs | 61 +++++++------------------------------------------ src/pipeline.rs | 48 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 61 deletions(-) create mode 100644 src/pipeline.rs diff --git a/README.org b/README.org index a1122d6..2d30057 100644 --- a/README.org +++ b/README.org @@ -29,19 +29,21 @@ then you can build and run the project with the following command. This section provides a high-level overview of the system's architecture: the main components and their interactions. *** Main Components -- =main.rs= :: The entry point of the application. Handles command-line arguments and initiates the processing workflow. +- =main.rs= :: The entry point of the application. Handles command-line arguments. +- =pipeline.rs= :: Orchestrating the processing workflow by calling other modules. - =pre_processing.rs= :: Manages the initial parsing and preparation of data, utilizing memory-mapped files for efficient access. - =compute.rs= :: Contains the core logic for processing the temperature data, including calculations for min, mean, and max temperatures. -- =weather.rs= :: Defines data structures and utility functions that are used throughout the application. - =aggregate.rs= :: Responsible for aggregating the results of the temperature data processing. +- =weather.rs= :: Defines data structures and utility functions that are used throughout the application. *** Workflow 1. *Initialization*: The application starts in =main.rs=, where it parses command-line arguments to get the path of the input file. -2. *Data Loading*: =pre_processing.rs= handles the loading of the input data file using memory-mapped files to efficiently manage large data volumes. -3. *Data Processing*: =compute.rs= processes the loaded data, calculating the required statistics (min, mean, max) for each weather station. -4. *Aggregation*: =aggregate.rs= aggregates the computed results for final output. -5. *Output*: Results are then output in the format specified by the challenge requirements. +2. *Orchestration*: =pipeline.rs= sets up the workflow by calling other modules. +3. *Data Loading*: =pre_processing.rs= handles the loading of the input data file using memory-mapped files to efficiently manage large data volumes. +4. *Data Processing*: =compute.rs= processes the loaded data, calculating the required statistics (min, mean, max) for each weather station. +5. *Aggregation*: =aggregate.rs= aggregates the computed results for final output. +6. *Output*: Results are then output in the format specified by the challenge requirements. *** Boundaries -- Module Boundaries: Clear separation between data loading (=pre_processing.rs=), data processing (=compute.rs=), aggregation (=aggregate.rs=), and utility functions (=weather.rs=). -- Separating I/O: The application logic off-loads to business logic functions to keep them I/O independent. This separation ensures that the core algorithms remain focused on computation without being coupled to input/output operations enhancing testability. +- Module Boundaries: Clear separation between orchestration (=pipeline.rs=), data loading (=pre_processing.rs=), data processing (=compute.rs=), aggregation (=aggregate.rs=), and utility functions (=weather.rs=). +- Separating I/O: The application logic is off-loaded to business logic functions within =pipeline.rs=, ensuring that core algorithms remain focused on computation without being coupled to input/output operations. diff --git a/src/main.rs b/src/main.rs index 67a3532..64f560f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,51 +1,25 @@ mod aggregate; mod compute; +mod pipeline; mod pre_processing; mod weather; -use memmap2::MmapOptions; use std::fs::File; use std::path::PathBuf; -use std::sync::mpsc; -use std::sync::Arc; -use std::thread; -fn main() { +fn main() -> Result<(), std::io::Error> { let args = std::env::args().collect::>(); if args.len() < 2 { - eprintln!("Usage: {} ", args[0]); + eprintln!("Usage: {} ", &args[0]); std::process::exit(1); } - let path = PathBuf::from(&args[1]); - match run(path) { - Ok(res) => print_results(&res), - Err(RunErr::IO(e)) => eprintln!("{}", e), - } -} - -fn run(path: PathBuf) -> Result, RunErr> { - let file = File::open(path).unwrap(); - let mmap = Arc::new(unsafe { MmapOptions::new().map(&file).map_err(RunErr::IO)? }); - let (tx, rx) = mpsc::channel(); - pre_processing::Partition::try_from(&*mmap as &[u8]) - .map_err(RunErr::IO)? - .chunks - .into_iter() - .for_each(|chunk| { - let tx = tx.clone(); - let mmap = Arc::clone(&mmap); - thread::spawn(move || compute::stats(&mmap[chunk], tx)); - }); - drop(tx); - Ok(aggregate::reduce(rx)) -} - -#[derive(Debug)] -enum RunErr { - IO(std::io::Error), + let file = File::open(PathBuf::from(&args[1]))?; + let res = pipeline::run(&file)?; + print_formatted(&res); + Ok(()) } -fn print_results(v: &[weather::Station]) { +fn print_formatted(v: &[weather::Station]) { print!("{{"); for (i, record) in v.iter().enumerate() { if i < v.len() - 1 { @@ -56,22 +30,3 @@ fn print_results(v: &[weather::Station]) { } println!("}}") } - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn integration() { - let path = PathBuf::from("./data/measurements-test.txt"); - let actual = run(path).unwrap(); - let expected = vec![ - weather::Station::new("London", 85, 95, 180, 2), - weather::Station::new("New York", 35, 150, 185, 2), - weather::Station::new("Oslo", -100, 102, 2, 2), - weather::Station::new("Paris", 130, 130, 130, 1), - weather::Station::new("Stockholm", -5, 200, 210, 3), - ]; - assert_eq!(actual, expected); - } -} diff --git a/src/pipeline.rs b/src/pipeline.rs new file mode 100644 index 0000000..ff210c8 --- /dev/null +++ b/src/pipeline.rs @@ -0,0 +1,48 @@ +use crate::aggregate; +use crate::compute; +use crate::pre_processing; +use crate::weather; +use memmap2::Mmap; +use std::fs::File; +use std::sync::mpsc; +use std::sync::Arc; +use std::thread; + +type Result = std::result::Result, std::io::Error>; + +pub fn run(file: &File) -> Result { + let mmap = Arc::new(unsafe { Mmap::map(file)? }); + let (tx, rx) = mpsc::channel(); + pre_processing::Partition::try_from(&*mmap as &[u8])? + .chunks + .into_iter() + .for_each(|chunk| { + let tx = tx.clone(); + let mmap = Arc::clone(&mmap); + thread::spawn(move || compute::stats(&mmap[chunk], tx)); + }); + drop(tx); + Ok(aggregate::reduce(rx)) +} + +#[cfg(test)] +mod test { + use super::*; + use std::fs::File; + use std::path::PathBuf; + + #[test] + fn test_run() { + let file = File::open(PathBuf::from("./data/measurements-test.txt")) + .expect("Test file {path} not found"); + let actual = run(&file).unwrap(); + let expected = vec![ + weather::Station::new("London", 85, 95, 180, 2), + weather::Station::new("New York", 35, 150, 185, 2), + weather::Station::new("Oslo", -100, 102, 2, 2), + weather::Station::new("Paris", 130, 130, 130, 1), + weather::Station::new("Stockholm", -5, 200, 210, 3), + ]; + assert_eq!(actual, expected); + } +}