Skip to content

Commit

Permalink
Core: Add AzAffinityAllNodes Read Strategy
Browse files Browse the repository at this point in the history
Signed-off-by: Muhammad Awawdi <[email protected]>
  • Loading branch information
Muhammad-awawdi-amazon committed Jan 21, 2025
1 parent 918162b commit a36c302
Show file tree
Hide file tree
Showing 14 changed files with 653 additions and 7 deletions.
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/aio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ where
async fn setup_connection<C>(
connection_info: &RedisConnectionInfo,
con: &mut C,
// This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity.
// This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity or AZAffinityAllNodes.
// An INFO command will be triggered in the connection's setup to update the 'availability_zone' property.
discover_az: bool,
) -> RedisResult<()>
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub struct GlideConnectionOptions {
#[cfg(feature = "aio")]
/// Passive disconnect notifier
pub disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
/// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'.
/// If ReadFromReplica strategy is set to AZAffinity or AZAffinityAllNodes, this parameter will be set to 'true'.
/// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property.
pub discover_az: bool,
/// Connection timeout duration.
Expand Down
225 changes: 225 additions & 0 deletions glide-core/redis-rs/redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,64 @@ where
}
}

/// Returns the node's connection in the same availability zone as `client_az`,
/// checking replicas first, then primary, and falling back to any available node.
pub(crate) fn round_robin_read_from_replica_with_az_awareness_all_nodes(
&self,
slot_map_value: &SlotMapValue,
client_az: String,
) -> Option<ConnectionAndAddress<Connection>> {
let addrs = &slot_map_value.addrs;
let initial_index = slot_map_value.last_used_replica.load(Ordering::Relaxed);
let mut retries = 0usize;

// Step 1: Try to find a replica in the same AZ
loop {
retries = retries.saturating_add(1);
// Looped through all replicas; no connected replica found in the same availability zone.
if retries > addrs.replicas().len() {
break;
}

// Calculate index based on initial index and check count.
let index = (initial_index + retries) % addrs.replicas().len();
let replica = &addrs.replicas()[index];

if let Some((address, connection_details)) =
self.connection_details_for_address(replica.as_str())
{
if self.az_for_address(&address) == Some(client_az.clone()) {
// Found a replica in the same AZ
let _ = slot_map_value.last_used_replica.compare_exchange_weak(
initial_index,
index,
Ordering::Relaxed,
Ordering::Relaxed,
);
return Some((address, connection_details.conn));
}
}
}

// Step 2: Check if primary is in the same AZ
if let Some((address, connection_details)) =
self.connection_details_for_address(addrs.primary().as_str())
{
if self.az_for_address(&address) == Some(client_az) {
return Some((address, connection_details.conn));
}
}

// Step 3: Fall back to any available replica using round-robin
if !addrs.replicas().is_empty() {
return self.round_robin_read_from_replica(slot_map_value);
}

// Step 4: Final fallback - use primary
self.connection_details_for_address(addrs.primary().as_str())
.map(|(address, connection_details)| (address, connection_details.conn))
}

fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
let addrs = &slot_map_value.addrs;
Expand All @@ -311,6 +369,11 @@ where
slot_map_value,
az.to_string(),
),
ReadFromReplicaStrategy::AZAffinityAllNodes(az) => self
.round_robin_read_from_replica_with_az_awareness_all_nodes(
slot_map_value,
az.to_string(),
),
},
// when the user strategy per command is replica_preffered
SlotAddr::ReplicaRequired => match &self.read_from_replica_strategy {
Expand All @@ -319,6 +382,11 @@ where
slot_map_value,
az.to_string(),
),
ReadFromReplicaStrategy::AZAffinityAllNodes(az) => self
.round_robin_read_from_replica_with_az_awareness_all_nodes(
slot_map_value,
az.to_string(),
),
_ => self.round_robin_read_from_replica(slot_map_value),
},
}
Expand Down Expand Up @@ -888,6 +956,163 @@ mod tests {
assert_eq!(addresses, vec![31, 31, 33, 33]);
}

