Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline #17

Merged
merged 2 commits into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions README.org
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,21 @@ then you can build and run the project with the following command.
This section provides a high-level overview of the system's architecture: the main components and their interactions.

*** Main Components
- =main.rs= :: The entry point of the application. Handles command-line arguments and initiates the processing workflow.
- =main.rs= :: The entry point of the application. Handles command-line arguments.
- =pipeline.rs= :: Orchestrating the processing workflow by calling other modules.
- =pre_processing.rs= :: Manages the initial parsing and preparation of data, utilizing memory-mapped files for efficient access.
- =compute.rs= :: Contains the core logic for processing the temperature data, including calculations for min, mean, and max temperatures.
- =weather.rs= :: Defines data structures and utility functions that are used throughout the application.
- =aggregate.rs= :: Responsible for aggregating the results of the temperature data processing.
- =weather.rs= :: Defines data structures and utility functions that are used throughout the application.

*** Workflow
1. *Initialization*: The application starts in =main.rs=, where it parses command-line arguments to get the path of the input file.
2. *Data Loading*: =pre_processing.rs= handles the loading of the input data file using memory-mapped files to efficiently manage large data volumes.
3. *Data Processing*: =compute.rs= processes the loaded data, calculating the required statistics (min, mean, max) for each weather station.
4. *Aggregation*: =aggregate.rs= aggregates the computed results for final output.
5. *Output*: Results are then output in the format specified by the challenge requirements.
2. *Orchestration*: =pipeline.rs= sets up the workflow by calling other modules.
3. *Data Loading*: =pre_processing.rs= handles the loading of the input data file using memory-mapped files to efficiently manage large data volumes.
4. *Data Processing*: =compute.rs= processes the loaded data, calculating the required statistics (min, mean, max) for each weather station.
5. *Aggregation*: =aggregate.rs= aggregates the computed results for final output.
6. *Output*: Results are then output in the format specified by the challenge requirements.

*** Boundaries
- Module Boundaries: Clear separation between data loading (=pre_processing.rs=), data processing (=compute.rs=), aggregation (=aggregate.rs=), and utility functions (=weather.rs=).
- Separating I/O: The application logic off-loads to business logic functions to keep them I/O independent. This separation ensures that the core algorithms remain focused on computation without being coupled to input/output operations enhancing testability.
- Module Boundaries: Clear separation between orchestration (=pipeline.rs=), data loading (=pre_processing.rs=), data processing (=compute.rs=), aggregation (=aggregate.rs=), and utility functions (=weather.rs=).
- Separating I/O: The application logic is off-loaded to business logic functions within =pipeline.rs=, ensuring that core algorithms remain focused on computation without being coupled to input/output operations.
61 changes: 8 additions & 53 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,25 @@
mod aggregate;
mod compute;
mod pipeline;
mod pre_processing;
mod weather;

use memmap2::MmapOptions;
use std::fs::File;
use std::path::PathBuf;
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;

fn main() {
fn main() -> Result<(), std::io::Error> {
let args = std::env::args().collect::<Vec<_>>();
if args.len() < 2 {
eprintln!("Usage: {} </path/to/measurements.txt>", args[0]);
eprintln!("Usage: {} </path/to/measurements.txt>", &args[0]);
std::process::exit(1);
}
let path = PathBuf::from(&args[1]);
match run(path) {
Ok(res) => print_results(&res),
Err(RunErr::IO(e)) => eprintln!("{}", e),
}
}

fn run(path: PathBuf) -> Result<Vec<weather::Station>, RunErr> {
let file = File::open(path).unwrap();
let mmap = Arc::new(unsafe { MmapOptions::new().map(&file).map_err(RunErr::IO)? });
let (tx, rx) = mpsc::channel();
pre_processing::Partition::try_from(&*mmap as &[u8])
.map_err(RunErr::IO)?
.chunks
.into_iter()
.for_each(|chunk| {
let tx = tx.clone();
let mmap = Arc::clone(&mmap);
thread::spawn(move || compute::stats(&mmap[chunk], tx));
});
drop(tx);
Ok(aggregate::reduce(rx))
}

#[derive(Debug)]
enum RunErr {
IO(std::io::Error),
let file = File::open(PathBuf::from(&args[1]))?;
let res = pipeline::run(&file)?;
print_formatted(&res);
Ok(())
}

fn print_results(v: &[weather::Station]) {
fn print_formatted(v: &[weather::Station]) {
print!("{{");
for (i, record) in v.iter().enumerate() {
if i < v.len() - 1 {
Expand All @@ -56,22 +30,3 @@ fn print_results(v: &[weather::Station]) {
}
println!("}}")
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn integration() {
let path = PathBuf::from("./data/measurements-test.txt");
let actual = run(path).unwrap();
let expected = vec![
weather::Station::new("London", 85, 95, 180, 2),
weather::Station::new("New York", 35, 150, 185, 2),
weather::Station::new("Oslo", -100, 102, 2, 2),
weather::Station::new("Paris", 130, 130, 130, 1),
weather::Station::new("Stockholm", -5, 200, 210, 3),
];
assert_eq!(actual, expected);
}
}
48 changes: 48 additions & 0 deletions src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use crate::aggregate;
use crate::compute;
use crate::pre_processing;
use crate::weather;
use memmap2::Mmap;
use std::fs::File;
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;

type Result = std::result::Result<Vec<weather::Station>, std::io::Error>;

pub fn run(file: &File) -> Result {
let mmap = Arc::new(unsafe { Mmap::map(file)? });
let (tx, rx) = mpsc::channel();
pre_processing::Partition::try_from(&*mmap as &[u8])?
.chunks
.into_iter()
.for_each(|chunk| {
let tx = tx.clone();
let mmap = Arc::clone(&mmap);
thread::spawn(move || compute::stats(&mmap[chunk], tx));
});
drop(tx);
Ok(aggregate::reduce(rx))
}

#[cfg(test)]
mod test {
use super::*;
use std::fs::File;
use std::path::PathBuf;

#[test]
fn test_run() {
let file = File::open(PathBuf::from("./data/measurements-test.txt"))
.expect("Test file {path} not found");
let actual = run(&file).unwrap();
let expected = vec![
weather::Station::new("London", 85, 95, 180, 2),
weather::Station::new("New York", 35, 150, 185, 2),
weather::Station::new("Oslo", -100, 102, 2, 2),
weather::Station::new("Paris", 130, 130, 130, 1),
weather::Station::new("Stockholm", -5, 200, 210, 3),
];
assert_eq!(actual, expected);
}
}