diff --git a/glide-core/redis-rs/redis/src/aio/mod.rs b/glide-core/redis-rs/redis/src/aio/mod.rs index 077046feba..918b0f1a10 100644 --- a/glide-core/redis-rs/redis/src/aio/mod.rs +++ b/glide-core/redis-rs/redis/src/aio/mod.rs @@ -146,7 +146,7 @@ where async fn setup_connection( connection_info: &RedisConnectionInfo, con: &mut C, - // This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity. + // This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity or AZAffinityAllNodes. // An INFO command will be triggered in the connection's setup to update the 'availability_zone' property. discover_az: bool, ) -> RedisResult<()> diff --git a/glide-core/redis-rs/redis/src/client.rs b/glide-core/redis-rs/redis/src/client.rs index 2b97671110..5fcdb5ea7a 100644 --- a/glide-core/redis-rs/redis/src/client.rs +++ b/glide-core/redis-rs/redis/src/client.rs @@ -86,7 +86,7 @@ pub struct GlideConnectionOptions { #[cfg(feature = "aio")] /// Passive disconnect notifier pub disconnect_notifier: Option>, - /// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'. + /// If ReadFromReplica strategy is set to AZAffinity or AZAffinityAllNodes, this parameter will be set to 'true'. /// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property. pub discover_az: bool, /// Connection timeout duration. diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs index 955d24d9e9..2a03de3383 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs @@ -288,6 +288,64 @@ where } } + /// Returns the node's connection in the same availability zone as `client_az`, + /// checking replicas first, then primary, and falling back to any available node. + pub(crate) fn round_robin_read_from_replica_with_az_awareness_all_nodes( + &self, + slot_map_value: &SlotMapValue, + client_az: String, + ) -> Option> { + let addrs = &slot_map_value.addrs; + let initial_index = slot_map_value.last_used_replica.load(Ordering::Relaxed); + let mut retries = 0usize; + + // Step 1: Try to find a replica in the same AZ + loop { + retries = retries.saturating_add(1); + // Looped through all replicas; no connected replica found in the same availability zone. + if retries > addrs.replicas().len() { + break; + } + + // Calculate index based on initial index and check count. + let index = (initial_index + retries) % addrs.replicas().len(); + let replica = &addrs.replicas()[index]; + + if let Some((address, connection_details)) = + self.connection_details_for_address(replica.as_str()) + { + if self.az_for_address(&address) == Some(client_az.clone()) { + // Found a replica in the same AZ + let _ = slot_map_value.last_used_replica.compare_exchange_weak( + initial_index, + index, + Ordering::Relaxed, + Ordering::Relaxed, + ); + return Some((address, connection_details.conn)); + } + } + } + + // Step 2: Check if primary is in the same AZ + if let Some((address, connection_details)) = + self.connection_details_for_address(addrs.primary().as_str()) + { + if self.az_for_address(&address) == Some(client_az) { + return Some((address, connection_details.conn)); + } + } + + // Step 3: Fall back to any available replica using round-robin + if !addrs.replicas().is_empty() { + return self.round_robin_read_from_replica(slot_map_value); + } + + // Step 4: Final fallback - use primary + self.connection_details_for_address(addrs.primary().as_str()) + .map(|(address, connection_details)| (address, connection_details.conn)) + } + fn lookup_route(&self, route: &Route) -> Option> { let slot_map_value = self.slot_map.slot_value_for_route(route)?; let addrs = &slot_map_value.addrs; @@ -311,6 +369,11 @@ where slot_map_value, az.to_string(), ), + ReadFromReplicaStrategy::AZAffinityAllNodes(az) => self + .round_robin_read_from_replica_with_az_awareness_all_nodes( + slot_map_value, + az.to_string(), + ), }, // when the user strategy per command is replica_preffered SlotAddr::ReplicaRequired => match &self.read_from_replica_strategy { @@ -319,6 +382,11 @@ where slot_map_value, az.to_string(), ), + ReadFromReplicaStrategy::AZAffinityAllNodes(az) => self + .round_robin_read_from_replica_with_az_awareness_all_nodes( + slot_map_value, + az.to_string(), + ), _ => self.round_robin_read_from_replica(slot_map_value), }, } @@ -888,6 +956,163 @@ mod tests { assert_eq!(addresses, vec![31, 31, 33, 33]); } + // Helper function to create a container with AZAffinityAllNodes strategy + fn create_container_with_az_affinity_all_nodes_strategy( + use_management_connections: bool, + ) -> ConnectionsContainer { + let slot_map = SlotMap::new( + vec![ + Slot::new(1, 1000, "primary1".to_owned(), Vec::new()), + Slot::new( + 1002, + 2000, + "primary2".to_owned(), + vec!["replica2-1".to_owned()], + ), + Slot::new( + 2001, + 3000, + "primary3".to_owned(), + vec![ + "replica3-1".to_owned(), + "replica3-2".to_owned(), + "replica3-3".to_owned(), + ], + ), + ], + ReadFromReplicaStrategy::AlwaysFromPrimary, + ); + let connection_map = DashMap::new(); + connection_map.insert( + "primary1".into(), + create_cluster_node(1, use_management_connections, Some("use-1b".to_string())), + ); + connection_map.insert( + "primary2".into(), + create_cluster_node(2, use_management_connections, Some("use-1c".to_string())), + ); + connection_map.insert( + "primary3".into(), + create_cluster_node(3, use_management_connections, Some("use-1b".to_string())), + ); + connection_map.insert( + "replica2-1".into(), + create_cluster_node(21, use_management_connections, Some("use-1c".to_string())), + ); + connection_map.insert( + "replica3-1".into(), + create_cluster_node(31, use_management_connections, Some("use-1a".to_string())), + ); + connection_map.insert( + "replica3-2".into(), + create_cluster_node(32, use_management_connections, Some("use-1b".to_string())), + ); + connection_map.insert( + "replica3-3".into(), + create_cluster_node(33, use_management_connections, Some("use-1a".to_string())), + ); + + ConnectionsContainer { + slot_map, + connection_map, + read_from_replica_strategy: ReadFromReplicaStrategy::AZAffinityAllNodes( + "use-1a".to_string(), + ), + topology_hash: 0, + } + } + + #[test] + fn get_connection_for_az_affinity_all_nodes_route() { + // Create a container with AZAffinityAllNodes strategy + let container = create_container_with_az_affinity_all_nodes_strategy(false); + + // Slot number does not exist (slot 1001 wasn't assigned to any primary) + assert!(container + .connection_for_route(&Route::new(1001, SlotAddr::ReplicaOptional)) + .is_none()); + + // Test getting replica in client's AZ for slot 2001 + assert!(one_of( + container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)), + &[31, 33], + )); + + // Remove one replica in the client's AZ + remove_nodes(&container, &["replica3-3"]); + + // Should still get the remaining replica in the client's AZ + assert_eq!( + 31, + container + .connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)) + .unwrap() + .1 + ); + + // Remove all replicas in the client's AZ + remove_nodes(&container, &["replica3-1"]); + + // Test falling back to replica in different AZ + assert_eq!( + 32, + container + .connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)) + .unwrap() + .1 + ); + + // Set the primary to be in the client's AZ + container + .connection_map + .get_mut("primary3") + .unwrap() + .user_connection + .az = Some("use-1a".to_string()); + + // Remove the last replica + remove_nodes(&container, &["replica3-2"]); + + // Should now fall back to the primary in the client's AZ + assert_eq!( + 3, + container + .connection_for_route(&Route::new(2001, SlotAddr::Master)) + .unwrap() + .1 + ); + + // Move the primary out of the client's AZ + container + .connection_map + .get_mut("primary3") + .unwrap() + .user_connection + .az = Some("use-1b".to_string()); + + // Test falling back to replica under different primary + assert_eq!( + 21, + container + .connection_for_route(&Route::new(1002, SlotAddr::ReplicaRequired)) + .unwrap() + .1 + ); + + // Remove all replicas + remove_nodes(&container, &["replica2-1"]); + + // Test falling back to available primaries with their respective slots + assert!(one_of( + container.connection_for_route(&Route::new(1002, SlotAddr::Master)), + &[2], + )); + assert!(one_of( + container.connection_for_route(&Route::new(500, SlotAddr::Master)), + &[1], + )); + } + #[test] fn get_connection_by_address() { let container = create_container(); diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs index e5af8d1e50..d574056607 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs @@ -182,6 +182,7 @@ where let discover_az = matches!( params.read_from_replicas, crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity(_) + | crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityAllNodes(_) ); match create_connection::( diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 3d61efce29..8615f7aa7c 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1088,6 +1088,7 @@ where let discover_az = matches!( cluster_params.read_from_replicas, crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity(_) + | crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityAllNodes(_) ); let glide_connection_options = GlideConnectionOptions { diff --git a/glide-core/redis-rs/redis/src/cluster_client.rs b/glide-core/redis-rs/redis/src/cluster_client.rs index c4dc0103dc..0f471834c5 100644 --- a/glide-core/redis-rs/redis/src/cluster_client.rs +++ b/glide-core/redis-rs/redis/src/cluster_client.rs @@ -392,6 +392,8 @@ impl ClusterClientBuilder { /// The parameter `read_strategy` can be one of: /// `ReadFromReplicaStrategy::AZAffinity(availability_zone)` - attempt to access replicas in the same availability zone. /// If no suitable replica is found (i.e. no replica could be found in the requested availability zone), choose any replica. Falling back to primary if needed. + /// `ReadFromReplicaStrategy::AZAffinityAllNodes(availability_zone)` - attempt to access nodes in the same availability zone. + /// prioritizing local replicas, then the local primary, and falling back to any replica or the primary if needed. /// `ReadFromReplicaStrategy::RoundRobin` - reads are distributed across replicas for load balancing using round-robin algorithm. Falling back to primary if needed. /// `ReadFromReplicaStrategy::AlwaysFromPrimary` ensures all read and write queries are directed to the primary node. /// diff --git a/glide-core/redis-rs/redis/src/cluster_slotmap.rs b/glide-core/redis-rs/redis/src/cluster_slotmap.rs index f2e43b4449..45eb42d0c4 100644 --- a/glide-core/redis-rs/redis/src/cluster_slotmap.rs +++ b/glide-core/redis-rs/redis/src/cluster_slotmap.rs @@ -32,6 +32,9 @@ pub enum ReadFromReplicaStrategy { /// Spread the read requests between replicas in the same client's Aviliablity zone in a round robin manner, /// falling back to other replicas or the primary if needed. AZAffinity(String), + /// Spread the read requests among nodes within the client's Availability Zone (AZ) in a round robin manner, + /// prioritizing local replicas, then the local primary, and falling back to any replica or the primary if needed. + AZAffinityAllNodes(String), } #[derive(Debug, Default)] @@ -60,6 +63,7 @@ fn get_address_from_slot( addrs.replicas()[index].clone() } ReadFromReplicaStrategy::AZAffinity(_az) => todo!(), // Drop sync client + ReadFromReplicaStrategy::AZAffinityAllNodes(_az) => todo!(), // Drop sync client } } diff --git a/glide-core/redis-rs/redis/tests/test_cluster_async.rs b/glide-core/redis-rs/redis/tests/test_cluster_async.rs index c69a9f933f..473cc428b7 100644 --- a/glide-core/redis-rs/redis/tests/test_cluster_async.rs +++ b/glide-core/redis-rs/redis/tests/test_cluster_async.rs @@ -302,6 +302,103 @@ mod cluster_async { ); } + #[tokio::test] + async fn test_routing_by_slot_to_replica_with_az_affinity_all_nodes_strategy_to_half_replicas() + { + // Skip test if version is less then Valkey 8.0 + if crate::engine_version_less_than("8.0").await { + return; + } + + let replica_num: u16 = 4; + let primaries_num: u16 = 3; + let replicas_num_in_client_az = replica_num / 2; + let cluster = + TestClusterContext::new((replica_num * primaries_num) + primaries_num, replica_num); + let az: String = "us-east-1a".to_string(); + + let mut connection = cluster.async_connection(None).await; + let cluster_addresses: Vec<_> = cluster + .cluster + .servers + .iter() + .map(|server| server.connection_info()) + .collect(); + + let mut cmd = redis::cmd("CONFIG"); + cmd.arg(&["SET", "availability-zone", &az.clone()]); + + for _ in 0..replicas_num_in_client_az { + connection + .route_command( + &cmd, + RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route::new( + 12182, // foo key is mapping to 12182 slot + SlotAddr::ReplicaRequired, + ))), + ) + .await + .unwrap(); + } + + let mut client = ClusterClient::builder(cluster_addresses.clone()) + .read_from( + redis::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityAllNodes(az.clone()), + ) + .build() + .unwrap() + .get_async_connection(None) + .await + .unwrap(); + + // Each replica in the client az will return the value of foo n times + let n = 4; + for _ in 0..n * replicas_num_in_client_az { + let mut cmd = redis::cmd("GET"); + cmd.arg("foo"); + let _res: RedisResult = cmd.query_async(&mut client).await; + } + + let mut cmd = redis::cmd("INFO"); + cmd.arg("ALL"); + let info = connection + .route_command( + &cmd, + RoutingInfo::MultiNode((MultipleNodeRoutingInfo::AllNodes, None)), + ) + .await + .unwrap(); + + let info_result = redis::from_owned_redis_value::>(info).unwrap(); + let get_cmdstat = "cmdstat_get:calls=".to_string(); + let n_get_cmdstat = format!("cmdstat_get:calls={}", n); + let client_az = format!("availability_zone:{}", az); + + let mut matching_entries_count: usize = 0; + + for value in info_result.values() { + if value.contains(&get_cmdstat) { + if value.contains(&client_az) && value.contains(&n_get_cmdstat) { + matching_entries_count += 1; + } else { + panic!( + "Invalid entry found: {}. Expected cmdstat_get:calls={} and availability_zone={}", + value, n, az); + } + } + } + + assert_eq!( + (matching_entries_count.try_into() as Result).unwrap(), + replicas_num_in_client_az, + "Test failed: expected exactly '{}' entries with '{}' and '{}', found {}", + replicas_num_in_client_az, + get_cmdstat, + client_az, + matching_entries_count + ); + } + #[tokio::test] async fn test_routing_by_slot_to_replica_with_az_affinity_strategy_to_all_replicas() { // Skip test if version is less then Valkey 8.0 @@ -392,6 +489,209 @@ mod cluster_async { ); } + #[tokio::test] + async fn test_routing_by_slot_to_replica_with_az_affinity_all_nodes_strategy_to_all_replicas() { + // Skip test if version is less then Valkey 8.0 + if crate::engine_version_less_than("8.0").await { + return; + } + + let replica_num: u16 = 4; + let primaries_num: u16 = 3; + let cluster = + TestClusterContext::new((replica_num * primaries_num) + primaries_num, replica_num); + let az: String = "us-east-1a".to_string(); + + let mut connection = cluster.async_connection(None).await; + let cluster_addresses: Vec<_> = cluster + .cluster + .servers + .iter() + .map(|server| server.connection_info()) + .collect(); + + let mut cmd = redis::cmd("CONFIG"); + cmd.arg(&["SET", "availability-zone", &az.clone()]); + + connection + .route_command( + &cmd, + RoutingInfo::MultiNode((MultipleNodeRoutingInfo::AllNodes, None)), + ) + .await + .unwrap(); + + let mut client = ClusterClient::builder(cluster_addresses.clone()) + .read_from( + redis::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityAllNodes(az.clone()), + ) + .build() + .unwrap() + .get_async_connection(None) + .await + .unwrap(); + + // Each replica will return the value of foo n times + let n = 4; + for _ in 0..(n * replica_num) { + let mut cmd = redis::cmd("GET"); + cmd.arg("foo"); + let _res: RedisResult = cmd.query_async(&mut client).await; + } + + let mut cmd = redis::cmd("INFO"); + cmd.arg("ALL"); + let info = connection + .route_command( + &cmd, + RoutingInfo::MultiNode((MultipleNodeRoutingInfo::AllNodes, None)), + ) + .await + .unwrap(); + + let info_result: HashMap = + redis::from_owned_redis_value::>(info).unwrap(); + + let get_cmdstat = "cmdstat_get:calls=".to_string(); + let n_get_cmdstat: String = format!("cmdstat_get:calls={}", n); + let client_az = format!("availability_zone:{}", az); + + let mut matching_entries_count: usize = 0; + + for value in info_result.values() { + if value.contains(&get_cmdstat) { + if value.contains(&client_az) && value.contains(&n_get_cmdstat) { + matching_entries_count += 1; + } else { + panic!( + "Invalid entry found: {}. Expected cmdstat_get:calls={} and availability_zone={}", + value, n, az); + } + } + } + + assert_eq!( + (matching_entries_count.try_into() as Result).unwrap(), + replica_num, + "Test failed: expected exactly '{}' entries with '{}' and '{}', found {}", + replica_num, + get_cmdstat, + client_az, + matching_entries_count + ); + } + + #[tokio::test] + async fn test_az_affinity_all_nodes_prefers_local_primary() { + // Skip test if version is less than Valkey 8.0 + if crate::engine_version_less_than("8.0").await { + return; + } + + let replica_num: u16 = 4; + let primaries_num: u16 = 3; + let primary_in_same_az: u16 = 1; + + let cluster = + TestClusterContext::new((replica_num * primaries_num) + primaries_num, replica_num); + let client_az = "us-east-1a".to_string(); + let other_az = "us-east-1b".to_string(); + + let mut connection = cluster.async_connection(None).await; + let cluster_addresses: Vec<_> = cluster + .cluster + .servers + .iter() + .map(|server| server.connection_info()) + .collect(); + + // Set AZ for all nodes to a different AZ initially + let mut cmd = redis::cmd("CONFIG"); + cmd.arg(&["SET", "availability-zone", &other_az.clone()]); + + connection + .route_command( + &cmd, + RoutingInfo::MultiNode((MultipleNodeRoutingInfo::AllNodes, None)), + ) + .await + .unwrap(); + + // Set the client's AZ for one primary (the last one) + let mut cmd = redis::cmd("CONFIG"); + cmd.arg(&["SET", "availability-zone", &client_az]); + connection + .route_command( + &cmd, + RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route::new( + 10923, // This should target the third primary + SlotAddr::Master, + ))), + ) + .await + .unwrap(); + + let mut client = ClusterClient::builder(cluster_addresses.clone()) + .read_from( + redis::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityAllNodes( + client_az.clone(), + ), + ) + .build() + .unwrap() + .get_async_connection(None) + .await + .unwrap(); + + // Perform read operations + let n = 100; + for _ in 0..n { + let mut cmd = redis::cmd("GET"); + cmd.arg("foo"); // This key should hash to the third primary's slot + let _res: RedisResult = cmd.query_async(&mut client).await; + } + + // Gather INFO + let mut cmd = redis::cmd("INFO"); + cmd.arg("ALL"); + let info = connection + .route_command( + &cmd, + RoutingInfo::MultiNode((MultipleNodeRoutingInfo::AllNodes, None)), + ) + .await + .unwrap(); + + let info_result: HashMap = + redis::from_owned_redis_value::>(info).unwrap(); + let get_cmdstat = "cmdstat_get:calls=".to_string(); + let n_get_cmdstat = format!("cmdstat_get:calls={}", n); + let client_az2 = format!("availability-zone:{}", client_az); + let mut matching_entries_count: usize = 0; + + for value in info_result.values() { + if value.contains(&get_cmdstat) { + if value.contains(&client_az) && value.contains(&n_get_cmdstat) { + matching_entries_count += 1; + } else { + panic!( + "Invalid entry found: {}. Expected cmdstat_get:calls={} and availability_zone={}", + value, n, client_az2); + } + } + } + + assert_eq!( + (matching_entries_count.try_into() as Result).unwrap(), + primary_in_same_az, + "Test failed: expected exactly '{}' entries with '{}' and '{}', found {}", + primary_in_same_az, + get_cmdstat, + client_az, + matching_entries_count + ); + } + #[test] #[serial_test::serial] fn test_async_cluster_basic_eval() { diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index 005a38a9ca..0cd2927ece 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -579,6 +579,7 @@ async fn create_cluster_client( let read_from_strategy = request.read_from.unwrap_or_default(); builder = builder.read_from(match read_from_strategy { ReadFrom::AZAffinity(az) => ReadFromReplicaStrategy::AZAffinity(az), + ReadFrom::AZAffinityAllNodes(az) => ReadFromReplicaStrategy::AZAffinityAllNodes(az), ReadFrom::PreferReplica => ReadFromReplicaStrategy::RoundRobin, ReadFrom::Primary => ReadFromReplicaStrategy::AlwaysFromPrimary, }); @@ -733,6 +734,8 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String { ReadFrom::Primary => "Only primary", ReadFrom::PreferReplica => "Prefer replica", ReadFrom::AZAffinity(_) => "Prefer replica in user's availability zone", + ReadFrom::AZAffinityAllNodes(_) => + "Prefer nodes (replica and primary) in user's availability zone", } ) }) diff --git a/glide-core/src/client/standalone_client.rs b/glide-core/src/client/standalone_client.rs index c2c541c763..c273526e33 100644 --- a/glide-core/src/client/standalone_client.rs +++ b/glide-core/src/client/standalone_client.rs @@ -31,6 +31,10 @@ enum ReadFrom { client_az: String, last_read_replica_index: Arc, }, + AZAffinityAllNodes { + client_az: String, + last_read_replica_index: Arc, + }, } #[derive(Debug)] @@ -129,7 +133,7 @@ impl StandaloneClient { let pubsub_addr = &connection_request.addresses[pubsub_node_index]; let discover_az = matches!( connection_request.read_from, - Some(ClientReadFrom::AZAffinity(_)) + Some(ClientReadFrom::AZAffinity(_)) | Some(ClientReadFrom::AZAffinityAllNodes(_)) ); let connection_timeout = to_duration( @@ -306,6 +310,63 @@ impl StandaloneClient { } } + async fn round_robin_read_from_replica_az_awareness_all_nodes( + &self, + latest_read_replica_index: &Arc, + client_az: String, + ) -> &ReconnectingConnection { + let initial_index = latest_read_replica_index.load(Ordering::Relaxed); + let mut retries = 0usize; + + // Step 1: Try to find a replica in the same AZ + loop { + retries = retries.saturating_add(1); + // Looped through all replicas; no connected replica found in the same AZ. + if retries >= self.inner.nodes.len() { + break; + } + + // Calculate index based on initial index and check count. + let index = (initial_index + retries) % self.inner.nodes.len(); + let replica = &self.inner.nodes[index]; + + // Attempt to get a connection and retrieve the replica's AZ. + if let Ok(connection) = replica.get_connection().await { + if let Some(replica_az) = connection.get_az().as_deref() { + if replica_az == client_az { + // Update `latest_used_replica` with the index of this replica. + let _ = latest_read_replica_index.compare_exchange_weak( + initial_index, + index, + Ordering::Relaxed, + Ordering::Relaxed, + ); + return replica; + } + } + } + } + + // Step 2: Check if primary is in the same AZ + let primary = self.get_primary_connection(); + if let Ok(connection) = primary.get_connection().await { + if let Some(primary_az) = connection.get_az().as_deref() { + if primary_az == client_az { + return primary; + } + } + } + + // Step 3: Fall back to any available replica using round-robin + let replica = self.round_robin_read_from_replica(latest_read_replica_index); + if replica as *const _ != primary as *const _ { + return replica; + } + + // Step 4: Final fallback - use primary + return primary; + } + async fn get_connection(&self, readonly: bool) -> &ReconnectingConnection { if self.inner.nodes.len() == 1 || !readonly { return self.get_primary_connection(); @@ -326,6 +387,16 @@ impl StandaloneClient { ) .await } + ReadFrom::AZAffinityAllNodes { + client_az, + last_read_replica_index, + } => { + self.round_robin_read_from_replica_az_awareness_all_nodes( + last_read_replica_index, + client_az.to_string(), + ) + .await + } } } @@ -608,6 +679,10 @@ fn get_read_from(read_from: Option) -> ReadFrom { client_az: az, last_read_replica_index: Default::default(), }, + Some(super::ReadFrom::AZAffinityAllNodes(az)) => ReadFrom::AZAffinityAllNodes { + client_az: az, + last_read_replica_index: Default::default(), + }, None => ReadFrom::Primary, } } diff --git a/glide-core/src/client/types.rs b/glide-core/src/client/types.rs index e2314a1ab6..f9013974e4 100644 --- a/glide-core/src/client/types.rs +++ b/glide-core/src/client/types.rs @@ -58,6 +58,7 @@ pub enum ReadFrom { Primary, PreferReplica, AZAffinity(String), + AZAffinityAllNodes(String), } #[derive(PartialEq, Eq, Clone, Copy, Default)] @@ -113,6 +114,20 @@ impl From for ConnectionRequest { ReadFrom::PreferReplica } } + protobuf::ReadFrom::AZAffinityAllNodes => { + if let Some(client_az) = chars_to_string_option(&value.client_az) { + ReadFrom::AZAffinityAllNodes(client_az) + } else { + log_warn( + "types", + format!( + "Failed to convert availability zone string: '{:?}'. Falling back to `ReadFrom::PreferReplica`", + value.client_az + ), + ); + ReadFrom::PreferReplica + } + }, }); let client_name = chars_to_string_option(&value.client_name); diff --git a/glide-core/src/protobuf/connection_request.proto b/glide-core/src/protobuf/connection_request.proto index 8e33b39da3..6e72ca899f 100644 --- a/glide-core/src/protobuf/connection_request.proto +++ b/glide-core/src/protobuf/connection_request.proto @@ -11,6 +11,7 @@ enum ReadFrom { PreferReplica = 1; LowestLatency = 2; AZAffinity = 3; + AZAffinityAllNodes = 4; } enum TlsMode { diff --git a/glide-core/tests/test_standalone_client.rs b/glide-core/tests/test_standalone_client.rs index 77363b5c18..786f4057b0 100644 --- a/glide-core/tests/test_standalone_client.rs +++ b/glide-core/tests/test_standalone_client.rs @@ -272,6 +272,7 @@ mod standalone_client_tests { }); } + // TODO - Current test falls back to PreferReplica when run, need to integrate the az here also #[rstest] #[serial_test::serial] #[timeout(SHORT_STANDALONE_TEST_TIMEOUT)] @@ -283,6 +284,18 @@ mod standalone_client_tests { ..Default::default() }); } + // TODO - Needs changes in the struct and the create_primary_mock + #[rstest] + #[serial_test::serial] + #[timeout(SHORT_STANDALONE_TEST_TIMEOUT)] + fn test_read_from_replica_az_affinity_all_nodes() { + test_read_from_replica(ReadFromReplicaTestConfig { + read_from: ReadFrom::AZAffinity, + expected_primary_reads: 0, + expected_replica_reads: vec![1, 1, 1], + ..Default::default() + }); + } #[rstest] #[serial_test::serial] diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 025d0218bf..01ebdb95e7 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -501,7 +501,10 @@ export type ReadFrom = | "preferReplica" /** Spread the requests between replicas in the same client's Aviliablity zone in a round robin manner. If no replica is available, route the requests to the primary.*/ - | "AZAffinity"; + | "AZAffinity" + /** Spread the read requests among all nodes within the client's Availability Zone (AZ) in a round robin manner, + prioritizing local replicas, then the local primary, and falling back to any replica or the primary if needed.*/ + | "AZAffinityAllNodes"; /** * Configuration settings for creating a client. Shared settings for standalone and cluster clients. @@ -531,11 +534,11 @@ export type ReadFrom = * * ### Read Strategy * - * - Use `readFrom` to specify the client's read strategy (e.g., primary, preferReplica, AZAffinity). + * - Use `readFrom` to specify the client's read strategy (e.g., primary, preferReplica, AZAffinity, AZAffinityAllNodes). * * ### Availability Zone * - * - Use `clientAz` to specify the client's availability zone, which can influence read operations when using `readFrom: 'AZAffinity'`. + * - Use `clientAz` to specify the client's availability zone, which can influence read operations when using `readFrom: 'AZAffinity'or `readFrom: 'AZAffinityAllNodes'`. * * ### Decoder Settings * @@ -637,13 +640,15 @@ export interface BaseClientConfiguration { inflightRequestsLimit?: number; /** * Availability Zone of the client. - * If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits. + * If ReadFrom strategy is AZAffinity or AZAffinityAllNodes, this setting ensures that readonly commands are directed to nodes within the specified AZ if they exist. * * @example * ```typescript * // Example configuration for setting client availability zone and read strategy * configuration.clientAz = 'us-east-1a'; // Sets the client's availability zone * configuration.readFrom = 'AZAffinity'; // Directs read operations to nodes within the same AZ + * Or + * configuration.readFrom = 'AZAffinityAllNodes'; // Directs read operations to any node (primary or replica) within the same AZ * ``` */ clientAz?: string; @@ -6069,6 +6074,7 @@ export class BaseClient { primary: connection_request.ReadFrom.Primary, preferReplica: connection_request.ReadFrom.PreferReplica, AZAffinity: connection_request.ReadFrom.AZAffinity, + AZAffinityAllNodes: connection_request.ReadFrom.AZAffinityAllNodes, }; /**