Skip to content

Commit

Permalink
perf(core/mailbox): use an intrusive list of envelopes
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed May 25, 2024
1 parent 3b1fbb2 commit 63fa14f
Show file tree
Hide file tree
Showing 25 changed files with 1,052 additions and 421 deletions.
1 change: 1 addition & 0 deletions elfo-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ elfo-utils = { version = "0.2.5", path = "../elfo-utils" }
tokio = { version = "1.16", features = ["rt", "sync", "time", "signal", "macros"] }
idr-ebr = "0.2"
futures-intrusive = "0.5"
cordyceps = "0.3.2"
parking_lot = "0.12"
smallbox = "0.8.0"
# TODO: avoid the `rc` feature here?
Expand Down
62 changes: 29 additions & 33 deletions elfo-core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
coop,
demux::Demux,
dumping::{Direction, Dump, Dumper, INTERNAL_CLASS},
envelope::{AnyMessageBorrowed, AnyMessageOwned, Envelope, EnvelopeOwned, MessageKind},
envelope::{Envelope, MessageKind},
errors::{RequestError, SendError, TryRecvError, TrySendError},
mailbox::RecvResult,
message::{Message, Request},
Expand Down Expand Up @@ -179,14 +179,14 @@ impl<C, K> Context<C, K> {
sender: self.actor_addr,
};

self.stats.on_sent_message(&message);
self.stats.on_sent_message(&message); // TODO: only if successful?

trace!("> {:?}", message);
if let Some(permit) = DUMPER.acquire_m(&message) {
permit.record(Dump::message(&message, &kind, Direction::Out));
}

let envelope = Envelope::new(message, kind).upcast();
let envelope = Envelope::new(message, kind);
let addrs = self.demux.filter(&envelope);

if addrs.is_empty() {
Expand Down Expand Up @@ -214,15 +214,15 @@ impl<C, K> Context<C, K> {
Ok(()) => success = true,
Err(err) => {
has_full |= err.is_full();
forget_and_replace(&mut unused, Some(err.into_inner()));
replace_unused(&mut unused, Some(err.into_inner()));
}
},
None => forget_and_replace(&mut unused, Some(envelope)),
None => replace_unused(&mut unused, Some(envelope)),
};
}

