Skip to content

Commit

Permalink
feat(core): the restarting policy can be overridden via configuration…
Browse files Browse the repository at this point in the history
… for each actor.

`RestartParams` have been added for `RestartPolicy::always(..)` and `RestartPolicy::on_failure(..)`. The linear backoff has been replaced with an exponential approach, with a configurable limit for restarts.

BREAKING CHANGE: The default RestartPolicy is set to `RestartPolicy::never()`.
  • Loading branch information
sargarass committed Dec 16, 2023
1 parent 63bc111 commit 766a04b
Show file tree
Hide file tree
Showing 25 changed files with 634 additions and 200 deletions.
6 changes: 5 additions & 1 deletion elfo-configurer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use elfo_core::{
},
msg, scope,
signal::{Signal, SignalKind},
ActorGroup, ActorStatus, Addr, Blueprint, Context, Topology,
ActorGroup, ActorStatus, Addr, Blueprint, Context, RestartParams, RestartPolicy, Topology,
};

pub use self::protocol::*;
Expand All @@ -48,6 +48,10 @@ fn blueprint(topology: &Topology, source: ConfigSource) -> Blueprint {
let topology = topology.clone();
ActorGroup::new()
.stop_order(100)
.restart_policy(RestartPolicy::on_failure(RestartParams::new(
Duration::from_secs(5),
Duration::from_secs(30),
)))
.exec(move |ctx| Configurer::new(ctx, topology.clone(), source.clone()).main())
}

Expand Down
1 change: 1 addition & 0 deletions elfo-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ regex = "1.6.0"
thread_local = { version = "1.1.3", optional = true }
unicycle = "0.9.3"
rmp-serde = { version = "1.1.0", optional = true }
humantime-serde = "1"

[dev-dependencies]
elfo-utils = { version = "0.2.3", path = "../elfo-utils", features = ["test-util"] }
Expand Down
60 changes: 59 additions & 1 deletion elfo-core/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use tracing::{error, info, warn};
use crate::{
envelope::Envelope,
errors::{SendError, TrySendError},
group::{RestartPolicy, TerminationPolicy},
group::TerminationPolicy,
mailbox::{Mailbox, RecvResult},
messages::{ActorStatusReport, Terminate},
msg,
request_table::RequestTable,
restarting::RestartPolicy,
scope,
subscription::SubscriptionManager,
Addr,
Expand Down Expand Up @@ -117,6 +118,63 @@ impl ActorStatusKind {
}
}

// === ActorStartInfo ===

/// A struct holding information related to an actor start.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ActorStartInfo {
/// The cause for the actor start, indicating why the actor is being
/// initialized.
pub cause: ActorStartCause,
}

/// An enum representing various causes for an actor to start.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum ActorStartCause {
/// The actor started because its group was mounted.
GroupMounted,
/// The actor started in response to a message.
OnMessage,
/// The actor started due to the restart policy.
Restarted,
}

impl ActorStartInfo {
pub(crate) fn on_group_mounted() -> Self {
Self {
cause: ActorStartCause::GroupMounted,
}
}

pub(crate) fn on_message() -> Self {
Self {
cause: ActorStartCause::OnMessage,
}
}

pub(crate) fn on_restart() -> Self {
Self {
cause: ActorStartCause::Restarted,
}
}
}

impl ActorStartCause {
pub fn is_group_mounted(&self) -> bool {
matches!(self, ActorStartCause::GroupMounted)
}

pub fn is_restarted(&self) -> bool {
matches!(self, ActorStartCause::Restarted)
}

pub fn is_on_message(&self) -> bool {
matches!(self, ActorStartCause::OnMessage)
}
}

// === Actor ===

