Skip to content

Commit

Permalink
feat(core/context): add Context::unbounded_send(_to)
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Jul 11, 2024
1 parent 1e710b3 commit aa2d483
Show file tree
Hide file tree
Showing 15 changed files with 437 additions and 128 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Specify MSRV as 1.76.
- logger: log truncation up to the `max_line_size` configuration parameter ([#128]).
- core: directly accept never returning functions in `ActorGroup::exec()` ([#127]).
- core/context: add `Context::unbounded_send(_to)` methods.
- errors: add `From<SendError> for TrySendError` and `SendError::{into_inner,map}` methods.

### Changed
- **BREAKING** macros: remove the `network` feature ([#127]).
Expand Down
46 changes: 26 additions & 20 deletions elfo-core/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,36 +223,40 @@ impl Actor {
self.send_status_to_subscribers(&self.control.read());
}

pub(crate) async fn send(&self, envelope: Envelope) -> Result<(), SendError<Envelope>> {
match self.handle_system(envelope) {
Some(envelope) => self.mailbox.send(envelope).await,
None => Ok(()),
}
}

pub(crate) fn try_send(&self, envelope: Envelope) -> Result<(), TrySendError<Envelope>> {
msg!(match &envelope {
Terminate { closing } => {
if *closing || self.termination_policy.close_mailbox {
if self.close() {
return Ok(());
} else {
return Err(TrySendError::Closed(envelope));
}
}
}
});
match self.handle_system(envelope) {
Some(envelope) => self.mailbox.try_send(envelope),
None => Ok(()),
}
}

self.mailbox.try_send(envelope)
pub(crate) fn unbounded_send(&self, envelope: Envelope) -> Result<(), SendError<Envelope>> {
match self.handle_system(envelope) {
Some(envelope) => self.mailbox.unbounded_send(envelope),
None => Ok(()),
}
}

pub(crate) async fn send(&self, envelope: Envelope) -> Result<(), SendError<Envelope>> {
#[inline(always)]
fn handle_system(&self, envelope: Envelope) -> Option<Envelope> {
msg!(match &envelope {
Terminate { closing } => {
if *closing || self.termination_policy.close_mailbox {
if self.close() {
return Ok(());
} else {
return Err(SendError(envelope));
}
if (*closing || self.termination_policy.close_mailbox) && self.close() {
// First closing `Terminate` is considered successful.
return None;
}
}
});

self.mailbox.send(envelope).await
// If the mailbox is closed, all following `*_send()` returns an error.
Some(envelope)
}

pub(crate) async fn recv(&self) -> RecvResult {
Expand Down Expand Up @@ -311,6 +315,8 @@ impl Actor {
// or use another actor to listen all statuses for this.
}

#[cold]
#[inline(never)]
pub(crate) fn close(&self) -> bool {
self.mailbox.close(scope::trace_id())
}
Expand Down
Loading

0 comments on commit aa2d483

Please sign in to comment.