Skip to content

Commit

Permalink
Add select_biased! macro
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Nov 25, 2023
1 parent 18afbb6 commit d37a73a
Show file tree
Hide file tree
Showing 2 changed files with 997 additions and 17 deletions.
39 changes: 24 additions & 15 deletions crossbeam-channel/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ enum Timeout {
fn run_select(
handles: &mut [(&dyn SelectHandle, usize, *const u8)],
timeout: Timeout,
is_biased: bool,
) -> Option<(Token, usize, *const u8)> {
if handles.is_empty() {
// Wait until the timeout and return.
Expand All @@ -192,8 +193,10 @@ fn run_select(
}
}

// Shuffle the operations for fairness.
utils::shuffle(handles);
if !is_biased {
// Shuffle the operations for fairness.
utils::shuffle(handles);
}

// Create a token, which serves as a temporary variable that gets initialized in this function
// and is later used by a call to `channel::read()` or `channel::write()` that completes the
Expand Down Expand Up @@ -324,6 +327,7 @@ fn run_select(
fn run_ready(
handles: &mut [(&dyn SelectHandle, usize, *const u8)],
timeout: Timeout,
is_biased: bool,
) -> Option<usize> {
if handles.is_empty() {
// Wait until the timeout and return.
Expand All @@ -340,8 +344,10 @@ fn run_ready(
}
}

// Shuffle the operations for fairness.
utils::shuffle(handles);
if !is_biased {
// Shuffle the operations for fairness.
utils::shuffle(handles);
}

loop {
let backoff = Backoff::new();
Expand Down Expand Up @@ -450,7 +456,7 @@ fn run_ready(
pub fn try_select<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
) -> Result<SelectedOperation<'a>, TrySelectError> {
match run_select(handles, Timeout::Now) {
match run_select(handles, Timeout::Now, false) {
None => Err(TrySelectError),
Some((token, index, ptr)) => Ok(SelectedOperation {
token,
Expand All @@ -466,12 +472,13 @@ pub fn try_select<'a>(
#[inline]
pub fn select<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
is_biased: bool,
) -> SelectedOperation<'a> {
if handles.is_empty() {
panic!("no operations have been added to `Select`");
}

let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
let (token, index, ptr) = run_select(handles, Timeout::Never, is_biased).unwrap();
SelectedOperation {
token,
index,
Expand All @@ -486,10 +493,11 @@ pub fn select<'a>(
pub fn select_timeout<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
timeout: Duration,
is_biased: bool,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
match Instant::now().checked_add(timeout) {
Some(deadline) => select_deadline(handles, deadline),
None => Ok(select(handles)),
Some(deadline) => select_deadline(handles, deadline, is_biased),
None => Ok(select(handles, is_biased)),
}
}

Expand All @@ -498,8 +506,9 @@ pub fn select_timeout<'a>(
pub(crate) fn select_deadline<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
deadline: Instant,
is_biased: bool,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
match run_select(handles, Timeout::At(deadline)) {
match run_select(handles, Timeout::At(deadline), is_biased) {
None => Err(SelectTimeoutError),
Some((token, index, ptr)) => Ok(SelectedOperation {
token,
Expand Down Expand Up @@ -814,7 +823,7 @@ impl<'a> Select<'a> {
/// # t2.join().unwrap(); // join thread to avoid https://github.com/rust-lang/miri/issues/1371
/// ```
pub fn select(&mut self) -> SelectedOperation<'a> {
select(&mut self.handles)
select(&mut self.handles, false)
}

/// Blocks for a limited time until one of the operations becomes ready and selects it.
Expand Down Expand Up @@ -868,7 +877,7 @@ impl<'a> Select<'a> {
&mut self,
timeout: Duration,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
select_timeout(&mut self.handles, timeout)
select_timeout(&mut self.handles, timeout, false)
}

/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
Expand Down Expand Up @@ -924,7 +933,7 @@ impl<'a> Select<'a> {
&mut self,
deadline: Instant,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
select_deadline(&mut self.handles, deadline)
select_deadline(&mut self.handles, deadline, false)
}

/// Attempts to find a ready operation without blocking.
Expand Down Expand Up @@ -963,7 +972,7 @@ impl<'a> Select<'a> {
/// }
/// ```
pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
match run_ready(&mut self.handles, Timeout::Now) {
match run_ready(&mut self.handles, Timeout::Now, false) {
None => Err(TryReadyError),
Some(index) => Ok(index),
}
Expand Down Expand Up @@ -1020,7 +1029,7 @@ impl<'a> Select<'a> {
panic!("no operations have been added to `Select`");
}

run_ready(&mut self.handles, Timeout::Never).unwrap()
run_ready(&mut self.handles, Timeout::Never, false).unwrap()
}

/// Blocks for a limited time until one of the operations becomes ready.
Expand Down Expand Up @@ -1121,7 +1130,7 @@ impl<'a> Select<'a> {
/// # t2.join().unwrap(); // join thread to avoid https://github.com/rust-lang/miri/issues/1371
/// ```
pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
match run_ready(&mut self.handles, Timeout::At(deadline)) {
match run_ready(&mut self.handles, Timeout::At(deadline), false) {
None => Err(ReadyTimeoutError),
Some(index) => Ok(index),
}
Expand Down
Loading

0 comments on commit d37a73a

Please sign in to comment.