From 2bb04edc651fddbe36a6c2e8bc3252fb840c0757 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Thu, 11 Jul 2024 21:49:20 +0400 Subject: [PATCH] feat(core/mailbox): configure capacity and change the default value --- CHANGELOG.md | 3 + benches/messaging.rs | 4 ++ elfo-core/Cargo.toml | 2 +- elfo-core/src/actor.rs | 39 +++++++++++-- elfo-core/src/config.rs | 1 + elfo-core/src/context.rs | 23 +++++++- elfo-core/src/init.rs | 3 +- elfo-core/src/mailbox.rs | 74 ++++++++++++++++++++---- elfo-core/src/supervisor.rs | 46 +++++++++------ elfo-test/src/proxy.rs | 1 + elfo/tests/mailbox_capacity.rs | 89 +++++++++++++++++++++++++++++ examples/examples/usage/config.toml | 3 + 12 files changed, 252 insertions(+), 36 deletions(-) create mode 100644 elfo/tests/mailbox_capacity.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index eed4cf91..aff3b351 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,8 +13,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - core: directly accept never returning functions in `ActorGroup::exec()` ([#127]). - core/context: add `Context::unbounded_send(_to)` methods. - errors: add `From for TrySendError` and `SendError::{into_inner,map}` methods. +- core/config: add the `system.mailbox.capacity` parameter to set the mailbox capacity. +- core/context: add `Context::set_mailbox_capacity()`. ### Changed +- **BREAKING** core/mailbox: default capacity is `100` now. - **BREAKING** macros: remove the `network` feature ([#127]). - **BREAKING** core/message: remove `AnyMessage::upcast()` in favor of `AnyMessage::new()` ([#127]). - **BREAKING** core/envelope: `Envelope::message()` returns `AnyMessageRef` ([#127]). diff --git a/benches/messaging.rs b/benches/messaging.rs index e13190bd..87a9486a 100644 --- a/benches/messaging.rs +++ b/benches/messaging.rs @@ -149,6 +149,10 @@ fn make_consumers(actor_count: u32) -> Blueprint { }) })) .exec(move |mut ctx| async move { + // Measure throughput without extra context switches. + // The number of switches are controlled by `yield_now()` in producers. + ctx.set_mailbox_capacity(1_000_000); + while let Some(envelope) = ctx.recv().await { msg!(match envelope { msg @ Sample => { diff --git a/elfo-core/Cargo.toml b/elfo-core/Cargo.toml index 176d7569..dbb7a24d 100644 --- a/elfo-core/Cargo.toml +++ b/elfo-core/Cargo.toml @@ -26,7 +26,7 @@ elfo-utils = { version = "0.2.5", path = "../elfo-utils" } stability.workspace = true metrics.workspace = true -tokio = { version = "1.16", features = ["rt", "sync", "time", "signal", "macros"] } +tokio = { version = "1.37", features = ["rt", "sync", "time", "signal", "macros"] } idr-ebr = "0.2" futures-intrusive = "0.5" cordyceps = "0.3.2" diff --git a/elfo-core/src/actor.rs b/elfo-core/src/actor.rs index efafe892..087ec0f3 100644 --- a/elfo-core/src/actor.rs +++ b/elfo-core/src/actor.rs @@ -10,7 +10,7 @@ use crate::{ envelope::Envelope, errors::{SendError, TrySendError}, group::TerminationPolicy, - mailbox::{Mailbox, RecvResult}, + mailbox::{Mailbox, MailboxConfig, RecvResult}, messages::{ActorStatusReport, Terminate}, msg, request_table::RequestTable, @@ -182,32 +182,39 @@ pub(crate) struct Actor { termination_policy: TerminationPolicy, mailbox: Mailbox, request_table: RequestTable, - control: RwLock, + control: RwLock, finished: ManualResetEvent, // TODO: remove in favor of `status_subscription`? status_subscription: Arc, } -struct ControlBlock { +struct Control { status: ActorStatus, /// If `None`, a group's policy will be used. restart_policy: Option, + /// A mailbox capacity set in the config. + mailbox_capacity_config: usize, + /// Explicitly set mailbox capacity via `Context::set_mailbox_capacity()`. + mailbox_capacity_override: Option, } impl Actor { pub(crate) fn new( meta: Arc, addr: Addr, + mailbox_config: &MailboxConfig, termination_policy: TerminationPolicy, status_subscription: Arc, ) -> Self { Actor { meta, termination_policy, - mailbox: Mailbox::new(), + mailbox: Mailbox::new(mailbox_config), request_table: RequestTable::new(addr), - control: RwLock::new(ControlBlock { + control: RwLock::new(Control { status: ActorStatus::INITIALIZING, restart_policy: None, + mailbox_capacity_config: mailbox_config.capacity, + mailbox_capacity_override: None, }), finished: ManualResetEvent::new(false), status_subscription, @@ -271,6 +278,26 @@ impl Actor { &self.request_table } + pub(crate) fn set_mailbox_capacity_config(&self, capacity: usize) { + self.control.write().mailbox_capacity_config = capacity; + self.update_mailbox_capacity(); + } + + pub(crate) fn set_mailbox_capacity_override(&self, capacity: Option) { + self.control.write().mailbox_capacity_override = capacity; + self.update_mailbox_capacity(); + } + + fn update_mailbox_capacity(&self) { + let control = self.control.read(); + + let capacity = control + .mailbox_capacity_override + .unwrap_or(control.mailbox_capacity_config); + + self.mailbox.set_capacity(capacity); + } + pub(crate) fn restart_policy(&self) -> Option { self.control.read().restart_policy.clone() } @@ -348,7 +375,7 @@ impl Actor { }) } - fn send_status_to_subscribers(&self, control: &ControlBlock) { + fn send_status_to_subscribers(&self, control: &Control) { self.status_subscription.send(ActorStatusReport { meta: self.meta.clone(), status: control.status.clone(), diff --git a/elfo-core/src/config.rs b/elfo-core/src/config.rs index 0612156c..929b07d6 100644 --- a/elfo-core/src/config.rs +++ b/elfo-core/src/config.rs @@ -167,6 +167,7 @@ impl<'de> Deserializer<'de> for AnyConfig { #[derive(Debug, Default, Deserialize)] #[serde(default)] pub(crate) struct SystemConfig { + pub(crate) mailbox: crate::mailbox::MailboxConfig, pub(crate) logging: crate::logging::LoggingConfig, pub(crate) dumping: crate::dumping::DumpingConfig, pub(crate) telemetry: crate::telemetry::TelemetryConfig, diff --git a/elfo-core/src/context.rs b/elfo-core/src/context.rs index 67f1bf58..4ae91449 100644 --- a/elfo-core/src/context.rs +++ b/elfo-core/src/context.rs @@ -105,7 +105,28 @@ impl Context { ward!(self.actor.as_ref().and_then(|o| o.as_actor())).set_status(status); } - /// Overrides the group's default restart policy. + /// Overrides the group's default mailbox capacity, which set in the config. + /// + /// Note: after restart the actor will be created from scratch, so this + /// override will be also reset to the group's default mailbox capacity. + /// + /// # Example + /// ``` + /// # use elfo_core as elfo; + /// # fn exec(ctx: elfo::Context) { + /// // Override the group's default mailbox capacity. + /// ctx.set_mailbox_capacity(42); + /// + /// // Set the group's default mailbox capacity. + /// ctx.set_mailbox_capacity(None); + /// # } + /// ``` + pub fn set_mailbox_capacity(&self, capacity: impl Into>) { + ward!(self.actor.as_ref().and_then(|o| o.as_actor())) + .set_mailbox_capacity_override(capacity.into()); + } + + /// Overrides the group's default restart policy, which set in the config. /// /// Note: after restart the actor will be created from scratch, so this /// override will be also reset to the group's default restart policy. diff --git a/elfo-core/src/init.rs b/elfo-core/src/init.rs index 96210c2b..60dc2ddb 100644 --- a/elfo-core/src/init.rs +++ b/elfo-core/src/init.rs @@ -174,7 +174,8 @@ pub async fn do_start( let actor = Actor::new( meta.clone(), addr, - Default::default(), + &<_>::default(), + <_>::default(), Arc::new(SubscriptionManager::new(ctx.clone())), ); diff --git a/elfo-core/src/mailbox.rs b/elfo-core/src/mailbox.rs index f86cc36d..1a405c01 100644 --- a/elfo-core/src/mailbox.rs +++ b/elfo-core/src/mailbox.rs @@ -42,6 +42,22 @@ use crate::{ tracing::TraceId, }; +// === MailboxConfig === + +#[derive(Debug, PartialEq, serde::Deserialize)] +#[serde(default)] +pub(crate) struct MailboxConfig { + pub(crate) capacity: usize, +} + +impl Default for MailboxConfig { + fn default() -> Self { + Self { capacity: 100 } + } +} + +// === Mailbox === + pub(crate) type Link = Links; assert_not_impl_any!(EnvelopeHeader: Unpin); @@ -77,9 +93,6 @@ unsafe impl Linked for EnvelopeHeader { } } -// TODO: make configurable (a config + `ctx.set_mailbox_capacity(_)`). -const LIMIT: usize = 100_000; - pub(crate) struct Mailbox { /// A storage for envelopes based on an intrusive linked list. /// Note: `cordyceps` uses terms "head" and "tail" in the opposite way. @@ -93,18 +106,52 @@ pub(crate) struct Mailbox { // TODO: replace with `diatomic-waker` (3-5% faster). rx_notify: CachePadded, + /// Use `Mutex` here for synchronization on close/configure. + control: Mutex, +} + +struct Control { /// A trace ID that should be assigned once the mailbox is closed. - /// Use `Mutex` here for synchronization on close, more in `close()`. - closed_trace_id: Mutex>, + closed_trace_id: Option, + /// A real capacity of the mailbox. + capacity: usize, } impl Mailbox { - pub(crate) fn new() -> Self { + pub(crate) fn new(config: &MailboxConfig) -> Self { + let capacity = clamp_capacity(config.capacity); + Self { queue: MpscQueue::new_with_stub(Envelope::stub()), - tx_semaphore: Semaphore::new(LIMIT), + tx_semaphore: Semaphore::new(capacity), rx_notify: CachePadded::new(Notify::new()), - closed_trace_id: Mutex::new(None), + control: Mutex::new(Control { + closed_trace_id: None, + capacity, + }), + } + } + + pub(crate) fn set_capacity(&self, capacity: usize) { + let mut control = self.control.lock(); + + if capacity == control.capacity { + return; + } + + if capacity < control.capacity { + let delta = control.capacity - capacity; + let real_delta = self.tx_semaphore.forget_permits(delta); + + // Note that we cannot reduce the number of active permits + // (relates to messages that already stored in the queue) in tokio impl. + // Sadly, in such cases, we violate provided `capacity`. + debug_assert!(real_delta <= delta); + control.capacity -= real_delta; + } else { + let real_delta = clamp_capacity(capacity) - control.capacity; + self.tx_semaphore.add_permits(real_delta); + control.capacity += real_delta; } } @@ -180,13 +227,13 @@ impl Mailbox { // channel. If we take a lock after closing the channel, data race is // possible when we try to `recv()` after the channel is closed, but // before the `closed_trace_id` is assigned. - let mut closed_trace_id = self.closed_trace_id.lock(); + let mut control = self.control.lock(); if self.tx_semaphore.is_closed() { return false; } - *closed_trace_id = Some(trace_id); + control.closed_trace_id = Some(trace_id); self.tx_semaphore.close(); self.rx_notify.notify_one(); @@ -204,7 +251,8 @@ impl Mailbox { match self.queue.dequeue() { Some(envelope) => RecvResult::Data(envelope), None => { - let trace_id = self.closed_trace_id.lock().expect("called before close()"); + let control = self.control.lock(); + let trace_id = control.closed_trace_id.expect("called before close()"); RecvResult::Closed(trace_id) } } @@ -215,3 +263,7 @@ pub(crate) enum RecvResult { Data(Envelope), Closed(TraceId), } + +fn clamp_capacity(capacity: usize) -> usize { + capacity.min(Semaphore::MAX_PERMITS) +} diff --git a/elfo-core/src/supervisor.rs b/elfo-core/src/supervisor.rs index e6cdbc22..ecdee72c 100644 --- a/elfo-core/src/supervisor.rs +++ b/elfo-core/src/supervisor.rs @@ -42,13 +42,13 @@ pub(crate) struct Supervisor, C, X> { objects: DashMap, router: R, exec: X, - control: CachePadded>>, + control: CachePadded>>, scope_shared: Arc, status_subscription: Arc, rt_manager: RuntimeManager, } -struct ControlBlock { +struct Control { system_config: Arc, user_config: Option>, is_started: bool, @@ -87,7 +87,7 @@ where termination_policy: TerminationPolicy, rt_manager: RuntimeManager, ) -> Self { - let control = ControlBlock { + let control = Control { system_config: Default::default(), user_config: None, is_started: false, @@ -303,6 +303,18 @@ where .with_key(key.clone()) .with_config(user_config); + let meta = Arc::new(ActorMeta { + group: self.meta.group.clone(), + key: key_str, + }); + let actor = Actor::new( + meta.clone(), + addr, + &system_config.mailbox, + self.termination_policy.clone(), + self.status_subscription.clone(), + ); + drop(control); let sv = self.clone(); @@ -389,19 +401,8 @@ where sv.context.book().remove(addr); }; - let meta = Arc::new(ActorMeta { - group: self.meta.group.clone(), - key: key_str, - }); - let rt = self.rt_manager.get(&meta); - let actor = Actor::new( - meta.clone(), - addr, - self.termination_policy.clone(), - self.status_subscription.clone(), - ); entry.insert(Object::new(addr, actor)); let scope = Scope::new(scope::trace_id(), addr, meta, self.scope_shared.clone()) @@ -436,17 +437,30 @@ where } } - fn update_config(&self, control: &mut ControlBlock, config: &AnyConfig) { + fn update_config(&self, control: &mut Control, config: &AnyConfig) { let system = config.get_system(); self.scope_shared.configure(system); + let need_to_update_actors = control.system_config.mailbox != system.mailbox; + // Update user's config. - control.system_config = config.get_system().clone(); + control.system_config = system.clone(); control.user_config = Some(config.get_user::().clone()); self.router .update(control.user_config.as_ref().expect("just saved")); + if need_to_update_actors { + for object in self.objects.iter() { + let actor = object + .value() + .as_actor() + .expect("a supervisor stores only actors"); + + actor.set_mailbox_capacity_config(system.mailbox.capacity); + } + } + self.in_scope(|| { debug!( message = "config updated", diff --git a/elfo-test/src/proxy.rs b/elfo-test/src/proxy.rs index 1fd1c45f..f144d75a 100644 --- a/elfo-test/src/proxy.rs +++ b/elfo-test/src/proxy.rs @@ -179,6 +179,7 @@ impl Proxy { /// Now it's implemented as multiple calls `yield_now()`, /// but the implementation can be changed in the future. pub async fn sync(&mut self) { + // TODO: it should probably be `request(Ping).await`. for _ in 0..SYNC_YIELD_COUNT { task::yield_now().await; } diff --git a/elfo/tests/mailbox_capacity.rs b/elfo/tests/mailbox_capacity.rs new file mode 100644 index 00000000..4fd5540d --- /dev/null +++ b/elfo/tests/mailbox_capacity.rs @@ -0,0 +1,89 @@ +#![cfg(feature = "test-util")] + +use std::time::Duration; + +use serde::Deserialize; +use toml::toml; + +use elfo::{ + config::AnyConfig, + messages::{Ping, UpdateConfig}, + prelude::*, +}; + +#[message] +struct Dummy; + +#[message(ret = ())] +struct Freeze; + +#[message] +struct SetCapacity(Option); + +fn testee() -> Blueprint { + ActorGroup::new().exec(move |mut ctx| async move { + while let Some(envelope) = ctx.recv().await { + msg!(match envelope { + Dummy => {} + (Freeze, token) => { + ctx.respond(token, ()); + tokio::time::sleep(Duration::from_secs(60)).await + } + SetCapacity(capacity) => ctx.set_mailbox_capacity(capacity), + }); + } + }) +} + +fn testee_config(capacity: usize) -> AnyConfig { + AnyConfig::deserialize(toml! { + system.mailbox.capacity = capacity + }) + .unwrap() +} + +#[tokio::test(start_paused = true)] +async fn config() { + let proxy = elfo::test::proxy(testee(), testee_config(0)).await; + + for capacity in [1, 10, 100, 1000, 500, 50, 5, 15, 150] { + proxy.send(UpdateConfig::new(testee_config(capacity))).await; + proxy.request(Freeze).await; + + for i in 1..=capacity { + assert!(proxy.try_send(Dummy).is_ok(), "shoud pass [{i}/{capacity}]"); + } + assert!(proxy.try_send(Dummy).is_err(), "should reject [{capacity}]"); + + // Ensure that all sent messages are handled. + proxy.request(Ping::default()).await; + } +} + +#[tokio::test(start_paused = true)] +async fn explicit_override() { + let configured_capacity = 42; + let proxy = elfo::test::proxy(testee(), testee_config(configured_capacity)).await; + + for capacity in [1, 10, 100, 1000, 500, 50, 5, 15, 150] { + proxy.send(SetCapacity(Some(capacity))).await; + proxy.request(Freeze).await; + + for i in 1..=capacity { + assert!(proxy.try_send(Dummy).is_ok(), "shoud pass [{i}/{capacity}]"); + } + assert!(proxy.try_send(Dummy).is_err(), "should reject [{capacity}]"); + + // Ensure that all sent messages are handled. + proxy.request(Ping::default()).await; + } + + // Reset to the configured value. + proxy.send(SetCapacity(None)).await; + proxy.request(Freeze).await; + + for i in 1..=configured_capacity { + assert!(proxy.try_send(Dummy).is_ok(), "shoud pass [{i}/configured]"); + } + assert!(proxy.try_send(Dummy).is_err(), "should reject [configured]"); +} diff --git a/examples/examples/usage/config.toml b/examples/examples/usage/config.toml index c3b4e39d..4f9fd6c7 100644 --- a/examples/examples/usage/config.toml +++ b/examples/examples/usage/config.toml @@ -3,6 +3,9 @@ # The primary purpose is to define default values of system settings (logging, dumping, and so on). # Parameters and their defaults +# Mailbox +#system.mailbox.capacity = 100 +# # Logging #system.logging.max_level = "Info" # one of: Trace, Debug, Info, Warn, Error, Off. #system.logging.max_rate_per_level = 1000 # per second