Skip to content

Commit

Permalink
Merge pull request #116 from sargarass/fix/unrouted-network-requests
Browse files Browse the repository at this point in the history
fix(network): unrouted network requests on the remote node don't lead to the caller getting stuck
  • Loading branch information
loyd authored Nov 7, 2023
2 parents 98684fd + 3e80bc0 commit e540b5c
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 18 deletions.
74 changes: 61 additions & 13 deletions elfo-core/src/address_book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,29 @@ impl AddressBook {
#[cfg(feature = "network")]
pub(crate) fn register_remote(
&self,
network_actor_addr: Addr,
local_group: GroupNo,
remote_group: (NodeNo, GroupNo),
handle_addr: Addr,
) {
self.remote.insert(local_group, remote_group, handle_addr);
self.remote
.insert(network_actor_addr, local_group, remote_group, handle_addr);
}

#[cfg(feature = "network")]
pub(crate) fn deregister_remote(
&self,
network_actor_addr: Addr,
local_group: GroupNo,
remote_group: (NodeNo, GroupNo),
handle_addr: Addr,
) {
self.remote
.remove(network_actor_addr, local_group, remote_group, handle_addr);
}

pub fn get(&self, addr: Addr) -> Option<ObjectRef<'_>> {
let addr = self.prepare_addr(addr)?;

self.local
.get(addr.slot_key(self.launch_id))
// sharded-slab doesn't check top bits, so we need to check them manually.
Expand Down Expand Up @@ -117,15 +130,21 @@ cfg_network!({
use arc_swap::ArcSwap;
use fxhash::FxHashMap;

#[derive(Default)]
pub(super) struct RemoteToHandleMap {
#[derive(Clone, Default)]
struct RemoteToHandleMapInner {
// (local_group_no, remote_node_no_group_no) -> handle_addr
map: ArcSwap<FxHashMap<u64, Addr>>,
map: FxHashMap<u64, Addr>,
// network_actor_addr -> handle_addr
fallback: FxHashMap<Addr, Addr>,
}

#[derive(Default)]
pub(super) struct RemoteToHandleMap(ArcSwap<RemoteToHandleMapInner>);

impl RemoteToHandleMap {
pub(super) fn insert(
&self,
network_actor_addr: Addr,
local_group: GroupNo,
remote_group: (NodeNo, GroupNo),
handle_addr: Addr,
Expand All @@ -134,21 +153,50 @@ cfg_network!({
| u64::from(remote_group.0.into_bits()) << 8
| u64::from(remote_group.1.into_bits());

self.map.rcu(|map| {
let mut map = (**map).clone();
map.insert(key, handle_addr);
map
self.0.rcu(|inner| {
let mut inner = (**inner).clone();
inner.map.insert(key, handle_addr);
inner.fallback.insert(network_actor_addr, handle_addr);
inner
});
}

pub(super) fn remove(
&self,
network_actor_addr: Addr,
local_group: GroupNo,
remote_group: (NodeNo, GroupNo),
handle_addr: Addr,
) {
let key = u64::from(local_group.into_bits()) << 32
| u64::from(remote_group.0.into_bits()) << 8
| u64::from(remote_group.1.into_bits());

self.0.rcu(|inner| {
// We don't want to remove a handle that was not registered by us.
let mut inner = (**inner).clone();
if inner.map.get(&key) == Some(&handle_addr) {
inner.map.remove(&key);
}
// In the fallback map `network_actor_addr` is unique, so no lookups are needed.
inner.fallback.remove(&network_actor_addr);
inner
});
}

pub(super) fn get(&self, remote_addr: Addr) -> Option<Addr> {
debug_assert!(remote_addr.is_remote());

let local = crate::scope::with(|scope| scope.group()).node_no_group_no();
let local_actor = crate::scope::with(|scope| scope.actor());
let remote = remote_addr.node_no_group_no();
let key = u64::from(local) << 32 | u64::from(remote);

self.map.load().get(&key).copied()
let key = u64::from(local_actor.node_no_group_no()) << 32 | u64::from(remote);

let inner = self.0.load();
inner
.map
.get(&key)
.or_else(|| inner.fallback.get(&local_actor))
.copied()
}
}
});
24 changes: 19 additions & 5 deletions elfo-core/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ cfg_network!({
#[stability::unstable]
pub fn register_remote(
&self,
network_actor_addr: Addr,
local_group: GroupNo,
remote_group: (NodeNo, GroupNo),
remote_group_name: &str,
Expand All @@ -334,7 +335,7 @@ cfg_network!({
entry.insert(object);

self.book
.register_remote(local_group, remote_group, handle_addr);
.register_remote(network_actor_addr, local_group, remote_group, handle_addr);

// Update the demux to make `send()` work,
// but only if there is a route between these groups.
Expand All @@ -358,7 +359,9 @@ cfg_network!({
RegisterRemoteGroupGuard {
book: &self.book,
handle_addr,
remote_node: remote_group.0,
network_actor_addr,
local_group,
remote_group,
nodes,
}
}
Expand Down Expand Up @@ -470,11 +473,14 @@ cfg_network!({
// Nothing for now, reserved for future use.
pub struct NodeDiscovery(());

// TODO: should undo register_remote in drop
#[stability::unstable]
pub struct RegisterRemoteGroupGuard<'a> {
book: &'a AddressBook,
handle_addr: Addr,
remote_node: NodeNo,
network_actor_addr: Addr,
local_group: GroupNo,
remote_group: (NodeNo, GroupNo),
nodes: Option<Nodes>,
}

Expand All @@ -486,6 +492,14 @@ cfg_network!({

impl Drop for RegisterRemoteGroupGuard<'_> {
fn drop(&mut self) {
// Undo the registration.
self.book.deregister_remote(
self.network_actor_addr,
self.local_group,
self.remote_group,
self.handle_addr,
);

// Disable direct messaging.
self.book.remove(self.handle_addr);

Expand All @@ -495,8 +509,8 @@ cfg_network!({
let mut nodes = (**nodes).clone();

// We don't want to remove the node if it was re-registered by another handle.
if nodes.get(&self.remote_node) == Some(&self.handle_addr) {
nodes.remove(&self.remote_node);
if nodes.get(&self.remote_group.0) == Some(&self.handle_addr) {
nodes.remove(&self.remote_group.0);
}

nodes
Expand Down
1 change: 1 addition & 0 deletions elfo-network/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ impl Worker {
tx_flows: tx_flows.clone(),
};
let remote_group_guard = self.topology.register_remote(
self.ctx.addr(),
self.local.group_no,
(self.remote.node_no, self.remote.group_no),
&self.remote.group_name,
Expand Down

0 comments on commit e540b5c

Please sign in to comment.