// Helper function to create a container with AZAffinityAllNodes strategy
fn create_container_with_az_affinity_all_nodes_strategy(
use_management_connections: bool,
) -> ConnectionsContainer<usize> {
let slot_map = SlotMap::new(
vec![
Slot::new(1, 1000, "primary1".to_owned(), Vec::new()),
Slot::new(
1002,
2000,
"primary2".to_owned(),
vec!["replica2-1".to_owned()],
),
Slot::new(
2001,
3000,
"primary3".to_owned(),
vec![
"replica3-1".to_owned(),
"replica3-2".to_owned(),
"replica3-3".to_owned(),
],
),
],
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let connection_map = DashMap::new();
connection_map.insert(
"primary1".into(),
create_cluster_node(1, use_management_connections, Some("use-1b".to_string())),
);
connection_map.insert(
"primary2".into(),
create_cluster_node(2, use_management_connections, Some("use-1c".to_string())),
);
connection_map.insert(
"primary3".into(),
create_cluster_node(3, use_management_connections, Some("use-1b".to_string())),
);
connection_map.insert(
"replica2-1".into(),
create_cluster_node(21, use_management_connections, Some("use-1c".to_string())),
);
connection_map.insert(
"replica3-1".into(),
create_cluster_node(31, use_management_connections, Some("use-1a".to_string())),
);
connection_map.insert(
"replica3-2".into(),
create_cluster_node(32, use_management_connections, Some("use-1b".to_string())),
);
connection_map.insert(
"replica3-3".into(),
create_cluster_node(33, use_management_connections, Some("use-1a".to_string())),
);

ConnectionsContainer {
slot_map,
connection_map,
read_from_replica_strategy: ReadFromReplicaStrategy::AZAffinityAllNodes(
"use-1a".to_string(),
),
topology_hash: 0,
}
}

#[test]
fn get_connection_for_az_affinity_all_nodes_route() {
// Create a container with AZAffinityAllNodes strategy
let container = create_container_with_az_affinity_all_nodes_strategy(false);

// Slot number does not exist (slot 1001 wasn't assigned to any primary)
assert!(container
.connection_for_route(&Route::new(1001, SlotAddr::ReplicaOptional))
.is_none());

// Test getting replica in client's AZ for slot 2001
assert!(one_of(
container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)),
&[31, 33],
));

// Remove one replica in the client's AZ
remove_nodes(&container, &["replica3-3"]);

// Should still get the remaining replica in the client's AZ
assert_eq!(
31,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired))
.unwrap()
.1
);

// Remove all replicas in the client's AZ
remove_nodes(&container, &["replica3-1"]);

// Test falling back to replica in different AZ
assert_eq!(
32,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired))
.unwrap()
.1
);

// Set the primary to be in the client's AZ
container
.connection_map
.get_mut("primary3")
.unwrap()
.user_connection
.az = Some("use-1a".to_string());

// Remove the last replica
remove_nodes(&container, &["replica3-2"]);

// Should now fall back to the primary in the client's AZ
assert_eq!(
3,
container
.connection_for_route(&Route::new(2001, SlotAddr::Master))
.unwrap()
.1
);

// Move the primary out of the client's AZ
container
.connection_map
.get_mut("primary3")
.unwrap()
.user_connection
.az = Some("use-1b".to_string());

// Test falling back to replica under different primary
assert_eq!(
21,
container
.connection_for_route(&Route::new(1002, SlotAddr::ReplicaRequired))
.unwrap()
.1
);

// Remove all replicas
remove_nodes(&container, &["replica2-1"]);

// Test falling back to available primaries with their respective slots
assert!(one_of(
container.connection_for_route(&Route::new(1002, SlotAddr::Master)),
&[2],
));
assert!(one_of(
container.connection_for_route(&Route::new(500, SlotAddr::Master)),
&[1],
));
}

#[test]
fn get_connection_by_address() {
let container = create_container();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ where
let discover_az = matches!(
params.read_from_replicas,
crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity(_)
| crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityAllNodes(_)
);

match create_connection::<C>(
Expand Down
1 change: 1 addition & 0 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,7 @@ where
let discover_az = matches!(
cluster_params.read_from_replicas,
crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity(_)
| crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityAllNodes(_)
);

let glide_connection_options = GlideConnectionOptions {
Expand Down
2 changes: 2 additions & 0 deletions glide-core/redis-rs/redis/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,8 @@ impl ClusterClientBuilder {
/// The parameter `read_strategy` can be one of:
/// `ReadFromReplicaStrategy::AZAffinity(availability_zone)` - attempt to access replicas in the same availability zone.
/// If no suitable replica is found (i.e. no replica could be found in the requested availability zone), choose any replica. Falling back to primary if needed.
/// `ReadFromReplicaStrategy::AZAffinityAllNodes(availability_zone)` - attempt to access nodes in the same availability zone.
/// prioritizing local replicas, then the local primary, and falling back to any replica or the primary if needed.
/// `ReadFromReplicaStrategy::RoundRobin` - reads are distributed across replicas for load balancing using round-robin algorithm. Falling back to primary if needed.
/// `ReadFromReplicaStrategy::AlwaysFromPrimary` ensures all read and write queries are directed to the primary node.
///
Expand Down
4 changes: 4 additions & 0 deletions glide-core/redis-rs/redis/src/cluster_slotmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub enum ReadFromReplicaStrategy {
/// Spread the read requests between replicas in the same client's Aviliablity zone in a round robin manner,
/// falling back to other replicas or the primary if needed.
AZAffinity(String),
/// Spread the read requests among nodes within the client's Availability Zone (AZ) in a round robin manner,
/// prioritizing local replicas, then the local primary, and falling back to any replica or the primary if needed.
AZAffinityAllNodes(String),
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -60,6 +63,7 @@ fn get_address_from_slot(
addrs.replicas()[index].clone()
}
ReadFromReplicaStrategy::AZAffinity(_az) => todo!(), // Drop sync client
ReadFromReplicaStrategy::AZAffinityAllNodes(_az) => todo!(), // Drop sync client
}
}

Expand Down
Loading

0 comments on commit a36c302

Please sign in to comment.