Skip to content

Commit

Permalink
Merge branch 'release-1.2' into main
Browse files Browse the repository at this point in the history
Signed-off-by: ikolomi <[email protected]>
  • Loading branch information
ikolomi committed Dec 30, 2024
2 parents 2d6ae0b + 151d9b1 commit 0edf200
Show file tree
Hide file tree
Showing 41 changed files with 1,176 additions and 396 deletions.
15 changes: 6 additions & 9 deletions .github/workflows/create-test-matrices/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@ runs:
echo 'Select server engines to run tests against'
if [[ "${{ github.event_name }}" == "pull_request" || "${{ github.event_name }}" == "push" || "${{ inputs.run-full-matrix }}" == "false" ]]; then
echo 'Pick engines marked as `"run": "always"` only - on PR, push or manually triggered job which does not require full matrix'
jq -c '[.[] | select(.run == "always")]' < .github/json_matrices/engine-matrix.json | awk '{ printf "engine-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c '[.[] | select(.run == "always")]' < .github/json_matrices/engine-matrix.json | awk '{ printf "engine-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
else
echo 'Pick all engines - on cron (schedule) or if manually triggered job requires a full matrix'
jq -c . < .github/json_matrices/engine-matrix.json | awk '{ printf "engine-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c . < .github/json_matrices/engine-matrix.json | awk '{ printf "engine-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
fi
cat $GITHUB_OUTPUT
- name: Load host matrix
id: load-host-matrix
Expand All @@ -57,12 +56,11 @@ runs:
echo 'Select runners (VMs) to run tests on'
if [[ "${{ github.event_name }}" == "pull_request" || "${{ github.event_name }}" == "push" || "${{ inputs.run-full-matrix }}" == "false" ]]; then
echo 'Pick runners marked as '"run": "always"' only - on PR, push or manually triggered job which does not require full matrix'
jq -c '[.[] | select(.run == "always")]' < .github/json_matrices/build-matrix.json | awk '{ printf "host-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c '[.[] | select(.run == "always")]' < .github/json_matrices/build-matrix.json | awk '{ printf "host-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
else
echo 'Pick all runners assigned for the chosen client (language) - on cron (schedule) or if manually triggered job requires a full matrix'
jq -c "[.[] | select(.languages? and any(.languages[] == \"${{ inputs.language-name }}\"; .) and $CONDITION)]" < .github/json_matrices/build-matrix.json | awk '{ printf "host-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c "[.[] | select(.languages? and any(.languages[] == \"${{ inputs.language-name }}\"; .) and $CONDITION)]" < .github/json_matrices/build-matrix.json | awk '{ printf "host-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
fi
cat $GITHUB_OUTPUT
- name: Create language version matrix
id: create-lang-version-matrix
Expand All @@ -72,9 +70,8 @@ runs:
echo 'Select language (framework/SDK) versions to run tests on'
if [[ "${{ github.event_name }}" == "pull_request" || "${{ github.event_name }}" == "push" || "${{ inputs.run-full-matrix }}" == "false" ]]; then
echo 'Pick language versions listed in 'always-run-versions' only - on PR, push or manually triggered job which does not require full matrix'
jq -c '[.[] | select(.language == "${{ inputs.language-name }}") | .["always-run-versions"]][0] // []' < .github/json_matrices/supported-languages-versions.json | awk '{ printf "version-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c '[.[] | select(.language == "${{ inputs.language-name }}") | .["always-run-versions"]][0] // []' < .github/json_matrices/supported-languages-versions.json | awk '{ printf "version-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
else
echo 'Pick language versions listed in 'versions' - on cron (schedule) or if manually triggered job requires a full matrix'
jq -c '[.[] | select(.language == "${{ inputs.language-name }}") | .versions][0]' < .github/json_matrices/supported-languages-versions.json | awk '{ printf "version-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c '[.[] | select(.language == "${{ inputs.language-name }}") | .versions][0]' < .github/json_matrices/supported-languages-versions.json | awk '{ printf "version-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
fi
cat $GITHUB_OUTPUT
4 changes: 4 additions & 0 deletions .github/workflows/java-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ jobs:
host: ${{ fromJson(needs.load-platform-matrix.outputs.PLATFORM_MATRIX) }}
runs-on: ${{ matrix.host.RUNNER }}
steps:
- name: Setup self-hosted runner access
if: ${{matrix.host.TARGET == 'aarch64-unknown-linux-gnu' }}
run: sudo chown -R $USER:$USER /home/ubuntu/action-runner-ilia/_work/valkey-glide

- name: Checkout
uses: actions/checkout@v4

Expand Down
20 changes: 11 additions & 9 deletions .github/workflows/npm-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -308,23 +308,18 @@ jobs:
if: ${{ matrix.build.TARGET == 'aarch64-unknown-linux-gnu' }}
run: sudo chown -R $USER:$USER /home/ubuntu/actions-runner/_work/valkey-glide

- name: install Redis and git for alpine
- name: install redis and git for alpine
if: ${{ contains(matrix.build.TARGET, 'musl') }}
run: |
apk update
apk add redis git
apk add git redis
node -v
- name: install Redis and Python for ubuntu
- name: install Python for ubuntu
if: ${{ contains(matrix.build.TARGET, 'linux-gnu') }}
run: |
sudo apt-get update
sudo apt-get install redis-server python3
- name: install Redis, Python for macos
if: ${{ contains(matrix.build.RUNNER, 'mac') }}
run: |
brew install redis python3
sudo apt-get install python3
- name: Checkout
if: ${{ matrix.build.TARGET != 'aarch64-unknown-linux-musl'}}
Expand All @@ -339,6 +334,13 @@ jobs:
npm-auth-token: ${{ secrets.NPM_AUTH_TOKEN }}
arch: ${{ matrix.build.ARCH }}

- name: Install engine
if: ${{ !contains(matrix.build.TARGET, 'musl') }}
uses: ./.github/workflows/install-engine
with:
engine-version: "8.0"
target: ${{ matrix.build.target }}

- name: Setup node
if: ${{ !contains(matrix.build.TARGET, 'musl') }}
uses: actions/setup-node@v4
Expand Down
29 changes: 5 additions & 24 deletions .github/workflows/pypi-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -232,31 +232,12 @@ jobs:
with:
python-version: 3.12

- name: Install engine Ubuntu ARM
if: ${{ matrix.build.TARGET == 'aarch64-unknown-linux-gnu' }}
shell: bash
# in self hosted runner we first want to check that engine is not already installed
run: |
if [[ $(`which redis-server`) == '' ]]
then
sudo apt-get update
sudo apt-get install -y redis-server
else
echo "Redis is already installed"
fi
- name: Install engine Ubuntu x86
if: ${{ matrix.build.TARGET == 'x86_64-unknown-linux-gnu' }}
shell: bash
run: |
sudo apt-get update
sudo apt-get install -y redis-server
- name: Install engine
uses: ./.github/workflows/install-engine
with:
engine-version: "8.0"
target: ${{ matrix.build.target }}

- name: Install engine MacOS
if: ${{ matrix.build.OS == 'macos' }}
shell: bash
run: |
brew install redis

- name: Check if RC and set a distribution tag for the package
shell: bash
Expand Down
24 changes: 20 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#### Changes
* Node, Python, Java: Add allow uncovered slots scanning flag option in cluster scan ([#2814](https://github.com/valkey-io/valkey-glide/pull/2814), [#2815](https://github.com/valkey-io/valkey-glide/pull/2815), [#2860](https://github.com/valkey-io/valkey-glide/pull/2860))

* Go: Add HINCRBY command ([#2847](https://github.com/valkey-io/valkey-glide/pull/2847))
* Go: Add HINCRBYFLOAT command ([#2846](https://github.com/valkey-io/valkey-glide/pull/2846))
* Go: Add SUNIONSTORE command ([#2805](https://github.com/valkey-io/valkey-glide/pull/2805))
* Go: Add SUNION ([#2787](https://github.com/valkey-io/valkey-glide/pull/2787))
* Java: bump `netty` version ([#2795](https://github.com/valkey-io/valkey-glide/pull/2795))
* Java: Bump protobuf (protoc) version ([#2796](https://github.com/valkey-io/valkey-glide/pull/2796), [#2800](https://github.com/valkey-io/valkey-glide/pull/2800))
* Go: Add `SInterStore` ([#2779](https://github.com/valkey-io/valkey-glide/issues/2779))
* Node: Remove native package references for MacOs x64 architecture ([#2799](https://github.com/valkey-io/valkey-glide/issues/2799))
* Go: Add `ZIncrBy` command ([#2830](https://github.com/valkey-io/valkey-glide/pull/2830))
* Go: Add `SScan` and `SMove` ([#2789](https://github.com/valkey-io/valkey-glide/issues/2789))
* Go: Add `ZADD` ([#2813](https://github.com/valkey-io/valkey-glide/issues/2813))
Expand All @@ -19,6 +18,25 @@

#### Operational Enhancements

## 1.2.1 (2024-12-29)

#### Changes

* Node, Python, Java: Add allow uncovered slots scanning flag option in cluster scan ([#2814](https://github.com/valkey-io/valkey-glide/pull/2814), [#2815](https://github.com/valkey-io/valkey-glide/pull/2815), [#2860](https://github.com/valkey-io/valkey-glide/pull/2860))
* Java: Bump protobuf (protoc) version ([#2561](https://github.com/valkey-io/valkey-glide/pull/2561), [#2802](https://github.com/valkey-io/valkey-glide/pull/2802))
* Java: bump `netty` version ([#2777](https://github.com/valkey-io/valkey-glide/pull/2777))
* Node: Remove native package references for MacOs x64 architecture ([#2799](https://github.com/valkey-io/valkey-glide/issues/2799))
* Node, Python, Java: Add connection timeout to client configuration ([#2823](https://github.com/valkey-io/valkey-glide/issues/2823))

#### Breaking Changes

#### Fixes

* Core: Fix RESP2 multi-node response from cluster ([#2381](https://github.com/valkey-io/valkey-glide/pull/2381))
* Core: Ensure cluster client creation fail when engine is < 7.0 and sharded subscriptions are configured ([#2819](https://github.com/valkey-io/valkey-glide/pull/2819))

#### Operational Enhancements

## 1.2.0 (2024-11-27)

#### Changes
Expand Down Expand Up @@ -117,8 +135,6 @@

#### Breaking Changes

**None**

#### Fixes

* Core: UDS Socket Handling Rework ([#2482](https://github.com/valkey-io/valkey-glide/pull/2482))
Expand Down
1 change: 1 addition & 0 deletions glide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ nanoid = "0.4.0"
async-trait = { version = "0.1.24" }
serde_json = "1"
serde = { version = "1", features = ["derive"] }
versions = "6.3"

[features]
socket-layer = [
Expand Down
5 changes: 5 additions & 0 deletions glide-core/redis-rs/redis/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ pub struct GlideConnectionOptions {
/// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'.
/// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property.
pub discover_az: bool,
/// Connection timeout duration.
///
/// This optional field sets the maximum duration to wait when attempting to establish
/// a connection. If `None`, the connection will use `DEFAULT_CONNECTION_TIMEOUT`.
pub connection_timeout: Option<Duration>,
}

/// To enable async support you need to enable the feature: `tokio-comp`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ where
push_sender: None,
disconnect_notifier,
discover_az,
connection_timeout: Some(params.connection_timeout),
},
)
.await
Expand Down
1 change: 1 addition & 0 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,7 @@ where
push_sender,
disconnect_notifier,
discover_az,
connection_timeout: Some(cluster_params.connection_timeout),
};

let connections = Self::create_initial_connections(
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ fn base_routing(cmd: &[u8]) -> RouteBy {
| b"FUNCTION STATS" => RouteBy::AllNodes,

b"DBSIZE"
| b"DEBUG"
| b"FLUSHALL"
| b"FLUSHDB"
| b"FT._ALIASLIST"
Expand Down Expand Up @@ -717,7 +718,6 @@ fn base_routing(cmd: &[u8]) -> RouteBy {
| b"COMMAND LIST"
| b"COMMAND"
| b"CONFIG GET"
| b"DEBUG"
| b"ECHO"
| b"FUNCTION LIST"
| b"LASTSAVE"
Expand Down
69 changes: 62 additions & 7 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use redis::cluster_routing::{
};
use redis::cluster_slotmap::ReadFromReplicaStrategy;
use redis::{
ClusterScanArgs, Cmd, ErrorKind, PushInfo, RedisError, RedisResult, ScanStateRC, Value,
ClusterScanArgs, Cmd, ErrorKind, FromRedisValue, PushInfo, RedisError, RedisResult,
ScanStateRC, Value,
};
pub use standalone_client::StandaloneClient;
use std::io;
Expand All @@ -26,14 +27,17 @@ use self::value_conversion::{convert_to_expected_type, expected_type_for_cmd, ge
mod reconnecting_connection;
mod standalone_client;
mod value_conversion;
use redis::InfoDict;
use tokio::sync::mpsc;
use versions::Versioning;

pub const HEARTBEAT_SLEEP_DURATION: Duration = Duration::from_secs(1);
pub const DEFAULT_RETRIES: u32 = 3;
/// Note: If you change the default value, make sure to change the documentation in *all* wrappers.
pub const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL: Duration = Duration::from_secs(60);
pub const INTERNAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250);
/// Note: If you change the default value, make sure to change the documentation in *all* wrappers.
pub const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250);
pub const FINISHED_SCAN_CURSOR: &str = "finished";

/// The value of 1000 for the maximum number of inflight requests is determined based on Little's Law in queuing theory:
Expand Down Expand Up @@ -568,8 +572,9 @@ async fn create_cluster_client(
Some(PeriodicCheck::ManualInterval(interval)) => Some(interval),
None => Some(DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL),
};
let connection_timeout = to_duration(request.connection_timeout, DEFAULT_CONNECTION_TIMEOUT);
let mut builder = redis::cluster::ClusterClientBuilder::new(initial_nodes)
.connection_timeout(INTERNAL_CONNECTION_TIMEOUT)
.connection_timeout(connection_timeout)
.retries(DEFAULT_RETRIES);
let read_from_strategy = request.read_from.unwrap_or_default();
builder = builder.read_from(match read_from_strategy {
Expand All @@ -592,15 +597,63 @@ async fn create_cluster_client(
};
builder = builder.tls(tls);
}
if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions {
if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions.clone() {
builder = builder.pubsub_subscriptions(pubsub_subscriptions);
}

// Always use with Glide
builder = builder.periodic_connections_checks(CONNECTION_CHECKS_INTERVAL);

let client = builder.build()?;
client.get_async_connection(push_sender).await
let mut con = client.get_async_connection(push_sender).await?;

// This validation ensures that sharded subscriptions are not applied to Redis engines older than version 7.0,
// preventing scenarios where the client becomes inoperable or, worse, unaware that sharded pubsub messages are not being received.
// The issue arises because `client.get_async_connection()` might succeed even if the engine does not support sharded pubsub.
// For example, initial connections may exclude the target node for sharded subscriptions, allowing the creation to succeed,
// but subsequent resubscription tasks will fail when `setup_connection()` cannot establish a connection to the node.
//
// One approach to handle this would be to check the engine version inside `setup_connection()` and skip applying sharded subscriptions.
// However, this approach would leave the application unaware that the subscriptions were not applied, requiring the user to analyze logs to identify the issue.
// Instead, we explicitly check the engine version here and fail the connection creation if it is incompatible with sharded subscriptions.

if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions {
if pubsub_subscriptions.contains_key(&redis::PubSubSubscriptionKind::Sharded) {
let info_res = con
.route_command(
redis::cmd("INFO").arg("SERVER"),
RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random),
)
.await?;
let info_dict: InfoDict = FromRedisValue::from_redis_value(&info_res)?;
match info_dict.get::<String>("redis_version") {
Some(version) => match (Versioning::new(version), Versioning::new("7.0")) {
(Some(server_ver), Some(min_ver)) => {
if server_ver < min_ver {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"Sharded subscriptions provided, but the engine version is < 7.0",
)));
}
}
_ => {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Failed to parse engine version",
)))
}
},
_ => {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Could not determine engine version from INFO result",
)))
}
}
}
}

Ok(con)
}

#[derive(thiserror::Error)]
Expand Down Expand Up @@ -667,6 +720,8 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String {
"\nStandalone mode"
};
let request_timeout = format_optional_value("Request timeout", request.request_timeout);
let connection_timeout =
format_optional_value("Connection timeout", request.connection_timeout);
let database_id = format!("\ndatabase ID: {}", request.database_id);
let rfr_strategy = request
.read_from
Expand Down Expand Up @@ -723,7 +778,7 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String {
);

format!(
"\nAddresses: {addresses}{tls_mode}{cluster_mode}{request_timeout}{rfr_strategy}{connection_retry_strategy}{database_id}{protocol}{client_name}{periodic_checks}{pubsub_subscriptions}{inflight_requests_limit}",
"\nAddresses: {addresses}{tls_mode}{cluster_mode}{request_timeout}{connection_timeout}{rfr_strategy}{connection_retry_strategy}{database_id}{protocol}{client_name}{periodic_checks}{pubsub_subscriptions}{inflight_requests_limit}",
)
}

Expand Down
Loading

0 comments on commit 0edf200

Please sign in to comment.