Skip to content

Commit

Permalink
Release 1.2 uncoverd slots (#2869)
Browse files Browse the repository at this point in the history
* Add allow_non_covered_slots option to cluster scan across Node, Python, and Java implementations

Signed-off-by: avifenesh <[email protected]>

* Merge python requirement files. (#2597)

* Merge python requirement files.

---------

Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: avifenesh <[email protected]>

---------

Signed-off-by: avifenesh <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Co-authored-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
avifenesh and Yury-Fridlyand authored Dec 25, 2024
1 parent 4de4847 commit 8bfa0d8
Show file tree
Hide file tree
Showing 30 changed files with 1,683 additions and 758 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ jobs:
working-directory: ./python
run: |
source .env/bin/activate
pip install -r dev_requirements.txt
pip install -r requirements.txt
cd python/tests/
pytest --asyncio-mode=auto --html=pytest_report.html --self-contained-html
Expand Down Expand Up @@ -178,7 +178,7 @@ jobs:
working-directory: ./python
run: |
source .env/bin/activate
pip install -r dev_requirements.txt
pip install -r requirements.txt
cd python/tests/
pytest --asyncio-mode=auto -k test_pubsub --html=pytest_report.html --self-contained-html
Expand Down
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#### Changes

* Node, Python, Java: Add allow uncovered slots scanning flag option in cluster scan ([#2814](https://github.com/valkey-io/valkey-glide/pull/2814), [#2815](https://github.com/valkey-io/valkey-glide/pull/2815), [#2860](https://github.com/valkey-io/valkey-glide/pull/2860))
* Java: Bump protobuf (protoc) version ([#2561](https://github.com/valkey-io/valkey-glide/pull/2561), [#2802](https://github.com/valkey-io/valkey-glide/pull/2802)
* Java: bump `netty` version ([#2777](https://github.com/valkey-io/valkey-glide/pull/2777))
* Node: Remove native package references for MacOs x64 architecture ([#2799](https://github.com/valkey-io/valkey-glide/issues/2799))
Expand Down Expand Up @@ -535,4 +535,3 @@
Preview release of **GLIDE for Redis** a Polyglot Redis client.

See the [README](README.md) for additional information.

1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ python-test: .build/python_deps check-redis-server
@cd python && python3 -m venv .env
@echo "$(GREEN)Installing requirements...$(RESET)"
@cd python && .env/bin/pip install -r requirements.txt
@cd python && .env/bin/pip install -r dev_requirements.txt
@mkdir -p .build/ && touch .build/python_deps

##
Expand Down
8 changes: 8 additions & 0 deletions eslint.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import eslint from "@eslint/js";
import prettierConfig from "eslint-config-prettier";
import tseslint from "typescript-eslint";
import jsdoc from "eslint-plugin-jsdoc";

export default tseslint.config(
eslint.configs.recommended,
Expand Down Expand Up @@ -54,6 +55,13 @@ export default tseslint.config(
next: "*",
},
],
"@typescript-eslint/indent": ["error", 4, {
"SwitchCase": 1,
"ObjectExpression": 1,
"FunctionDeclaration": {"parameters": "first"},
"FunctionExpression": {"parameters": "first"},
"ignoredNodes": ["TSTypeParameterInstantiation"]
}],
},
},
prettierConfig,
Expand Down
150 changes: 37 additions & 113 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@ pub mod testing {
use crate::{
client::GlideConnectionOptions,
cluster_routing::{Routable, RoutingInfo, ShardUpdateResult},
cluster_slotmap::SlotMap,
cluster_topology::{
calculate_topology, get_slot, SlotRefreshState, DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES,
DEFAULT_REFRESH_SLOTS_RETRY_BASE_DURATION_MILLIS, DEFAULT_REFRESH_SLOTS_RETRY_BASE_FACTOR,
SLOT_SIZE,
},
cmd,
commands::cluster_scan::{cluster_scan, ClusterScanArgs, ObjectType, ScanStateRC},
FromRedisValue, InfoDict, ToRedisArgs,
commands::cluster_scan::{cluster_scan, ClusterScanArgs, ScanStateRC},
FromRedisValue, InfoDict,
};
use dashmap::DashMap;
use std::{
Expand Down Expand Up @@ -111,7 +109,7 @@ use crate::types::RetryMethod;

pub(crate) const MUTEX_READ_ERR: &str = "Failed to obtain read lock. Poisoned mutex?";
const MUTEX_WRITE_ERR: &str = "Failed to obtain write lock. Poisoned mutex?";
/// This represents an async Redis Cluster connection. It stores the
/// This represents an async Cluster connection. It stores the
/// underlying connections maintained for each node in the cluster, as well
/// as common parameters for connecting to nodes and executing commands.
#[derive(Clone)]
Expand Down Expand Up @@ -142,79 +140,18 @@ where
})
}

/// Special handling for `SCAN` command, using `cluster_scan`.
/// If you wish to use a match pattern, use [`cluster_scan_with_pattern`].
/// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology
/// and make sure that all keys that were in the cluster from start to end of the scan are scanned.
/// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance.
///
/// # Arguments
///
/// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`],
/// for each subsequent iteration use the returned [`ScanStateRC`].
/// * `count` - An optional count of keys requested,
/// the amount returned can vary and not obligated to return exactly count.
/// * `object_type` - An optional [`ObjectType`] enum of requested key redis type.
///
/// # Returns
///
/// A [`ScanStateRC`] for the updated state of the scan and the vector of keys that were found in the scan.
/// structure of returned value:
/// `Ok((ScanStateRC, Vec<Value>))`
///
/// When the scan is finished [`ScanStateRC`] will be None, and can be checked by calling `scan_state_wrapper.is_finished()`.
///
/// # Example
/// ```rust,no_run
/// use redis::cluster::ClusterClient;
/// use redis::{ScanStateRC, FromRedisValue, from_redis_value, Value, ObjectType};
///
/// async fn scan_all_cluster() -> Vec<String> {
/// let nodes = vec!["redis://127.0.0.1/"];
/// let client = ClusterClient::new(nodes).unwrap();
/// let mut connection = client.get_async_connection(None).await.unwrap();
/// let mut scan_state_rc = ScanStateRC::new();
/// let mut keys: Vec<String> = vec![];
/// loop {
/// let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) =
/// connection.cluster_scan(scan_state_rc, None, None).await.unwrap();
/// scan_state_rc = next_cursor;
/// let mut scan_keys = scan_keys
/// .into_iter()
/// .map(|v| from_redis_value(&v).unwrap())
/// .collect::<Vec<String>>(); // Change the type of `keys` to `Vec<String>`
/// keys.append(&mut scan_keys);
/// if scan_state_rc.is_finished() {
/// break;
/// }
/// }
/// keys
/// }
/// ```
pub async fn cluster_scan(
&mut self,
scan_state_rc: ScanStateRC,
count: Option<usize>,
object_type: Option<ObjectType>,
) -> RedisResult<(ScanStateRC, Vec<Value>)> {
let cluster_scan_args = ClusterScanArgs::new(scan_state_rc, None, count, object_type);
self.route_cluster_scan(cluster_scan_args).await
}

/// Special handling for `SCAN` command, using `cluster_scan_with_pattern`.
/// It is a special case of [`cluster_scan`], with an additional match pattern.
/// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology
/// Perform a `SCAN` command on a cluster, using scan state object in order to handle changes in topology
/// and make sure that all keys that were in the cluster from start to end of the scan are scanned.
/// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance.
///
/// # Arguments
///
/// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`],
/// for each subsequent iteration use the returned [`ScanStateRC`].
/// * `match_pattern` - A match pattern of requested keys.
/// * `count` - An optional count of keys requested,
/// the amount returned can vary and not obligated to return exactly count.
/// * `object_type` - An optional [`ObjectType`] enum of requested key redis type.
/// * `cluster_scan_args` - A [`ClusterScanArgs`] struct containing the arguments for the cluster scan command - match pattern, count,
/// object type and the allow_non_covered_slots flag.
///
/// # Returns
///
Expand All @@ -227,17 +164,18 @@ where
/// # Example
/// ```rust,no_run
/// use redis::cluster::ClusterClient;
/// use redis::{ScanStateRC, FromRedisValue, from_redis_value, Value, ObjectType};
/// use redis::{ScanStateRC, from_redis_value, Value, ObjectType, ClusterScanArgs};
///
/// async fn scan_all_cluster() -> Vec<String> {
/// let nodes = vec!["redis://127.0.0.1/"];
/// let client = ClusterClient::new(nodes).unwrap();
/// let mut connection = client.get_async_connection(None).await.unwrap();
/// let mut scan_state_rc = ScanStateRC::new();
/// let mut keys: Vec<String> = vec![];
/// let cluster_scan_args = ClusterScanArgs::builder().with_count(1000).with_object_type(ObjectType::String).build();
/// loop {
/// let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) =
/// connection.cluster_scan_with_pattern(scan_state_rc, b"my_key", None, None).await.unwrap();
/// connection.cluster_scan(scan_state_rc, cluster_scan_args.clone()).await.unwrap();
/// scan_state_rc = next_cursor;
/// let mut scan_keys = scan_keys
/// .into_iter()
Expand All @@ -251,19 +189,12 @@ where
/// keys
/// }
/// ```
pub async fn cluster_scan_with_pattern<K: ToRedisArgs>(
pub async fn cluster_scan(
&mut self,
scan_state_rc: ScanStateRC,
match_pattern: K,
count: Option<usize>,
object_type: Option<ObjectType>,
mut cluster_scan_args: ClusterScanArgs,
) -> RedisResult<(ScanStateRC, Vec<Value>)> {
let cluster_scan_args = ClusterScanArgs::new(
scan_state_rc,
Some(match_pattern.to_redis_args().concat()),
count,
object_type,
);
cluster_scan_args.set_scan_state_cursor(scan_state_rc);
self.route_cluster_scan(cluster_scan_args).await
}

Expand All @@ -279,18 +210,18 @@ where
sender,
})
.await
.map_err(|_| {
.map_err(|e| {
RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to send command",
format!("Cluster: Error occurred while trying to send SCAN command to internal send task. {e:?}"),
))
})?;
receiver
.await
.unwrap_or_else(|_| {
.unwrap_or_else(|e| {
Err(RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to receive command",
format!("Cluster: Failed to receive SCAN command response from internal send task. {e:?}"),
)))
})
.map(|response| match response {
Expand All @@ -316,18 +247,20 @@ where
sender,
})
.await
.map_err(|_| {
.map_err(|e| {
RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to send command",
format!("Cluster: Error occurred while trying to send command to internal sender. {e:?}"),
))
})?;
receiver
.await
.unwrap_or_else(|_| {
.unwrap_or_else(|e| {
Err(RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to receive command",
format!(
"Cluster: Failed to receive command response from internal sender. {e:?}"
),
)))
})
.map(|response| match response {
Expand Down Expand Up @@ -489,21 +422,8 @@ where
.map_err(|_| RedisError::from((ErrorKind::ClientError, MUTEX_WRITE_ERR)))
}

// return address of node for slot
pub(crate) async fn get_address_from_slot(
&self,
slot: u16,
slot_addr: SlotAddr,
) -> Option<Arc<String>> {
self.conn_lock
.read()
.expect(MUTEX_READ_ERR)
.slot_map
.get_node_address_for_slot(slot, slot_addr)
}

// return epoch of node
pub(crate) async fn get_address_epoch(&self, node_address: &str) -> Result<u64, RedisError> {
pub(crate) async fn address_epoch(&self, node_address: &str) -> Result<u64, RedisError> {
let command = cmd("CLUSTER").arg("INFO").to_owned();
let node_conn = self
.conn_lock
Expand Down Expand Up @@ -541,14 +461,26 @@ where
}
}

// return slots of node
pub(crate) async fn get_slots_of_address(&self, node_address: Arc<String>) -> Vec<u16> {
/// return slots of node
pub(crate) async fn slots_of_address(&self, node_address: Arc<String>) -> Vec<u16> {
self.conn_lock
.read()
.expect(MUTEX_READ_ERR)
.slot_map
.get_slots_of_node(node_address)
}

/// Get connection for address
pub(crate) async fn connection_for_address(
&self,
address: &str,
) -> Option<ConnectionFuture<C>> {
self.conn_lock
.read()
.expect(MUTEX_READ_ERR)
.connection_for_address(address)
.map(|(_, conn)| conn)
}
}

pub(crate) struct ClusterConnInner<C> {
Expand Down Expand Up @@ -1884,14 +1816,6 @@ where
Self::refresh_slots_inner(inner, curr_retry).await
}

pub(crate) fn check_if_all_slots_covered(slot_map: &SlotMap) -> bool {
let mut slots_covered = 0;
for (end, slots) in slot_map.slots.iter() {
slots_covered += end.saturating_sub(slots.start).saturating_add(1);
}
slots_covered == SLOT_SIZE
}

// Query a node to discover slot-> master mappings
async fn refresh_slots_inner(inner: Arc<InnerCore<C>>, curr_retry: usize) -> RedisResult<()> {
let num_of_nodes = inner.conn_lock.read().expect(MUTEX_READ_ERR).len();
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/cluster_slotmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl SlotMap {
.collect()
}

pub(crate) fn get_node_address_for_slot(
pub(crate) fn node_address_for_slot(
&self,
slot: u16,
slot_addr: SlotAddr,
Expand Down
Loading

0 comments on commit 8bfa0d8

Please sign in to comment.