Skip to content

Commit

Permalink
Pipeline (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
hesampakdaman authored May 19, 2024
1 parent 23fb4cc commit 244006c
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 61 deletions.
18 changes: 10 additions & 8 deletions README.org
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,21 @@ then you can build and run the project with the following command.
This section provides a high-level overview of the system's architecture: the main components and their interactions.

*** Main Components
- =main.rs= :: The entry point of the application. Handles command-line arguments and initiates the processing workflow.
- =main.rs= :: The entry point of the application. Handles command-line arguments.
- =pipeline.rs= :: Orchestrating the processing workflow by calling other modules.
- =pre_processing.rs= :: Manages the initial parsing and preparation of data, utilizing memory-mapped files for efficient access.
- =compute.rs= :: Contains the core logic for processing the temperature data, including calculations for min, mean, and max temperatures.
- =weather.rs= :: Defines data structures and utility functions that are used throughout the application.
- =aggregate.rs= :: Responsible for aggregating the results of the temperature data processing.
- =weather.rs= :: Defines data structures and utility functions that are used throughout the application.

*** Workflow
1. *Initialization*: The application starts in =main.rs=, where it parses command-line arguments to get the path of the input file.
2. *Data Loading*: =pre_processing.rs= handles the loading of the input data file using memory-mapped files to efficiently manage large data volumes.
3. *Data Processing*: =compute.rs= processes the loaded data, calculating the required statistics (min, mean, max) for each weather station.
4. *Aggregation*: =aggregate.rs= aggregates the computed results for final output.
5. *Output*: Results are then output in the format specified by the challenge requirements.
2. *Orchestration*: =pipeline.rs= sets up the workflow by calling other modules.
3. *Data Loading*: =pre_processing.rs= handles the loading of the input data file using memory-mapped files to efficiently manage large data volumes.
4. *Data Processing*: =compute.rs= processes the loaded data, calculating the required statistics (min, mean, max) for each weather station.
5. *Aggregation*: =aggregate.rs= aggregates the computed results for final output.
6. *Output*: Results are then output in the format specified by the challenge requirements.

*** Boundaries
- Module Boundaries: Clear separation between data loading (=pre_processing.rs=), data processing (=compute.rs=), aggregation (=aggregate.rs=), and utility functions (=weather.rs=).
- Separating I/O: The application logic off-loads to business logic functions to keep them I/O independent. This separation ensures that the core algorithms remain focused on computation without being coupled to input/output operations enhancing testability.
- Module Boundaries: Clear separation between orchestration (=pipeline.rs=), data loading (=pre_processing.rs=), data processing (=compute.rs=), aggregation (=aggregate.rs=), and utility functions (=weather.rs=).
- Separating I/O: The application logic is off-loaded to business logic functions within =pipeline.rs=, ensuring that core algorithms remain focused on computation without being coupled to input/output operations.
61 changes: 8 additions & 53 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,25 @@
mod aggregate;
mod compute;
mod pipeline;
mod pre_processing;
mod weather;

use memmap2::MmapOptions;
use std::fs::File;
use std::path::PathBuf;
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;

fn main() {
fn main() -> Result<(), std::io::Error> {
let args = std::env::args().collect::<Vec<_>>();
if args.len() < 2 {
eprintln!("Usage: {} </path/to/measurements.txt>", args[0]);
eprintln!("Usage: {} </path/to/measurements.txt>", &args[0]);
std::process::exit(1);
}
let path = PathBuf::from(&args[1]);
match run(path) {
Ok(res) => print_results(&res),
Err(RunErr::IO(e)) => eprintln!("{}", e),
}
}

fn run(path: PathBuf) -> Result<Vec<weather::Station>, RunErr> {
let file = File::open(path).unwrap();
let mmap = Arc::new(unsafe { MmapOptions::new().map(&file).map_err(RunErr::IO)? });
let (tx, rx) = mpsc::channel();
pre_processing::Partition::try_from(&*mmap as &[u8])
.map_err(RunErr::IO)?
.chunks
.into_iter()
.for_each(|chunk| {
let tx = tx.clone();
let mmap = Arc::clone(&mmap);
thread::spawn(move || compute::stats(&mmap[chunk], tx));
});
drop(tx);
Ok(aggregate::reduce(rx))
}

#[derive(Debug)]
enum RunErr {
IO(std::io::Error),
let file = File::open(PathBuf::from(&args[1]))?;
let res = pipeline::run(&file)?;
print_formatted(&res);
Ok(())
}

fn print_results(v: &[weather::Station]) {
fn print_formatted(v: &[weather::Station]) {
print!("{{");
for (i, record) in v.iter().enumerate() {
if i < v.len() - 1 {
Expand All @@ -56,22 +30,3 @@ fn print_results(v: &[weather::Station]) {
}
println!("}}")
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn integration() {
let path = PathBuf::from("./data/measurements-test.txt");
let actual = run(path).unwrap();
let expected = vec![
weather::Station::new("London", 85, 95, 180, 2),
weather::Station::new("New York", 35, 150, 185, 2),
weather::Station::new("Oslo", -100, 102, 2, 2),
weather::Station::new("Paris", 130, 130, 130, 1),
weather::Station::new("Stockholm", -5, 200, 210, 3),
];
assert_eq!(actual, expected);
}
}
48 changes: 48 additions & 0 deletions src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use crate::aggregate;
use crate::compute;
use crate::pre_processing;
use crate::weather;
use memmap2::Mmap;
use std::fs::File;
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;

type Result = std::result::Result<Vec<weather::Station>, std::io::Error>;

pub fn run(file: &File) -> Result {
let mmap = Arc::new(unsafe { Mmap::map(file)? });
let (tx, rx) = mpsc::channel();
pre_processing::Partition::try_from(&*mmap as &[u8])?
.chunks
.into_iter()
.for_each(|chunk| {
let tx = tx.clone();
let mmap = Arc::clone(&mmap);
thread::spawn(move || compute::stats(&mmap[chunk], tx));
});
drop(tx);
Ok(aggregate::reduce(rx))
}

#[cfg(test)]
mod test {
use super::*;
use std::fs::File;
use std::path::PathBuf;

#[test]
fn test_run() {
let file = File::open(PathBuf::from("./data/measurements-test.txt"))
.expect("Test file {path} not found");
let actual = run(&file).unwrap();
let expected = vec![
weather::Station::new("London", 85, 95, 180, 2),
weather::Station::new("New York", 35, 150, 185, 2),
weather::Station::new("Oslo", -100, 102, 2, 2),
weather::Station::new("Paris", 130, 130, 130, 1),
weather::Station::new("Stockholm", -5, 200, 210, 3),
];
assert_eq!(actual, expected);
}
}

0 comments on commit 244006c

Please sign in to comment.