diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs index 7de66f59cf..aa96472da7 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs @@ -125,10 +125,9 @@ pub(crate) struct ConnectionsMap(pub(crate) DashMap { pub handle: JoinHandle<()>, // The currect running refresh task - pub node_conn: Option> // The refreshed connection after the task is done + pub node_conn: Option>, // The refreshed connection after the task is done } - impl std::fmt::Display for ConnectionsMap { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { for item in self.0.iter() { @@ -148,7 +147,6 @@ pub(crate) struct ConnectionsContainer { read_from_replica_strategy: ReadFromReplicaStrategy, topology_hash: TopologyHash, - // Holds all the failed addresses that started a refresh task. pub(crate) refresh_addresses_started: DashSet, // Follow the refresh ops on the connections diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 679098ef02..b2a00026d7 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -43,10 +43,18 @@ use crate::{ use connections_container::RefreshState; use dashmap::DashMap; use std::{ - collections::{HashMap, HashSet}, fmt, io, iter::once, mem, net::{IpAddr, SocketAddr}, pin::Pin, sync::{ + collections::{HashMap, HashSet}, + fmt, io, + iter::once, + mem, + net::{IpAddr, SocketAddr}, + pin::Pin, + sync::{ atomic::{self, AtomicUsize, Ordering}, Arc, Mutex, - }, task::{self, Poll}, time::SystemTime + }, + task::{self, Poll}, + time::SystemTime, }; use strum_macros::Display; #[cfg(feature = "tokio-comp")] @@ -1360,7 +1368,8 @@ where inner.clone(), addrs_to_refresh, RefreshConnectionType::AllConnections, - ).await; + ) + .await; } } @@ -1379,21 +1388,34 @@ where info!("Skipping refresh for {}: already in progress", address); continue; } - + let inner_clone = inner.clone(); let address_clone = address.clone(); let address_clone_for_task = address.clone(); let handle = tokio::spawn(async move { - info!("Refreshing connection task to {:?} started", address_clone_for_task); + info!( + "Refreshing connection task to {:?} started", + address_clone_for_task + ); let _ = async { // Add this address to be removed in poll_flush so all requests see a consistent connection map. - // See next comment for elaborated explanation. - inner_clone.conn_lock.read().expect(MUTEX_READ_ERR).refresh_addresses_done.insert(address_clone_for_task.clone()); - - let mut cluster_params = inner_clone.cluster_params.read().expect(MUTEX_READ_ERR).clone(); + // See next comment for elaborated explanation. + inner_clone + .conn_lock + .read() + .expect(MUTEX_READ_ERR) + .refresh_addresses_done + .insert(address_clone_for_task.clone()); + + let mut cluster_params = inner_clone + .cluster_params + .read() + .expect(MUTEX_READ_ERR) + .clone(); let subs_guard = inner_clone.subscriptions_by_address.read().await; - cluster_params.pubsub_subscriptions = subs_guard.get(&address_clone_for_task).cloned(); + cluster_params.pubsub_subscriptions = + subs_guard.get(&address_clone_for_task).cloned(); drop(subs_guard); let node_result = get_or_create_conn( @@ -1409,7 +1431,7 @@ where Ok(node) => { // Maintain the newly refreshed connection separately from the main connection map. // This refreshed connection will be incorporated into the main connection map at the start of the poll_flush operation. - // This approach ensures that all requests within the current batch interact with a consistent connection map, + // This approach ensures that all requests within the current batch interact with a consistent connection map, // preventing potential reordering issues. // // By delaying the integration of the refreshed connection: @@ -1421,11 +1443,17 @@ where // This strategy effectively creates a synchronization point at the beginning of poll_flush, where the connection map is // updated atomically for the next batch of operations. This approach balances the need for up-to-date connection information // with the requirement for consistent request handling within each processing cycle. - let connection_container = inner_clone.conn_lock.read().expect(MUTEX_READ_ERR); - if let Some(mut refresh_state) = connection_container.refresh_operations.get_mut(&address_clone_for_task) { + let connection_container = + inner_clone.conn_lock.read().expect(MUTEX_READ_ERR); + if let Some(mut refresh_state) = connection_container + .refresh_operations + .get_mut(&address_clone_for_task) + { refresh_state.node_conn = Some(node); } - connection_container.refresh_addresses_done.insert(address_clone_for_task); + connection_container + .refresh_addresses_done + .insert(address_clone_for_task); Ok(()) } Err(err) => { @@ -1436,17 +1464,24 @@ where Err(err) } } - }.await; + } + .await; info!("Refreshing connection task to {:?} is done", address_clone); }); // Keep the task handle into the RefreshState of this address - info!("Inserting tokio task to refresh_ops map of address {:?}", address.clone()); - refresh_ops_map.insert(address, RefreshState { - handle, + info!( + "Inserting tokio task to refresh_ops map of address {:?}", + address.clone() + ); + refresh_ops_map.insert( + address, + RefreshState { + handle, node_conn: None, - }); + }, + ); } debug!("refresh connection tasts initiated"); } @@ -1786,7 +1821,8 @@ where inner.clone(), addrs_to_refresh, RefreshConnectionType::AllConnections, - ).await; + ) + .await; } } @@ -2291,7 +2327,12 @@ where ConnectionCheck::Found((address, connection)) => (address, connection.await), ConnectionCheck::OnlyAddress(addr) => { // No connection in for this address in the conn_map - Self::refresh_connections(core, HashSet::from_iter(once(addr)),RefreshConnectionType::AllConnections).await; + Self::refresh_connections( + core, + HashSet::from_iter(once(addr)), + RefreshConnectionType::AllConnections, + ) + .await; return Err(RedisError::from(( ErrorKind::AllConnectionsUnavailable, "No connection for the address, started a refresh task", @@ -2395,39 +2436,58 @@ where fn update_refreshed_connection(&mut self) { loop { let connections_container = self.inner.conn_lock.read().expect(MUTEX_WRITE_ERR); - + // Process refresh_addresses_started - let addresses_to_remove: Vec = connections_container.refresh_addresses_started.iter().map(|r| r.key().clone()).collect(); + let addresses_to_remove: Vec = connections_container + .refresh_addresses_started + .iter() + .map(|r| r.key().clone()) + .collect(); for address in addresses_to_remove { - connections_container.refresh_addresses_started.remove(&address); + connections_container + .refresh_addresses_started + .remove(&address); connections_container.remove_node(&address); } - + // Process refresh_addresses_done - let addresses_done: Vec = connections_container.refresh_addresses_done.iter().map(|r| r.key().clone()).collect(); + let addresses_done: Vec = connections_container + .refresh_addresses_done + .iter() + .map(|r| r.key().clone()) + .collect(); for address in addresses_done { - connections_container.refresh_addresses_done.remove(&address); - - if let Some(mut refresh_state) = connections_container.refresh_operations.get_mut(&address) { - info!("update_refreshed_connection: Update conn for addr: {}", address); - + connections_container + .refresh_addresses_done + .remove(&address); + + if let Some(mut refresh_state) = + connections_container.refresh_operations.get_mut(&address) + { + info!( + "update_refreshed_connection: Update conn for addr: {}", + address + ); + // Take the node_conn out of RefreshState, replacing it with None if let Some(node_conn) = mem::take(&mut refresh_state.node_conn) { info!("update_refreshed_connection: replacing/adding the conn"); // Move the node_conn to the function - connections_container.replace_or_add_connection_for_address(address.clone(), node_conn); + connections_container + .replace_or_add_connection_for_address(address.clone(), node_conn); } } // Remove this entry from refresh_ops_map connections_container.refresh_operations.remove(&address); - } - + // Check if both sets are empty - if connections_container.refresh_addresses_started.is_empty() && connections_container.refresh_addresses_done.is_empty() { + if connections_container.refresh_addresses_started.is_empty() + && connections_container.refresh_addresses_done.is_empty() + { break; } - + // Release the lock before the next iteration drop(connections_container); } @@ -2537,8 +2597,8 @@ where } } Next::Reconnect { request, target } => { - poll_flush_action = - poll_flush_action.change_state(PollFlushAction::Reconnect(HashSet::from_iter([target]))); + poll_flush_action = poll_flush_action + .change_state(PollFlushAction::Reconnect(HashSet::from_iter([target]))); if let Some(request) = request { self.inner.pending_requests.lock().unwrap().push(request); } @@ -2621,18 +2681,18 @@ where fn start_send(self: Pin<&mut Self>, msg: Message) -> Result<(), Self::Error> { let Message { cmd, sender } = msg; - + let info = RequestInfo { cmd }; - + self.inner .pending_requests .lock() .unwrap() .push(PendingRequest { - retry: 0, - sender, - info, - }); + retry: 0, + sender, + info, + }); Ok(()) } @@ -2659,7 +2719,7 @@ where // Updating the connection_map with all the refreshed_connections // In case of active poll_recovery, the work should - // take care of the refreshed_connection, add them if still relevant, and kill the refresh_tasks of + // take care of the refreshed_connection, add them if still relevant, and kill the refresh_tasks of // non-relevant addresses. self.update_refreshed_connection();