if success {
forget_and_replace(&mut unused, None);
replace_unused(&mut unused, None);
Ok(())
} else if has_full {
Err(TrySendError::Full(e2m(unused.unwrap())))
Expand Down Expand Up @@ -270,14 +270,14 @@ impl<C, K> Context<C, K> {
}

async fn do_send<M: Message>(&self, message: M, kind: MessageKind) -> Result<(), SendError<M>> {
self.stats.on_sent_message(&message);
self.stats.on_sent_message(&message); // TODO: only if successful?

trace!("> {:?}", message);
if let Some(permit) = DUMPER.acquire_m(&message) {
permit.record(Dump::message(&message, &kind, Direction::Out));
}

let envelope = Envelope::new(message, kind).upcast();
let envelope = Envelope::new(message, kind);
let addrs = self.demux.filter(&envelope);

if addrs.is_empty() {
Expand Down Expand Up @@ -305,7 +305,7 @@ impl<C, K> Context<C, K> {
let guard = EbrGuard::new();
let entry = self.book.get(recipient, &guard);
let object = ward!(entry, {
forget_and_replace(&mut unused, Some(envelope));
replace_unused(&mut unused, Some(envelope));
continue;
});
Object::send(object, Addr::NULL, envelope)
Expand All @@ -314,14 +314,14 @@ impl<C, K> Context<C, K> {
.err()
.map(|err| err.0);

forget_and_replace(&mut unused, returned_envelope);
replace_unused(&mut unused, returned_envelope);
if unused.is_none() {
success = true;
}
}

if success {
forget_and_replace(&mut unused, None);
replace_unused(&mut unused, None);
Ok(())
} else {
Err(SendError(e2m(unused.unwrap())))
Expand Down Expand Up @@ -362,7 +362,7 @@ impl<C, K> Context<C, K> {
message: M,
kind: MessageKind,
) -> Result<(), SendError<M>> {
self.stats.on_sent_message(&message);
self.stats.on_sent_message(&message); // TODO: only if successful?

trace!(to = %recipient, "> {:?}", message);
if let Some(permit) = DUMPER.acquire_m(&message) {
Expand All @@ -374,7 +374,7 @@ impl<C, K> Context<C, K> {
let entry = self.book.get(recipient, &guard);
let object = ward!(entry, return Err(SendError(message)));
let envelope = Envelope::new(message, kind);
Object::send(object, recipient, envelope.upcast())
Object::send(object, recipient, envelope)
}
.await
.map_err(|err| SendError(e2m(err.0)))
Expand Down Expand Up @@ -402,7 +402,7 @@ impl<C, K> Context<C, K> {
recipient: Addr,
message: M,
) -> Result<(), TrySendError<M>> {
self.stats.on_sent_message(&message);
self.stats.on_sent_message(&message); // TODO: only if successful?

let kind = MessageKind::Regular {
sender: self.actor_addr,
Expand All @@ -419,7 +419,7 @@ impl<C, K> Context<C, K> {
let envelope = Envelope::new(message, kind);

object
.try_send(recipient, envelope.upcast())
.try_send(recipient, envelope)
.map_err(|err| err.map(e2m))
}

Expand All @@ -442,7 +442,7 @@ impl<C, K> Context<C, K> {
let token = token.into_untyped();
let recipient = token.sender();
let message = R::Wrapper::from(message);
self.stats.on_sent_message(&message);
self.stats.on_sent_message(&message); // TODO: only if successful?

let kind = MessageKind::Response {
sender: self.addr(),
Expand All @@ -454,7 +454,7 @@ impl<C, K> Context<C, K> {
permit.record(Dump::message(&message, &kind, Direction::Out));
}

let envelope = Envelope::new(message, kind).upcast();
let envelope = Envelope::new(message, kind);
let guard = EbrGuard::new();
let object = ward!(self.book.get(recipient, &guard));
object.respond(token, Ok(envelope));
Expand Down Expand Up @@ -696,7 +696,7 @@ impl<C, K> Context<C, K> {
let kind = MessageKind::Regular {
sender: self.actor_addr,
};
let envelope = Envelope::new(message, kind).upcast();
let envelope = Envelope::new(message, kind);
self.respond(token, Ok(()));
envelope
}
Expand All @@ -705,9 +705,9 @@ impl<C, K> Context<C, K> {

let message = envelope.message();
trace!("< {:?}", message);
if let Some(permit) = DUMPER.acquire_m(message) {
if let Some(permit) = DUMPER.acquire_m(&*message) {
let kind = envelope.message_kind();
permit.record(Dump::message(message, kind, Direction::In));
permit.record(Dump::message(&*message, kind, Direction::In));
}

// We should change the status after dumping the original message
Expand Down Expand Up @@ -826,11 +826,9 @@ impl<C, K> Context<C, K> {
}
}

#[cold]
fn e2m<M: Message>(envelope: Envelope) -> M {
envelope
.unpack_regular()
.downcast()
.expect("invalid message")
envelope.unpack().expect("invalid message").0
}

#[cold]
Expand Down Expand Up @@ -862,10 +860,9 @@ fn addrs_with_envelope(
})
}

fn forget_and_replace(dest: &mut Option<Envelope>, value: Option<Envelope>) {
if let Some(old_value) = dest.take() {
let (_, token) = old_value.unpack_request();
token.forget();
fn replace_unused(dest: &mut Option<Envelope>, value: Option<Envelope>) {
if let Some(old) = dest.take() {
old.drop_as_unused();
}
*dest = value;
}
Expand Down Expand Up @@ -1019,14 +1016,13 @@ fn prepare_response<R: Request>(
response: Result<Envelope, RequestError>,
) -> Result<R::Response, RequestError> {
let envelope = response?;
let message = envelope.message().downcast2::<R::Wrapper>();
let (message, kind) = envelope.unpack::<R::Wrapper>().expect("invalid response");

// TODO: increase a counter.
trace!("< {:?}", message);
if let Some(permit) = DUMPER.acquire_m(message) {
let kind = envelope.message_kind();
permit.record(Dump::message(message, kind, Direction::In));
if let Some(permit) = DUMPER.acquire_m(&message) {
permit.record(Dump::message(&message, &kind, Direction::In));
}

Ok(envelope.unpack_regular().downcast2::<R::Wrapper>().into())
Ok(message.into())
}
Loading

0 comments on commit 63fa14f

Please sign in to comment.