Skip to content

Commit

Permalink
Merge pull request #127 from elfo-rs/feat/intrusive-mailbox
Browse files Browse the repository at this point in the history
feat(core): implement intrusive mailbox
  • Loading branch information
loyd authored Jul 6, 2024
2 parents 372eb34 + 3a790d5 commit 1e710b3
Show file tree
Hide file tree
Showing 38 changed files with 2,064 additions and 819 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,16 @@ jobs:
- run: rustup show active-toolchain -v
- run: cargo test
- run: cargo test --all-features

miri:
needs: build
runs-on: ubuntu-latest
env:
MIRIFLAGS: -Zmiri-strict-provenance
steps:
- uses: actions/checkout@v4
- run: rustup toolchain install nightly --component miri
- run: rustup override set nightly
- run: rustup show active-toolchain -v
- run: cargo miri setup
- run: cargo miri test -p elfo-core --all-features -- _miri
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Specify MSRV as 1.76.
- logger: log truncation up to the `max_line_size` configuration parameter ([#128]).
- core: directly accept never returning functions in `ActorGroup::exec()` ([#127]).

### Changed
- **BREAKING** macros: remove the `network` feature ([#127]).
- **BREAKING** core/message: remove `AnyMessage::upcast()` in favor of `AnyMessage::new()` ([#127]).
- **BREAKING** core/envelope: `Envelope::message()` returns `AnyMessageRef` ([#127]).
- core/mailbox: move to an intrusive MPSC queue greatly improving performance ([#52]).
- core/message: allow `AnyMessage` to be downcasted to `AnyMessage` ([#127]).
- core/message: stabilize `AnyMessage` and `AnyMessageRef` ([#127]).
- core/message: faster `AnyMessage` serialization ([#127]).
- tracing: improve performance of `TraceId::generate()`.
- dumping: remove unstable `Timestamp`.

### Fixed
- core/request: avoid all-request to multiple groups hanging in some cases ([#127]).

[#52]: https://github.com/elfo-rs/elfo/issues/52
[#127]: https://github.com/elfo-rs/elfo/pull/127
[#128]: https://github.com/elfo-rs/elfo/pull/128

## [0.2.0-alpha.15] - 2024-05-13
Expand Down
3 changes: 2 additions & 1 deletion elfo-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ workspace = true

[features]
test-util = ["tokio/test-util"]
network = ["rmp-serde", "elfo-macros/network"]
network = ["rmp-serde"]
unstable = []
unstable-stuck-detection = ["dep:thread_local"]

Expand All @@ -29,6 +29,7 @@ metrics.workspace = true
tokio = { version = "1.16", features = ["rt", "sync", "time", "signal", "macros"] }
idr-ebr = "0.2"
futures-intrusive = "0.5"
cordyceps = "0.3.2"
parking_lot = "0.12"
smallbox = "0.8.0"
# TODO: avoid the `rc` feature here?
Expand Down
61 changes: 24 additions & 37 deletions elfo-core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
coop,
demux::Demux,
dumping::{Direction, Dump, Dumper, INTERNAL_CLASS},
envelope::{AnyMessageBorrowed, AnyMessageOwned, Envelope, EnvelopeOwned, MessageKind},
envelope::{Envelope, MessageKind},
errors::{RequestError, SendError, TryRecvError, TrySendError},
mailbox::RecvResult,
message::{Message, Request},
Expand Down Expand Up @@ -179,14 +179,14 @@ impl<C, K> Context<C, K> {
sender: self.actor_addr,
};

self.stats.on_sent_message(&message);
self.stats.on_sent_message(&message); // TODO: only if successful?

trace!("> {:?}", message);
if let Some(permit) = DUMPER.acquire_m(&message) {
permit.record(Dump::message(&message, &kind, Direction::Out));
}

let envelope = Envelope::new(message, kind).upcast();
let envelope = Envelope::new(message, kind);
let addrs = self.demux.filter(&envelope);

if addrs.is_empty() {
Expand Down Expand Up @@ -214,15 +214,14 @@ impl<C, K> Context<C, K> {
Ok(()) => success = true,
Err(err) => {
has_full |= err.is_full();
forget_and_replace(&mut unused, Some(err.into_inner()));
unused = Some(err.into_inner());
}
},
None => forget_and_replace(&mut unused, Some(envelope)),
None => unused = Some(envelope),
};
}

if success {
forget_and_replace(&mut unused, None);
Ok(())
} else if has_full {
Err(TrySendError::Full(e2m(unused.unwrap())))
Expand Down Expand Up @@ -270,14 +269,14 @@ impl<C, K> Context<C, K> {
}

async fn do_send<M: Message>(&self, message: M, kind: MessageKind) -> Result<(), SendError<M>> {
self.stats.on_sent_message(&message);
self.stats.on_sent_message(&message); // TODO: only if successful?

trace!("> {:?}", message);
if let Some(permit) = DUMPER.acquire_m(&message) {
permit.record(Dump::message(&message, &kind, Direction::Out));
}

let envelope = Envelope::new(message, kind).upcast();
let envelope = Envelope::new(message, kind);
let addrs = self.demux.filter(&envelope);

if addrs.is_empty() {
Expand Down Expand Up @@ -305,7 +304,7 @@ impl<C, K> Context<C, K> {
let guard = EbrGuard::new();
let entry = self.book.get(recipient, &guard);
let object = ward!(entry, {
forget_and_replace(&mut unused, Some(envelope));
unused = Some(envelope);
continue;
});
Object::send(object, Addr::NULL, envelope)
Expand All @@ -314,14 +313,13 @@ impl<C, K> Context<C, K> {
.err()
.map(|err| err.0);

forget_and_replace(&mut unused, returned_envelope);
unused = returned_envelope;
if unused.is_none() {
success = true;
}
}

if success {
forget_and_replace(&mut unused, None);
Ok(())
} else {
Err(SendError(e2m(unused.unwrap())))
Expand Down Expand Up @@ -362,7 +360,7 @@ impl<C, K> Context<C, K> {
message: M,
kind: MessageKind,
) -> Result<(), SendError<M>> {
self.stats.on_sent_message(&message);
self.stats.on_sent_message(&message); // TODO: only if successful?

trace!(to = %recipient, "> {:?}", message);
if let Some(permit) = DUMPER.acquire_m(&message) {
Expand All @@ -374,7 +372,7 @@ impl<C, K> Context<C, K> {
let entry = self.book.get(recipient, &guard);
let object = ward!(entry, return Err(SendError(message)));
let envelope = Envelope::new(message, kind);
Object::send(object, recipient, envelope.upcast())
Object::send(object, recipient, envelope)
}
.await
.map_err(|err| SendError(e2m(err.0)))
Expand Down Expand Up @@ -402,7 +400,7 @@ impl<C, K> Context<C, K> {
recipient: Addr,
message: M,
) -> Result<(), TrySendError<M>> {
self.stats.on_sent_message(&message);
self.stats.on_sent_message(&message); // TODO: only if successful?

let kind = MessageKind::Regular {
sender: self.actor_addr,
Expand All @@ -419,7 +417,7 @@ impl<C, K> Context<C, K> {
let envelope = Envelope::new(message, kind);

object
.try_send(recipient, envelope.upcast())
.try_send(recipient, envelope)
.map_err(|err| err.map(e2m))
}

Expand All @@ -442,7 +440,7 @@ impl<C, K> Context<C, K> {
let token = token.into_untyped();
let recipient = token.sender();
let message = R::Wrapper::from(message);
self.stats.on_sent_message(&message);
self.stats.on_sent_message(&message); // TODO: only if successful?

let kind = MessageKind::Response {
sender: self.addr(),
Expand All @@ -454,7 +452,7 @@ impl<C, K> Context<C, K> {
permit.record(Dump::message(&message, &kind, Direction::Out));
}

let envelope = Envelope::new(message, kind).upcast();
let envelope = Envelope::new(message, kind);
let guard = EbrGuard::new();
let object = ward!(self.book.get(recipient, &guard));
object.respond(token, Ok(envelope));
Expand Down Expand Up @@ -696,7 +694,7 @@ impl<C, K> Context<C, K> {
let kind = MessageKind::Regular {
sender: self.actor_addr,
};
let envelope = Envelope::new(message, kind).upcast();
let envelope = Envelope::new(message, kind);
self.respond(token, Ok(()));
envelope
}
Expand All @@ -705,9 +703,9 @@ impl<C, K> Context<C, K> {

let message = envelope.message();
trace!("< {:?}", message);
if let Some(permit) = DUMPER.acquire_m(message) {
if let Some(permit) = DUMPER.acquire_m(&*message) {
let kind = envelope.message_kind();
permit.record(Dump::message(message, kind, Direction::In));
permit.record(Dump::message(&*message, kind, Direction::In));
}

// We should change the status after dumping the original message
Expand Down Expand Up @@ -826,11 +824,9 @@ impl<C, K> Context<C, K> {
}
}

#[cold]
fn e2m<M: Message>(envelope: Envelope) -> M {
envelope
.unpack_regular()
.downcast()
.expect("invalid message")
envelope.unpack().expect("invalid message").0
}

#[cold]
Expand Down Expand Up @@ -862,14 +858,6 @@ fn addrs_with_envelope(
})
}

fn forget_and_replace(dest: &mut Option<Envelope>, value: Option<Envelope>) {
if let Some(old_value) = dest.take() {
let (_, token) = old_value.unpack_request();
token.forget();
}
*dest = value;
}

impl Context {
pub(crate) fn new(book: AddressBook, demux: Demux) -> Self {
Self {
Expand Down Expand Up @@ -1019,14 +1007,13 @@ fn prepare_response<R: Request>(
response: Result<Envelope, RequestError>,
) -> Result<R::Response, RequestError> {
let envelope = response?;
let message = envelope.message().downcast2::<R::Wrapper>();
let (message, kind) = envelope.unpack::<R::Wrapper>().expect("invalid response");

// TODO: increase a counter.
trace!("< {:?}", message);
if let Some(permit) = DUMPER.acquire_m(message) {
let kind = envelope.message_kind();
permit.record(Dump::message(message, kind, Direction::In));
if let Some(permit) = DUMPER.acquire_m(&message) {
permit.record(Dump::message(&message, &kind, Direction::In));
}

Ok(envelope.unpack_regular().downcast2::<R::Wrapper>().into())
Ok(message.into())
}
Loading

0 comments on commit 1e710b3

Please sign in to comment.