Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Add AzAffinityAllNodes Read Strategy #2986

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/aio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ where
async fn setup_connection<C>(
connection_info: &RedisConnectionInfo,
con: &mut C,
// This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity.
// This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity or AZAffinityAllNodes.
// An INFO command will be triggered in the connection's setup to update the 'availability_zone' property.
discover_az: bool,
) -> RedisResult<()>
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub struct GlideConnectionOptions {
#[cfg(feature = "aio")]
/// Passive disconnect notifier
pub disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
/// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'.
/// If ReadFromReplica strategy is set to AZAffinity or AZAffinityAllNodes, this parameter will be set to 'true'.
/// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property.
pub discover_az: bool,
/// Connection timeout duration.
Expand Down
225 changes: 225 additions & 0 deletions glide-core/redis-rs/redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,64 @@ where
}
}

/// Returns the node's connection in the same availability zone as `client_az`,
/// checking replicas first, then primary, and falling back to any available node.
pub(crate) fn round_robin_read_from_replica_with_az_awareness_all_nodes(
&self,
slot_map_value: &SlotMapValue,
client_az: String,
) -> Option<ConnectionAndAddress<Connection>> {
let addrs = &slot_map_value.addrs;
let initial_index = slot_map_value.last_used_replica.load(Ordering::Relaxed);
let mut retries = 0usize;

// Step 1: Try to find a replica in the same AZ
loop {
retries = retries.saturating_add(1);
// Looped through all replicas; no connected replica found in the same availability zone.
if retries > addrs.replicas().len() {
break;
}

// Calculate index based on initial index and check count.
let index = (initial_index + retries) % addrs.replicas().len();
let replica = &addrs.replicas()[index];

if let Some((address, connection_details)) =
self.connection_details_for_address(replica.as_str())
{
if self.az_for_address(&address) == Some(client_az.clone()) {
// Found a replica in the same AZ
let _ = slot_map_value.last_used_replica.compare_exchange_weak(
initial_index,
index,
Ordering::Relaxed,
Ordering::Relaxed,
);
return Some((address, connection_details.conn));
}
}
}

// Step 2: Check if primary is in the same AZ
if let Some((address, connection_details)) =
self.connection_details_for_address(addrs.primary().as_str())
{
if self.az_for_address(&address) == Some(client_az) {
return Some((address, connection_details.conn));
}
}

// Step 3: Fall back to any available replica using round-robin
if !addrs.replicas().is_empty() {
return self.round_robin_read_from_replica(slot_map_value);
}

// Step 4: Final fallback - use primary
self.connection_details_for_address(addrs.primary().as_str())
.map(|(address, connection_details)| (address, connection_details.conn))
}

fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
let addrs = &slot_map_value.addrs;
Expand All @@ -311,6 +369,11 @@ where
slot_map_value,
az.to_string(),
),
ReadFromReplicaStrategy::AZAffinityAllNodes(az) => self
.round_robin_read_from_replica_with_az_awareness_all_nodes(
slot_map_value,
az.to_string(),
),
},
// when the user strategy per command is replica_preffered
SlotAddr::ReplicaRequired => match &self.read_from_replica_strategy {
Expand All @@ -319,6 +382,11 @@ where
slot_map_value,
az.to_string(),
),
ReadFromReplicaStrategy::AZAffinityAllNodes(az) => self
.round_robin_read_from_replica_with_az_awareness_all_nodes(
slot_map_value,
az.to_string(),
),
_ => self.round_robin_read_from_replica(slot_map_value),
},
}
Expand Down Expand Up @@ -888,6 +956,163 @@ mod tests {
assert_eq!(addresses, vec![31, 31, 33, 33]);
}

// Helper function to create a container with AZAffinityAllNodes strategy
fn create_container_with_az_affinity_all_nodes_strategy(
use_management_connections: bool,
) -> ConnectionsContainer<usize> {
let slot_map = SlotMap::new(
vec![
Slot::new(1, 1000, "primary1".to_owned(), Vec::new()),
Slot::new(
1002,
2000,
"primary2".to_owned(),
vec!["replica2-1".to_owned()],
),
Slot::new(
2001,
3000,
"primary3".to_owned(),
vec![
"replica3-1".to_owned(),
"replica3-2".to_owned(),
"replica3-3".to_owned(),
],
),
],
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let connection_map = DashMap::new();
connection_map.insert(
"primary1".into(),
create_cluster_node(1, use_management_connections, Some("use-1b".to_string())),
);
connection_map.insert(
"primary2".into(),
create_cluster_node(2, use_management_connections, Some("use-1c".to_string())),
);
connection_map.insert(
"primary3".into(),
create_cluster_node(3, use_management_connections, Some("use-1b".to_string())),
);
connection_map.insert(
"replica2-1".into(),
create_cluster_node(21, use_management_connections, Some("use-1c".to_string())),
);
connection_map.insert(
"replica3-1".into(),
create_cluster_node(31, use_management_connections, Some("use-1a".to_string())),
);
connection_map.insert(
"replica3-2".into(),
create_cluster_node(32, use_management_connections, Some("use-1b".to_string())),
);
connection_map.insert(
"replica3-3".into(),
create_cluster_node(33, use_management_connections, Some("use-1a".to_string())),
);

ConnectionsContainer {
slot_map,
connection_map,
read_from_replica_strategy: ReadFromReplicaStrategy::AZAffinityAllNodes(
"use-1a".to_string(),
),
topology_hash: 0,
}
}

#[test]
fn get_connection_for_az_affinity_all_nodes_route() {
// Create a container with AZAffinityAllNodes strategy
let container = create_container_with_az_affinity_all_nodes_strategy(false);

// Slot number does not exist (slot 1001 wasn't assigned to any primary)
assert!(container
.connection_for_route(&Route::new(1001, SlotAddr::ReplicaOptional))
.is_none());

// Test getting replica in client's AZ for slot 2001
assert!(one_of(
container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)),
&[31, 33],
));

