From 8bfa0d87b3ab4a4aec58dc34b08af6f6cab32801 Mon Sep 17 00:00:00 2001 From: Avi Fenesh <55848801+avifenesh@users.noreply.github.com> Date: Wed, 25 Dec 2024 19:01:31 +0200 Subject: [PATCH] Release 1.2 uncoverd slots (#2869) * Add allow_non_covered_slots option to cluster scan across Node, Python, and Java implementations Signed-off-by: avifenesh * Merge python requirement files. (#2597) * Merge python requirement files. --------- Signed-off-by: Yury-Fridlyand Signed-off-by: avifenesh --------- Signed-off-by: avifenesh Signed-off-by: Yury-Fridlyand Co-authored-by: Yury-Fridlyand --- .github/workflows/python.yml | 4 +- CHANGELOG.md | 3 +- Makefile | 1 - eslint.config.mjs | 8 + .../redis-rs/redis/src/cluster_async/mod.rs | 150 +-- .../redis-rs/redis/src/cluster_slotmap.rs | 2 +- .../redis/src/commands/cluster_scan.rs | 1095 +++++++++-------- glide-core/redis-rs/redis/src/commands/mod.rs | 3 + glide-core/redis-rs/redis/src/lib.rs | 3 + .../redis-rs/redis/tests/test_cluster_scan.rs | 549 ++++++++- glide-core/src/client/mod.rs | 28 +- glide-core/src/protobuf/command_request.proto | 1 + glide-core/src/socket_listener.rs | 39 +- .../api/models/commands/scan/ScanOptions.java | 15 + .../java/glide/managers/CommandManager.java | 4 + node/src/Commands.ts | 13 + node/src/GlideClusterClient.ts | 17 +- node/tests/ScanTest.test.ts | 162 +++ node/tests/TestUtilities.ts | 45 + package.json | 12 +- python/DEVELOPER.md | 3 +- python/dev_requirements.txt | 7 - .../glide/async_commands/cluster_commands.py | 89 +- python/python/glide/async_commands/core.py | 1 + python/python/glide/glide_client.py | 2 + python/python/tests/conftest.py | 8 +- python/python/tests/test_scan.py | 157 ++- python/python/tests/test_transaction.py | 5 +- python/requirements.txt | 7 +- utils/TestUtils.ts | 8 +- 30 files changed, 1683 insertions(+), 758 deletions(-) delete mode 100644 python/dev_requirements.txt diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 699033cf1a..11df78697a 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -115,7 +115,7 @@ jobs: working-directory: ./python run: | source .env/bin/activate - pip install -r dev_requirements.txt + pip install -r requirements.txt cd python/tests/ pytest --asyncio-mode=auto --html=pytest_report.html --self-contained-html @@ -178,7 +178,7 @@ jobs: working-directory: ./python run: | source .env/bin/activate - pip install -r dev_requirements.txt + pip install -r requirements.txt cd python/tests/ pytest --asyncio-mode=auto -k test_pubsub --html=pytest_report.html --self-contained-html diff --git a/CHANGELOG.md b/CHANGELOG.md index f820f45e05..a1bc2cd15f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ #### Changes - +* Node, Python, Java: Add allow uncovered slots scanning flag option in cluster scan ([#2814](https://github.com/valkey-io/valkey-glide/pull/2814), [#2815](https://github.com/valkey-io/valkey-glide/pull/2815), [#2860](https://github.com/valkey-io/valkey-glide/pull/2860)) * Java: Bump protobuf (protoc) version ([#2561](https://github.com/valkey-io/valkey-glide/pull/2561), [#2802](https://github.com/valkey-io/valkey-glide/pull/2802) * Java: bump `netty` version ([#2777](https://github.com/valkey-io/valkey-glide/pull/2777)) * Node: Remove native package references for MacOs x64 architecture ([#2799](https://github.com/valkey-io/valkey-glide/issues/2799)) @@ -535,4 +535,3 @@ Preview release of **GLIDE for Redis** a Polyglot Redis client. See the [README](README.md) for additional information. - diff --git a/Makefile b/Makefile index 877d78f6e9..d4088fb898 100644 --- a/Makefile +++ b/Makefile @@ -58,7 +58,6 @@ python-test: .build/python_deps check-redis-server @cd python && python3 -m venv .env @echo "$(GREEN)Installing requirements...$(RESET)" @cd python && .env/bin/pip install -r requirements.txt - @cd python && .env/bin/pip install -r dev_requirements.txt @mkdir -p .build/ && touch .build/python_deps ## diff --git a/eslint.config.mjs b/eslint.config.mjs index 21995480f4..a96d4fdecd 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -2,6 +2,7 @@ import eslint from "@eslint/js"; import prettierConfig from "eslint-config-prettier"; import tseslint from "typescript-eslint"; +import jsdoc from "eslint-plugin-jsdoc"; export default tseslint.config( eslint.configs.recommended, @@ -54,6 +55,13 @@ export default tseslint.config( next: "*", }, ], + "@typescript-eslint/indent": ["error", 4, { + "SwitchCase": 1, + "ObjectExpression": 1, + "FunctionDeclaration": {"parameters": "first"}, + "FunctionExpression": {"parameters": "first"}, + "ignoredNodes": ["TSTypeParameterInstantiation"] + }], }, }, prettierConfig, diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 3726d7a674..8164d09413 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -32,15 +32,13 @@ pub mod testing { use crate::{ client::GlideConnectionOptions, cluster_routing::{Routable, RoutingInfo, ShardUpdateResult}, - cluster_slotmap::SlotMap, cluster_topology::{ calculate_topology, get_slot, SlotRefreshState, DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES, DEFAULT_REFRESH_SLOTS_RETRY_BASE_DURATION_MILLIS, DEFAULT_REFRESH_SLOTS_RETRY_BASE_FACTOR, - SLOT_SIZE, }, cmd, - commands::cluster_scan::{cluster_scan, ClusterScanArgs, ObjectType, ScanStateRC}, - FromRedisValue, InfoDict, ToRedisArgs, + commands::cluster_scan::{cluster_scan, ClusterScanArgs, ScanStateRC}, + FromRedisValue, InfoDict, }; use dashmap::DashMap; use std::{ @@ -111,7 +109,7 @@ use crate::types::RetryMethod; pub(crate) const MUTEX_READ_ERR: &str = "Failed to obtain read lock. Poisoned mutex?"; const MUTEX_WRITE_ERR: &str = "Failed to obtain write lock. Poisoned mutex?"; -/// This represents an async Redis Cluster connection. It stores the +/// This represents an async Cluster connection. It stores the /// underlying connections maintained for each node in the cluster, as well /// as common parameters for connecting to nodes and executing commands. #[derive(Clone)] @@ -142,68 +140,9 @@ where }) } - /// Special handling for `SCAN` command, using `cluster_scan`. - /// If you wish to use a match pattern, use [`cluster_scan_with_pattern`]. - /// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology - /// and make sure that all keys that were in the cluster from start to end of the scan are scanned. - /// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance. - /// - /// # Arguments - /// - /// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`], - /// for each subsequent iteration use the returned [`ScanStateRC`]. - /// * `count` - An optional count of keys requested, - /// the amount returned can vary and not obligated to return exactly count. - /// * `object_type` - An optional [`ObjectType`] enum of requested key redis type. - /// - /// # Returns - /// - /// A [`ScanStateRC`] for the updated state of the scan and the vector of keys that were found in the scan. - /// structure of returned value: - /// `Ok((ScanStateRC, Vec))` - /// - /// When the scan is finished [`ScanStateRC`] will be None, and can be checked by calling `scan_state_wrapper.is_finished()`. - /// - /// # Example - /// ```rust,no_run - /// use redis::cluster::ClusterClient; - /// use redis::{ScanStateRC, FromRedisValue, from_redis_value, Value, ObjectType}; - /// - /// async fn scan_all_cluster() -> Vec { - /// let nodes = vec!["redis://127.0.0.1/"]; - /// let client = ClusterClient::new(nodes).unwrap(); - /// let mut connection = client.get_async_connection(None).await.unwrap(); - /// let mut scan_state_rc = ScanStateRC::new(); - /// let mut keys: Vec = vec![]; - /// loop { - /// let (next_cursor, scan_keys): (ScanStateRC, Vec) = - /// connection.cluster_scan(scan_state_rc, None, None).await.unwrap(); - /// scan_state_rc = next_cursor; - /// let mut scan_keys = scan_keys - /// .into_iter() - /// .map(|v| from_redis_value(&v).unwrap()) - /// .collect::>(); // Change the type of `keys` to `Vec` - /// keys.append(&mut scan_keys); - /// if scan_state_rc.is_finished() { - /// break; - /// } - /// } - /// keys - /// } - /// ``` - pub async fn cluster_scan( - &mut self, - scan_state_rc: ScanStateRC, - count: Option, - object_type: Option, - ) -> RedisResult<(ScanStateRC, Vec)> { - let cluster_scan_args = ClusterScanArgs::new(scan_state_rc, None, count, object_type); - self.route_cluster_scan(cluster_scan_args).await - } - /// Special handling for `SCAN` command, using `cluster_scan_with_pattern`. /// It is a special case of [`cluster_scan`], with an additional match pattern. - /// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology + /// Perform a `SCAN` command on a cluster, using scan state object in order to handle changes in topology /// and make sure that all keys that were in the cluster from start to end of the scan are scanned. /// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance. /// @@ -211,10 +150,8 @@ where /// /// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`], /// for each subsequent iteration use the returned [`ScanStateRC`]. - /// * `match_pattern` - A match pattern of requested keys. - /// * `count` - An optional count of keys requested, - /// the amount returned can vary and not obligated to return exactly count. - /// * `object_type` - An optional [`ObjectType`] enum of requested key redis type. + /// * `cluster_scan_args` - A [`ClusterScanArgs`] struct containing the arguments for the cluster scan command - match pattern, count, + /// object type and the allow_non_covered_slots flag. /// /// # Returns /// @@ -227,7 +164,7 @@ where /// # Example /// ```rust,no_run /// use redis::cluster::ClusterClient; - /// use redis::{ScanStateRC, FromRedisValue, from_redis_value, Value, ObjectType}; + /// use redis::{ScanStateRC, from_redis_value, Value, ObjectType, ClusterScanArgs}; /// /// async fn scan_all_cluster() -> Vec { /// let nodes = vec!["redis://127.0.0.1/"]; @@ -235,9 +172,10 @@ where /// let mut connection = client.get_async_connection(None).await.unwrap(); /// let mut scan_state_rc = ScanStateRC::new(); /// let mut keys: Vec = vec![]; + /// let cluster_scan_args = ClusterScanArgs::builder().with_count(1000).with_object_type(ObjectType::String).build(); /// loop { /// let (next_cursor, scan_keys): (ScanStateRC, Vec) = - /// connection.cluster_scan_with_pattern(scan_state_rc, b"my_key", None, None).await.unwrap(); + /// connection.cluster_scan(scan_state_rc, cluster_scan_args.clone()).await.unwrap(); /// scan_state_rc = next_cursor; /// let mut scan_keys = scan_keys /// .into_iter() @@ -251,19 +189,12 @@ where /// keys /// } /// ``` - pub async fn cluster_scan_with_pattern( + pub async fn cluster_scan( &mut self, scan_state_rc: ScanStateRC, - match_pattern: K, - count: Option, - object_type: Option, + mut cluster_scan_args: ClusterScanArgs, ) -> RedisResult<(ScanStateRC, Vec)> { - let cluster_scan_args = ClusterScanArgs::new( - scan_state_rc, - Some(match_pattern.to_redis_args().concat()), - count, - object_type, - ); + cluster_scan_args.set_scan_state_cursor(scan_state_rc); self.route_cluster_scan(cluster_scan_args).await } @@ -279,18 +210,18 @@ where sender, }) .await - .map_err(|_| { + .map_err(|e| { RedisError::from(io::Error::new( io::ErrorKind::BrokenPipe, - "redis_cluster: Unable to send command", + format!("Cluster: Error occurred while trying to send SCAN command to internal send task. {e:?}"), )) })?; receiver .await - .unwrap_or_else(|_| { + .unwrap_or_else(|e| { Err(RedisError::from(io::Error::new( io::ErrorKind::BrokenPipe, - "redis_cluster: Unable to receive command", + format!("Cluster: Failed to receive SCAN command response from internal send task. {e:?}"), ))) }) .map(|response| match response { @@ -316,18 +247,20 @@ where sender, }) .await - .map_err(|_| { + .map_err(|e| { RedisError::from(io::Error::new( io::ErrorKind::BrokenPipe, - "redis_cluster: Unable to send command", + format!("Cluster: Error occurred while trying to send command to internal sender. {e:?}"), )) })?; receiver .await - .unwrap_or_else(|_| { + .unwrap_or_else(|e| { Err(RedisError::from(io::Error::new( io::ErrorKind::BrokenPipe, - "redis_cluster: Unable to receive command", + format!( + "Cluster: Failed to receive command response from internal sender. {e:?}" + ), ))) }) .map(|response| match response { @@ -489,21 +422,8 @@ where .map_err(|_| RedisError::from((ErrorKind::ClientError, MUTEX_WRITE_ERR))) } - // return address of node for slot - pub(crate) async fn get_address_from_slot( - &self, - slot: u16, - slot_addr: SlotAddr, - ) -> Option> { - self.conn_lock - .read() - .expect(MUTEX_READ_ERR) - .slot_map - .get_node_address_for_slot(slot, slot_addr) - } - // return epoch of node - pub(crate) async fn get_address_epoch(&self, node_address: &str) -> Result { + pub(crate) async fn address_epoch(&self, node_address: &str) -> Result { let command = cmd("CLUSTER").arg("INFO").to_owned(); let node_conn = self .conn_lock @@ -541,14 +461,26 @@ where } } - // return slots of node - pub(crate) async fn get_slots_of_address(&self, node_address: Arc) -> Vec { + /// return slots of node + pub(crate) async fn slots_of_address(&self, node_address: Arc) -> Vec { self.conn_lock .read() .expect(MUTEX_READ_ERR) .slot_map .get_slots_of_node(node_address) } + + /// Get connection for address + pub(crate) async fn connection_for_address( + &self, + address: &str, + ) -> Option> { + self.conn_lock + .read() + .expect(MUTEX_READ_ERR) + .connection_for_address(address) + .map(|(_, conn)| conn) + } } pub(crate) struct ClusterConnInner { @@ -1884,14 +1816,6 @@ where Self::refresh_slots_inner(inner, curr_retry).await } - pub(crate) fn check_if_all_slots_covered(slot_map: &SlotMap) -> bool { - let mut slots_covered = 0; - for (end, slots) in slot_map.slots.iter() { - slots_covered += end.saturating_sub(slots.start).saturating_add(1); - } - slots_covered == SLOT_SIZE - } - // Query a node to discover slot-> master mappings async fn refresh_slots_inner(inner: Arc>, curr_retry: usize) -> RedisResult<()> { let num_of_nodes = inner.conn_lock.read().expect(MUTEX_READ_ERR).len(); diff --git a/glide-core/redis-rs/redis/src/cluster_slotmap.rs b/glide-core/redis-rs/redis/src/cluster_slotmap.rs index 88e7549323..f2e43b4449 100644 --- a/glide-core/redis-rs/redis/src/cluster_slotmap.rs +++ b/glide-core/redis-rs/redis/src/cluster_slotmap.rs @@ -202,7 +202,7 @@ impl SlotMap { .collect() } - pub(crate) fn get_node_address_for_slot( + pub(crate) fn node_address_for_slot( &self, slot: u16, slot_addr: SlotAddr, diff --git a/glide-core/redis-rs/redis/src/commands/cluster_scan.rs b/glide-core/redis-rs/redis/src/commands/cluster_scan.rs index 109aceca22..517d43c6b3 100644 --- a/glide-core/redis-rs/redis/src/commands/cluster_scan.rs +++ b/glide-core/redis-rs/redis/src/commands/cluster_scan.rs @@ -1,73 +1,269 @@ -//! This module contains the implementation of scanning operations in a Redis cluster. +//! This module implements cluster-wide scanning operations for clusters. //! -//! The [`ClusterScanArgs`] struct represents the arguments for a cluster scan operation, -//! including the scan state reference, match pattern, count, and object type. +//! # Overview //! -//! The [[`ScanStateRC`]] struct is a wrapper for managing the state of a scan operation in a cluster. -//! It holds a reference to the scan state and provides methods for accessing the state. +//! The module provides functionality to scan keys across all nodes in a cluster, +//! handling topology changes, failovers, and partial cluster coverage scenarios. +//! It maintains state between scan iterations and ensures consistent scanning even +//! when cluster topology changes. //! -//! The [[`ClusterInScan`]] trait defines the methods for interacting with a Redis cluster during scanning, -//! including retrieving address information, refreshing slot mapping, and routing commands to specific address. +//! # Key Components //! -//! The [[`ScanState`]] struct represents the state of a scan operation in a Redis cluster. -//! It holds information about the current scan state, including the cursor position, scanned slots map, -//! address being scanned, and address's epoch. +//! - [`ClusterScanArgs`]: Configuration for scan operations including filtering and behavior options +//! - [`ScanStateRC`]: Thread-safe reference-counted wrapper for scan state management +//! - [`ScanState`]: Internal state tracking for cluster-wide scanning progress +//! - [`ObjectType`]: Supported data types for filtering scan results +//! +//! # Key Features +//! +//! - Resumable scanning across cluster nodes +//! - Automatic handling of cluster topology changes +//! - Support for all regular SCAN options +//! - Resilient to node failures and resharding +//! +//! # Implementation Details +//! +//! The scanning process is implemented using a bitmap to track scanned slots and +//! maintains epoch information to handle topology changes. The implementation: +//! +//! - Uses a 64-bit aligned bitmap for efficient slot tracking +//! - Maintains cursor position per node +//! - Handles partial cluster coverage scenarios +//! - Provides automatic recovery from node failures +//! - Ensures consistent scanning across topology changes +//! +//! # Error Handling +//! +//! The module handles various error scenarios including: +//! - Node failures during scanning +//! - Cluster topology changes +//! - Network connectivity issues +//! - Invalid routing scenarios use crate::aio::ConnectionLike; -use crate::cluster_async::{ - ClusterConnInner, Connect, Core, InternalRoutingInfo, InternalSingleNodeRouting, RefreshPolicy, - Response, MUTEX_READ_ERR, -}; +use crate::cluster_async::{ClusterConnInner, Connect, InnerCore, RefreshPolicy, MUTEX_READ_ERR}; use crate::cluster_routing::SlotAddr; use crate::cluster_topology::SLOT_SIZE; -use crate::{cmd, from_redis_value, Cmd, ErrorKind, RedisError, RedisResult, Value}; -use async_trait::async_trait; +use crate::{cmd, from_redis_value, ErrorKind, RedisError, RedisResult, Value}; use std::sync::Arc; -use strum_macros::Display; +use strum_macros::{Display, EnumString}; + +const BITS_PER_U64: u16 = u64::BITS as u16; +const NUM_OF_SLOTS: u16 = SLOT_SIZE; +const BITS_ARRAY_SIZE: u16 = NUM_OF_SLOTS / BITS_PER_U64; +const END_OF_SCAN: u16 = NUM_OF_SLOTS; +type SlotsBitsArray = [u64; BITS_ARRAY_SIZE as usize]; + +/// Holds configuration for a cluster scan operation. +/// +/// # Fields +/// - `scan_state_cursor`: Internal state tracking scan progress +/// - `match_pattern`: Optional pattern to filter keys +/// - `count`: Optional limit on number of keys returned per iteration +/// - `object_type`: Optional filter for specific data types +/// - `allow_non_covered_slots`: Whether to continue if some slots are uncovered +/// +/// See examples below for usage with the builder pattern. +/// # Examples +/// +/// Create a new `ClusterScanArgs` instance using the builder pattern: +/// +/// ```rust,no_run +/// use redis::ClusterScanArgs; +/// use redis::ObjectType; +/// +/// // Create basic scan args with defaults +/// let basic_scan = ClusterScanArgs::builder().build(); +/// +/// // Create scan args with custom options +/// let custom_scan = ClusterScanArgs::builder() +/// .with_match_pattern("user:*") // Match keys starting with "user:" +/// .with_count(100) // Return 100 keys per iteration +/// .with_object_type(ObjectType::Hash) // Only scan hash objects +/// .allow_non_covered_slots(true) // Continue scanning even if some slots aren't covered +/// .build(); +/// +/// // The builder can be used to create multiple configurations +/// let another_scan = ClusterScanArgs::builder() +/// .with_match_pattern("session:*") +/// .with_object_type(ObjectType::String) +/// .build(); +/// ``` -const BITS_PER_U64: usize = u64::BITS as usize; -const NUM_OF_SLOTS: usize = SLOT_SIZE as usize; -const BITS_ARRAY_SIZE: usize = NUM_OF_SLOTS / BITS_PER_U64; -const END_OF_SCAN: u16 = NUM_OF_SLOTS as u16 + 1; -type SlotsBitsArray = [u64; BITS_ARRAY_SIZE]; +#[derive(Clone, Default)] +pub struct ClusterScanArgs { + /// Reference-counted scan state cursor, managed internally. + pub scan_state_cursor: ScanStateRC, -#[derive(Clone)] -pub(crate) struct ClusterScanArgs { - pub(crate) scan_state_cursor: ScanStateRC, + /// Optional pattern to match keys during the scan. + pub match_pattern: Option>, + + /// A "hint" to the cluster of how much keys to return per scan iteration, if none is sent to the server, the default value is 10. + pub count: Option, + + /// Optional filter to include only keys of a specific data type. + pub object_type: Option, + + /// Flag indicating whether to allow scanning when there are slots not covered by the cluster, by default it is set to false and the scan will stop if some slots are not covered. + pub allow_non_covered_slots: bool, +} + +impl ClusterScanArgs { + /// Creates a new [`ClusterScanArgsBuilder`] instance. + /// + /// # Returns + /// + /// A [`ClusterScanArgsBuilder`] instance for configuring cluster scan arguments. + pub fn builder() -> ClusterScanArgsBuilder { + ClusterScanArgsBuilder::default() + } + pub(crate) fn set_scan_state_cursor(&mut self, scan_state_cursor: ScanStateRC) { + self.scan_state_cursor = scan_state_cursor; + } +} + +#[derive(Default)] +/// Builder pattern for creating cluster scan arguments. +/// +/// This struct allows configuring various parameters for scanning keys in a cluster: +/// * Pattern matching for key filtering +/// * Count limit for returned keys +/// * Filtering by object type +/// * Control over scanning non-covered slots +/// +/// # Example +/// ``` +/// use redis::{ClusterScanArgs, ObjectType}; +/// +/// let args = ClusterScanArgs::builder() +/// .with_match_pattern(b"user:*") +/// .with_count(100) +/// .with_object_type(ObjectType::Hash) +/// .build(); +/// ``` +pub struct ClusterScanArgsBuilder { + /// By default, the match pattern is set to `None` and no filtering is applied. match_pattern: Option>, - count: Option, + /// A "hint" to the cluster of how much keys to return per scan iteration, by default none is sent to the server, the default value is 10. + count: Option, + /// By default, the object type is set to `None` and no filtering is applied. object_type: Option, + /// By default, the flag to allow scanning non-covered slots is set to `false`, meaning scanning will stop if some slots are not covered. + allow_non_covered_slots: Option, } -#[derive(Debug, Clone, Display)] -/// Represents the type of an object in Redis. +impl ClusterScanArgsBuilder { + /// Sets the match pattern for the scan operation. + /// + /// # Arguments + /// + /// * `pattern` - The pattern to match keys against. + /// + /// # Returns + /// + /// The updated [`ClusterScanArgsBuilder`] instance. + pub fn with_match_pattern>>(mut self, pattern: T) -> Self { + self.match_pattern = Some(pattern.into()); + self + } + + /// Sets the count for the scan operation. + /// + /// # Arguments + /// + /// * `count` - A "hint" to the cluster of how much keys to return per scan iteration. + /// + /// The actual number of keys returned may be less or more than the count specified. + /// + /// 4,294,967,295 is the maximum keys possible in a cluster, so higher values will be capped. + /// Hence the count is represented as a `u32` instead of `usize`. + /// + /// The default value is 10, if nothing is sent to the server, meaning nothing set in the builder. + /// + /// # Returns + /// + /// The updated [`ClusterScanArgsBuilder`] instance. + pub fn with_count(mut self, count: u32) -> Self { + self.count = Some(count); + self + } + + /// Sets the object type for the scan operation. + /// + /// # Arguments + /// + /// * `object_type` - The type of object to filter keys by. + /// + /// See [`ObjectType`] for supported data types. + /// + /// # Returns + /// + /// The updated [`ClusterScanArgsBuilder`] instance. + pub fn with_object_type(mut self, object_type: ObjectType) -> Self { + self.object_type = Some(object_type); + self + } + + /// Sets the flag to allow scanning non-covered slots. + /// + /// # Arguments + /// + /// * `allow` - A boolean flag indicating whether to allow scanning non-covered slots. + /// + /// # Returns + /// + /// The updated [`ClusterScanArgsBuilder`] instance. + pub fn allow_non_covered_slots(mut self, allow: bool) -> Self { + self.allow_non_covered_slots = Some(allow); + self + } + + /// Builds the [`ClusterScanArgs`] instance with the provided configuration. + /// + /// # Returns + /// + /// A [`ClusterScanArgs`] instance with the configured options. + pub fn build(self) -> ClusterScanArgs { + ClusterScanArgs { + scan_state_cursor: ScanStateRC::new(), + match_pattern: self.match_pattern, + count: self.count, + object_type: self.object_type, + allow_non_covered_slots: self.allow_non_covered_slots.unwrap_or(false), + } + } +} + +/// Represents the type of an object used to filter keys by data type. +/// +/// This enum is used with the `TYPE` option in the `SCAN` command to +/// filter keys by their data type. +#[derive(Debug, Clone, Display, PartialEq, EnumString)] pub enum ObjectType { - /// Represents a string object in Redis. + /// String data type. String, - /// Represents a list object in Redis. + /// List data type. List, - /// Represents a set object in Redis. + /// Set data type. Set, - /// Represents a sorted set object in Redis. + /// Sorted set data type. ZSet, - /// Represents a hash object in Redis. + /// Hash data type. Hash, - /// Represents a stream object in Redis. + /// Stream data type. Stream, } -impl ClusterScanArgs { - pub(crate) fn new( - scan_state_cursor: ScanStateRC, - match_pattern: Option>, - count: Option, - object_type: Option, - ) -> Self { - Self { - scan_state_cursor, - match_pattern, - count, - object_type, +impl From for ObjectType { + fn from(s: String) -> Self { + match s.to_lowercase().as_str() { + "string" => ObjectType::String, + "list" => ObjectType::List, + "set" => ObjectType::Set, + "zset" => ObjectType::ZSet, + "hash" => ObjectType::Hash, + "stream" => ObjectType::Stream, + _ => ObjectType::String, } } } @@ -80,10 +276,11 @@ pub enum ScanStateStage { Finished, } +/// Wrapper struct for managing the state of a cluster scan operation. +/// +/// This struct holds an `Arc` to the actual scan state and a status indicating +/// whether the scan is initiating, in progress, or finished. #[derive(Debug, Clone, Default)] -/// A wrapper struct for managing the state of a scan operation in a cluster. -/// It holds a reference to the scan state and provides methods for accessing the state. -/// The `status` field indicates the status of the scan operation. pub struct ScanStateRC { scan_state_rc: Arc>, status: ScanStateStage, @@ -121,7 +318,7 @@ impl ScanStateRC { } /// Returns a clone of the scan state, if it exist. - pub(crate) fn get_state_from_wrapper(&self) -> Option { + pub(crate) fn state_from_wrapper(&self) -> Option { if self.status == ScanStateStage::Initiating || self.status == ScanStateStage::Finished { None } else { @@ -130,33 +327,10 @@ impl ScanStateRC { } } -/// This trait defines the methods for interacting with a Redis cluster during scanning. -#[async_trait] -pub(crate) trait ClusterInScan { - /// Retrieves the address associated with a given slot in the cluster. - async fn get_address_by_slot(&self, slot: u16) -> RedisResult>; - - /// Retrieves the epoch of a given address in the cluster. - /// The epoch represents the version of the address, which is updated when a failover occurs or slots migrate in. - async fn get_address_epoch(&self, address: &str) -> Result; - - /// Retrieves the slots assigned to a given address in the cluster. - async fn get_slots_of_address(&self, address: Arc) -> Vec; - - /// Routes a Redis command to a specific address in the cluster. - async fn route_command(&self, cmd: Cmd, address: &str) -> RedisResult; - - /// Check if all slots are covered by the cluster - async fn are_all_slots_covered(&self) -> bool; - - /// Check if the topology of the cluster has changed and refresh the slots if needed - async fn refresh_if_topology_changed(&self) -> RedisResult; -} - -/// Represents the state of a scan operation in a Redis cluster. +/// Represents the state of a cluster scan operation. /// -/// This struct holds information about the current scan state, including the cursor position, -/// the scanned slots map, the address being scanned, and the address's epoch. +/// This struct keeps track of the current cursor, which slots have been scanned, +/// the address currently being scanned, and the epoch of that address. #[derive(PartialEq, Debug, Clone)] pub(crate) struct ScanState { // the real cursor in the scan operation @@ -205,7 +379,7 @@ impl ScanState { fn create_finished_state() -> Self { Self { cursor: 0, - scanned_slots_map: [0; BITS_ARRAY_SIZE], + scanned_slots_map: [0; BITS_ARRAY_SIZE as usize], address_in_scan: Default::default(), address_epoch: 0, scan_status: ScanStateStage::Finished, @@ -217,63 +391,68 @@ impl ScanState { /// and the address set to the address associated with slot 0. /// The address epoch is set to the epoch of the address. /// If the address epoch cannot be retrieved, the method returns an error. - async fn initiate_scan(connection: &C) -> RedisResult { - let new_scanned_slots_map: SlotsBitsArray = [0; BITS_ARRAY_SIZE]; + async fn initiate_scan( + core: &InnerCore, + allow_non_covered_slots: bool, + ) -> RedisResult + where + C: ConnectionLike + Connect + Clone + Send + Sync + 'static, + { + let mut new_scanned_slots_map: SlotsBitsArray = [0; BITS_ARRAY_SIZE as usize]; let new_cursor = 0; - let address = connection.get_address_by_slot(0).await?; - let address_epoch = connection.get_address_epoch(&address).await.unwrap_or(0); - Ok(ScanState::new( - new_cursor, - new_scanned_slots_map, - address, - address_epoch, - ScanStateStage::InProgress, - )) - } + let address = + next_address_to_scan(core, 0, &mut new_scanned_slots_map, allow_non_covered_slots)?; - /// Get the next slot to be scanned based on the scanned slots map. - /// If all slots have been scanned, the method returns [`END_OF_SCAN`]. - fn get_next_slot(&self, scanned_slots_map: &SlotsBitsArray) -> Option { - let all_slots_scanned = scanned_slots_map.iter().all(|&word| word == u64::MAX); - if all_slots_scanned { - return Some(END_OF_SCAN); - } - for (i, slot) in scanned_slots_map.iter().enumerate() { - let mut mask = 1; - for j in 0..BITS_PER_U64 { - if (slot & mask) == 0 { - return Some((i * BITS_PER_U64 + j) as u16); - } - mask <<= 1; + match address { + NextNodeResult::AllSlotsCompleted => Ok(ScanState::create_finished_state()), + NextNodeResult::Address(address) => { + let address_epoch = core.address_epoch(&address).await.unwrap_or(0); + Ok(ScanState::new( + new_cursor, + new_scanned_slots_map, + address, + address_epoch, + ScanStateStage::InProgress, + )) } } - None } /// Update the scan state without updating the scanned slots map. /// This method is used when the address epoch has changed, and we can't determine which slots are new. /// In this case, we skip updating the scanned slots map and only update the address and cursor. - async fn creating_state_without_slot_changes( + async fn new_scan_state( &self, - connection: &C, - ) -> RedisResult { - let next_slot = self.get_next_slot(&self.scanned_slots_map).unwrap_or(0); - let new_address = if next_slot == END_OF_SCAN { - return Ok(ScanState::create_finished_state()); - } else { - connection.get_address_by_slot(next_slot).await - }; - match new_address { - Ok(address) => { - let new_epoch = connection.get_address_epoch(&address).await.unwrap_or(0); + core: Arc>, + allow_non_covered_slots: bool, + new_scanned_slots_map: Option, + ) -> RedisResult + where + C: ConnectionLike + Connect + Clone + Send + Sync + 'static, + { + // If the new scanned slots map is not provided, use the current scanned slots map. + // The new scanned slots map is provided in the general case when the address epoch has not changed, + // meaning that we could safely update the scanned slots map with the slots owned by the node. + // Epoch change means that some slots are new, and we can't determine which slots been there from the beginning and which are new. + let mut scanned_slots_map = new_scanned_slots_map.unwrap_or(self.scanned_slots_map); + let next_slot = next_slot(&scanned_slots_map).unwrap_or(0); + match next_address_to_scan( + &core, + next_slot, + &mut scanned_slots_map, + allow_non_covered_slots, + ) { + Ok(NextNodeResult::Address(new_address)) => { + let new_epoch = core.address_epoch(&new_address).await.unwrap_or(0); Ok(ScanState::new( 0, - self.scanned_slots_map, - address, + scanned_slots_map, + new_address, new_epoch, ScanStateStage::InProgress, )) } + Ok(NextNodeResult::AllSlotsCompleted) => Ok(ScanState::create_finished_state()), Err(err) => Err(err), } } @@ -284,210 +463,204 @@ impl ScanState { /// If the address epoch has changed, the method skips updating the scanned slots map and only updates the address and cursor. /// If the address epoch has not changed, the method updates the scanned slots map with the slots owned by the address. /// The method returns the new scan state with the updated cursor, scanned slots map, address, and epoch. - async fn create_updated_scan_state_for_completed_address( + async fn create_updated_scan_state_for_completed_address( &mut self, - connection: &C, - ) -> RedisResult { - connection - .refresh_if_topology_changed() - .await - .map_err(|err| { - RedisError::from(( - ErrorKind::ResponseError, - "Error during cluster scan: failed to refresh slots", - format!("{:?}", err), - )) - })?; + core: Arc>, + allow_non_covered_slots: bool, + ) -> RedisResult + where + C: ConnectionLike + Connect + Clone + Send + Sync + 'static, + { + ClusterConnInner::check_topology_and_refresh_if_diff( + core.clone(), + &RefreshPolicy::NotThrottable, + ) + .await?; + let mut scanned_slots_map = self.scanned_slots_map; // If the address epoch changed it mean that some slots in the address are new, so we cant know which slots been there from the beginning and which are new, or out and in later. // In this case we will skip updating the scanned_slots_map and will just update the address and the cursor - let new_address_epoch = connection - .get_address_epoch(&self.address_in_scan) - .await - .unwrap_or(0); + let new_address_epoch = core.address_epoch(&self.address_in_scan).await.unwrap_or(0); if new_address_epoch != self.address_epoch { - return self.creating_state_without_slot_changes(connection).await; + return self + .new_scan_state(core, allow_non_covered_slots, None) + .await; } // If epoch wasn't changed, the slots owned by the address after the refresh are all valid as slots that been scanned // So we will update the scanned_slots_map with the slots owned by the address - let slots_scanned = connection - .get_slots_of_address(self.address_in_scan.clone()) - .await; + let slots_scanned = core.slots_of_address(self.address_in_scan.clone()).await; for slot in slots_scanned { - let slot_index = slot as usize / BITS_PER_U64; - let slot_bit = slot as usize % BITS_PER_U64; - scanned_slots_map[slot_index] |= 1 << slot_bit; + mark_slot_as_scanned(&mut scanned_slots_map, slot); } // Get the next address to scan and its param base on the next slot set to 0 in the scanned_slots_map - let next_slot = self.get_next_slot(&scanned_slots_map).unwrap_or(0); - let new_address = if next_slot == END_OF_SCAN { - return Ok(ScanState::create_finished_state()); - } else { - connection.get_address_by_slot(next_slot).await - }; - match new_address { - Ok(new_address) => { - let new_epoch = connection - .get_address_epoch(&new_address) - .await - .unwrap_or(0); - let new_cursor = 0; - Ok(ScanState::new( - new_cursor, - scanned_slots_map, - new_address, - new_epoch, - ScanStateStage::InProgress, - )) - } - Err(err) => Err(err), - } + self.new_scan_state(core, allow_non_covered_slots, Some(scanned_slots_map)) + .await } } -// Implement the [`ClusterInScan`] trait for [`InnerCore`] of async cluster connection. -#[async_trait] -impl ClusterInScan for Core +fn mark_slot_as_scanned(scanned_slots_map: &mut SlotsBitsArray, slot: u16) { + let slot_index = (slot as u64 / BITS_PER_U64 as u64) as usize; + let slot_bit = slot as u64 % (BITS_PER_U64 as u64); + scanned_slots_map[slot_index] |= 1 << slot_bit; +} + +#[derive(PartialEq, Debug, Clone)] +/// The address type representing a connection address +/// +/// # Fields +/// +/// * `Address` - A thread-safe shared string containing the server address +/// * `AllSlotsCompleted` - Indicates that all slots have been scanned +enum NextNodeResult { + Address(Arc), + AllSlotsCompleted, +} + +/// Determines the next node address to scan within the cluster. +/// +/// This asynchronous function iterates through cluster slots to find the next available +/// node responsible for scanning. If a slot is not covered and `allow_non_covered_slots` +/// is enabled, it marks the slot as scanned and proceeds to the next one. The process +/// continues until a valid address is found or all slots have been scanned. +/// +/// # Arguments +/// +/// * `core` - Reference to the cluster's inner core connection. +/// * `slot` - The current slot number to scan. +/// * `scanned_slots_map` - Mutable reference to the bitmap tracking scanned slots. +/// * `allow_non_covered_slots` - Flag indicating whether to allow scanning of uncovered slots. +/// +/// # Returns +/// +/// * `RedisResult` - Returns the next node address to scan or indicates completion. +/// +/// # Type Parameters +/// +/// * `C`: The connection type that must implement `ConnectionLike`, `Connect`, `Clone`, `Send`, `Sync`, and `'static`. +/// +fn next_address_to_scan( + core: &InnerCore, + mut slot: u16, + scanned_slots_map: &mut SlotsBitsArray, + allow_non_covered_slots: bool, +) -> RedisResult where C: ConnectionLike + Connect + Clone + Send + Sync + 'static, { - async fn get_address_by_slot(&self, slot: u16) -> RedisResult> { - let address = self - .get_address_from_slot(slot, SlotAddr::ReplicaRequired) - .await; - match address { - Some(addr) => Ok(addr), - None => { - if self.are_all_slots_covered().await { - Err(RedisError::from(( - ErrorKind::IoError, - "Failed to get connection to the node cover the slot, please check the cluster configuration ", - ))) - } else { - Err(RedisError::from(( - ErrorKind::NotAllSlotsCovered, - "All slots are not covered by the cluster, please check the cluster configuration ", - ))) - } - } + loop { + if slot == END_OF_SCAN { + return Ok(NextNodeResult::AllSlotsCompleted); } - } - async fn get_address_epoch(&self, address: &str) -> Result { - self.as_ref().get_address_epoch(address).await - } - async fn get_slots_of_address(&self, address: Arc) -> Vec { - self.as_ref().get_slots_of_address(address).await - } - async fn route_command(&self, cmd: Cmd, address: &str) -> RedisResult { - let routing = InternalRoutingInfo::SingleNode(InternalSingleNodeRouting::ByAddress( - address.to_string(), - )); - let core = self.to_owned(); - let response = ClusterConnInner::::try_cmd_request(Arc::new(cmd), routing, core) - .await - .map_err(|err| err.1)?; - match response { - Response::Single(value) => Ok(value), - _ => Err(RedisError::from(( - ErrorKind::ClientError, - "Expected single response, got unexpected response", - ))), + if let Some(addr) = core + .conn_lock + .read() + .expect(MUTEX_READ_ERR) + .slot_map + .node_address_for_slot(slot, SlotAddr::ReplicaRequired) + { + // Found a valid address for the slot + return Ok(NextNodeResult::Address(addr)); + } else if allow_non_covered_slots { + // Mark the current slot as scanned + mark_slot_as_scanned(scanned_slots_map, slot); + slot = next_slot(scanned_slots_map).unwrap(); + } else { + // Error if slots are not covered and scanning is not allowed + return Err(RedisError::from(( + ErrorKind::NotAllSlotsCovered, + "Could not find an address covering a slot, SCAN operation cannot continue \n + If you want to continue scanning even if some slots are not covered, set allow_non_covered_slots to true \n + Note that this may lead to incomplete scanning, and the SCAN operation lose its all guarantees ", + ))); } } - async fn are_all_slots_covered(&self) -> bool { - ClusterConnInner::::check_if_all_slots_covered( - &self.conn_lock.read().expect(MUTEX_READ_ERR).slot_map, - ) - } - async fn refresh_if_topology_changed(&self) -> RedisResult { - ClusterConnInner::check_topology_and_refresh_if_diff( - self.to_owned(), - // The cluster SCAN implementation must refresh the slots when a topology change is found - // to ensure the scan logic is correct. - &RefreshPolicy::NotThrottable, - ) - .await +} + +/// Get the next slot to be scanned based on the scanned slots map. +/// If all slots have been scanned, the method returns [`END_OF_SCAN`]. +fn next_slot(scanned_slots_map: &SlotsBitsArray) -> Option { + let all_slots_scanned = scanned_slots_map.iter().all(|&word| word == u64::MAX); + if all_slots_scanned { + return Some(END_OF_SCAN); + } + for (i, slot) in scanned_slots_map.iter().enumerate() { + let mut mask = 1; + for j in 0..BITS_PER_U64 { + if (slot & mask) == 0 { + return Some(i as u16 * BITS_PER_U64 + j); + } + mask <<= 1; + } } + None } -/// Perform a cluster scan operation. -/// This function performs a scan operation in a Redis cluster using the given [`ClusterInScan`] connection. -/// It scans the cluster for keys based on the given `ClusterScanArgs` arguments. -/// The function returns a tuple containing the new scan state cursor and the keys found in the scan operation. -/// If the scan operation fails, an error is returned. +/// Performs a cluster-wide `SCAN` operation. +/// +/// This function scans the cluster for keys based on the provided arguments. +/// It handles the initiation of a new scan or continues an existing scan, manages +/// scan state, handles routing failures, and ensures consistent scanning across +/// cluster topology changes. /// /// # Arguments -/// * `core` - The connection to the Redis cluster. -/// * `cluster_scan_args` - The arguments for the cluster scan operation. +/// +/// * `core` - An `Arc`-wrapped reference to the cluster connection (`InnerCore`). +/// * `cluster_scan_args` - Configuration and arguments for the scan operation. /// /// # Returns -/// A tuple containing the new scan state cursor and the keys found in the scan operation. -/// If the scan operation fails, an error is returned. +/// +/// * `RedisResult<(ScanStateRC, Vec)>` - +/// - On success: A tuple containing the updated scan state (`ScanStateRC`) and a vector of `Value`s representing the found keys. +/// - On failure: A `RedisError` detailing the reason for the failure. +/// +/// # Type Parameters +/// +/// * `C`: The connection type that must implement `ConnectionLike`, `Connect`, `Clone`, `Send`, `Sync`, and `'static`. +/// pub(crate) async fn cluster_scan( - core: C, + core: Arc>, cluster_scan_args: ClusterScanArgs, ) -> RedisResult<(ScanStateRC, Vec)> where - C: ClusterInScan, + C: ConnectionLike + Connect + Clone + Send + Sync + 'static, { - let ClusterScanArgs { - scan_state_cursor, - match_pattern, - count, - object_type, - } = cluster_scan_args; - // If scan_state is None, meaning we start a new scan - let scan_state = match scan_state_cursor.get_state_from_wrapper() { + // Extract the current scan state cursor and the flag for non-covered slots + let scan_state_cursor = &cluster_scan_args.scan_state_cursor; + let allow_non_covered_slots = cluster_scan_args.allow_non_covered_slots; + + // Determine the current scan state: + // - If an existing scan state is present, use it. + // - Otherwise, initiate a new scan. + let scan_state = match scan_state_cursor.state_from_wrapper() { Some(state) => state, - None => match ScanState::initiate_scan(&core).await { + None => match ScanState::initiate_scan(&core, allow_non_covered_slots).await { Ok(state) => state, Err(err) => { + // Early return if initiating the scan fails return Err(err); } }, }; - // Send the actual scan command to the address in the scan_state - let scan_result = send_scan( - &scan_state, - &core, - match_pattern.clone(), - count, - object_type.clone(), - ) - .await; - let ((new_cursor, new_keys), mut scan_state): ((u64, Vec), ScanState) = match scan_result - { - Ok(scan_result) => (from_redis_value(&scan_result)?, scan_state.clone()), - Err(err) => match err.kind() { - // If the scan command failed to route to the address because the address is not found in the cluster or - // the connection to the address cant be reached from different reasons, we will check we want to check if - // the problem is problem that we can recover from like failover or scale down or some network issue - // that we can retry the scan command to an address that own the next slot we are at. - ErrorKind::IoError - | ErrorKind::AllConnectionsUnavailable - | ErrorKind::ConnectionNotFoundForRoute => { - let retry = - retry_scan(&scan_state, &core, match_pattern, count, object_type).await?; - (from_redis_value(&retry.0?)?, retry.1) - } - _ => return Err(err), - }, - }; + // Send the SCAN command using the current scan state and scan arguments + let ((new_cursor, new_keys), mut scan_state) = + try_scan(&scan_state, &cluster_scan_args, core.clone()).await?; - // If the cursor is 0, meaning we finished scanning the address - // we will update the scan state to get the next address to scan + // Check if the cursor indicates the end of the current scan segment if new_cursor == 0 { + // Update the scan state to move to the next address/node in the cluster scan_state = scan_state - .create_updated_scan_state_for_completed_address(&core) + .create_updated_scan_state_for_completed_address(core, allow_non_covered_slots) .await?; } - // If the address is empty, meaning we finished scanning all the address + // Verify if the entire cluster has been scanned if scan_state.scan_status == ScanStateStage::Finished { + // Return the final scan state and the collected keys return Ok((ScanStateRC::create_finished(), new_keys)); } + // Update the scan state with the new cursor and maintain the progress scan_state = ScanState::new( new_cursor, scan_state.scanned_slots_map, @@ -495,256 +668,214 @@ where scan_state.address_epoch, ScanStateStage::InProgress, ); + + // Return the updated scan state and the newly found keys Ok((ScanStateRC::from_scan_state(scan_state), new_keys)) } -// Send the scan command to the address in the scan_state +/// Sends the `SCAN` command to the specified address. +/// +/// # Arguments +/// +/// * `scan_state` - The current scan state. +/// * `cluster_scan_args` - Arguments for the scan operation, including match pattern, count, object type, and allow_non_covered_slots. +/// * `core` - The cluster connection. +/// +/// # Returns +/// +/// A `RedisResult` containing the response from the `SCAN` command. async fn send_scan( scan_state: &ScanState, - core: &C, - match_pattern: Option>, - count: Option, - object_type: Option, + cluster_scan_args: &ClusterScanArgs, + core: Arc>, ) -> RedisResult where - C: ClusterInScan, + C: ConnectionLike + Connect + Clone + Send + Sync + 'static, { - let mut scan_command = cmd("SCAN"); - scan_command.arg(scan_state.cursor); - if let Some(match_pattern) = match_pattern { - scan_command.arg("MATCH").arg(match_pattern); - } - if let Some(count) = count { - scan_command.arg("COUNT").arg(count); - } - if let Some(object_type) = object_type { - scan_command.arg("TYPE").arg(object_type.to_string()); + if let Some(conn_future) = core + .connection_for_address(&scan_state.address_in_scan) + .await + { + let mut conn = conn_future.await; + let mut scan_command = cmd("SCAN"); + scan_command.arg(scan_state.cursor); + if let Some(match_pattern) = cluster_scan_args.match_pattern.as_ref() { + scan_command.arg("MATCH").arg(match_pattern); + } + if let Some(count) = cluster_scan_args.count { + scan_command.arg("COUNT").arg(count); + } + if let Some(object_type) = &cluster_scan_args.object_type { + scan_command.arg("TYPE").arg(object_type.to_string()); + } + conn.req_packed_command(&scan_command).await + } else { + Err(RedisError::from(( + ErrorKind::ConnectionNotFoundForRoute, + "Cluster scan failed. No connection available for address: ", + format!("{}", scan_state.address_in_scan), + ))) } +} - core.route_command(scan_command, &scan_state.address_in_scan) - .await +/// Checks if the error is retryable during scanning. +/// Retryable errors include network issues, cluster topology changes, and unavailable connections. +/// Scan operations are not keyspace operations, so they are not affected by keyspace errors like `MOVED`. +fn is_scanwise_retryable_error(err: &RedisError) -> bool { + matches!( + err.kind(), + ErrorKind::IoError + | ErrorKind::AllConnectionsUnavailable + | ErrorKind::ConnectionNotFoundForRoute + | ErrorKind::ClusterDown + | ErrorKind::FatalSendError + ) } -// If the scan command failed to route to the address we will check we will first refresh the slots, we will check if all slots are covered by cluster, -// and if so we will try to get a new address to scan for handling case of failover. -// if all slots are not covered by the cluster we will return an error indicating that the cluster is not well configured. -// if all slots are covered by cluster but we failed to get a new address to scan we will return an error indicating that we failed to get a new address to scan. -// if we got a new address to scan but the scan command failed to route to the address we will return an error indicating that we failed to route the command. -async fn retry_scan( +/// Gets the next scan state by finding the next address to scan. +/// The method updates the scanned slots map and retrieves the next address to scan. +/// If the address epoch has changed, the method creates a new scan state without updating the scanned slots map. +/// If the address epoch has not changed, the method updates the scanned slots map with the slots owned by the address. +/// The method returns the new scan state with the updated cursor, scanned slots map, address, and epoch. +/// The method is used to continue scanning the cluster after completing a scan segment. +async fn next_scan_state( + core: &Arc>, scan_state: &ScanState, - core: &C, - match_pattern: Option>, - count: Option, - object_type: Option, -) -> RedisResult<(RedisResult, ScanState)> + cluster_scan_args: &ClusterScanArgs, +) -> RedisResult> where - C: ClusterInScan, + C: ConnectionLike + Connect + Clone + Send + Sync + 'static, { - // TODO: This mechanism of refreshing on failure to route to address should be part of the routing mechanism - // After the routing mechanism is updated to handle this case, this refresh in the case bellow should be removed - core.refresh_if_topology_changed().await.map_err(|err| { - RedisError::from(( - ErrorKind::ResponseError, - "Error during cluster scan: failed to refresh slots", - format!("{:?}", err), - )) - })?; - if !core.are_all_slots_covered().await { - return Err(RedisError::from(( - ErrorKind::NotAllSlotsCovered, - "Not all slots are covered by the cluster, please check the cluster configuration", - ))); + let next_slot = next_slot(&scan_state.scanned_slots_map).unwrap_or(0); + let mut scanned_slots_map = scan_state.scanned_slots_map; + match next_address_to_scan( + core, + next_slot, + &mut scanned_slots_map, + cluster_scan_args.allow_non_covered_slots, + ) { + Ok(NextNodeResult::Address(new_address)) => { + let new_epoch = core.address_epoch(&new_address).await.unwrap_or(0); + Ok(Some(ScanState::new( + 0, + scanned_slots_map, + new_address, + new_epoch, + ScanStateStage::InProgress, + ))) + } + Ok(NextNodeResult::AllSlotsCompleted) => Ok(None), + Err(err) => Err(err), + } +} + +/// Attempts to scan the cluster for keys based on the current scan state. +/// Sends the `SCAN` command to the current address and processes the response. +/// On retryable errors, refreshes the cluster topology and retries the scan. +/// Returns the new cursor and keys found upon success. +async fn try_scan( + scan_state: &ScanState, + cluster_scan_args: &ClusterScanArgs, + core: Arc>, +) -> RedisResult<((u64, Vec), ScanState)> +where + C: ConnectionLike + Connect + Clone + Send + Sync + 'static, +{ + let mut new_scan_state = scan_state.clone(); + + loop { + match send_scan(&new_scan_state, cluster_scan_args, core.clone()).await { + Ok(scan_response) => { + let (new_cursor, new_keys) = from_redis_value::<(u64, Vec)>(&scan_response)?; + return Ok(((new_cursor, new_keys), new_scan_state)); + } + Err(err) if is_scanwise_retryable_error(&err) => { + ClusterConnInner::check_topology_and_refresh_if_diff( + core.clone(), + &RefreshPolicy::NotThrottable, + ) + .await?; + + if let Some(next_scan_state) = + next_scan_state(&core, &new_scan_state, cluster_scan_args).await? + { + new_scan_state = next_scan_state; + } else { + return Ok(((0, Vec::new()), ScanState::create_finished_state())); + } + } + Err(err) => return Err(err), + } } - // If for some reason we failed to reach the address we don't know if its a scale down or a failover. - // Since it might be scale down we cant just keep going with the current state we the same cursor as we are at - // the same point in the new address, so we need to get the new address own the next slot that haven't been scanned - // and start from the beginning of this address. - let next_slot = scan_state - .get_next_slot(&scan_state.scanned_slots_map) - .unwrap_or(0); - let address = core.get_address_by_slot(next_slot).await?; - - let new_epoch = core.get_address_epoch(&address).await.unwrap_or(0); - let scan_state = &ScanState::new( - 0, - scan_state.scanned_slots_map, - address, - new_epoch, - ScanStateStage::InProgress, - ); - let res = ( - send_scan(scan_state, core, match_pattern, count, object_type).await, - scan_state.clone(), - ); - Ok(res) } #[cfg(test)] mod tests { - use super::*; - #[test] - fn test_creation_of_empty_scan_wrapper() { - let scan_state_wrapper = ScanStateRC::new(); - assert!(scan_state_wrapper.status == ScanStateStage::Initiating); - } + #[tokio::test] + async fn test_cluster_scan_args_builder() { + let args = ClusterScanArgs::builder() + .with_match_pattern("user:*") + .with_count(100) + .with_object_type(ObjectType::Hash) + .allow_non_covered_slots(true) + .build(); - #[test] - fn test_creation_of_scan_state_wrapper_from() { - let scan_state = ScanState { - cursor: 0, - scanned_slots_map: [0; BITS_ARRAY_SIZE], - address_in_scan: String::from("address1").into(), - address_epoch: 1, - scan_status: ScanStateStage::InProgress, - }; - - let scan_state_wrapper = ScanStateRC::from_scan_state(scan_state); - assert!(!scan_state_wrapper.is_finished()); + assert_eq!(args.match_pattern, Some(b"user:*".to_vec())); + assert_eq!(args.count, Some(100)); + assert_eq!(args.object_type, Some(ObjectType::Hash)); + assert!(args.allow_non_covered_slots); } - #[test] - // Test the get_next_slot method - fn test_scan_state_get_next_slot() { - let scanned_slots_map: SlotsBitsArray = [0; BITS_ARRAY_SIZE]; - let scan_state = ScanState { - cursor: 0, - scanned_slots_map, - address_in_scan: String::from("address1").into(), - address_epoch: 1, - scan_status: ScanStateStage::InProgress, - }; - let next_slot = scan_state.get_next_slot(&scanned_slots_map); - assert_eq!(next_slot, Some(0)); - // Set the first slot to 1 - let mut scanned_slots_map: SlotsBitsArray = [0; BITS_ARRAY_SIZE]; - scanned_slots_map[0] = 1; - let scan_state = ScanState { - cursor: 0, - scanned_slots_map, - address_in_scan: String::from("address1").into(), - address_epoch: 1, - scan_status: ScanStateStage::InProgress, - }; - let next_slot = scan_state.get_next_slot(&scanned_slots_map); - assert_eq!(next_slot, Some(1)); - } - // Create a mock connection - struct MockConnection; - #[async_trait] - impl ClusterInScan for MockConnection { - async fn refresh_if_topology_changed(&self) -> RedisResult { - Ok(true) - } - async fn get_address_by_slot(&self, _slot: u16) -> RedisResult> { - Ok("mock_address".to_string().into()) - } - async fn get_address_epoch(&self, _address: &str) -> Result { - Ok(0) - } - async fn get_slots_of_address(&self, address: Arc) -> Vec { - if address.as_str() == "mock_address" { - vec![3, 4, 5] - } else { - vec![0, 1, 2] - } - } - async fn route_command(&self, _: Cmd, _: &str) -> RedisResult { - unimplemented!() - } - async fn are_all_slots_covered(&self) -> bool { - true - } - } - // Test the initiate_scan function #[tokio::test] - async fn test_initiate_scan() { - let connection = MockConnection; - let scan_state = ScanState::initiate_scan(&connection).await.unwrap(); - - // Assert that the scan state is initialized correctly - assert_eq!(scan_state.cursor, 0); - assert_eq!(scan_state.scanned_slots_map, [0; BITS_ARRAY_SIZE]); - assert_eq!( - scan_state.address_in_scan, - "mock_address".to_string().into() + async fn test_scan_state_new() { + let address = Arc::new("127.0.0.1:6379".to_string()); + let scan_state = ScanState::new( + 0, + [0; BITS_ARRAY_SIZE as usize], + address.clone(), + 1, + ScanStateStage::InProgress, ); - assert_eq!(scan_state.address_epoch, 0); - } - // Test the get_next_slot function - #[test] - fn test_get_next_slot() { - let scan_state = ScanState { - cursor: 0, - scanned_slots_map: [0; BITS_ARRAY_SIZE], - address_in_scan: "".to_string().into(), - address_epoch: 0, - scan_status: ScanStateStage::InProgress, - }; - // Test when all first bits of each u6 are set to 1, the next slots should be 1 - let scanned_slots_map: SlotsBitsArray = [1; BITS_ARRAY_SIZE]; - let next_slot = scan_state.get_next_slot(&scanned_slots_map); - assert_eq!(next_slot, Some(1)); - - // Test when all slots are scanned, the next slot should be 0 - let scanned_slots_map: SlotsBitsArray = [u64::MAX; BITS_ARRAY_SIZE]; - let next_slot = scan_state.get_next_slot(&scanned_slots_map); - assert_eq!(next_slot, Some(16385)); - - // Test when first, second, fourth, sixth and eighth slots scanned, the next slot should be 2 - let mut scanned_slots_map: SlotsBitsArray = [0; BITS_ARRAY_SIZE]; - scanned_slots_map[0] = 171; // 10101011 - let next_slot = scan_state.get_next_slot(&scanned_slots_map); - assert_eq!(next_slot, Some(2)); + assert_eq!(scan_state.cursor, 0); + assert_eq!(scan_state.scanned_slots_map, [0; BITS_ARRAY_SIZE as usize]); + assert_eq!(scan_state.address_in_scan, address); + assert_eq!(scan_state.address_epoch, 1); + assert_eq!(scan_state.scan_status, ScanStateStage::InProgress); } - // Test the update_scan_state_and_get_next_address function #[tokio::test] - async fn test_update_scan_state_and_get_next_address() { - let connection = MockConnection; - let scan_state = ScanState::initiate_scan(&connection).await; - let updated_scan_state = scan_state - .unwrap() - .create_updated_scan_state_for_completed_address(&connection) - .await - .unwrap(); + async fn test_scan_state_create_finished() { + let scan_state = ScanState::create_finished_state(); - // cursor should be reset to 0 - assert_eq!(updated_scan_state.cursor, 0); + assert_eq!(scan_state.cursor, 0); + assert_eq!(scan_state.scanned_slots_map, [0; BITS_ARRAY_SIZE as usize]); + assert_eq!(scan_state.address_in_scan, Arc::new(String::new())); + assert_eq!(scan_state.address_epoch, 0); + assert_eq!(scan_state.scan_status, ScanStateStage::Finished); + } - // address_in_scan should be updated to the new address - assert_eq!( - updated_scan_state.address_in_scan, - "mock_address".to_string().into() - ); + #[tokio::test] + async fn test_mark_slot_as_scanned() { + let mut scanned_slots_map = [0; BITS_ARRAY_SIZE as usize]; + mark_slot_as_scanned(&mut scanned_slots_map, 5); - // address_epoch should be updated to the new address epoch - assert_eq!(updated_scan_state.address_epoch, 0); + assert_eq!(scanned_slots_map[0], 1 << 5); } #[tokio::test] - async fn test_update_scan_state_without_updating_scanned_map() { - let connection = MockConnection; + async fn test_next_slot() { let scan_state = ScanState::new( 0, - [0; BITS_ARRAY_SIZE], - "address".to_string().into(), - 0, + [0; BITS_ARRAY_SIZE as usize], + Arc::new("127.0.0.1:6379".to_string()), + 1, ScanStateStage::InProgress, ); - let scanned_slots_map = scan_state.scanned_slots_map; - let updated_scan_state = scan_state - .creating_state_without_slot_changes(&connection) - .await - .unwrap(); - assert_eq!(updated_scan_state.scanned_slots_map, scanned_slots_map); - assert_eq!(updated_scan_state.cursor, 0); - assert_eq!( - updated_scan_state.address_in_scan, - "mock_address".to_string().into() - ); - assert_eq!(updated_scan_state.address_epoch, 0); + let next_slot = next_slot(&scan_state.scanned_slots_map); + + assert_eq!(next_slot, Some(0)); } } diff --git a/glide-core/redis-rs/redis/src/commands/mod.rs b/glide-core/redis-rs/redis/src/commands/mod.rs index 22a68cc987..19dae750c2 100644 --- a/glide-core/redis-rs/redis/src/commands/mod.rs +++ b/glide-core/redis-rs/redis/src/commands/mod.rs @@ -16,6 +16,9 @@ mod json; #[cfg(feature = "cluster-async")] pub use cluster_scan::ScanStateRC; +#[cfg(feature = "cluster-async")] +pub use cluster_scan::ClusterScanArgs; + #[cfg(feature = "cluster-async")] pub(crate) mod cluster_scan; diff --git a/glide-core/redis-rs/redis/src/lib.rs b/glide-core/redis-rs/redis/src/lib.rs index 0c960f3b4e..7121ee03c7 100644 --- a/glide-core/redis-rs/redis/src/lib.rs +++ b/glide-core/redis-rs/redis/src/lib.rs @@ -457,6 +457,9 @@ pub use crate::commands::ScanStateRC; #[cfg(feature = "cluster-async")] pub use crate::commands::ObjectType; +#[cfg(feature = "cluster-async")] +pub use crate::commands::ClusterScanArgs; + #[cfg(feature = "cluster")] mod cluster_client; diff --git a/glide-core/redis-rs/redis/tests/test_cluster_scan.rs b/glide-core/redis-rs/redis/tests/test_cluster_scan.rs index cfc4bae594..96910fe7f8 100644 --- a/glide-core/redis-rs/redis/tests/test_cluster_scan.rs +++ b/glide-core/redis-rs/redis/tests/test_cluster_scan.rs @@ -5,9 +5,68 @@ mod support; mod test_cluster_scan_async { use crate::support::*; use rand::Rng; - use redis::cluster_routing::{RoutingInfo, SingleNodeRoutingInfo}; - use redis::{cmd, from_redis_value, ObjectType, RedisResult, ScanStateRC, Value}; + use redis::cluster_routing::{ + MultipleNodeRoutingInfo, ResponsePolicy, RoutingInfo, SingleNodeRoutingInfo, + }; + use redis::{ + cmd, from_redis_value, ClusterScanArgs, ObjectType, RedisResult, ScanStateRC, Value, + }; use std::time::Duration; + use tokio::time::{sleep, Instant}; + + async fn del_slots_range( + cluster: &TestClusterContext, + range: (u16, u16), + ) -> Result<(), &'static str> { + let mut cluster_conn = cluster.async_connection(None).await; + let mut del_slots_cmd = cmd("CLUSTER"); + let (start, end) = range; + del_slots_cmd.arg("DELSLOTSRANGE").arg(start).arg(end); + let _: RedisResult = cluster_conn + .route_command( + &del_slots_cmd, + RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllNodes, + Some(ResponsePolicy::AllSucceeded), + )), + ) + .await; + + let timeout = Duration::from_secs(10); + let mut invalid = false; + loop { + sleep(Duration::from_millis(500)).await; + + let now = Instant::now(); + if now.elapsed() > timeout { + return Err("Timeout while waiting for slots to be deleted"); + } + + let slot_distribution = + cluster.get_slots_ranges_distribution(&cluster.get_cluster_nodes().await); + for (_, _, _, slot_ranges) in slot_distribution { + println!("slot_ranges: {:?}", slot_ranges); + for slot_range in slot_ranges { + let (slot_start, slot_end) = (slot_range[0], slot_range[1]); + + println!("slot_start: {}, slot_end: {}", slot_start, slot_end); + if slot_start >= start && slot_start <= end { + invalid = true; + continue; + } + if slot_end >= start && slot_end <= end { + invalid = true; + continue; + } + } + } + + if invalid { + continue; + } + return Ok(()); + } + } async fn kill_one_node( cluster: &TestClusterContext, @@ -49,7 +108,12 @@ mod test_cluster_scan_async { #[tokio::test] #[serial_test::serial] async fn test_async_cluster_scan() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new_with_cluster_client_builder( + 3, + 0, + |builder| builder.retries(1), + false, + ); let mut connection = cluster.async_connection(None).await; // Set some keys @@ -67,14 +131,14 @@ mod test_cluster_scan_async { let mut keys: Vec = vec![]; loop { let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, None, None) + .cluster_scan(scan_state_rc, ClusterScanArgs::default()) .await .unwrap(); scan_state_rc = next_cursor; let mut scan_keys = scan_keys .into_iter() .map(|v| from_redis_value(&v).unwrap()) - .collect::>(); // Change the type of `keys` to `Vec` + .collect::>(); keys.append(&mut scan_keys); if scan_state_rc.is_finished() { break; @@ -88,10 +152,114 @@ mod test_cluster_scan_async { } } + #[tokio::test] + #[serial_test::serial] + async fn test_async_cluster_scan_with_allow_non_covered_slots() { + let cluster = TestClusterContext::new_with_cluster_client_builder( + 3, + 0, + |builder| builder.retries(1), + false, + ); + + let mut connection = cluster.async_connection(None).await; + let mut expected_keys: Vec = Vec::new(); + + for i in 0..1000 { + let key = format!("key{}", i); + let _: Result<(), redis::RedisError> = redis::cmd("SET") + .arg(&key) + .arg("value") + .query_async(&mut connection) + .await; + expected_keys.push(key); + } + + let mut scan_state_rc = ScanStateRC::new(); + let mut keys: Vec = Vec::new(); + loop { + let cluster_scan_args = ClusterScanArgs::builder() + .allow_non_covered_slots(true) + .build(); + let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection + .cluster_scan(scan_state_rc, cluster_scan_args) + .await + .unwrap(); + scan_state_rc = next_cursor; + let mut scan_keys = scan_keys + .into_iter() + .map(|v| from_redis_value(&v).unwrap()) + .collect::>(); + keys.append(&mut scan_keys); + if scan_state_rc.is_finished() { + break; + } + } + + keys.sort(); + expected_keys.sort(); + assert_eq!(keys, expected_keys); + } + + #[tokio::test] + #[serial_test::serial] + async fn test_async_cluster_scan_with_delslots() { + let cluster = TestClusterContext::new_with_cluster_client_builder( + 3, + 0, + |builder| builder.retries(1), + false, + ); + let mut connection = cluster.async_connection(None).await; + let mut expected_keys: Vec = Vec::new(); + + for i in 0..1000 { + let key = format!("key{}", i); + let _: Result<(), redis::RedisError> = redis::cmd("SET") + .arg(&key) + .arg("value") + .query_async(&mut connection) + .await; + expected_keys.push(key); + } + + del_slots_range(&cluster, (1, 100)).await.unwrap(); + + let mut scan_state_rc = ScanStateRC::new(); + let mut keys: Vec = Vec::new(); + loop { + let cluster_scan_args = ClusterScanArgs::builder() + .allow_non_covered_slots(true) + .build(); + let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection + .cluster_scan(scan_state_rc, cluster_scan_args) + .await + .unwrap(); + scan_state_rc = next_cursor; + let mut scan_keys = scan_keys + .into_iter() + .map(|v| from_redis_value(&v).unwrap()) + .collect::>(); + keys.append(&mut scan_keys); + if scan_state_rc.is_finished() { + break; + } + } + + keys.sort(); + expected_keys.sort(); + assert_eq!(keys, expected_keys); + } + #[tokio::test] #[serial_test::serial] // test cluster scan with slot migration in the middle async fn test_async_cluster_scan_with_migration() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new_with_cluster_client_builder( + 3, + 0, + |builder| builder.retries(1), + false, + ); let mut connection = cluster.async_connection(None).await; // Set some keys @@ -114,7 +282,7 @@ mod test_cluster_scan_async { loop { count += 1; let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, None, None) + .cluster_scan(scan_state_rc, ClusterScanArgs::default()) .await .unwrap(); scan_state_rc = next_cursor; @@ -189,18 +357,21 @@ mod test_cluster_scan_async { let mut keys: Vec = Vec::new(); let mut count = 0; let mut result: RedisResult = Ok(Value::Nil); + let mut next_cursor = ScanStateRC::new(); + let mut scan_keys; loop { count += 1; - let scan_response: RedisResult<(ScanStateRC, Vec)> = - connection.cluster_scan(scan_state_rc, None, None).await; - let (next_cursor, scan_keys) = match scan_response { - Ok((cursor, keys)) => (cursor, keys), + let scan_response: RedisResult<(ScanStateRC, Vec)> = connection + .cluster_scan(scan_state_rc, ClusterScanArgs::default()) + .await; + (next_cursor, scan_keys) = match scan_response { + Ok((cursor, keys)) => (cursor.clone(), keys), Err(e) => { result = Err(e); break; } }; - scan_state_rc = next_cursor; + scan_state_rc = next_cursor.clone(); keys.extend(scan_keys.into_iter().map(|v| from_redis_value(&v).unwrap())); if scan_state_rc.is_finished() { break; @@ -225,6 +396,47 @@ mod test_cluster_scan_async { } // We expect an error of finding address assert!(result.is_err()); + + // Test we can continue scanning after the fail using allow_non_covered_slots=true + scan_state_rc = next_cursor; + // config cluster to allow missing slots + let mut config_cmd = cmd("CONFIG"); + config_cmd + .arg("SET") + .arg("cluster-require-full-coverage") + .arg("no"); + let res: RedisResult = connection + .route_command( + &config_cmd, + RoutingInfo::MultiNode((MultipleNodeRoutingInfo::AllNodes, None)), + ) + .await; + print!("config result: {:?}", res); + let args = ClusterScanArgs::builder() + .allow_non_covered_slots(true) + .build(); + loop { + let res = connection + .cluster_scan(scan_state_rc.clone(), args.clone()) + .await; + let (next_cursor, scan_keys): (ScanStateRC, Vec) = match res { + Ok((cursor, keys)) => (cursor.clone(), keys), + Err(e) => { + println!("error: {:?}", e); + break; + } + }; + scan_state_rc = next_cursor; + let mut scan_keys = scan_keys + .into_iter() + .map(|v| from_redis_value(&v).unwrap()) + .collect::>(); + keys.append(&mut scan_keys); + if scan_state_rc.is_finished() { + break; + } + } + assert!(scan_state_rc.is_finished()); } #[tokio::test] @@ -268,8 +480,9 @@ mod test_cluster_scan_async { let mut count = 0; loop { count += 1; - let scan_response: RedisResult<(ScanStateRC, Vec)> = - connection.cluster_scan(scan_state_rc, None, None).await; + let scan_response: RedisResult<(ScanStateRC, Vec)> = connection + .cluster_scan(scan_state_rc, ClusterScanArgs::default()) + .await; if scan_response.is_err() { println!("error: {:?}", scan_response); } @@ -439,12 +652,13 @@ mod test_cluster_scan_async { } // Scan the keys let mut scan_state_rc = ScanStateRC::new(); - let mut keys: Vec = Vec::new(); + let mut keys: Vec = vec![]; let mut count = 0; loop { count += 1; - let scan_response: RedisResult<(ScanStateRC, Vec)> = - connection.cluster_scan(scan_state_rc, None, None).await; + let scan_response: RedisResult<(ScanStateRC, Vec)> = connection + .cluster_scan(scan_state_rc, ClusterScanArgs::default()) + .await; if scan_response.is_err() { println!("error: {:?}", scan_response); } @@ -513,7 +727,7 @@ mod test_cluster_scan_async { let mut keys: Vec = vec![]; loop { let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, None, None) + .cluster_scan(scan_state_rc, ClusterScanArgs::default()) .await .unwrap(); scan_state_rc = next_cursor; @@ -574,7 +788,7 @@ mod test_cluster_scan_async { let mut keys: Vec = vec![]; loop { let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, None, None) + .cluster_scan(scan_state_rc, ClusterScanArgs::default()) .await .unwrap(); scan_state_rc = next_cursor; @@ -642,15 +856,20 @@ mod test_cluster_scan_async { let mut scan_state_rc = ScanStateRC::new(); let mut keys: Vec = vec![]; loop { + let cluster_scan_args = ClusterScanArgs::builder() + .with_match_pattern("key:pattern:*") + .allow_non_covered_slots(false) + .build(); + let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan_with_pattern(scan_state_rc, "key:pattern:*", None, None) + .cluster_scan(scan_state_rc, cluster_scan_args) .await .unwrap(); scan_state_rc = next_cursor; let mut scan_keys = scan_keys .into_iter() .map(|v| from_redis_value(&v).unwrap()) - .collect::>(); // Change the type of `keys` to `Vec` + .collect::>(); keys.append(&mut scan_keys); if scan_state_rc.is_finished() { break; @@ -665,7 +884,7 @@ mod test_cluster_scan_async { for key in expected_keys.iter() { assert!(keys.contains(key)); } - assert!(keys.len() == expected_keys.len()); + assert_eq!(keys.len(), expected_keys.len()); } #[tokio::test] @@ -701,15 +920,20 @@ mod test_cluster_scan_async { let mut scan_state_rc = ScanStateRC::new(); let mut keys: Vec = vec![]; loop { + let cluster_scan_args = ClusterScanArgs::builder() + .with_object_type(ObjectType::Set) + .allow_non_covered_slots(false) + .build(); + let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, None, Some(ObjectType::Set)) + .cluster_scan(scan_state_rc, cluster_scan_args) .await .unwrap(); scan_state_rc = next_cursor; let mut scan_keys = scan_keys .into_iter() .map(|v| from_redis_value(&v).unwrap()) - .collect::>(); // Change the type of `keys` to `Vec` + .collect::>(); keys.append(&mut scan_keys); if scan_state_rc.is_finished() { break; @@ -724,7 +948,7 @@ mod test_cluster_scan_async { for key in expected_keys.iter() { assert!(keys.contains(key)); } - assert!(keys.len() == expected_keys.len()); + assert_eq!(keys.len(), expected_keys.len()); } #[tokio::test] @@ -755,24 +979,35 @@ mod test_cluster_scan_async { let mut keys: Vec = vec![]; let mut comparing_times = 0; loop { + let cluster_scan_args = ClusterScanArgs::builder() + .with_count(100) + .allow_non_covered_slots(false) + .build(); + + let cluster_scan_args_no_count = ClusterScanArgs::builder() + .allow_non_covered_slots(false) + .build(); + let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc.clone(), Some(100), None) + .cluster_scan(scan_state_rc.clone(), cluster_scan_args) .await .unwrap(); + let (_, scan_without_count_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, Some(100), None) + .cluster_scan(scan_state_rc, cluster_scan_args_no_count) .await .unwrap(); + if !scan_keys.is_empty() && !scan_without_count_keys.is_empty() { assert!(scan_keys.len() >= scan_without_count_keys.len()); - comparing_times += 1; } + scan_state_rc = next_cursor; let mut scan_keys = scan_keys .into_iter() .map(|v| from_redis_value(&v).unwrap()) - .collect::>(); // Change the type of `keys` to `Vec` + .collect::>(); keys.append(&mut scan_keys); if scan_state_rc.is_finished() { break; @@ -788,7 +1023,7 @@ mod test_cluster_scan_async { for key in expected_keys.iter() { assert!(keys.contains(key)); } - assert!(keys.len() == expected_keys.len()); + assert_eq!(keys.len(), expected_keys.len()); } #[tokio::test] @@ -821,8 +1056,9 @@ mod test_cluster_scan_async { let mut count = 0; loop { count += 1; - let scan_response: RedisResult<(ScanStateRC, Vec)> = - connection.cluster_scan(scan_state_rc, None, None).await; + let scan_response: RedisResult<(ScanStateRC, Vec)> = connection + .cluster_scan(scan_state_rc, ClusterScanArgs::default()) + .await; if scan_response.is_err() { println!("error: {:?}", scan_response); } @@ -835,7 +1071,7 @@ mod test_cluster_scan_async { if count == 5 { drop(cluster); let scan_response: RedisResult<(ScanStateRC, Vec)> = connection - .cluster_scan(scan_state_rc.clone(), None, None) + .cluster_scan(scan_state_rc.clone(), ClusterScanArgs::default()) .await; assert!(scan_response.is_err()); break; @@ -844,8 +1080,9 @@ mod test_cluster_scan_async { cluster = TestClusterContext::new(3, 0); connection = cluster.async_connection(None).await; loop { - let scan_response: RedisResult<(ScanStateRC, Vec)> = - connection.cluster_scan(scan_state_rc, None, None).await; + let scan_response: RedisResult<(ScanStateRC, Vec)> = connection + .cluster_scan(scan_state_rc, ClusterScanArgs::default()) + .await; if scan_response.is_err() { println!("error: {:?}", scan_response); } @@ -857,4 +1094,246 @@ mod test_cluster_scan_async { } } } + + #[tokio::test] + #[serial_test::serial] + /// Test a case where a node is killed, key set into the cluster, and the client is still able to scan all keys + async fn test_async_cluster_scan_uncovered_slots_of_missing_node() { + // Create a cluster with 3 nodes + let cluster = TestClusterContext::new_with_cluster_client_builder( + 3, + 0, + |builder| builder.retries(0), + false, + ); + let mut connection = cluster.async_connection(None).await; + + let mut config_cmd = cmd("CONFIG"); + config_cmd + .arg("SET") + .arg("cluster-require-full-coverage") + .arg("no"); + let _: RedisResult = connection + .route_command( + &config_cmd, + RoutingInfo::MultiNode((MultipleNodeRoutingInfo::AllNodes, None)), + ) + .await; + // Kill one node + let mut cluster_nodes = cluster.get_cluster_nodes().await; + let slot_distribution = cluster.get_slots_ranges_distribution(&cluster_nodes); + let killed_node_routing = kill_one_node(&cluster, slot_distribution.clone()).await; + let ready = cluster.wait_for_fail_to_finish(&killed_node_routing).await; + match ready { + Ok(_) => {} + Err(e) => { + println!("error: {:?}", e); + } + } + + // Compare slot distribution before and after killing a node + cluster_nodes = cluster.get_cluster_nodes().await; + let new_slot_distribution = cluster.get_slots_ranges_distribution(&cluster_nodes); + assert_ne!(slot_distribution, new_slot_distribution); + let mut excepted_keys: Vec = vec![]; + // Set some keys + for i in 0..100 { + let key = format!("key{}", i); + let res: Result<(), redis::RedisError> = redis::cmd("SET") + .arg(&key) + .arg("value") + .query_async(&mut connection) + .await; + if res.is_ok() { + excepted_keys.push(key); + } + } + + // Scan the keys + let mut scan_state_rc = ScanStateRC::new(); + let mut keys: Vec = vec![]; + let args = ClusterScanArgs::builder() + .allow_non_covered_slots(true) + .build(); + loop { + let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection + .cluster_scan(scan_state_rc, args.clone()) + .await + .unwrap(); + scan_state_rc = next_cursor; + let mut scan_keys = scan_keys + .into_iter() + .map(|v| from_redis_value(&v).unwrap()) + .collect::>(); // Change the type of `keys` to `Vec` + keys.append(&mut scan_keys); + if scan_state_rc.is_finished() { + break; + } + } + // Check if all keys available scanned + keys.sort(); + keys.dedup(); + excepted_keys.sort(); + excepted_keys.dedup(); + for key in excepted_keys.iter() { + assert!(keys.contains(key)); + } + assert!(keys.len() > 0); + } + + #[tokio::test] + #[serial_test::serial] + /// Test scanning after killing a node and compare with "KEYS *" from remaining nodes + async fn test_async_cluster_scan_after_node_killed() { + // Create a cluster with 3 nodes + let cluster = TestClusterContext::new_with_cluster_client_builder( + 3, + 0, + |builder| builder.retries(0), + false, + ); + let mut connection = cluster.async_connection(None).await; + + // Set cluster-require-full-coverage to no + let mut config_cmd = cmd("CONFIG"); + config_cmd + .arg("SET") + .arg("cluster-require-full-coverage") + .arg("no"); + let _: RedisResult = connection + .route_command( + &config_cmd, + RoutingInfo::MultiNode((MultipleNodeRoutingInfo::AllNodes, None)), + ) + .await; + + for i in 0..100 { + let key = format!("key{}", i); + let _res: RedisResult<()> = redis::cmd("SET") + .arg(&key) + .arg("value") + .query_async(&mut connection) + .await; + } + + // Kill one node + let cluster_nodes = cluster.get_cluster_nodes().await; + let slot_distribution = cluster.get_slots_ranges_distribution(&cluster_nodes); + let killed_node_routing = kill_one_node(&cluster, slot_distribution.clone()).await; + let ready = cluster.wait_for_fail_to_finish(&killed_node_routing).await; + match ready { + Ok(_) => {} + Err(e) => { + println!("error: {:?}", e); + } + } + + // Scan the keys + let mut scan_state_rc = ScanStateRC::new(); + let mut keys: Vec = vec![]; + let args = ClusterScanArgs::builder() + .allow_non_covered_slots(true) + .build(); + loop { + let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection + .cluster_scan(scan_state_rc, args.clone()) + .await + .unwrap(); + scan_state_rc = next_cursor; + let mut scan_keys = scan_keys + .into_iter() + .map(|v| from_redis_value(&v).unwrap()) + .collect::>(); // Change the type of `keys` to `Vec` + keys.append(&mut scan_keys); + if scan_state_rc.is_finished() { + break; + } + } + + // Get keys from remaining nodes using "KEYS *" + let mut keys_from_keys_command: Vec = Vec::new(); + let key_res: RedisResult = connection + .route_command( + cmd("KEYS").arg("*"), + RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllNodes, + Some(ResponsePolicy::CombineArrays), + )), + ) + .await; + if let Ok(value) = key_res { + let values: Vec = from_redis_value(&value).unwrap(); + keys_from_keys_command + .extend(values.into_iter().map(|v| from_redis_value(&v).unwrap())); + } + + // Sort and dedup keys + keys.sort(); + keys.dedup(); + keys_from_keys_command.sort(); + keys_from_keys_command.dedup(); + + // Check if scanned keys match keys from "KEYS *" + assert_eq!(keys, keys_from_keys_command); + } + + #[tokio::test] + #[serial_test::serial] + /// Test scanning with allow_non_covered_slots as false after killing a node + async fn test_async_cluster_scan_uncovered_slots_fail() { + // Create a cluster with 3 nodes + let cluster = TestClusterContext::new_with_cluster_client_builder( + 3, + 0, + |builder| builder.retries(0), + false, + ); + let mut connection = cluster.async_connection(None).await; + + // Kill one node + let cluster_nodes = cluster.get_cluster_nodes().await; + let slot_distribution = cluster.get_slots_ranges_distribution(&cluster_nodes); + let killed_node_routing = kill_one_node(&cluster, slot_distribution.clone()).await; + let ready = cluster.wait_for_fail_to_finish(&killed_node_routing).await; + match ready { + Ok(_) => {} + Err(e) => { + println!("error: {:?}", e); + } + } + + for i in 0..100 { + let key = format!("key{}", i); + let _res: RedisResult<()> = redis::cmd("SET") + .arg(&key) + .arg("value") + .query_async(&mut connection) + .await; + } + + // Try scanning with allow_non_covered_slots as false + let mut scan_state_rc = ScanStateRC::new(); + let mut had_error = false; + loop { + let result = connection + .cluster_scan(scan_state_rc.clone(), ClusterScanArgs::default()) + .await; + + match result { + Ok((next_cursor, _)) => { + scan_state_rc = next_cursor; + if scan_state_rc.is_finished() { + break; + } + } + Err(e) => { + had_error = true; + assert_eq!(e.kind(), redis::ErrorKind::NotAllSlotsCovered); + break; + } + } + } + + assert!(had_error); + } } diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index a560e17697..73eee144b1 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -13,7 +13,7 @@ use redis::cluster_routing::{ }; use redis::cluster_slotmap::ReadFromReplicaStrategy; use redis::{ - Cmd, ErrorKind, FromRedisValue, InfoDict, ObjectType, PushInfo, RedisError, RedisResult, + ClusterScanArgs, Cmd, ErrorKind, FromRedisValue, PushInfo, RedisError, RedisResult, ScanStateRC, Value, }; pub use standalone_client::StandaloneClient; @@ -27,6 +27,7 @@ use self::value_conversion::{convert_to_expected_type, expected_type_for_cmd, ge mod reconnecting_connection; mod standalone_client; mod value_conversion; +use redis::InfoDict; use tokio::sync::mpsc; use versions::Versioning; @@ -310,33 +311,16 @@ impl Client { pub async fn cluster_scan<'a>( &'a mut self, scan_state_cursor: &'a ScanStateRC, - match_pattern: &'a Option>, - count: Option, - object_type: Option, + cluster_scan_args: ClusterScanArgs, ) -> RedisResult { match self.internal_client { ClientWrapper::Standalone(_) => { unreachable!("Cluster scan is not supported in standalone mode") } ClientWrapper::Cluster { ref mut client } => { - let (cursor, keys) = match match_pattern { - Some(pattern) => { - client - .cluster_scan_with_pattern( - scan_state_cursor.clone(), - pattern, - count, - object_type, - ) - .await? - } - None => { - client - .cluster_scan(scan_state_cursor.clone(), count, object_type) - .await? - } - }; - + let (cursor, keys) = client + .cluster_scan(scan_state_cursor.clone(), cluster_scan_args) + .await?; let cluster_cursor_id = if cursor.is_finished() { Value::BulkString(FINISHED_SCAN_CURSOR.into()) } else { diff --git a/glide-core/src/protobuf/command_request.proto b/glide-core/src/protobuf/command_request.proto index 30b33362af..d7c693cfd6 100644 --- a/glide-core/src/protobuf/command_request.proto +++ b/glide-core/src/protobuf/command_request.proto @@ -506,6 +506,7 @@ message ClusterScan { optional bytes match_pattern = 2; optional int64 count = 3; optional string object_type = 4; + bool allow_non_covered_slots = 5; } message UpdateConnectionPassword { diff --git a/glide-core/src/socket_listener.rs b/glide-core/src/socket_listener.rs index f148bbdede..4896f83565 100644 --- a/glide-core/src/socket_listener.rs +++ b/glide-core/src/socket_listener.rs @@ -19,7 +19,7 @@ use redis::cluster_routing::{ MultipleNodeRoutingInfo, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr, }; use redis::cluster_routing::{ResponsePolicy, Routable}; -use redis::{Cmd, PushInfo, RedisError, ScanStateRC, Value}; +use redis::{ClusterScanArgs, Cmd, PushInfo, RedisError, ScanStateRC, Value}; use std::cell::Cell; use std::collections::HashSet; use std::rc::Rc; @@ -321,30 +321,23 @@ async fn cluster_scan(cluster_scan: ClusterScan, mut client: Client) -> ClientUs } else { get_cluster_scan_cursor(cursor)? }; - - let match_pattern = cluster_scan.match_pattern.map(|pattern| pattern.into()); - let count = cluster_scan.count.map(|count| count as usize); - - let object_type = match cluster_scan.object_type { - Some(char_object_type) => match char_object_type.to_string().to_lowercase().as_str() { - STRING => Some(redis::ObjectType::String), - LIST => Some(redis::ObjectType::List), - SET => Some(redis::ObjectType::Set), - ZSET => Some(redis::ObjectType::ZSet), - HASH => Some(redis::ObjectType::Hash), - STREAM => Some(redis::ObjectType::Stream), - _ => { - return Err(ClientUsageError::Internal(format!( - "Received invalid object type: {:?}", - char_object_type - ))) - } - }, - None => None, - }; + let mut cluster_scan_args_builder = + ClusterScanArgs::builder().allow_non_covered_slots(cluster_scan.allow_non_covered_slots); + if let Some(match_pattern) = cluster_scan.match_pattern { + cluster_scan_args_builder = + cluster_scan_args_builder.with_match_pattern::(match_pattern); + } + if let Some(count) = cluster_scan.count { + cluster_scan_args_builder = cluster_scan_args_builder.with_count(count as u32); + } + if let Some(object_type) = cluster_scan.object_type { + cluster_scan_args_builder = + cluster_scan_args_builder.with_object_type(object_type.to_string().into()); + } + let cluster_scan_args = cluster_scan_args_builder.build(); client - .cluster_scan(&cluster_scan_cursor, &match_pattern, count, object_type) + .cluster_scan(&cluster_scan_cursor, cluster_scan_args) .await .map_err(|err| err.into()) } diff --git a/java/client/src/main/java/glide/api/models/commands/scan/ScanOptions.java b/java/client/src/main/java/glide/api/models/commands/scan/ScanOptions.java index 6fffc46f35..2b7102bc2f 100644 --- a/java/client/src/main/java/glide/api/models/commands/scan/ScanOptions.java +++ b/java/client/src/main/java/glide/api/models/commands/scan/ScanOptions.java @@ -6,6 +6,7 @@ import glide.api.models.GlideString; import glide.ffi.resolvers.ObjectTypeResolver; import glide.utils.ArrayTransformUtils; +import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.experimental.SuperBuilder; @@ -28,6 +29,13 @@ public class ScanOptions extends BaseScanOptions { */ private final ObjectType type; + /** + * If set to true, the scan will perform even if some slots are not covered by any node. It's + * important to note that when set to true, the scan has no guarantee to cover all keys in the + * cluster, and the method loses its way to validate the progress of the scan. Defaults to false. + */ + @Builder.Default private final Boolean allowNonCoveredSlots = false; + /** Defines the complex data types available for a SCAN request. */ public enum ObjectType { STRING(ObjectTypeResolver.OBJECT_TYPE_STRING_NATIVE_NAME), @@ -86,4 +94,11 @@ public Long getCount() { public ObjectType getType() { return type; } + + /** + * @return whether non-covered slots are allowed. + */ + public Boolean getAllowNonCoveredSlots() { + return allowNonCoveredSlots; + } } diff --git a/java/client/src/main/java/glide/managers/CommandManager.java b/java/client/src/main/java/glide/managers/CommandManager.java index d069c6bd72..47b0de7d75 100644 --- a/java/client/src/main/java/glide/managers/CommandManager.java +++ b/java/client/src/main/java/glide/managers/CommandManager.java @@ -428,6 +428,10 @@ protected CommandRequest.Builder prepareCursorRequest( clusterScanBuilder.setObjectType(options.getType().getNativeName()); } + if (options.getAllowNonCoveredSlots() != null) { + clusterScanBuilder.setAllowNonCoveredSlots(options.getAllowNonCoveredSlots()); + } + return CommandRequest.newBuilder().setClusterScan(clusterScanBuilder.build()); } diff --git a/node/src/Commands.ts b/node/src/Commands.ts index 8411cf212d..972c74777c 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -3842,6 +3842,19 @@ export interface ScanOptions extends BaseScanOptions { type?: ObjectType; } +/** + * Options for the SCAN command. + * `match`: The match filter is applied to the result of the command and will only include keys that match the pattern specified. + * `count`: `COUNT` is a just a hint for the command for how many elements to fetch from the server, the default is 10. + * `type`: The type of the object to scan. + * Types are the data types of Valkey: `string`, `list`, `set`, `zset`, `hash`, `stream`. + * `allowNonCoveredSlots`: If true, the scan will keep scanning even if slots are not covered by the cluster. + * By default, the scan will stop if slots are not covered by the cluster. + */ +export interface ClusterScanOptions extends ScanOptions { + allowNonCoveredSlots?: boolean; +} + /** * Options specific to the ZSCAN command, extending from the base scan options. */ diff --git a/node/src/GlideClusterClient.ts b/node/src/GlideClusterClient.ts index 0524128dd5..4e9aee579d 100644 --- a/node/src/GlideClusterClient.ts +++ b/node/src/GlideClusterClient.ts @@ -23,7 +23,7 @@ import { FunctionStatsSingleResponse, InfoOptions, LolwutOptions, - ScanOptions, + ClusterScanOptions, createClientGetName, createClientId, createConfigGet, @@ -146,7 +146,7 @@ export namespace GlideClusterClientConfiguration { /** * Configuration options for creating a {@link GlideClusterClient | GlideClusterClient}. * - * Extends `BaseClientConfiguration` with properties specific to `GlideClusterClient`, such as periodic topology checks + * Extends {@link BaseClientConfiguration | BaseClientConfiguration} with properties specific to `GlideClusterClient`, such as periodic topology checks * and Pub/Sub subscription settings. * * @remarks @@ -579,7 +579,7 @@ export class GlideClusterClient extends BaseClient { */ protected scanOptionsToProto( cursor: string, - options?: ScanOptions, + options?: ClusterScanOptions, ): command_request.ClusterScan { const command = command_request.ClusterScan.create(); command.cursor = cursor; @@ -596,6 +596,7 @@ export class GlideClusterClient extends BaseClient { command.objectType = options.type; } + command.allowNonCoveredSlots = options?.allowNonCoveredSlots ?? false; return command; } @@ -604,7 +605,7 @@ export class GlideClusterClient extends BaseClient { */ protected createClusterScanPromise( cursor: ClusterScanCursor, - options?: ScanOptions & DecoderOption, + options?: ClusterScanOptions & DecoderOption, ): Promise<[ClusterScanCursor, GlideString[]]> { // separate decoder option from scan options const { decoder, ...scanOptions } = options || {}; @@ -633,7 +634,7 @@ export class GlideClusterClient extends BaseClient { * * @param cursor - The cursor object that wraps the scan state. * To start a new scan, create a new empty `ClusterScanCursor` using {@link ClusterScanCursor}. - * @param options - (Optional) The scan options, see {@link ScanOptions} and {@link DecoderOption}. + * @param options - (Optional) The scan options, see {@link ClusterScanOptions} and {@link DecoderOption}. * @returns A Promise resolving to an array containing the next cursor and an array of keys, * formatted as [`ClusterScanCursor`, `string[]`]. * @@ -651,14 +652,14 @@ export class GlideClusterClient extends BaseClient { * console.log(allKeys); // ["key1", "key2", "key3"] * * // Iterate over keys matching a pattern - * await client.mset([{key: "key1", value: "value1"}, {key: "key2", value: "value2"}, {key: "notMykey", value: "value3"}, {key: "somethingElse", value: "value4"}]); + * await client.mset([{key: "key1", value: "value1"}, {key: "key2", value: "value2"}, {key: "notMyKey", value: "value3"}, {key: "somethingElse", value: "value4"}]); * let cursor = new ClusterScanCursor(); * const matchedKeys: GlideString[] = []; * while (!cursor.isFinished()) { * const [cursor, keys] = await client.scan(cursor, { match: "*key*", count: 10 }); * matchedKeys.push(...keys); * } - * console.log(matchedKeys); // ["key1", "key2", "notMykey"] + * console.log(matchedKeys); // ["key1", "key2", "notMyKey"] * * // Iterate over keys of a specific type * await client.mset([{key: "key1", value: "value1"}, {key: "key2", value: "value2"}, {key: "key3", value: "value3"}]); @@ -674,7 +675,7 @@ export class GlideClusterClient extends BaseClient { */ public async scan( cursor: ClusterScanCursor, - options?: ScanOptions & DecoderOption, + options?: ClusterScanOptions & DecoderOption, ): Promise<[ClusterScanCursor, GlideString[]]> { return this.createClusterScanPromise(cursor, options); } diff --git a/node/tests/ScanTest.test.ts b/node/tests/ScanTest.test.ts index bff90bab36..bb370a81db 100644 --- a/node/tests/ScanTest.test.ts +++ b/node/tests/ScanTest.test.ts @@ -12,6 +12,7 @@ import { GlideString, ObjectType, ProtocolVersion, + GlideClusterClientConfiguration, } from ".."; import { ValkeyCluster } from "../../utils/TestUtils.js"; import { @@ -19,6 +20,7 @@ import { getClientConfigurationOption, getServerVersion, parseEndpoints, + waitForClusterReady as isClusterReadyWithExpectedNodeCount, } from "./TestUtilities"; const TIMEOUT = 50000; @@ -376,6 +378,166 @@ describe("Scan GlideClusterClient", () => { }, TIMEOUT, ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `GlideClusterClient scan with allowNonCoveredSlots %p`, + async (protocol) => { + const testCluster = await ValkeyCluster.createCluster( + true, + 3, + 0, + getServerVersion, + ); + const config: GlideClusterClientConfiguration = { + addresses: testCluster + .getAddresses() + .map(([host, port]) => ({ host, port })), + protocol, + }; + const testClient = await GlideClusterClient.createClient(config); + + try { + for (let i = 0; i < 10000; i++) { + const result = await testClient.set(`${uuidv4()}`, "value"); + expect(result).toBe("OK"); + } + + // Perform an initial scan to ensure all works as expected + let cursor = new ClusterScanCursor(); + let result = await testClient.scan(cursor); + cursor = result[0]; + expect(cursor.isFinished()).toBe(false); + + // Set 'cluster-require-full-coverage' to 'no' to allow operations with missing slots + await testClient.configSet({ + "cluster-require-full-coverage": "no", + }); + + // Forget one server to simulate a node failure + const addresses = testCluster.getAddresses(); + const addressToForget = addresses[0]; + const allOtherAddresses = addresses.slice(1); + const idToForget = await testClient.customCommand( + ["CLUSTER", "MYID"], + { + route: { + type: "routeByAddress", + host: addressToForget[0], + port: addressToForget[1], + }, + }, + ); + + for (const address of allOtherAddresses) { + await testClient.customCommand( + ["CLUSTER", "FORGET", idToForget as string], + { + route: { + type: "routeByAddress", + host: address[0], + port: address[1], + }, + }, + ); + } + + // Wait for the cluster to stabilize after forgetting a node + const ready = await isClusterReadyWithExpectedNodeCount( + testClient, + allOtherAddresses.length, + ); + expect(ready).toBe(true); + + // Attempt to scan without 'allowNonCoveredSlots', expecting an error + // Since it might take time for the inner core to forget the missing node, + // we retry the scan until the expected error is thrown. + + const maxRetries = 10; + let retries = 0; + let errorReceived = false; + + while (retries < maxRetries && !errorReceived) { + retries++; + cursor = new ClusterScanCursor(); + + try { + while (!cursor.isFinished()) { + result = await testClient.scan(cursor); + cursor = result[0]; + } + + // If scan completes without error, wait and retry + await new Promise((resolve) => + setTimeout(resolve, 1000), + ); + } catch (error) { + if ( + error instanceof Error && + error.message.includes( + "Could not find an address covering a slot, SCAN operation cannot continue", + ) + ) { + // Expected error occurred + errorReceived = true; + } else { + // Unexpected error, rethrow + throw error; + } + } + } + + expect(errorReceived).toBe(true); + + // Perform scan with 'allowNonCoveredSlots: true' + cursor = new ClusterScanCursor(); + + while (!cursor.isFinished()) { + result = await testClient.scan(cursor, { + allowNonCoveredSlots: true, + }); + cursor = result[0]; + } + + expect(cursor.isFinished()).toBe(true); + + // Get keys using 'KEYS *' from the remaining nodes + const keys: GlideString[] = []; + + for (const address of allOtherAddresses) { + const result = await testClient.customCommand( + ["KEYS", "*"], + { + route: { + type: "routeByAddress", + host: address[0], + port: address[1], + }, + }, + ); + keys.push(...(result as GlideString[])); + } + + // Scan again with 'allowNonCoveredSlots: true' and collect results + cursor = new ClusterScanCursor(); + const results: GlideString[] = []; + + while (!cursor.isFinished()) { + result = await testClient.scan(cursor, { + allowNonCoveredSlots: true, + }); + results.push(...result[1]); + cursor = result[0]; + } + + // Compare the sets of keys obtained from 'KEYS *' and 'SCAN' + expect(new Set(results)).toEqual(new Set(keys)); + } finally { + testClient.close(); + await testCluster.close(); + } + }, + TIMEOUT, + ); }); //standalone tests diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index 6d79c768fc..cd21d20367 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -83,6 +83,51 @@ function intoArrayInternal(obj: any, builder: string[]) { } } +// The function is used to check if the cluster is ready with the count nodes known command using the client supplied. +// The way it works is by parsing the response of the CLUSTER INFO command and checking if the cluster_state is ok and the cluster_known_nodes is equal to the count. +// If so, we know the cluster is ready, and it has the amount of nodes we expect. +export async function waitForClusterReady( + client: GlideClusterClient, + count: number, +): Promise { + const timeout = 20000; // 20 seconds timeout in milliseconds + const startTime = Date.now(); + + while (true) { + if (Date.now() - startTime > timeout) { + return false; + } + + const clusterInfo = await client.customCommand(["CLUSTER", "INFO"]); + // parse the response + const clusterInfoMap = new Map(); + + if (clusterInfo) { + const clusterInfoLines = clusterInfo + .toString() + .split("\n") + .filter((line) => line.length > 0); + + for (const line of clusterInfoLines) { + const [key, value] = line.split(":"); + + clusterInfoMap.set(key.trim(), value.trim()); + } + + if ( + clusterInfoMap.get("cluster_state") == "ok" && + Number(clusterInfoMap.get("cluster_known_nodes")) == count + ) { + break; + } + } + + await new Promise((resolve) => setTimeout(resolve, 2000)); + } + + return true; +} + /** * accept any variable `v` and convert it into String, recursively */ diff --git a/package.json b/package.json index 3f61298feb..c6676131a2 100644 --- a/package.json +++ b/package.json @@ -1,12 +1,14 @@ { "devDependencies": { - "@eslint/js": "^9.10.0", + "@eslint/js": "9.17.0", "@types/eslint__js": "^8.42.3", "@types/eslint-config-prettier": "^6.11.3", - "eslint": "9.14.0", + "eslint": "9.17.0", "eslint-config-prettier": "^9.1.0", - "prettier": "^3.3.3", - "typescript": "^5.6.2", - "typescript-eslint": "^8.13" + "eslint-plugin-jsdoc": "^50.6.1", + "prettier": "3.4.2", + "prettier-eslint": "16.3.0", + "typescript": "5.7.2", + "typescript-eslint": "8.18.1" } } diff --git a/python/DEVELOPER.md b/python/DEVELOPER.md index 66127913c3..ae945b5835 100644 --- a/python/DEVELOPER.md +++ b/python/DEVELOPER.md @@ -109,7 +109,6 @@ cd python python3 -m venv .env source .env/bin/activate pip install -r requirements.txt -pip install -r dev_requirements.txt ``` ## Build the package (in release mode): @@ -211,7 +210,7 @@ Run from the main `/python` folder ```bash cd $HOME/src/valkey-glide/python source .env/bin/activate - pip install -r dev_requirements.txt + pip install -r requirements.txt isort . --profile black --skip-glob python/glide/protobuf --skip-glob .env black . --exclude python/glide/protobuf --exclude .env flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics \ diff --git a/python/dev_requirements.txt b/python/dev_requirements.txt deleted file mode 100644 index 02e9c4fd53..0000000000 --- a/python/dev_requirements.txt +++ /dev/null @@ -1,7 +0,0 @@ -black >= 24.3.0 -flake8 == 5.0 -isort == 5.10 -mypy == 1.2 -mypy-protobuf == 3.5 -packaging >= 22.0 -pyrsistent diff --git a/python/python/glide/async_commands/cluster_commands.py b/python/python/glide/async_commands/cluster_commands.py index 584091ab32..6167e1abaa 100644 --- a/python/python/glide/async_commands/cluster_commands.py +++ b/python/python/glide/async_commands/cluster_commands.py @@ -1076,67 +1076,78 @@ async def scan( match: Optional[TEncodable] = None, count: Optional[int] = None, type: Optional[ObjectType] = None, + allow_non_covered_slots: bool = False, ) -> List[Union[ClusterScanCursor, List[bytes]]]: """ - Incrementally iterates over the keys in the Cluster. + Incrementally iterates over the keys in the cluster. The method returns a list containing the next cursor and a list of keys. - This command is similar to the SCAN command, but it is designed to work in a Cluster environment. - For each iteration the new cursor object should be used to continue the scan. + This command is similar to the SCAN command but is designed to work in a cluster environment. + For each iteration, the new cursor object should be used to continue the scan. Using the same cursor object for multiple iterations will result in the same keys or unexpected behavior. - For more information about the Cluster Scan implementation, - see [Cluster Scan](https://github.com/valkey-io/valkey-glide/wiki/General-Concepts#cluster-scan). + For more information about the Cluster Scan implementation, see [Cluster Scan](https://github.com/valkey-io/valkey-glide/wiki/General-Concepts#cluster-scan). - As the SCAN command, the method can be used to iterate over the keys in the database, - to return all keys the database have from the time the scan started till the scan ends. - The same key can be returned in multiple scans iteration. + Like the SCAN command, the method can be used to iterate over the keys in the database, + returning all keys the database has from when the scan started until the scan ends. + The same key can be returned in multiple scan iterations. See https://valkey.io/commands/scan/ for more details. Args: cursor (ClusterScanCursor): The cursor object that wraps the scan state. - To start a new scan, create a new empty ClusterScanCursor using ClusterScanCursor(). + To start a new scan, create a new empty ClusterScanCursor using ClusterScanCursor(). match (Optional[TEncodable]): A pattern to match keys against. count (Optional[int]): The number of keys to return in a single iteration. - The actual number returned can vary and is not guaranteed to match this count exactly. - This parameter serves as a hint to the server on the number of steps to perform in each iteration. - The default value is 10. + The actual number returned can vary and is not guaranteed to match this count exactly. + This parameter serves as a hint to the server on the number of steps to perform in each iteration. + The default value is 10. type (Optional[ObjectType]): The type of object to scan for. + allow_non_covered_slots (bool): If set to True, the scan will perform even if some slots are not covered by any node. + It's important to note that when set to True, the scan has no guarantee to cover all keys in the cluster, + and the method loses its way to validate the progress of the scan. Defaults to False. Returns: List[Union[ClusterScanCursor, List[TEncodable]]]: A list containing the next cursor and a list of keys, - formatted as [ClusterScanCursor, [key1, key2, ...]]. + formatted as [ClusterScanCursor, [key1, key2, ...]]. Examples: - >>> # In the following example, we will iterate over the keys in the cluster. - await client.mset({b'key1': b'value1', b'key2': b'value2', b'key3': b'value3'}) - cursor = ClusterScanCursor() - all_keys = [] - while not cursor.is_finished(): - cursor, keys = await client.scan(cursor, count=10) - all_keys.extend(keys) - print(all_keys) # [b'key1', b'key2', b'key3'] - >>> # In the following example, we will iterate over the keys in the cluster that match the pattern "*key*". - await client.mset({b"key1": b"value1", b"key2": b"value2", b"not_my_key": b"value3", b"something_else": b"value4"}) - cursor = ClusterScanCursor() - all_keys = [] - while not cursor.is_finished(): - cursor, keys = await client.scan(cursor, match=b"*key*", count=10) - all_keys.extend(keys) - print(all_keys) # [b'my_key1', b'my_key2', b'not_my_key'] - >>> # In the following example, we will iterate over the keys in the cluster that are of type STRING. - await client.mset({b'key1': b'value1', b'key2': b'value2', b'key3': b'value3'}) - await client.sadd(b"this_is_a_set", [b"value4"]) - cursor = ClusterScanCursor() - all_keys = [] - while not cursor.is_finished(): - cursor, keys = await client.scan(cursor, type=ObjectType.STRING) - all_keys.extend(keys) - print(all_keys) # [b'key1', b'key2', b'key3'] + >>> # Iterate over all keys in the cluster. + >>> await client.mset({b'key1': b'value1', b'key2': b'value2', b'key3': b'value3'}) + >>> cursor = ClusterScanCursor() + >>> all_keys = [] + >>> while not cursor.is_finished(): + >>> cursor, keys = await client.scan(cursor, count=10, allow_non_covered_slots=False) + >>> all_keys.extend(keys) + >>> print(all_keys) # [b'key1', b'key2', b'key3'] + + >>> # Iterate over keys matching the pattern "*key*". + >>> await client.mset({b"key1": b"value1", b"key2": b"value2", b"not_my_key": b"value3", b"something_else": b"value4"}) + >>> cursor = ClusterScanCursor() + >>> all_keys = [] + >>> while not cursor.is_finished(): + >>> cursor, keys = await client.scan(cursor, match=b"*key*", count=10, allow_non_covered_slots=False) + >>> all_keys.extend(keys) + >>> print(all_keys) # [b'key1', b'key2', b'not_my_key'] + + >>> # Iterate over keys of type STRING. + >>> await client.mset({b'key1': b'value1', b'key2': b'value2', b'key3': b'value3'}) + >>> await client.sadd(b"this_is_a_set", [b"value4"]) + >>> cursor = ClusterScanCursor() + >>> all_keys = [] + >>> while not cursor.is_finished(): + >>> cursor, keys = await client.scan(cursor, type=ObjectType.STRING, allow_non_covered_slots=False) + >>> all_keys.extend(keys) + >>> print(all_keys) # [b'key1', b'key2', b'key3'] """ return cast( List[Union[ClusterScanCursor, List[bytes]]], - await self._cluster_scan(cursor, match, count, type), + await self._cluster_scan( + cursor=cursor, + match=match, + count=count, + type=type, + allow_non_covered_slots=allow_non_covered_slots, + ), ) async def script_exists( diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index 6ebc8d2ab6..3f1be75d98 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -390,6 +390,7 @@ async def _cluster_scan( match: Optional[TEncodable] = ..., count: Optional[int] = ..., type: Optional[ObjectType] = ..., + allow_non_covered_slots: bool = ..., ) -> TResult: ... async def _update_connection_password( diff --git a/python/python/glide/glide_client.py b/python/python/glide/glide_client.py index 6178b997a7..5ed558e709 100644 --- a/python/python/glide/glide_client.py +++ b/python/python/glide/glide_client.py @@ -566,6 +566,7 @@ async def _cluster_scan( match: Optional[TEncodable] = None, count: Optional[int] = None, type: Optional[ObjectType] = None, + allow_non_covered_slots: bool = False, ) -> List[Union[ClusterScanCursor, List[bytes]]]: if self._is_closed: raise ClosingError( @@ -576,6 +577,7 @@ async def _cluster_scan( # Take out the id string from the wrapping object cursor_string = cursor.get_cursor() request.cluster_scan.cursor = cursor_string + request.cluster_scan.allow_non_covered_slots = allow_non_covered_slots if match is not None: request.cluster_scan.match_pattern = ( self._encode_arg(match) if isinstance(match, str) else match diff --git a/python/python/tests/conftest.py b/python/python/tests/conftest.py index 0937ca2067..0ab5c9d6e9 100644 --- a/python/python/tests/conftest.py +++ b/python/python/tests/conftest.py @@ -252,14 +252,16 @@ async def create_client( inflight_requests_limit: Optional[int] = None, read_from: ReadFrom = ReadFrom.PRIMARY, client_az: Optional[str] = None, + valkey_cluster: Optional[ValkeyCluster] = None, ) -> Union[GlideClient, GlideClusterClient]: # Create async socket client use_tls = request.config.getoption("--tls") if cluster_mode: - assert type(pytest.valkey_cluster) is ValkeyCluster + valkey_cluster = valkey_cluster or pytest.valkey_cluster + assert type(valkey_cluster) is ValkeyCluster assert database_id == 0 - k = min(3, len(pytest.valkey_cluster.nodes_addr)) - seed_nodes = random.sample(pytest.valkey_cluster.nodes_addr, k=k) + k = min(3, len(valkey_cluster.nodes_addr)) + seed_nodes = random.sample(valkey_cluster.nodes_addr, k=k) cluster_config = GlideClusterClientConfiguration( addresses=seed_nodes if addresses is None else addresses, use_tls=use_tls, diff --git a/python/python/tests/test_scan.py b/python/python/tests/test_scan.py index 907dc703d5..69d5243aaf 100644 --- a/python/python/tests/test_scan.py +++ b/python/python/tests/test_scan.py @@ -1,16 +1,96 @@ -from __future__ import annotations - -from typing import List, cast +import asyncio +from typing import AsyncGenerator, List, cast import pytest -from glide import ClusterScanCursor +from glide import ByAddressRoute from glide.async_commands.command_args import ObjectType from glide.config import ProtocolVersion from glide.exceptions import RequestError +from glide.glide import ClusterScanCursor from glide.glide_client import GlideClient, GlideClusterClient +from tests.conftest import create_client +from tests.utils.cluster import ValkeyCluster from tests.utils.utils import get_random_string +# Helper function to get a number of nodes, and ask the cluster till we get the number of nodes +async def is_cluster_ready(client: GlideClusterClient, count: int) -> bool: + # we allow max 20 seconds to get the nodes + timeout = 20 + start_time = asyncio.get_event_loop().time() + + while True: + if asyncio.get_event_loop().time() - start_time > timeout: + return False + + cluster_info = await client.custom_command(["CLUSTER", "INFO"]) + cluster_info_map = {} + + if cluster_info: + info_str = ( + cluster_info + if isinstance(cluster_info, str) + else ( + cluster_info.decode() + if isinstance(cluster_info, bytes) + else str(cluster_info) + ) + ) + cluster_info_lines = info_str.split("\n") + cluster_info_lines = [line for line in cluster_info_lines if line] + + for line in cluster_info_lines: + key, value = line.split(":") + cluster_info_map[key.strip()] = value.strip() + + if ( + cluster_info_map.get("cluster_state") == "ok" + and int(cluster_info_map.get("cluster_known_nodes", "0")) == count + ): + break + + await asyncio.sleep(2) + + return True + + +# The slots not covered testing is messing with the cluster by removing a node, and then scanning the cluster +# When a node is forgot its getting into a banned state for one minutes, so in order to bring back the cluster to normal state +# we need to wait for the node to be unbanned, and then we can continue with the tests +# In order to avoid the time wasting and the chance that the restoration will not happen, we will run the test on separate cluster +@pytest.fixture(scope="function") +async def function_scoped_cluster(): + """ + Function-scoped fixture to create a new cluster for each test invocation. + """ + cluster = ValkeyCluster( + tls=False, cluster_mode=True, shard_count=3, replica_count=0 + ) + yield cluster + del cluster + + +# Since the cluster for slots covered is created separately, we need to create a client for the specific cluster +# The client is created with 100 timeout so looping over the keys with scan will return the error before we finish the loop +# otherwise the test will be flaky +@pytest.fixture(scope="function") +async def glide_client_scoped( + request, function_scoped_cluster: ValkeyCluster, protocol: ProtocolVersion +) -> AsyncGenerator[GlideClusterClient, None]: + """ + Get client for tests, adjusted to use the function-scoped cluster. + """ + client = await create_client( + request, + True, + valkey_cluster=function_scoped_cluster, + protocol=protocol, + ) + assert isinstance(client, GlideClusterClient) + yield client + await client.close() + + @pytest.mark.asyncio class TestScan: # Cluster scan tests @@ -251,6 +331,75 @@ async def test_cluster_scan_all_types(self, glide_client: GlideClusterClient): assert not set(encoded_list_keys).intersection(set(keys)) assert not set(encoded_zset_keys).intersection(set(keys)) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_cluster_scan_non_covered_slots( + self, + protocol: ProtocolVersion, + function_scoped_cluster: ValkeyCluster, + glide_client_scoped: GlideClusterClient, + ): + key = get_random_string(10) + for i in range(1000): + await glide_client_scoped.set(f"{key}:{i}", "value") + cursor = ClusterScanCursor() + result = await glide_client_scoped.scan(cursor) + cursor = cast(ClusterScanCursor, result[0]) + assert not cursor.is_finished() + await glide_client_scoped.config_set({"cluster-require-full-coverage": "no"}) + # forget one server + address_to_forget = glide_client_scoped.config.addresses[0] + all_other_addresses = glide_client_scoped.config.addresses[1:] + id_to_forget = await glide_client_scoped.custom_command( + ["CLUSTER", "MYID"], + ByAddressRoute(address_to_forget.host, address_to_forget.port), + ) + for address in all_other_addresses: + await glide_client_scoped.custom_command( + ["CLUSTER", "FORGET", cast(str, id_to_forget)], + ByAddressRoute(address.host, address.port), + ) + # now we let it few seconds gossip to get the new cluster configuration + await is_cluster_ready(glide_client_scoped, len(all_other_addresses)) + # Iterate scan until error is returned, as it might take time for the inner core to forget the missing node + cursor = ClusterScanCursor() + while True: + try: + while not cursor.is_finished(): + result = await glide_client_scoped.scan(cursor) + cursor = cast(ClusterScanCursor, result[0]) + # Reset cursor for next iteration + cursor = ClusterScanCursor() + except RequestError as e_info: + assert ( + "Could not find an address covering a slot, SCAN operation cannot continue" + in str(e_info) + ) + break + # Scan with allow_non_covered_slots=True + while not cursor.is_finished(): + result = await glide_client_scoped.scan( + cursor, allow_non_covered_slots=True + ) + cursor = cast(ClusterScanCursor, result[0]) + assert cursor.is_finished() + # check the keys we can get with keys command, and scan from the beginning + keys = [] + for address in all_other_addresses: + result = await glide_client_scoped.custom_command( + ["KEYS", "*"], ByAddressRoute(address.host, address.port) + ) + keys.extend(cast(List[bytes], result)) + + cursor = ClusterScanCursor() + results = [] + while not cursor.is_finished(): + result = await glide_client_scoped.scan( + cursor, allow_non_covered_slots=True + ) + results.extend(cast(List[bytes], result[1])) + cursor = cast(ClusterScanCursor, result[0]) + assert set(keys) == set(results) + # Standalone scan tests @pytest.mark.parametrize("cluster_mode", [False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index ccdb309f58..c623f4e8c9 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -1033,10 +1033,7 @@ async def test_standalone_transaction(self, glide_client: GlideClient): assert result[5:13] == [2, 2, 2, [b"Bob", b"Alice"], 2, OK, None, 0] assert result[13:] == expected - @pytest.mark.filterwarnings( - action="ignore", message="The test " - ) - def test_transaction_clear(self): + async def test_transaction_clear(self): transaction = Transaction() transaction.info() transaction.select(1) diff --git a/python/requirements.txt b/python/requirements.txt index b39d1d96c8..b5880e6287 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -1,7 +1,12 @@ async-timeout==4.0.2;python_version<"3.11" maturin==0.14.17 # higher version break the needs structure changes, the name of the project is not the same as the package name, and the naming both glide create a circular dependency - TODO: fix this -protobuf==3.20.* pytest pytest-asyncio typing_extensions==4.8.0;python_version<"3.11" pytest-html +black >= 24.3.0 +flake8 == 5.0 +isort == 5.10 +mypy == 1.13.0 +mypy-protobuf == 3.5 +packaging >= 22.0 diff --git a/utils/TestUtils.ts b/utils/TestUtils.ts index 423bf8e9cb..9c89788528 100644 --- a/utils/TestUtils.ts +++ b/utils/TestUtils.ts @@ -21,9 +21,9 @@ function parseOutput(input: string): { .split(",") .map((address) => address.split(":")) .map((address) => [address[0], Number(address[1])]) as [ - string, - number, - ][]; + string, + number, + ][]; if (clusterFolder === undefined || ports === undefined) { throw new Error(`Insufficient data in input: ${input}`); @@ -82,7 +82,7 @@ export class ValkeyCluster { execFile( "python3", [PY_SCRIPT_PATH, ...command.split(" ")], - (error, stdout, stderr) => { + (error, stdout) => { if (error) { reject(error); } else {