pub(crate) struct Actor {
Expand Down
1 change: 1 addition & 0 deletions elfo-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ pub(crate) struct SystemConfig {
pub(crate) logging: crate::logging::LoggingConfig,
pub(crate) dumping: crate::dumping::DumpingConfig,
pub(crate) telemetry: crate::telemetry::TelemetryConfig,
pub(crate) restart_policy: crate::restarting::RestartPolicyConfig,
}

// === Secret ===
Expand Down
48 changes: 46 additions & 2 deletions elfo-core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ use tracing::{info, trace};
use elfo_utils::unlikely;

use crate::{
actor::{Actor, ActorStatus},
actor::{Actor, ActorStartInfo, ActorStatus},
addr::Addr,
address_book::AddressBook,
config::AnyConfig,
demux::Demux,
dumping::{Direction, Dump, Dumper, INTERNAL_CLASS},
envelope::{AnyMessageBorrowed, AnyMessageOwned, Envelope, EnvelopeOwned, MessageKind},
errors::{RequestError, SendError, TryRecvError, TrySendError},
group::RestartPolicy,
mailbox::RecvResult,
message::{Message, Request},
messages, msg,
object::ObjectArc,
request_table::ResponseToken,
restarting::RestartPolicy,
routers::Singleton,
scope,
source::{SourceHandle, Sources, UnattachedSource},
Expand All @@ -38,6 +38,7 @@ pub struct Context<C = (), K = Singleton> {
book: AddressBook,
actor: Option<ObjectArc>, // `None` for group's and pruned context.
actor_addr: Addr,
actor_start_info: Option<ActorStartInfo>, // `None` for group's context,
group_addr: Addr,
demux: Demux,
config: Arc<C>,
Expand Down Expand Up @@ -624,6 +625,39 @@ impl<C, K> Context<C, K> {
}
}

/// Retrieves information related to the start of the actor.
///
/// # Panics
///
/// This method will panic if the context is pruned, indicating that the
/// required information is no longer available.
///
/// # Example
///
/// ```
/// # use elfo_core as elfo;
/// # use elfo_core::{ActorStartCause, ActorStartInfo};
/// # async fn exec(mut ctx: elfo::Context) {
/// match ctx.start_info().cause {
/// ActorStartCause::GroupMounted => {
/// // The actor started because its group was mounted.
/// }
/// ActorStartCause::OnMessage => {
/// // The actor started in response to a message.
/// }
/// ActorStartCause::Restarted => {
/// // The actor started due to the restart policy.
/// }
/// _ => {}
/// }
/// # }
/// ```
pub fn start_info(&self) -> &ActorStartInfo {
self.actor_start_info
.as_ref()
.expect("start_info is not available for a group context")
}

fn pre_recv(&mut self) {
self.stats.on_recv();

Expand Down Expand Up @@ -718,6 +752,7 @@ impl<C, K> Context<C, K> {
book: self.book.clone(),
actor: None,
actor_addr: self.actor_addr,
actor_start_info: self.actor_start_info.clone(),
group_addr: self.group_addr,
demux: self.demux.clone(),
config: Arc::new(()),
Expand All @@ -740,6 +775,7 @@ impl<C, K> Context<C, K> {
book: self.book,
actor: self.actor,
actor_addr: self.actor_addr,
actor_start_info: self.actor_start_info,
group_addr: self.group_addr,
demux: self.demux,
config,
Expand All @@ -764,11 +800,17 @@ impl<C, K> Context<C, K> {
self
}

pub(crate) fn with_start_info(mut self, actor_start_info: ActorStartInfo) -> Self {
self.actor_start_info = Some(actor_start_info);
self
}

pub(crate) fn with_key<K1>(self, key: K1) -> Context<C, K1> {
Context {
book: self.book,
actor: self.actor,
actor_addr: self.actor_addr,
actor_start_info: self.actor_start_info,
group_addr: self.group_addr,
demux: self.demux,
config: self.config,
Expand Down Expand Up @@ -832,6 +874,7 @@ impl Context {
actor: None,
actor_addr: Addr::NULL,
group_addr: Addr::NULL,
actor_start_info: None,
demux,
config: Arc::new(()),
key: Singleton,
Expand All @@ -850,6 +893,7 @@ impl<C, K: Clone> Clone for Context<C, K> {
book: self.book.clone(),
actor: self.book.get_owned(self.actor_addr),
actor_addr: self.actor_addr,
actor_start_info: self.actor_start_info.clone(),
group_addr: self.group_addr,
demux: self.demux.clone(),
config: self.config.clone(),
Expand Down
44 changes: 4 additions & 40 deletions elfo-core/src/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
envelope::Envelope,
exec::{Exec, ExecResult},
object::{GroupHandle, GroupVisitor, Object},
restarting::RestartPolicy,
routers::Router,
runtime::RuntimeManager,
supervisor::Supervisor,
Expand Down Expand Up @@ -47,13 +48,15 @@ impl<R, C> ActorGroup<R, C> {
}

/// The behaviour on actor termination.
/// `RestartPolicy::on_failures` is used by default.
///
/// `RestartPolicy::never` is used by default.
pub fn restart_policy(mut self, policy: RestartPolicy) -> Self {
self.restart_policy = policy;
self
}

/// The behaviour on the `Terminate` message.
///
/// `TerminationPolicy::closing` is used by default.
pub fn termination_policy(mut self, policy: TerminationPolicy) -> Self {
self.termination_policy = policy;
Expand Down Expand Up @@ -183,42 +186,3 @@ impl TerminationPolicy {

// TODO: add `stop_spawning`?
}

/// The behaviour on actor termination.
#[derive(Debug, Clone)]
pub struct RestartPolicy {
pub(crate) mode: RestartMode,
}

impl Default for RestartPolicy {
fn default() -> Self {
Self::on_failures()
}
}

#[derive(Debug, Clone)]
pub(crate) enum RestartMode {
Always,
OnFailures,
Never,
}

impl RestartPolicy {
pub fn always() -> Self {
Self {
mode: RestartMode::Always,
}
}

pub fn on_failures() -> Self {
Self {
mode: RestartMode::OnFailures,
}
}

pub fn never() -> Self {
Self {
mode: RestartMode::Never,
}
}
}
6 changes: 4 additions & 2 deletions elfo-core/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use elfo_utils::time::Instant;
use crate::{memory_tracker::MemoryTracker, time::Interval};

use crate::{
actor::{Actor, ActorMeta, ActorStatus},
actor::{Actor, ActorMeta, ActorStartInfo, ActorStatus},
addr::{Addr, GroupNo},
config::SystemConfig,
context::Context,
Expand Down Expand Up @@ -187,7 +187,9 @@ pub async fn do_start<F: Future>(
entry.insert(Object::new(addr, actor));

// It must be called after `entry.insert()`.
let ctx = ctx.with_addr(addr);
let ctx = ctx
.with_addr(addr)
.with_start_info(ActorStartInfo::on_group_mounted());

let init = async move {
start_entrypoints(&ctx, &topology, is_check_only).await?;
Expand Down
6 changes: 4 additions & 2 deletions elfo-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ extern crate self as elfo_core;

// TODO: revise this list
pub use crate::{
actor::{ActorMeta, ActorStatus, ActorStatusKind},
actor::{ActorMeta, ActorStartCause, ActorStartInfo, ActorStatus, ActorStatusKind},
addr::Addr,
config::Config,
context::{Context, RequestBuilder},
envelope::Envelope,
group::{ActorGroup, Blueprint, RestartPolicy, TerminationPolicy},
group::{ActorGroup, Blueprint, TerminationPolicy},
local::{Local, MoveOwnership},
message::{Message, Request},
request_table::ResponseToken,
restarting::{RestartParams, RestartPolicy},
source::{SourceHandle, UnattachedSource},
topology::Topology,
};
Expand Down Expand Up @@ -65,6 +66,7 @@ pub mod remote;
#[cfg(all(feature = "network", not(feature = "unstable")))]
mod remote;
mod request_table;
mod restarting;
mod runtime;
mod source;
mod subscription;
Expand Down
1 change: 1 addition & 0 deletions elfo-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ impl From<u16> for NodeNo {

static NODE_NO: AtomicU16 = AtomicU16::new(0);

#[stability::unstable]
/// Returns the current `node_no`.
pub fn node_no() -> Option<crate::addr::NodeNo> {
crate::addr::NodeNo::from_bits(NODE_NO.load(Ordering::Relaxed))
Expand Down
Loading

0 comments on commit 766a04b

Please sign in to comment.