// Remove one replica in the client's AZ
remove_nodes(&container, &["replica3-3"]);

// Should still get the remaining replica in the client's AZ
assert_eq!(
31,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired))
.unwrap()
.1
);

// Remove all replicas in the client's AZ
remove_nodes(&container, &["replica3-1"]);

// Test falling back to replica in different AZ
assert_eq!(
32,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired))
.unwrap()
.1
);

// Set the primary to be in the client's AZ
container
.connection_map
.get_mut("primary3")
.unwrap()
.user_connection
.az = Some("use-1a".to_string());

// Remove the last replica
remove_nodes(&container, &["replica3-2"]);

// Should now fall back to the primary in the client's AZ
assert_eq!(
3,
container
.connection_for_route(&Route::new(2001, SlotAddr::Master))
.unwrap()
.1
);

// Move the primary out of the client's AZ
container
.connection_map
.get_mut("primary3")
.unwrap()
.user_connection
.az = Some("use-1b".to_string());

// Test falling back to replica under different primary
assert_eq!(
21,
container
.connection_for_route(&Route::new(1002, SlotAddr::ReplicaRequired))
.unwrap()
.1
);

// Remove all replicas
remove_nodes(&container, &["replica2-1"]);

// Test falling back to available primaries with their respective slots
assert!(one_of(
container.connection_for_route(&Route::new(1002, SlotAddr::Master)),
&[2],
));
assert!(one_of(
container.connection_for_route(&Route::new(500, SlotAddr::Master)),
&[1],
));
}

#[test]
fn get_connection_by_address() {
let container = create_container();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ where
let discover_az = matches!(
params.read_from_replicas,
crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity(_)
| crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityAllNodes(_)
);

match create_connection::<C>(
Expand Down
1 change: 1 addition & 0 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,7 @@ where
let discover_az = matches!(
cluster_params.read_from_replicas,
crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity(_)
| crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityAllNodes(_)
);

let glide_connection_options = GlideConnectionOptions {
Expand Down
2 changes: 2 additions & 0 deletions glide-core/redis-rs/redis/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,8 @@ impl ClusterClientBuilder {
/// The parameter `read_strategy` can be one of:
/// `ReadFromReplicaStrategy::AZAffinity(availability_zone)` - attempt to access replicas in the same availability zone.
/// If no suitable replica is found (i.e. no replica could be found in the requested availability zone), choose any replica. Falling back to primary if needed.
/// `ReadFromReplicaStrategy::AZAffinityAllNodes(availability_zone)` - attempt to access nodes in the same availability zone.
/// prioritizing local replicas, then the local primary, and falling back to any replica or the primary if needed.
/// `ReadFromReplicaStrategy::RoundRobin` - reads are distributed across replicas for load balancing using round-robin algorithm. Falling back to primary if needed.
/// `ReadFromReplicaStrategy::AlwaysFromPrimary` ensures all read and write queries are directed to the primary node.
///
Expand Down
4 changes: 4 additions & 0 deletions glide-core/redis-rs/redis/src/cluster_slotmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub enum ReadFromReplicaStrategy {
/// Spread the read requests between replicas in the same client's Aviliablity zone in a round robin manner,
/// falling back to other replicas or the primary if needed.
AZAffinity(String),
/// Spread the read requests among nodes within the client's Availability Zone (AZ) in a round robin manner,
/// prioritizing local replicas, then the local primary, and falling back to any replica or the primary if needed.
AZAffinityAllNodes(String),
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -60,6 +63,7 @@ fn get_address_from_slot(
addrs.replicas()[index].clone()
}
ReadFromReplicaStrategy::AZAffinity(_az) => todo!(), // Drop sync client
ReadFromReplicaStrategy::AZAffinityAllNodes(_az) => todo!(), // Drop sync client
}
}

Expand Down
Loading
Loading