Skip to content

Commit

Permalink
Merge branch 'feat/idr-ebr'
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Apr 16, 2024
2 parents 4198532 + 8000b8a commit 441b7b0
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 204 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- telemeter: the `elfo_metrics_storage_shards` gauge metric.

### Changed
- core: replace sharded-slab with idr-ebr to reduce contention on messaging.
- macros/msg: replace a chain of `is()` with type id to improve codegen.
- telemeter: a new sharded-by-threads storage, it increases perf and dramatically reduces contention.
- telemeter: revise default DDSketch parameters. It improves stability for some cases.
- telemeter: rename the `Prometheus` sink to `OpenMetrics` with aliasing.
Expand Down
2 changes: 1 addition & 1 deletion elfo-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ elfo-macros = { version = "0.2.0-alpha.14", path = "../elfo-macros" }
elfo-utils = { version = "0.2.5", path = "../elfo-utils" }

tokio = { version = "1.16", features = ["rt", "sync", "time", "signal", "macros"] }
sharded-slab = "0.1.7"
idr-ebr = "0.1.1"
futures-intrusive = "0.5"
parking_lot = "0.12"
smallbox = "0.8.0"
Expand Down
100 changes: 43 additions & 57 deletions elfo-core/src/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
};

use derive_more::Display;
use idr_ebr::Key;
use serde::{Deserialize, Serialize};

