Skip to content

Commit

Permalink
switch to crossbeam::scope, create task-runner scope
Browse files Browse the repository at this point in the history
  • Loading branch information
syphar committed Mar 5, 2023
1 parent cc239ed commit d6fb65d
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 23 deletions.
89 changes: 89 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ atty = "0.2.0"
camino = "1.0.4"
clap = { version = "2.33.0", features = ["wrap_help"] }
ctrlc = { version = "3.1.1", features = ["termination"] }
crossbeam = "0.8.2"
derivative = "2.0.0"
dotenvy = "0.15"
edit-distance = "2.0.0"
Expand Down
32 changes: 9 additions & 23 deletions src/justfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,17 +255,13 @@ impl<'src> Justfile<'src> {
};

// let mut ran = BTreeSet::new();
std::thread::scope(|scope| -> RunResult<'src, ()> {
let mut threads = Vec::new();
parallel::task_scope(|scope| {
for (recipe, arguments) in grouped {
threads.push(scope.spawn(|| {
scope.spawn(|| {
Self::run_recipe(
&context, recipe, arguments, &dotenv, search, /*&mut ran*/
)
}));
}
for thread in threads {
thread.join().unwrap()?;
});
}
Ok(())
})?;
Expand Down Expand Up @@ -317,15 +313,14 @@ impl<'src> Justfile<'src> {
let mut evaluator =
Evaluator::recipe_evaluator(context.config, dotenv, &scope, context.settings, search);

std::thread::scope(|scope| -> RunResult<'src, ()> {
let mut threads = Vec::new();
parallel::task_scope(|scope| {
for Dependency { recipe, arguments } in recipe.dependencies.iter().take(recipe.priors) {
let arguments = arguments
.iter()
.map(|argument| evaluator.evaluate_expression(argument))
.collect::<RunResult<Vec<String>>>()?;

threads.push(scope.spawn(move || {
scope.spawn(move || {
Self::run_recipe(
context,
recipe,
Expand All @@ -334,11 +329,7 @@ impl<'src> Justfile<'src> {
search,
// ran,
)
}));
}

for thread in threads {
thread.join().unwrap()?;
});
}
Ok(())
})?;
Expand All @@ -348,8 +339,7 @@ impl<'src> Justfile<'src> {
{
// let mut ran = BTreeSet::new();

std::thread::scope(|scope| -> RunResult<'src, ()> {
let mut threads = Vec::new();
parallel::task_scope(|scope| {
for Dependency { recipe, arguments } in recipe.dependencies.iter().skip(recipe.priors) {
let mut evaluated = Vec::new();

Expand All @@ -361,7 +351,7 @@ impl<'src> Justfile<'src> {
);
}

threads.push(scope.spawn(move || {
scope.spawn(move || {
Self::run_recipe(
context,
recipe,
Expand All @@ -370,11 +360,7 @@ impl<'src> Justfile<'src> {
search,
// &mut ran,
)
}));
}

for thread in threads {
thread.join().unwrap()?;
});
}
Ok(())
})?;
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ mod name;
mod ordinal;
mod output;
mod output_error;
mod parallel;
mod parameter;
mod parameter_kind;
mod parser;
Expand Down
46 changes: 46 additions & 0 deletions src/parallel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::RunResult;
use crossbeam::thread;

type ScopeResult<'src> = RunResult<'src, ()>;

pub(crate) struct TaskScope<'env, 'src, 'inner_scope> {
inner: &'inner_scope thread::Scope<'env>,
join_handles: Vec<thread::ScopedJoinHandle<'inner_scope, ScopeResult<'src>>>,
}

impl<'env, 'src, 'inner_scope> TaskScope<'env, 'src, 'inner_scope> {
pub(crate) fn spawn<'scope, F>(&'scope mut self, f: F)
where
'src: 'env,
F: FnOnce() -> ScopeResult<'src>,
F: Send + 'env,
{
self.join_handles.push(self.inner.spawn(|_scope| f()));
}
}

/// task runner scope, based on `crossbeam::thread::scope`.
///
/// The `scope` object can be used to `.spawn` new tasks to be
/// run. The first error will be returned as result of this `task_scope`.
///
/// Only works for tasks with an `RunResult<'src, ()>` result type.
pub(crate) fn task_scope<'env, 'src, F>(f: F) -> ScopeResult<'src>
where
F: for<'inner_scope> FnOnce(&mut TaskScope<'env, 'src, 'inner_scope>) -> ScopeResult<'src>,
{
thread::scope(|scope| {
let mut task_scope = TaskScope {
inner: scope,
join_handles: Vec::new(),
};

f(&mut task_scope)?;

for handle in task_scope.join_handles {
handle.join().expect("could not join thread")?;
}
Ok(())
})
.expect("could not join thread")
}

0 comments on commit d6fb65d

Please sign in to comment.