From aa2d48375f3b6f4ad08df3c666c9c2188d60dc73 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Thu, 11 Jul 2024 15:32:31 +0400 Subject: [PATCH] feat(core/context): add `Context::unbounded_send(_to)` --- CHANGELOG.md | 2 + elfo-core/src/actor.rs | 46 ++-- elfo-core/src/context.rs | 349 ++++++++++++++++++++++-------- elfo-core/src/envelope.rs | 11 +- elfo-core/src/errors.rs | 30 +++ elfo-core/src/mailbox.rs | 10 + elfo-core/src/object.rs | 70 ++++++ elfo-core/src/remote.rs | 5 + elfo-core/src/signal.rs | 2 +- elfo-core/src/stream.rs | 2 +- elfo-core/src/time/delay.rs | 2 +- elfo-core/src/time/interval.rs | 2 +- elfo-network/src/discovery/mod.rs | 4 +- elfo-network/src/worker/mod.rs | 24 +- elfo-test/src/utils.rs | 6 +- 15 files changed, 437 insertions(+), 128 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d6a1799..3437a91d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Specify MSRV as 1.76. - logger: log truncation up to the `max_line_size` configuration parameter ([#128]). - core: directly accept never returning functions in `ActorGroup::exec()` ([#127]). +- core/context: add `Context::unbounded_send(_to)` methods. +- errors: add `From for TrySendError` and `SendError::{into_inner,map}` methods. ### Changed - **BREAKING** macros: remove the `network` feature ([#127]). diff --git a/elfo-core/src/actor.rs b/elfo-core/src/actor.rs index 7b8037d0..efafe892 100644 --- a/elfo-core/src/actor.rs +++ b/elfo-core/src/actor.rs @@ -223,36 +223,40 @@ impl Actor { self.send_status_to_subscribers(&self.control.read()); } + pub(crate) async fn send(&self, envelope: Envelope) -> Result<(), SendError> { + match self.handle_system(envelope) { + Some(envelope) => self.mailbox.send(envelope).await, + None => Ok(()), + } + } + pub(crate) fn try_send(&self, envelope: Envelope) -> Result<(), TrySendError> { - msg!(match &envelope { - Terminate { closing } => { - if *closing || self.termination_policy.close_mailbox { - if self.close() { - return Ok(()); - } else { - return Err(TrySendError::Closed(envelope)); - } - } - } - }); + match self.handle_system(envelope) { + Some(envelope) => self.mailbox.try_send(envelope), + None => Ok(()), + } + } - self.mailbox.try_send(envelope) + pub(crate) fn unbounded_send(&self, envelope: Envelope) -> Result<(), SendError> { + match self.handle_system(envelope) { + Some(envelope) => self.mailbox.unbounded_send(envelope), + None => Ok(()), + } } - pub(crate) async fn send(&self, envelope: Envelope) -> Result<(), SendError> { + #[inline(always)] + fn handle_system(&self, envelope: Envelope) -> Option { msg!(match &envelope { Terminate { closing } => { - if *closing || self.termination_policy.close_mailbox { - if self.close() { - return Ok(()); - } else { - return Err(SendError(envelope)); - } + if (*closing || self.termination_policy.close_mailbox) && self.close() { + // First closing `Terminate` is considered successful. + return None; } } }); - self.mailbox.send(envelope).await + // If the mailbox is closed, all following `*_send()` returns an error. + Some(envelope) } pub(crate) async fn recv(&self) -> RecvResult { @@ -311,6 +315,8 @@ impl Actor { // or use another actor to listen all statuses for this. } + #[cold] + #[inline(never)] pub(crate) fn close(&self) -> bool { self.mailbox.close(scope::trace_id()) } diff --git a/elfo-core/src/context.rs b/elfo-core/src/context.rs index 05fea7dd..67f1bf58 100644 --- a/elfo-core/src/context.rs +++ b/elfo-core/src/context.rs @@ -20,7 +20,7 @@ use crate::{ mailbox::RecvResult, message::{Message, Request}, messages, msg, - object::{Object, OwnedObject}, + object::{BorrowedObject, Object, OwnedObject}, request_table::ResponseToken, restarting::RestartPolicy, routers::Singleton, @@ -85,6 +85,9 @@ impl Context { } /// Attaches the provided source to the context. + /// + /// Messages produced by the source will be available via + /// [`Context::recv()`] and [`Context::try_recv()`] methods. pub fn attach(&mut self, source: UnattachedSource) -> S1 { source.attach_to(&mut self.sources) } @@ -130,31 +133,43 @@ impl Context { ward!(self.actor.as_ref().and_then(|o| o.as_actor()), return false).close() } - /// Sends a message using the routing system. + /// Sends a message using the [inter-group routing] system. + /// + /// It's possible to send requests if the response is not needed. /// /// Returns `Err` if the message hasn't reached any mailboxes. /// + /// # Cancel safety + /// + /// If cancelled, recipients with full mailboxes wont't receive the message. + /// /// # Example - /// ```ignore + /// ``` + /// # use elfo_core as elfo; + /// # async fn exec(mut ctx: elfo::Context) { + /// # use elfo::{message, msg}; + /// #[message] + /// struct SomethingHappened; + /// /// // Fire and forget. /// let _ = ctx.send(SomethingHappened).await; /// - /// // Fire or fail. - /// ctx.send(SomethingHappened).await?; - /// /// // Fire or log. - /// if let Ok(err) = ctx.send(SomethingHappened).await { - /// warn!("...", error = err); + /// if let Err(error) = ctx.send(SomethingHappened).await { + /// tracing::warn!(%error, "..."); /// } + /// # } /// ``` + /// + /// [inter-group routing]: https://actoromicon.rs/ch04-01-routing.html pub async fn send(&self, message: M) -> Result<(), SendError> { - let kind = MessageKind::Regular { - sender: self.actor_addr, - }; - self.do_send(message, kind).await + let kind = MessageKind::regular(self.actor_addr); + self.do_send_async(message, kind).await } - /// Tries to send a message using the routing system. + /// Tries to send a message using the [inter-group routing] system. + /// + /// It's possible to send requests if the response is not needed. /// /// Returns /// * `Ok(())` if the message has been added to any mailbox. @@ -162,22 +177,28 @@ impl Context { /// * `Err(Closed(_))` otherwise. /// /// # Example - /// ```ignore + /// ``` + /// # use elfo_core as elfo; + /// # async fn exec(mut ctx: elfo::Context) { + /// # use elfo::{message, msg}; + /// #[message] + /// struct SomethingHappened; + /// /// // Fire and forget. /// let _ = ctx.try_send(SomethingHappened); /// - /// // Fire or fail. - /// ctx.try_send(SomethingHappened)?; - /// /// // Fire or log. - /// if let Err(err) = ctx.try_send(SomethingHappened) { - /// warn!("...", error = err); + /// if let Err(error) = ctx.try_send(SomethingHappened) { + /// tracing::warn!(%error, "..."); /// } + /// # } /// ``` + /// + /// [inter-group routing]: https://actoromicon.rs/ch04-01-routing.html pub fn try_send(&self, message: M) -> Result<(), TrySendError> { - let kind = MessageKind::Regular { - sender: self.actor_addr, - }; + // XXX: avoid duplication with `unbounded_send()` and `send()`. + + let kind = MessageKind::regular(self.actor_addr); self.stats.on_sent_message(&message); // TODO: only if successful? @@ -230,7 +251,87 @@ impl Context { } } - /// Returns a request builder. + /// Sends a message using the [inter-group routing] system. + /// Ignores the capacity of the recipient's mailbox and doesn't change its + /// size, thus it doesn't affect other senders. + /// + /// Usually this method shouldn't be used because it can lead to high memory + /// usage and even OOM if the recipient works too slowly. + /// Prefer [`Context::try_send()`] or [`Context::send()`] instead. + /// + /// It's possible to send requests if the response is not needed. + /// + /// Returns `Err` if the message hasn't reached mailboxes or they are full. + /// + /// # Example + /// ``` + /// # use elfo_core as elfo; + /// # async fn exec(mut ctx: elfo::Context) { + /// # use elfo::{message, msg}; + /// #[message] + /// struct SomethingHappened; + /// + /// // Fire and forget. + /// let _ = ctx.unbounded_send(SomethingHappened); + /// + /// // Fire or log. + /// if let Err(error) = ctx.unbounded_send(SomethingHappened) { + /// tracing::warn!(%error, "..."); + /// } + /// # } + /// ``` + /// + /// [inter-group routing]: https://actoromicon.rs/ch04-01-routing.html + pub fn unbounded_send(&self, message: M) -> Result<(), SendError> { + let kind = MessageKind::regular(self.actor_addr); + + self.stats.on_sent_message(&message); // TODO: only if successful? + + trace!("> {:?}", message); + if let Some(permit) = DUMPER.acquire_m(&message) { + permit.record(Dump::message(&message, &kind, Direction::Out)); + } + + let envelope = Envelope::new(message, kind); + let addrs = self.demux.filter(&envelope); + + if addrs.is_empty() { + return Err(SendError(e2m(envelope))); + } + + let guard = EbrGuard::new(); + + if addrs.len() == 1 { + return match self.book.get(addrs[0], &guard) { + Some(object) => object + .unbounded_send(Addr::NULL, envelope) + .map_err(|err| err.map(e2m)), + None => Err(SendError(e2m(envelope))), + }; + } + + let mut unused = None; + let mut success = false; + + for (addr, envelope) in addrs_with_envelope(envelope, &addrs) { + match self.book.get(addr, &guard) { + Some(object) => match object.unbounded_send(Addr::NULL, envelope) { + Ok(()) => success = true, + Err(err) => unused = Some(err.into_inner()), + }, + None => unused = Some(envelope), + }; + } + + if success { + Ok(()) + } else { + Err(SendError(e2m(unused.unwrap()))) + } + } + + /// Returns a request builder to send a request (on `resolve()`) using + /// the [inter-group routing] system. /// /// # Example /// ```ignore @@ -242,12 +343,14 @@ impl Context { /// // ... /// } /// ``` + /// + /// [inter-group routing]: https://actoromicon.rs/ch04-01-routing.html #[inline] pub fn request(&self, request: R) -> RequestBuilder<'_, C, K, R, Any> { RequestBuilder::new(self, request) } - /// Returns a request builder to the specified recipient. + /// Returns a request builder to send a request to the specified recipient. /// /// # Example /// ```ignore @@ -268,7 +371,11 @@ impl Context { RequestBuilder::new(self, request).to(recipient) } - async fn do_send(&self, message: M, kind: MessageKind) -> Result<(), SendError> { + async fn do_send_async( + &self, + message: M, + kind: MessageKind, + ) -> Result<(), SendError> { self.stats.on_sent_message(&message); // TODO: only if successful? trace!("> {:?}", message); @@ -292,7 +399,7 @@ impl Context { Object::send(object, Addr::NULL, envelope) } .await - .map_err(|err| SendError(e2m(err.0))); + .map_err(|err| err.map(e2m)); } let mut unused = None; @@ -311,7 +418,7 @@ impl Context { } .await .err() - .map(|err| err.0); + .map(|err| err.into_inner()); unused = returned_envelope; if unused.is_none() { @@ -327,84 +434,136 @@ impl Context { } /// Sends a message to the specified recipient. + /// Waits if the recipient's mailbox is full. + /// + /// It's possible to send requests if the response is not needed. /// /// Returns `Err` if the message hasn't reached any mailboxes. /// + /// # Cancel safety + /// + /// If cancelled, recipients with full mailboxes wont't receive the message. + /// /// # Example - /// ```ignore - /// // Fire and forget. - /// let _ = ctx.send_to(addr, SomethingHappened).await; + /// ``` + /// # use elfo_core as elfo; + /// # async fn exec(mut ctx: elfo::Context, addr: elfo::Addr) { + /// # use elfo::{message, msg}; + /// #[message] + /// struct SomethingHappened; /// - /// // Fire or fail. - /// ctx.send_to(addr, SomethingHappened).await?; + /// let _ = ctx.send_to(addr, SomethingHappened).await; /// /// // Fire or log. - /// if let Some(err) = ctx.send_to(addr, SomethingHappened).await { - /// warn!("...", error = err); + /// if let Err(error) = ctx.send_to(addr, SomethingHappened).await { + /// tracing::warn!(%error, "..."); /// } + /// # } /// ``` pub async fn send_to( &self, recipient: Addr, message: M, ) -> Result<(), SendError> { - let kind = MessageKind::Regular { - sender: self.actor_addr, - }; - self.do_send_to(recipient, message, kind).await - } - - async fn do_send_to( - &self, - recipient: Addr, - message: M, - kind: MessageKind, - ) -> Result<(), SendError> { - self.stats.on_sent_message(&message); // TODO: only if successful? - - trace!(to = %recipient, "> {:?}", message); - if let Some(permit) = DUMPER.acquire_m(&message) { - permit.record(Dump::message(&message, &kind, Direction::Out)); - } - - { - let guard = EbrGuard::new(); - let entry = self.book.get(recipient, &guard); - let object = ward!(entry, return Err(SendError(message))); - let envelope = Envelope::new(message, kind); + let kind = MessageKind::regular(self.actor_addr); + self.do_send_to(recipient, message, kind, |object, envelope| { Object::send(object, recipient, envelope) - } + })? .await - .map_err(|err| SendError(e2m(err.0))) + .map_err(|err| err.map(e2m)) } /// Tries to send a message to the specified recipient. + /// Returns an error if the recipient's mailbox is full. /// - /// Returns `Err` if the message hasn't reached mailboxes or they are full. + /// It's possible to send requests if the response is not needed. + /// + /// Returns + /// * `Ok(())` if the message has been added to any mailbox. + /// * `Err(Full(_))` if some mailboxes are full. + /// * `Err(Closed(_))` otherwise. /// /// # Example - /// ```ignore + /// ``` + /// # use elfo_core as elfo; + /// # async fn exec(mut ctx: elfo::Context, addr: elfo::Addr) { + /// # use elfo::{message, msg}; + /// #[message] + /// struct SomethingHappened; + /// /// // Fire and forget. /// let _ = ctx.try_send_to(addr, SomethingHappened); /// - /// // Fire or fail. - /// ctx.try_send_to(addr, SomethingHappened)?; - /// /// // Fire or log. - /// if let Some(err) = ctx.try_send_to(addr, SomethingHappened) { - /// warn!("...", error = err); + /// if let Err(error) = ctx.try_send_to(addr, SomethingHappened) { + /// tracing::warn!(%error, "..."); /// } + /// # } /// ``` pub fn try_send_to( &self, recipient: Addr, message: M, ) -> Result<(), TrySendError> { - self.stats.on_sent_message(&message); // TODO: only if successful? + let kind = MessageKind::regular(self.actor_addr); + self.do_send_to(recipient, message, kind, |object, envelope| { + object + .try_send(recipient, envelope) + .map_err(|err| err.map(e2m)) + })? + } - let kind = MessageKind::Regular { - sender: self.actor_addr, - }; + /// Sends a message to the specified recipient. + /// Ignores the capacity of the recipient's mailbox and doesn't change its + /// size, thus it doesn't affect other senders. + /// + /// Usually this method shouldn't be used because it can lead to high memory + /// usage and even OOM if the recipient works too slowly. + /// Prefer [`Context::try_send_to()`] or [`Context::send_to()`] instead. + /// + /// It's possible to send requests if the response is not needed. + /// + /// Returns `Err` if the message hasn't reached mailboxes or they are full. + /// + /// # Example + /// ``` + /// # use elfo_core as elfo; + /// # async fn exec(mut ctx: elfo::Context, addr: elfo::Addr) { + /// # use elfo::{message, msg}; + /// #[message] + /// struct SomethingHappened; + /// + /// // Fire and forget. + /// let _ = ctx.unbounded_send_to(addr, SomethingHappened); + /// + /// // Fire or log. + /// if let Err(error) = ctx.unbounded_send_to(addr, SomethingHappened) { + /// tracing::warn!(%error, "..."); + /// } + /// # } + /// ``` + pub fn unbounded_send_to( + &self, + recipient: Addr, + message: M, + ) -> Result<(), SendError> { + let kind = MessageKind::regular(self.actor_addr); + self.do_send_to(recipient, message, kind, |object, envelope| { + object + .unbounded_send(recipient, envelope) + .map_err(|err| err.map(e2m)) + })? + } + + #[inline(always)] + fn do_send_to( + &self, + recipient: Addr, + message: M, + kind: MessageKind, + f: impl FnOnce(BorrowedObject<'_>, Envelope) -> R, + ) -> Result> { + self.stats.on_sent_message(&message); // TODO: only if successful? trace!(to = %recipient, "> {:?}", message); if let Some(permit) = DUMPER.acquire_m(&message) { @@ -413,12 +572,10 @@ impl Context { let guard = EbrGuard::new(); let entry = self.book.get(recipient, &guard); - let object = ward!(entry, return Err(TrySendError::Closed(message))); + let object = ward!(entry, return Err(SendError(message))); let envelope = Envelope::new(message, kind); - object - .try_send(recipient, envelope) - .map_err(|err| err.map(e2m)) + Ok(f(object, envelope)) } /// Responds to the requester with the provided response. @@ -478,7 +635,6 @@ impl Context { /// If the method is called again after `None` is returned. /// /// # Example - /// /// ``` /// # use elfo_core as elfo; /// # async fn exec(mut ctx: elfo::Context) { @@ -491,6 +647,7 @@ impl Context { /// }); /// } /// # } + /// ``` pub async fn recv(&mut self) -> Option where C: 'static, @@ -637,7 +794,6 @@ impl Context { /// required information is no longer available. /// /// # Example - /// /// ``` /// # use elfo_core as elfo; /// # use elfo_core::{ActorStartCause, ActorStartInfo}; @@ -691,9 +847,7 @@ impl Context { self.config = config.get_user::().clone(); info!("config updated"); let message = messages::ConfigUpdated {}; - let kind = MessageKind::Regular { - sender: self.actor_addr, - }; + let kind = MessageKind::regular(self.actor_addr); let envelope = Envelope::new(message, kind); self.respond(token, Ok(())); envelope @@ -927,13 +1081,30 @@ impl<'c, C, K, R> RequestBuilder<'c, C, K, R, Any> { } } -impl<'c, C, K, R, M> RequestBuilder<'c, C, K, R, M> { +impl<'c, C, K, R: Request, M> RequestBuilder<'c, C, K, R, M> { /// Specified the recipient of the request. #[inline] fn to(mut self, addr: Addr) -> Self { self.to = Some(addr); self } + + async fn do_send(self, kind: MessageKind) -> bool { + if let Some(recipient) = self.to { + let res = self + .context + .do_send_to(recipient, self.request, kind, |o, e| { + Object::send(o, recipient, e) + }); + + match res { + Ok(fut) => fut.await.is_ok(), + Err(_) => false, + } + } else { + self.context.do_send_async(self.request, kind).await.is_ok() + } + } } // TODO: add `pub async fn id() { ... }` @@ -951,13 +1122,7 @@ impl<'c, C: 'static, K, R: Request> RequestBuilder<'c, C, K, R, Any> { let request_id = token.request_id(); let kind = MessageKind::RequestAny(token); - let res = if let Some(recipient) = self.to { - self.context.do_send_to(recipient, self.request, kind).await - } else { - self.context.do_send(self.request, kind).await - }; - - if res.is_err() { + if !self.do_send(kind).await { actor.request_table().cancel_request(request_id); return Err(RequestError::Failed); } @@ -982,13 +1147,7 @@ impl<'c, C: 'static, K, R: Request> RequestBuilder<'c, C, K, R, All> { let request_id = token.request_id(); let kind = MessageKind::RequestAll(token); - let res = if let Some(recipient) = self.to { - self.context.do_send_to(recipient, self.request, kind).await - } else { - self.context.do_send(self.request, kind).await - }; - - if res.is_err() { + if !self.do_send(kind).await { actor.request_table().cancel_request(request_id); return vec![Err(RequestError::Failed)]; } diff --git a/elfo-core/src/envelope.rs b/elfo-core/src/envelope.rs index f3c38ba7..b3ccf560 100644 --- a/elfo-core/src/envelope.rs +++ b/elfo-core/src/envelope.rs @@ -54,6 +54,13 @@ pub enum MessageKind { Response { sender: Addr, request_id: RequestId }, } +impl MessageKind { + #[inline] + pub fn regular(sender: Addr) -> Self { + Self::Regular { sender } + } +} + // Called if the envelope hasn't been unpacked at all. // For instance, if an actor dies with a non-empty mailbox. // Usually, an envelope goes to `std::mem:forget()` in `unpack_*` methods. @@ -123,7 +130,7 @@ impl Envelope { pub(crate) fn stub() -> Self { Self::with_trace_id( crate::messages::Ping, - MessageKind::Regular { sender: Addr::NULL }, + MessageKind::regular(Addr::NULL), TraceId::try_from(1).unwrap(), ) } @@ -402,7 +409,7 @@ mod tests_miri { time::with_instant_mock(|_mock| { let addr = Addr::NULL; let trace_id = TraceId::try_from(1).unwrap(); - Envelope::with_trace_id(message, MessageKind::Regular { sender: addr }, trace_id) + Envelope::with_trace_id(message, MessageKind::regular(addr), trace_id) }) } diff --git a/elfo-core/src/errors.rs b/elfo-core/src/errors.rs index 5b6cbcd8..2c50bc11 100644 --- a/elfo-core/src/errors.rs +++ b/elfo-core/src/errors.rs @@ -5,6 +5,8 @@ use std::{ use derive_more::{Display, Error}; +// === StartError === + #[derive(Error)] #[non_exhaustive] pub struct StartError { @@ -82,10 +84,27 @@ pub struct StartGroupError { pub reason: String, } +// === SendError === + #[derive(Debug, Display, Error)] #[display(fmt = "mailbox closed")] pub struct SendError(#[error(not(source))] pub T); +impl SendError { + #[inline] + pub fn into_inner(self) -> T { + self.0 + } + + /// Transforms the inner message. + #[inline] + pub fn map(self, f: impl FnOnce(T) -> U) -> SendError { + SendError(f(self.0)) + } +} + +// === TrySendError === + #[derive(Debug, Display, Error)] pub enum TrySendError { /// The mailbox is full. @@ -128,6 +147,15 @@ impl TrySendError { } } +impl From> for TrySendError { + #[inline] + fn from(err: SendError) -> Self { + TrySendError::Closed(err.0) + } +} + +// === RequestError === + #[derive(Debug, Display, Error)] pub enum RequestError { /// Receiver hasn't got the request. @@ -152,6 +180,8 @@ impl RequestError { } } +// === TryRecvError === + #[derive(Debug, Clone, Display, Error)] pub enum TryRecvError { /// The mailbox is empty. diff --git a/elfo-core/src/mailbox.rs b/elfo-core/src/mailbox.rs index 7aa056a9..f86cc36d 100644 --- a/elfo-core/src/mailbox.rs +++ b/elfo-core/src/mailbox.rs @@ -133,6 +133,16 @@ impl Mailbox { } } + pub(crate) fn unbounded_send(&self, envelope: Envelope) -> Result<(), SendError> { + if !self.tx_semaphore.is_closed() { + self.queue.enqueue(envelope); + self.rx_notify.notify_one(); + Ok(()) + } else { + Err(SendError(envelope)) + } + } + pub(crate) async fn recv(&self) -> RecvResult { loop { // TODO: it should be possible to use `dequeue_unchecked()` here. diff --git a/elfo-core/src/object.rs b/elfo-core/src/object.rs index 77b86035..25933210 100644 --- a/elfo-core/src/object.rs +++ b/elfo-core/src/object.rs @@ -127,6 +127,24 @@ impl Object { } } + #[stability::unstable] + pub fn unbounded_send( + &self, + recipient: Addr, + envelope: Envelope, + ) -> Result<(), SendError> { + match &self.kind { + ObjectKind::Actor(handle) => handle.unbounded_send(envelope), + ObjectKind::Group(handle) => { + let mut visitor = UnboundedSendGroupVisitor::default(); + handle.handle(envelope, &mut visitor); + visitor.finish() + } + #[cfg(feature = "network")] + ObjectKind::Remote(handle) => handle.unbounded_send(recipient, envelope), + } + } + #[stability::unstable] pub fn respond(&self, token: ResponseToken, response: Result) { match &self.kind { @@ -409,3 +427,55 @@ impl GroupVisitor for TrySendGroupVisitor { self.try_send(object, envelope); } } + +// === UnboundedSendGroupVisitor === + +#[derive(Default)] +struct UnboundedSendGroupVisitor { + extra: Option, + has_ok: bool, +} + +impl UnboundedSendGroupVisitor { + // We must send while visiting to ensure that a message starting a new actor + // is actually the first message that the actor receives. + fn try_send(&mut self, object: &OwnedObject, envelope: Envelope) { + let actor = object.as_actor().expect("group stores only actors"); + match actor.unbounded_send(envelope) { + Ok(()) => self.has_ok = true, + Err(err) => self.extra = Some(err.0), + } + } + + fn finish(mut self) -> Result<(), SendError> { + if self.has_ok { + Ok(()) + } else { + let envelope = self.extra.take().expect("missing envelope"); + Err(SendError(envelope)) + } + } +} + +impl GroupVisitor for UnboundedSendGroupVisitor { + fn done(&mut self) { + debug_assert!(self.extra.is_none()); + debug_assert!(!self.has_ok); + self.has_ok = true; + } + + fn empty(&mut self, envelope: Envelope) { + debug_assert!(self.extra.is_none()); + debug_assert!(!self.has_ok); + self.extra = Some(envelope); + } + + fn visit(&mut self, object: &OwnedObject, envelope: &Envelope) { + let envelope = self.extra.take().unwrap_or_else(|| envelope.duplicate()); + self.try_send(object, envelope); + } + + fn visit_last(&mut self, object: &OwnedObject, envelope: Envelope) { + self.try_send(object, envelope); + } +} diff --git a/elfo-core/src/remote.rs b/elfo-core/src/remote.rs index 4779aada..658a9324 100644 --- a/elfo-core/src/remote.rs +++ b/elfo-core/src/remote.rs @@ -9,6 +9,11 @@ use crate::{ pub trait RemoteHandle: Send + Sync + 'static { fn send(&self, recipient: Addr, envelope: Envelope) -> SendResult; fn try_send(&self, recipient: Addr, envelope: Envelope) -> Result<(), TrySendError>; + fn unbounded_send( + &self, + recipient: Addr, + envelope: Envelope, + ) -> Result<(), SendError>; fn respond(&self, token: ResponseToken, response: Result); } diff --git a/elfo-core/src/signal.rs b/elfo-core/src/signal.rs index c5cc31c0..55e1a063 100644 --- a/elfo-core/src/signal.rs +++ b/elfo-core/src/signal.rs @@ -202,7 +202,7 @@ impl SourceStream for SignalSource { } let message = this.message.clone(); - let kind = MessageKind::Regular { sender: Addr::NULL }; + let kind = MessageKind::regular(Addr::NULL); let trace_id = TraceId::generate(); let envelope = Envelope::with_trace_id(message, kind, trace_id); Poll::Ready(Some(envelope)) diff --git a/elfo-core/src/stream.rs b/elfo-core/src/stream.rs index d65d7e99..0a886905 100644 --- a/elfo-core/src/stream.rs +++ b/elfo-core/src/stream.rs @@ -290,7 +290,7 @@ impl StreamItem for M { /// This method is private. #[doc(hidden)] fn pack(self, trace_id: TraceId) -> Envelope { - let kind = MessageKind::Regular { sender: Addr::NULL }; + let kind = MessageKind::regular(Addr::NULL); Envelope::with_trace_id(self, kind, trace_id) } } diff --git a/elfo-core/src/time/delay.rs b/elfo-core/src/time/delay.rs index 3f4c05b9..377f8790 100644 --- a/elfo-core/src/time/delay.rs +++ b/elfo-core/src/time/delay.rs @@ -124,7 +124,7 @@ impl SourceStream for DelaySource { // Emit the message. let message = this.message.take().unwrap(); - let kind = MessageKind::Regular { sender: Addr::NULL }; + let kind = MessageKind::regular(Addr::NULL); let trace_id = this.trace_id.take().unwrap_or_else(TraceId::generate); let envelope = Envelope::with_trace_id(message, kind, trace_id); diff --git a/elfo-core/src/time/interval.rs b/elfo-core/src/time/interval.rs index f7dbfeee..28cfba86 100644 --- a/elfo-core/src/time/interval.rs +++ b/elfo-core/src/time/interval.rs @@ -246,7 +246,7 @@ impl SourceStream for IntervalSource { // Emit the message. let message = this.message.clone(); - let kind = MessageKind::Regular { sender: Addr::NULL }; + let kind = MessageKind::regular(Addr::NULL); let trace_id = TraceId::generate(); let envelope = Envelope::with_trace_id(message, kind, trace_id); diff --git a/elfo-network/src/discovery/mod.rs b/elfo-network/src/discovery/mod.rs index 4174df11..f9b24c2d 100644 --- a/elfo-network/src/discovery/mod.rs +++ b/elfo-network/src/discovery/mod.rs @@ -520,9 +520,7 @@ async fn recv(socket: &mut Socket) -> Result { Ok(Envelope::new( message, - MessageKind::Regular { - sender: envelope.sender.into_remote(), - }, + MessageKind::regular(envelope.sender.into_remote()), )) } diff --git a/elfo-network/src/worker/mod.rs b/elfo-network/src/worker/mod.rs index f4722145..2932c132 100644 --- a/elfo-network/src/worker/mod.rs +++ b/elfo-network/src/worker/mod.rs @@ -663,6 +663,7 @@ impl SocketReader { return; } + // TODO: use `unbounded_send` if the envelope has been sent unboundedly. let result = object.try_send(Addr::NULL, envelope); // If the recipient has gone, close the flow and return. @@ -712,7 +713,7 @@ impl SocketReader { } fn make_system_envelope(message: impl Message) -> Envelope { - Envelope::new(message, MessageKind::Regular { sender: Addr::NULL }) + Envelope::new(message, MessageKind::regular(Addr::NULL)) } // === Pusher === @@ -756,6 +757,8 @@ impl Pusher { let fut = { let guard = EbrGuard::new(); let object = ward!(self.ctx.book().get(self.actor_addr, &guard), return false); + + // TODO: use `unbounded_send` if the envelope has been sent unboundedly. Object::send(object, Addr::NULL, envelope) }; @@ -852,6 +855,25 @@ impl remote::RemoteHandle for RemoteHandle { } } + fn unbounded_send( + &self, + recipient: Addr, + envelope: Envelope, + ) -> Result<(), SendError> { + let recipient = NetworkAddr::from_remote(recipient); + + if likely(self.tx_flows.do_acquire(recipient)) { + let mut item = Some(KanalItem::simple(recipient, envelope)); + match self.tx.try_send_option(&mut item) { + Ok(true) => Ok(()), + Ok(false) => unreachable!(), + Err(_) => Err(SendError(item.take().unwrap().envelope.unwrap())), + } + } else { + Err(SendError(envelope)) + } + } + fn respond(&self, token: ResponseToken, envelope: Result) { debug_assert!(!token.is_forgotten()); debug_assert!(token.sender().is_remote()); diff --git a/elfo-test/src/utils.rs b/elfo-test/src/utils.rs index afdc8e5e..f0530c6d 100644 --- a/elfo-test/src/utils.rs +++ b/elfo-test/src/utils.rs @@ -44,7 +44,7 @@ mod tests { #[test] fn extract_message_test() { create_scope().sync_within(|| { - let envelop = Envelope::new(TestMessage, MessageKind::Regular { sender: Addr::NULL }); + let envelop = Envelope::new(TestMessage, MessageKind::regular(Addr::NULL)); let resp = extract_message::(envelop); assert_eq!(resp, TestMessage); }); @@ -54,7 +54,7 @@ mod tests { #[should_panic(expected = "expected TestMessage, got TestRequest")] fn extract_message_panic_test() { create_scope().sync_within(|| { - let envelop = Envelope::new(TestRequest, MessageKind::Regular { sender: Addr::NULL }); + let envelop = Envelope::new(TestRequest, MessageKind::regular(Addr::NULL)); extract_message::(envelop); }); } @@ -62,7 +62,7 @@ mod tests { #[test] fn extract_request_test() { create_scope().sync_within(|| { - let envelop = Envelope::new(TestRequest, MessageKind::Regular { sender: Addr::NULL }); + let envelop = Envelope::new(TestRequest, MessageKind::regular(Addr::NULL)); let (resp, _token) = extract_request::(envelop); assert_eq!(resp, TestRequest); });