// === NodeNo ===
Expand Down Expand Up @@ -117,7 +118,7 @@ impl GroupNo {
///
/// # Uniqueness
///
/// An address is based on a sharded slab to make it a simple sendable number
/// An address is based on an [IDR] to make it a simple sendable number
/// (as opposed to reference counting) and provide better performance of lookups
/// than hashmaps. However, it means deletions and insertions to the same
/// underlying slot multiple times can lead to reusing the address for a
Expand All @@ -127,7 +128,7 @@ impl GroupNo {
/// * Alive actors on the same node always have different addresses.
/// * Actors in different nodes have different address spaces.
/// * Actors in different groups have different address spaces.
/// * An address includes the version number to guard against the ABA problem.
/// * An address includes the version number to guard against the [ABA] problem.
/// * An address is randomized between restarts of the same node if the
/// `network` feature is enabled.
///
Expand All @@ -139,29 +140,24 @@ impl GroupNo {
/// The only way to get an address of remote actor is `envelope.sender()`.
/// If sending `Addr` inside a message is unavoidable, use `Local<Addr>`,
/// however it won't be possible to send such message to a remote actor.
///
/// [IDR]: https://crates.io/crates/idr-ebr
/// [ABA]: https://en.wikipedia.org/wiki/ABA_problem
// ~
// Structure (64b platform):
// 64 48 40 30 21 0
// +------------+----------+------------+-------+-----------------+
// | node_no | group_no | generation | TID | page + offset |
// | 16b | 8b | 10b | 9b | 21b |
// +------------+----------+------------+-------+-----------------+
// (0 if local) ^----------- slot key (40b) -----------^
// The structure:
// 64 48 40 25 0
// ┌────────────┬──────────┬────────────┬────────────────────────┐
// node_no group_no generation │ page_no + slot_no │
// 16b 8b 15b 25b
// └────────────┴──────────┴────────────┴────────────────────────┘
// (0 if local) └─────────── IDR key (40b) ───────────┘
//
// Structure (32b platform):
// 64 48 40 32 25 18 0
// +------------+----------+------+--------+------+---------------+
// | node_no | group_no | rand | genera | TID | page + offset |
// | 16b | 8b | 8b | 7b | 7b | 18b |
// +------------+----------+------+--------+------+---------------+
// (0 if local) ^------- slot key (32b) -------^
//
// Limits: 64b 32b
// - max nodes in a cluster 65535 65535 (1)
// - max groups in a node 255 255 (2, 3)
// - max active actors spawned by one thread 1048544 131040
// - slot generations to prevent ABA 1024 128
// - max threads spawning actors 256 64
// Limits:
// - max nodes in a cluster 65'535 (1)
// - max groups in a node 255 (2, 3)
// - max active actors 33'554'400
// - slot generations to prevent ABA 32'768
//
// 1. `0` is reserved to represent the local node.
// 2. `0` is reserved to represent `Addr::NULL` unambiguously.
Expand All @@ -172,7 +168,7 @@ impl GroupNo {
// node will have different addresses. The original address is never printed or even represented
// and the slot key part is restored only by calling private `Addr::slot_key(launch_no)`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Addr(u64); // TODO: make it `NonZeroU64` instead of `Addr::NULL`?
pub struct Addr(u64); // TODO: make it `NonZeroU64` instead of `Addr::NULL`

const NODE_NO_SHIFT: u32 = 48;
const GROUP_NO_SHIFT: u32 = 40;
Expand Down Expand Up @@ -203,14 +199,16 @@ impl Addr {
pub const NULL: Addr = Addr(0);

#[cfg(feature = "network")]
pub(crate) fn new_local(slot_key: usize, group_no: GroupNo, launch_id: NodeLaunchId) -> Self {
pub(crate) fn new_local(slot_key: Key, group_no: GroupNo, launch_id: NodeLaunchId) -> Self {
let slot_key = u64::from(slot_key);
debug_assert!(slot_key < (1 << GROUP_NO_SHIFT));
let slot_key = ((slot_key as u64) ^ launch_id.into_bits()) & ((1 << GROUP_NO_SHIFT) - 1);
let slot_key = (slot_key ^ launch_id.into_bits()) & ((1 << GROUP_NO_SHIFT) - 1);
Self::new_local_inner(slot_key, group_no)
}

#[cfg(not(feature = "network"))]
pub(crate) fn new_local(slot_key: usize, group_no: GroupNo, _launch_id: NodeLaunchId) -> Self {
pub(crate) fn new_local(slot_key: Key, group_no: GroupNo, _launch_id: NodeLaunchId) -> Self {
let slot_key = u64::from(slot_key);
debug_assert!(slot_key < (1 << GROUP_NO_SHIFT));
Self::new_local_inner(slot_key as u64, group_no)
}
Expand Down Expand Up @@ -265,14 +263,14 @@ impl Addr {
}

#[cfg(feature = "network")]
pub(crate) fn slot_key(self, launch_id: NodeLaunchId) -> usize {
// sharded-slab uses the lower bits only, so we can xor the whole address.
(self.0 ^ launch_id.into_bits()) as usize
pub(crate) fn slot_key(self, launch_id: NodeLaunchId) -> Option<Key> {
// IDR uses the lower bits only, so we can xor the whole address.
(self.0 ^ launch_id.into_bits()).try_into().ok()
}

#[cfg(not(feature = "network"))]
pub(crate) fn slot_key(self, _launch_id: NodeLaunchId) -> usize {
self.0 as usize
pub(crate) fn slot_key(self, _launch_id: NodeLaunchId) -> Option<Key> {
self.0.try_into().ok()
}

#[cfg(feature = "network")]
Expand All @@ -293,35 +291,21 @@ impl Addr {
}
}

// === SlabConfig ===
// === IdrConfig ===

// Actually, it doesn't reexported.
pub struct SlabConfig;
// Actually, it doesn't reexported. TODO: remove comment?
pub(crate) struct IdrConfig;

#[cfg(target_pointer_width = "64")]
impl sharded_slab::Config for SlabConfig {
const INITIAL_PAGE_SIZE: usize = 32;
const MAX_PAGES: usize = 15;
const MAX_THREADS: usize = 256;
const RESERVED_BITS: usize = 24;
impl idr_ebr::Config for IdrConfig {
const INITIAL_PAGE_SIZE: u32 = 32;
const MAX_PAGES: u32 = 20;
const RESERVED_BITS: u32 = 24;
}
#[cfg(target_pointer_width = "64")]
const_assert_eq!(
sharded_slab::Slab::<crate::object::Object, SlabConfig>::USED_BITS,
GROUP_NO_SHIFT as usize
idr_ebr::Idr::<crate::object::Object, IdrConfig>::USED_BITS,
GROUP_NO_SHIFT
);

#[cfg(target_pointer_width = "32")]
impl sharded_slab::Config for SlabConfig {
const INITIAL_PAGE_SIZE: usize = 32;
const MAX_PAGES: usize = 12;
const MAX_THREADS: usize = 64;
const RESERVED_BITS: usize = 0;
}

#[cfg(target_pointer_width = "32")]
const_assert_eq!(Slab::<Object, SlabConfig>::USED_BITS, 32);

#[cfg(test)]
mod tests {
use std::collections::HashSet;
Expand Down Expand Up @@ -360,7 +344,7 @@ mod tests {
proptest! {
#[test]
fn addr(
slot_keys in prop::collection::hash_set(0u64..(1 << GROUP_NO_SHIFT), 10),
slot_keys in prop::collection::hash_set(1u64..(1 << GROUP_NO_SHIFT), 10),
group_nos in prop::collection::hash_set(1..=u8::MAX, 10),
launch_ids in prop::collection::hash_set(prop::num::u64::ANY, 10),
) {
Expand All @@ -374,7 +358,7 @@ mod tests {
for slot_key in &slot_keys {
for group_no in &group_nos {
for launch_id in &launch_ids {
let slot_key = *slot_key as usize;
let slot_key = Key::try_from(*slot_key).unwrap();
let launch_id = NodeLaunchId::from_bits(*launch_id);
let group_no = GroupNo::new(*group_no, launch_id).unwrap();
let addr = Addr::new_local(slot_key, group_no, launch_id);
Expand All @@ -384,7 +368,9 @@ mod tests {
prop_assert!(addr.is_local());
prop_assert_eq!(addr.group_no(), Some(group_no));
prop_assert_eq!(addr.node_no(), None);
prop_assert_eq!(addr.slot_key(launch_id) & ((1 << GROUP_NO_SHIFT) - 1), slot_key);

let actual_slot_key = u64::from(addr.slot_key(launch_id).unwrap());
prop_assert_eq!(actual_slot_key & ((1 << GROUP_NO_SHIFT) - 1), u64::from(slot_key));
prop_assert_eq!(addr.into_local(), addr);
prop_assert_eq!(Addr::from_bits(addr.into_bits()), Some(addr));
prop_assert_eq!(addr.to_string().split('/').count(), 2);
Expand Down
31 changes: 16 additions & 15 deletions elfo-core/src/address_book.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use std::sync::Arc;

use sharded_slab::{self as slab, Slab};
use idr_ebr::{Guard as EbrGuard, Idr};

use crate::{
addr::{Addr, GroupNo, NodeLaunchId, NodeNo, SlabConfig},
object::{Object, ObjectArc, ObjectRef},
addr::{Addr, GroupNo, IdrConfig, NodeLaunchId, NodeNo},
object::{BorrowedObject, Object, OwnedObject},
};

// Reexported in `_priv`.
#[derive(Clone)]
pub struct AddressBook {
launch_id: NodeLaunchId,
local: Arc<Slab<Object, SlabConfig>>,
local: Arc<Idr<Object, IdrConfig>>,
#[cfg(feature = "network")]
remote: Arc<RemoteToHandleMap>, // TODO: use `arc_swap::cache::Cache` in TLS?
}
Expand All @@ -20,7 +20,7 @@ assert_impl_all!(AddressBook: Sync);

impl AddressBook {
pub(crate) fn new(launch_id: NodeLaunchId) -> Self {
let local = Arc::new(Slab::new_with_config::<SlabConfig>());
let local = Arc::new(Idr::new());

#[cfg(feature = "network")]
return Self {
Expand Down Expand Up @@ -57,22 +57,23 @@ impl AddressBook {
.remove(network_actor_addr, local_group, remote_group, handle_addr);
}

pub fn get(&self, addr: Addr) -> Option<ObjectRef<'_>> {
pub fn get<'g>(&self, addr: Addr, guard: &'g EbrGuard) -> Option<BorrowedObject<'g>> {
let addr = self.prepare_addr(addr)?;

self.local
.get(addr.slot_key(self.launch_id))
// sharded-slab doesn't check top bits, so we need to check them manually.
.get(addr.slot_key(self.launch_id)?, guard)
// idr-ebr doesn't check top bits, so we need to check them manually.
// It equals to checking the group number, but without extra operations.
.filter(|object| object.addr() == addr)
}

pub fn get_owned(&self, addr: Addr) -> Option<ObjectArc> {
pub fn get_owned(&self, addr: Addr) -> Option<OwnedObject> {
let addr = self.prepare_addr(addr)?;

self.local
.clone()
.get_owned(addr.slot_key(self.launch_id))
// sharded-slab doesn't check top bits, so we need to check them manually.
.get_owned(addr.slot_key(self.launch_id)?)
// idr-ebr doesn't check top bits, so we need to check them manually.
// It equals to checking the group number, but without extra operations.
.filter(|object| object.addr() == addr)
}
Expand All @@ -89,7 +90,7 @@ impl AddressBook {
}

pub(crate) fn remove(&self, addr: Addr) {
self.local.remove(addr.slot_key(self.launch_id));
self.local.remove(ward!(addr.slot_key(self.launch_id)));
}

#[inline(always)]
Expand All @@ -110,13 +111,13 @@ impl AddressBook {
}
}

pub(crate) struct VacantEntry<'b> {
pub(crate) struct VacantEntry<'g> {
launch_id: NodeLaunchId,
entry: slab::VacantEntry<'b, Object, SlabConfig>,
entry: idr_ebr::VacantEntry<'g, Object, IdrConfig>,
group_no: GroupNo,
}

impl<'b> VacantEntry<'b> {
impl<'g> VacantEntry<'g> {
pub(crate) fn insert(self, object: Object) {
self.entry.insert(object)
}
Expand Down
Loading

0 comments on commit 441b7b0

Please sign in to comment.