diff --git a/elfo-core/src/address_book.rs b/elfo-core/src/address_book.rs index d254f1d3..8ba3760d 100644 --- a/elfo-core/src/address_book.rs +++ b/elfo-core/src/address_book.rs @@ -36,16 +36,29 @@ impl AddressBook { #[cfg(feature = "network")] pub(crate) fn register_remote( &self, + network_actor_addr: Addr, local_group: GroupNo, remote_group: (NodeNo, GroupNo), handle_addr: Addr, ) { - self.remote.insert(local_group, remote_group, handle_addr); + self.remote + .insert(network_actor_addr, local_group, remote_group, handle_addr); + } + + #[cfg(feature = "network")] + pub(crate) fn deregister_remote( + &self, + network_actor_addr: Addr, + local_group: GroupNo, + remote_group: (NodeNo, GroupNo), + handle_addr: Addr, + ) { + self.remote + .remove(network_actor_addr, local_group, remote_group, handle_addr); } pub fn get(&self, addr: Addr) -> Option> { let addr = self.prepare_addr(addr)?; - self.local .get(addr.slot_key(self.launch_id)) // sharded-slab doesn't check top bits, so we need to check them manually. @@ -117,15 +130,21 @@ cfg_network!({ use arc_swap::ArcSwap; use fxhash::FxHashMap; - #[derive(Default)] - pub(super) struct RemoteToHandleMap { + #[derive(Clone, Default)] + struct RemoteToHandleMapInner { // (local_group_no, remote_node_no_group_no) -> handle_addr - map: ArcSwap>, + map: FxHashMap, + // network_actor_addr -> handle_addr + fallback: FxHashMap, } + #[derive(Default)] + pub(super) struct RemoteToHandleMap(ArcSwap); + impl RemoteToHandleMap { pub(super) fn insert( &self, + network_actor_addr: Addr, local_group: GroupNo, remote_group: (NodeNo, GroupNo), handle_addr: Addr, @@ -134,21 +153,50 @@ cfg_network!({ | u64::from(remote_group.0.into_bits()) << 8 | u64::from(remote_group.1.into_bits()); - self.map.rcu(|map| { - let mut map = (**map).clone(); - map.insert(key, handle_addr); - map + self.0.rcu(|inner| { + let mut inner = (**inner).clone(); + inner.map.insert(key, handle_addr); + inner.fallback.insert(network_actor_addr, handle_addr); + inner + }); + } + + pub(super) fn remove( + &self, + network_actor_addr: Addr, + local_group: GroupNo, + remote_group: (NodeNo, GroupNo), + handle_addr: Addr, + ) { + let key = u64::from(local_group.into_bits()) << 32 + | u64::from(remote_group.0.into_bits()) << 8 + | u64::from(remote_group.1.into_bits()); + + self.0.rcu(|inner| { + // We don't want to remove a handle that was not registered by us. + let mut inner = (**inner).clone(); + if inner.map.get(&key) == Some(&handle_addr) { + inner.map.remove(&key); + } + // In the fallback map `network_actor_addr` is unique, so no lookups are needed. + inner.fallback.remove(&network_actor_addr); + inner }); } pub(super) fn get(&self, remote_addr: Addr) -> Option { debug_assert!(remote_addr.is_remote()); - let local = crate::scope::with(|scope| scope.group()).node_no_group_no(); + let local_actor = crate::scope::with(|scope| scope.actor()); let remote = remote_addr.node_no_group_no(); - let key = u64::from(local) << 32 | u64::from(remote); - - self.map.load().get(&key).copied() + let key = u64::from(local_actor.node_no_group_no()) << 32 | u64::from(remote); + + let inner = self.0.load(); + inner + .map + .get(&key) + .or_else(|| inner.fallback.get(&local_actor)) + .copied() } } }); diff --git a/elfo-core/src/topology.rs b/elfo-core/src/topology.rs index 66f9ba54..91511b8a 100644 --- a/elfo-core/src/topology.rs +++ b/elfo-core/src/topology.rs @@ -319,6 +319,7 @@ cfg_network!({ #[stability::unstable] pub fn register_remote( &self, + network_actor_addr: Addr, local_group: GroupNo, remote_group: (NodeNo, GroupNo), remote_group_name: &str, @@ -334,7 +335,7 @@ cfg_network!({ entry.insert(object); self.book - .register_remote(local_group, remote_group, handle_addr); + .register_remote(network_actor_addr, local_group, remote_group, handle_addr); // Update the demux to make `send()` work, // but only if there is a route between these groups. @@ -358,7 +359,9 @@ cfg_network!({ RegisterRemoteGroupGuard { book: &self.book, handle_addr, - remote_node: remote_group.0, + network_actor_addr, + local_group, + remote_group, nodes, } } @@ -470,11 +473,14 @@ cfg_network!({ // Nothing for now, reserved for future use. pub struct NodeDiscovery(()); + // TODO: should undo register_remote in drop #[stability::unstable] pub struct RegisterRemoteGroupGuard<'a> { book: &'a AddressBook, handle_addr: Addr, - remote_node: NodeNo, + network_actor_addr: Addr, + local_group: GroupNo, + remote_group: (NodeNo, GroupNo), nodes: Option, } @@ -486,6 +492,14 @@ cfg_network!({ impl Drop for RegisterRemoteGroupGuard<'_> { fn drop(&mut self) { + // Undo the registration. + self.book.deregister_remote( + self.network_actor_addr, + self.local_group, + self.remote_group, + self.handle_addr, + ); + // Disable direct messaging. self.book.remove(self.handle_addr); @@ -495,8 +509,8 @@ cfg_network!({ let mut nodes = (**nodes).clone(); // We don't want to remove the node if it was re-registered by another handle. - if nodes.get(&self.remote_node) == Some(&self.handle_addr) { - nodes.remove(&self.remote_node); + if nodes.get(&self.remote_group.0) == Some(&self.handle_addr) { + nodes.remove(&self.remote_group.0); } nodes diff --git a/elfo-network/src/worker/mod.rs b/elfo-network/src/worker/mod.rs index 5adf4b92..887fe7f3 100644 --- a/elfo-network/src/worker/mod.rs +++ b/elfo-network/src/worker/mod.rs @@ -126,6 +126,7 @@ impl Worker { tx_flows: tx_flows.clone(), }; let remote_group_guard = self.topology.register_remote( + self.ctx.addr(), self.local.group_no, (self.remote.node_no, self.remote.group_no), &self.remote.group_name,