diff --git a/CHANGELOG.md b/CHANGELOG.md index 817b975922..af4e2672e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,13 +84,14 @@ * Node: Added PUBSUB * commands ([#2090](https://github.com/valkey-io/valkey-glide/pull/2090)) * Python: Added PUBSUB * commands ([#2043](https://github.com/valkey-io/valkey-glide/pull/2043)) * Node: Added XGROUP CREATE & XGROUP DESTROY commands ([#2084](https://github.com/valkey-io/valkey-glide/pull/2084)) -* Node: Added BZPOPMAX & BZPOPMIN command ([#2077]((https://github.com/valkey-io/valkey-glide/pull/2077)) +* Node: Added BZPOPMAX & BZPOPMIN command ([#2077](https://github.com/valkey-io/valkey-glide/pull/2077)) * Node: Added XGROUP CREATECONSUMER & XGROUP DELCONSUMER commands ([#2088](https://github.com/valkey-io/valkey-glide/pull/2088)) * Node: Added GETEX command ([#2107]((https://github.com/valkey-io/valkey-glide/pull/2107)) * Node: Added ZINTER and ZUNION commands ([#2146](https://github.com/aws/glide-for-redis/pull/2146)) #### Breaking Changes * Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005)) +* Core: Change FUNCTION STATS command to return multi node response for standalone mode ([#2117](https://github.com/valkey-io/valkey-glide/pull/2117)) #### Fixes * Java: Add overloads for XADD to allow duplicate entry keys ([#1970](https://github.com/valkey-io/valkey-glide/pull/1970)) diff --git a/glide-core/src/client/reconnecting_connection.rs b/glide-core/src/client/reconnecting_connection.rs index c76da9cf42..4d962e40dd 100644 --- a/glide-core/src/client/reconnecting_connection.rs +++ b/glide-core/src/client/reconnecting_connection.rs @@ -165,7 +165,7 @@ impl ReconnectingConnection { create_connection(backend, connection_retry_strategy, push_sender).await } - fn node_address(&self) -> String { + pub(crate) fn node_address(&self) -> String { self.inner .backend .connection_info diff --git a/glide-core/src/client/standalone_client.rs b/glide-core/src/client/standalone_client.rs index a8350651e9..fd17c538e8 100644 --- a/glide-core/src/client/standalone_client.rs +++ b/glide-core/src/client/standalone_client.rs @@ -312,7 +312,22 @@ impl StandaloneClient { Some(ResponsePolicy::CombineMaps) => future::try_join_all(requests) .await .and_then(cluster_routing::combine_map_results), - Some(ResponsePolicy::Special) | None => { + Some(ResponsePolicy::Special) => { + // Await all futures and collect results + let results = future::try_join_all(requests).await?; + // Create key-value pairs where the key is the node address and the value is the corresponding result + let node_result_pairs = self + .inner + .nodes + .iter() + .zip(results) + .map(|(node, result)| (Value::BulkString(node.node_address().into()), result)) + .collect(); + + Ok(Value::Map(node_result_pairs)) + } + + None => { // This is our assumption - if there's no coherent way to aggregate the responses, we just collect them in an array, and pass it to the user. // TODO - once Value::Error is merged, we can use join_all and report separate errors and also pass successes. future::try_join_all(requests).await.map(Value::Array) diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index 91182e797e..0638201f9e 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -208,6 +208,7 @@ import glide.api.commands.StreamBaseCommands; import glide.api.commands.StringBaseCommands; import glide.api.commands.TransactionsBaseCommands; +import glide.api.models.ClusterValue; import glide.api.models.GlideString; import glide.api.models.PubSubMessage; import glide.api.models.Script; @@ -696,7 +697,7 @@ protected Map[] handleFunctionListResponseBinary(Object[] r return data; } - /** Process a FUNCTION STATS standalone response. */ + /** Process a FUNCTION STATS response from one node. */ protected Map> handleFunctionStatsResponse( Map> response) { Map runningScriptInfo = response.get("running_script"); @@ -707,7 +708,7 @@ protected Map> handleFunctionStatsResponse( return response; } - /** Process a FUNCTION STATS standalone response. */ + /** Process a FUNCTION STATS response from one node. */ protected Map> handleFunctionStatsBinaryResponse( Map> response) { Map runningScriptInfo = response.get(gs("running_script")); @@ -718,6 +719,36 @@ protected Map> handleFunctionStatsBinaryRe return response; } + /** Process a FUNCTION STATS cluster response. */ + protected ClusterValue>> handleFunctionStatsResponse( + Response response, boolean isSingleValue) { + if (isSingleValue) { + return ClusterValue.ofSingleValue(handleFunctionStatsResponse(handleMapResponse(response))); + } else { + Map>> data = handleMapResponse(response); + for (var nodeInfo : data.entrySet()) { + nodeInfo.setValue(handleFunctionStatsResponse(nodeInfo.getValue())); + } + return ClusterValue.ofMultiValue(data); + } + } + + /** Process a FUNCTION STATS cluster response. */ + protected ClusterValue>> + handleFunctionStatsBinaryResponse(Response response, boolean isSingleValue) { + if (isSingleValue) { + return ClusterValue.ofSingleValue( + handleFunctionStatsBinaryResponse(handleBinaryStringMapResponse(response))); + } else { + Map>> data = + handleBinaryStringMapResponse(response); + for (var nodeInfo : data.entrySet()) { + nodeInfo.setValue(handleFunctionStatsBinaryResponse(nodeInfo.getValue())); + } + return ClusterValue.ofMultiValueBinary(data); + } + } + /** Process a LCS key1 key2 IDX response */ protected Map handleLcsIdxResponse(Map response) throws GlideException { diff --git a/java/client/src/main/java/glide/api/GlideClient.java b/java/client/src/main/java/glide/api/GlideClient.java index ee85b4b393..32c5a5cc28 100644 --- a/java/client/src/main/java/glide/api/GlideClient.java +++ b/java/client/src/main/java/glide/api/GlideClient.java @@ -444,19 +444,20 @@ public CompletableFuture functionKill() { } @Override - public CompletableFuture>> functionStats() { + public CompletableFuture>>> functionStats() { return commandManager.submitNewCommand( FunctionStats, new String[0], - response -> handleFunctionStatsResponse(handleMapResponse(response))); + response -> handleFunctionStatsResponse(response, false).getMultiValue()); } @Override - public CompletableFuture>> functionStatsBinary() { + public CompletableFuture>>> + functionStatsBinary() { return commandManager.submitNewCommand( FunctionStats, new GlideString[0], - response -> handleFunctionStatsBinaryResponse(handleBinaryStringMapResponse(response))); + response -> handleFunctionStatsBinaryResponse(response, false).getMultiValue()); } @Override diff --git a/java/client/src/main/java/glide/api/GlideClusterClient.java b/java/client/src/main/java/glide/api/GlideClusterClient.java index bd01bce6b1..2eb52c10ef 100644 --- a/java/client/src/main/java/glide/api/GlideClusterClient.java +++ b/java/client/src/main/java/glide/api/GlideClusterClient.java @@ -946,36 +946,6 @@ public CompletableFuture functionKill(@NonNull Route route) { FunctionKill, new String[0], route, this::handleStringResponse); } - /** Process a FUNCTION STATS cluster response. */ - protected ClusterValue>> handleFunctionStatsResponse( - Response response, boolean isSingleValue) { - if (isSingleValue) { - return ClusterValue.ofSingleValue(handleFunctionStatsResponse(handleMapResponse(response))); - } else { - Map>> data = handleMapResponse(response); - for (var nodeInfo : data.entrySet()) { - nodeInfo.setValue(handleFunctionStatsResponse(nodeInfo.getValue())); - } - return ClusterValue.ofMultiValue(data); - } - } - - /** Process a FUNCTION STATS cluster response. */ - protected ClusterValue>> - handleFunctionStatsBinaryResponse(Response response, boolean isSingleValue) { - if (isSingleValue) { - return ClusterValue.ofSingleValue( - handleFunctionStatsBinaryResponse(handleBinaryStringMapResponse(response))); - } else { - Map>> data = - handleBinaryStringMapResponse(response); - for (var nodeInfo : data.entrySet()) { - nodeInfo.setValue(handleFunctionStatsBinaryResponse(nodeInfo.getValue())); - } - return ClusterValue.ofMultiValueBinary(data); - } - } - @Override public CompletableFuture>>> functionStats() { return commandManager.submitNewCommand( diff --git a/java/client/src/main/java/glide/api/commands/ScriptingAndFunctionsClusterCommands.java b/java/client/src/main/java/glide/api/commands/ScriptingAndFunctionsClusterCommands.java index 885fc16b81..0c4a9a891f 100644 --- a/java/client/src/main/java/glide/api/commands/ScriptingAndFunctionsClusterCommands.java +++ b/java/client/src/main/java/glide/api/commands/ScriptingAndFunctionsClusterCommands.java @@ -865,7 +865,7 @@ CompletableFuture> fcallReadOnly( /** * Kills a function that is currently executing.
* FUNCTION KILL terminates read-only functions only.
- * The command will be routed to all primary nodes. + * The command will be routed to all nodes. * * @since Valkey 7.0 and above. * @see valkey.io for details. diff --git a/java/client/src/main/java/glide/api/commands/ScriptingAndFunctionsCommands.java b/java/client/src/main/java/glide/api/commands/ScriptingAndFunctionsCommands.java index f6ca4890bb..7cb5bb3f36 100644 --- a/java/client/src/main/java/glide/api/commands/ScriptingAndFunctionsCommands.java +++ b/java/client/src/main/java/glide/api/commands/ScriptingAndFunctionsCommands.java @@ -328,7 +328,8 @@ CompletableFuture[]> functionListBinary( /** * Kills a function that is currently executing.
- * FUNCTION KILL terminates read-only functions only. + * FUNCTION KILL terminates read-only functions only. FUNCTION KILL runs + * on all nodes of the server, including primary and replicas. * * @since Valkey 7.0 and above. * @see valkey.io for details. @@ -343,11 +344,14 @@ CompletableFuture[]> functionListBinary( /** * Returns information about the function that's currently running and information about the - * available execution engines. + * available execution engines.
+ * FUNCTION STATS runs on all nodes of the server, including primary and replicas. + * The response includes a mapping from node address to the command response for that node. * * @since Valkey 7.0 and above. * @see valkey.io for details. - * @return A Map with two keys: + * @return A Map from node address to the command response for that node, where the + * command contains a Map with two keys: *
    *
  • running_script with information about the running script. *
  • engines with information about available engines and their stats. @@ -355,30 +359,35 @@ CompletableFuture[]> functionListBinary( * See example for more details. * @example *
    {@code
    -     * Map> response = client.functionStats().get();
    -     * Map runningScriptInfo = response.get("running_script");
    -     * if (runningScriptInfo != null) {
    -     *   String[] commandLine = (String[]) runningScriptInfo.get("command");
    -     *   System.out.printf("Server is currently running function '%s' with command line '%s', which has been running for %d ms%n",
    -     *       runningScriptInfo.get("name"), String.join(" ", commandLine), (long)runningScriptInfo.get("duration_ms"));
    -     * }
    -     * Map enginesInfo = response.get("engines");
    -     * for (String engineName : enginesInfo.keySet()) {
    -     *   Map engine = (Map) enginesInfo.get(engineName);
    -     *   System.out.printf("Server supports engine '%s', which has %d libraries and %d functions in total%n",
    -     *       engineName, engine.get("libraries_count"), engine.get("functions_count"));
    +     * Map>> response = client.functionStats().get();
    +     * for (String node : response.keySet()) {
    +     *   Map runningScriptInfo = response.get(node).get("running_script");
    +     *   if (runningScriptInfo != null) {
    +     *     String[] commandLine = (String[]) runningScriptInfo.get("command");
    +     *     System.out.printf("Node '%s' is currently running function '%s' with command line '%s', which has been running for %d ms%n",
    +     *         node, runningScriptInfo.get("name"), String.join(" ", commandLine), (long)runningScriptInfo.get("duration_ms"));
    +     *   }
    +     *   Map enginesInfo = response.get(node).get("engines");
    +     *   for (String engineName : enginesInfo.keySet()) {
    +     *     Map engine = (Map) enginesInfo.get(engineName);
    +     *     System.out.printf("Node '%s' supports engine '%s', which has %d libraries and %d functions in total%n",
    +     *         node, engineName, engine.get("libraries_count"), engine.get("functions_count"));
    +     *   }
          * }
          * }
    */ - CompletableFuture>> functionStats(); + CompletableFuture>>> functionStats(); /** * Returns information about the function that's currently running and information about the - * available execution engines. + * available execution engines.
    + * FUNCTION STATS runs on all nodes of the server, including primary and replicas. + * The response includes a mapping from node address to the command response for that node. * * @since Valkey 7.0 and above. * @see valkey.io for details. - * @return A Map with two keys: + * @return A Map from node address to the command response for that node, where the + * command contains a Map with two keys: *
      *
    • running_script with information about the running script. *
    • engines with information about available engines and their stats. @@ -386,20 +395,22 @@ CompletableFuture[]> functionListBinary( * See example for more details. * @example *
      {@code
      -     * Map> response = client.functionStats().get();
      -     * Map runningScriptInfo = response.get(gs("running_script"));
      -     * if (runningScriptInfo != null) {
      -     *   GlideString[] commandLine = (GlideString[]) runningScriptInfo.get(gs("command"));
      -     *   System.out.printf("Server is currently running function '%s' with command line '%s', which has been running for %d ms%n",
      -     *       runningScriptInfo.get(gs("name")), String.join(" ", Arrays.toString(commandLine)), (long)runningScriptInfo.get(gs("duration_ms")));
      -     * }
      -     * Map enginesInfo = response.get(gs("engines"));
      -     * for (GlideString engineName : enginesInfo.keySet()) {
      -     *   Map engine = (Map) enginesInfo.get(gs(engineName));
      -     *   System.out.printf("Server supports engine '%s', which has %d libraries and %d functions in total%n",
      -     *       engineName, engine.get(gs("libraries_count")), engine.get(gs("functions_count")));
      +     * Map>> response = client.functionStats().get();
      +     * for (String node : response.keySet()) {
      +     *   Map runningScriptInfo = response.get(gs(node)).get(gs("running_script"));
      +     *   if (runningScriptInfo != null) {
      +     *     GlideString[] commandLine = (GlideString[]) runningScriptInfo.get(gs("command"));
      +     *     System.out.printf("Node '%s' is currently running function '%s' with command line '%s', which has been running for %d ms%n",
      +     *         node, runningScriptInfo.get(gs("name")), String.join(" ", Arrays.toString(commandLine)), (long)runningScriptInfo.get(gs("duration_ms")));
      +     *   }
      +     *   Map enginesInfo = response.get(gs(node)).get(gs("engines"));
      +     *   for (GlideString engineName : enginesInfo.keySet()) {
      +     *     Map engine = (Map) enginesInfo.get(gs(engineName));
      +     *     System.out.printf("Node '%s' supports engine '%s', which has %d libraries and %d functions in total%n",
      +     *         node, engineName, engine.get(gs("libraries_count")), engine.get(gs("functions_count")));
      +     *   }
            * }
            * }
      */ - CompletableFuture>> functionStatsBinary(); + CompletableFuture>>> functionStatsBinary(); } diff --git a/java/client/src/test/java/glide/api/GlideClientTest.java b/java/client/src/test/java/glide/api/GlideClientTest.java index 50e812a511..2170218b36 100644 --- a/java/client/src/test/java/glide/api/GlideClientTest.java +++ b/java/client/src/test/java/glide/api/GlideClientTest.java @@ -11485,18 +11485,21 @@ public void functionKill_returns_success() { public void functionStats_returns_success() { // setup String[] args = new String[0]; - Map> value = Map.of("1", Map.of("2", 2)); - CompletableFuture>> testResponse = new CompletableFuture<>(); + Map>> value = + Map.of("::1", Map.of("1", Map.of("2", 2))); + CompletableFuture>>> testResponse = + new CompletableFuture<>(); testResponse.complete(value); // match on protobuf request - when(commandManager.>>submitNewCommand( + when(commandManager.>>>submitNewCommand( eq(FunctionStats), eq(args), any())) .thenReturn(testResponse); // exercise - CompletableFuture>> response = service.functionStats(); - Map> payload = response.get(); + CompletableFuture>>> response = + service.functionStats(); + Map>> payload = response.get(); // verify assertEquals(testResponse, response); @@ -11508,20 +11511,21 @@ public void functionStats_returns_success() { public void functionStatsBinary_returns_success() { // setup GlideString[] args = new GlideString[0]; - Map> value = Map.of(gs("1"), Map.of(gs("2"), 2)); - CompletableFuture>> testResponse = + Map>> value = + Map.of("::1", Map.of(gs("1"), Map.of(gs("2"), 2))); + CompletableFuture>>> testResponse = new CompletableFuture<>(); testResponse.complete(value); // match on protobuf request - when(commandManager.>>submitNewCommand( + when(commandManager.>>>submitNewCommand( eq(FunctionStats), eq(args), any())) .thenReturn(testResponse); // exercise - CompletableFuture>> response = + CompletableFuture>>> response = service.functionStatsBinary(); - Map> payload = response.get(); + Map>> payload = response.get(); // verify assertEquals(testResponse, response); diff --git a/java/integTest/src/test/java/glide/standalone/CommandTests.java b/java/integTest/src/test/java/glide/standalone/CommandTests.java index 5cdac40b9c..eeb8f6660d 100644 --- a/java/integTest/src/test/java/glide/standalone/CommandTests.java +++ b/java/integTest/src/test/java/glide/standalone/CommandTests.java @@ -986,7 +986,9 @@ public void functionStats() { assertEquals(libName, regularClient.functionLoad(code, true).get()); var response = regularClient.functionStats().get(); - checkFunctionStatsResponse(response, new String[0], 1, 1); + for (var nodeResponse : response.values()) { + checkFunctionStatsResponse(nodeResponse, new String[0], 1, 1); + } code = generateLuaLibCode( @@ -996,12 +998,16 @@ public void functionStats() { assertEquals(libName + "_2", regularClient.functionLoad(code, true).get()); response = regularClient.functionStats().get(); - checkFunctionStatsResponse(response, new String[0], 2, 3); + for (var nodeResponse : response.values()) { + checkFunctionStatsResponse(nodeResponse, new String[0], 2, 3); + } assertEquals(OK, regularClient.functionFlush(SYNC).get()); response = regularClient.functionStats().get(); - checkFunctionStatsResponse(response, new String[0], 0, 0); + for (var nodeResponse : response.values()) { + checkFunctionStatsResponse(nodeResponse, new String[0], 0, 0); + } } @Test @@ -1019,7 +1025,9 @@ public void functionStatsBinary() { assertEquals(libName, regularClient.functionLoad(code, true).get()); var response = regularClient.functionStatsBinary().get(); - checkFunctionStatsBinaryResponse(response, new GlideString[0], 1, 1); + for (var nodeResponse : response.values()) { + checkFunctionStatsBinaryResponse(nodeResponse, new GlideString[0], 1, 1); + } code = generateLuaLibCodeBinary( @@ -1033,12 +1041,16 @@ public void functionStatsBinary() { assertEquals(gs(libName.toString() + "_2"), regularClient.functionLoad(code, true).get()); response = regularClient.functionStatsBinary().get(); - checkFunctionStatsBinaryResponse(response, new GlideString[0], 2, 3); + for (var nodeResponse : response.values()) { + checkFunctionStatsBinaryResponse(nodeResponse, new GlideString[0], 2, 3); + } assertEquals(OK, regularClient.functionFlush(SYNC).get()); response = regularClient.functionStatsBinary().get(); - checkFunctionStatsBinaryResponse(response, new GlideString[0], 0, 0); + for (var nodeResponse : response.values()) { + checkFunctionStatsBinaryResponse(nodeResponse, new GlideString[0], 0, 0); + } } @Test diff --git a/node/npm/glide/index.ts b/node/npm/glide/index.ts index 276ebfa0b4..733de07d6a 100644 --- a/node/npm/glide/index.ts +++ b/node/npm/glide/index.ts @@ -107,7 +107,8 @@ function initialize() { GlideString, FunctionListOptions, FunctionListResponse, - FunctionStatsResponse, + FunctionStatsSingleResponse, + FunctionStatsFullResponse, FunctionRestorePolicy, SlotIdTypes, SlotKeyTypes, @@ -205,7 +206,8 @@ function initialize() { GlideClientConfiguration, FunctionListOptions, FunctionListResponse, - FunctionStatsResponse, + FunctionStatsSingleResponse, + FunctionStatsFullResponse, FunctionRestorePolicy, SlotIdTypes, SlotKeyTypes, diff --git a/node/src/Commands.ts b/node/src/Commands.ts index ba07f24a41..80206db1af 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -2324,12 +2324,24 @@ export function createFunctionList( return createCommand(RequestType.FunctionList, args); } -/** Type of the response of `FUNCTION STATS` command. */ -export type FunctionStatsResponse = Record< +/** Response for `FUNCTION STATS` command on a single node. + * The response is a map with 2 keys: + * 1. Information about the current running function/script (or null if none). + * 2. Details about the execution engines. + */ +export type FunctionStatsSingleResponse = Record< string, | null - | Record - | Record> + | Record // Running function/script information + | Record> // Execution engines information +>; + +/** Full response for `FUNCTION STATS` command across multiple nodes. + * It maps node addresses to the per-node response. + */ +export type FunctionStatsFullResponse = Record< + string, // Node address + FunctionStatsSingleResponse >; /** @internal */ diff --git a/node/src/GlideClient.ts b/node/src/GlideClient.ts index e5bfb9a1b4..d542a4057d 100644 --- a/node/src/GlideClient.ts +++ b/node/src/GlideClient.ts @@ -17,7 +17,7 @@ import { FunctionListOptions, FunctionListResponse, FunctionRestorePolicy, - FunctionStatsResponse, + FunctionStatsFullResponse, InfoOptions, LolwutOptions, SortOptions, @@ -597,55 +597,54 @@ export class GlideClient extends BaseClient { * Returns information about the function that's currently running and information about the * available execution engines. * + * FUNCTION STATS runs on all nodes of the server, including primary and replicas. + * The response includes a mapping from node address to the command response for that node. + * * @see {@link https://valkey.io/commands/function-stats/|valkey.io} for details. * @remarks Since Valkey version 7.0.0. * - * @returns A `Record` with two keys: - * - `"running_script"` with information about the running script. - * - `"engines"` with information about available engines and their stats. - * - see example for more details. - * + * @returns A Record where the key is the node address and the value is a Record with two keys: + * - `"running_script"`: Information about the running script, or `null` if no script is running. + * - `"engines"`: Information about available engines and their stats. + * - see example for more details. * @example * ```typescript * const response = await client.functionStats(); - * console.log(response); // Output: + * console.log(response); // Example output: * // { - * // "running_script": - * // { - * // "name": "deep_thought", - * // "command": ["fcall", "deep_thought", "0"], - * // "duration_ms": 5008 - * // }, - * // "engines": - * // { - * // "LUA": - * // { - * // "libraries_count": 2, - * // "functions_count": 3 + * // "127.0.0.1:6379": { // Response from the primary node + * // "running_script": { + * // "name": "foo", + * // "command": ["FCALL", "foo", "0", "hello"], + * // "duration_ms": 7758 + * // }, + * // "engines": { + * // "LUA": { + * // "libraries_count": 1, + * // "functions_count": 1 + * // } * // } - * // } - * // } - * // Output if no scripts running: - * // { - * // "running_script": null - * // "engines": - * // { - * // "LUA": - * // { - * // "libraries_count": 2, - * // "functions_count": 3 + * // }, + * // "127.0.0.1:6380": { // Response from a replica node + * // "running_script": null, + * // "engines": { + * // "LUA": { + * // "libraries_count": 1, + * // "functions_count": 1 + * // } * // } * // } * // } * ``` */ - public async functionStats(): Promise { + public async functionStats(): Promise { return this.createWritePromise(createFunctionStats()); } /** * Kills a function that is currently executing. * `FUNCTION KILL` terminates read-only functions only. + * `FUNCTION KILL` runs on all nodes of the server, including primary and replicas. * * @see {@link https://valkey.io/commands/function-kill/|valkey.io} for details. * @remarks Since Valkey version 7.0.0. diff --git a/node/src/GlideClusterClient.ts b/node/src/GlideClusterClient.ts index f4f36e51da..c9dc39f067 100644 --- a/node/src/GlideClusterClient.ts +++ b/node/src/GlideClusterClient.ts @@ -17,7 +17,7 @@ import { FunctionListOptions, FunctionListResponse, FunctionRestorePolicy, - FunctionStatsResponse, + FunctionStatsSingleResponse, InfoOptions, LolwutOptions, SortClusterOptions, @@ -914,12 +914,14 @@ export class GlideClusterClient extends BaseClient { /** * Returns information about the function that's currently running and information about the * available execution engines. + * + * * @see {@link https://valkey.io/commands/function-stats/|valkey.io} for details. * @remarks Since Valkey version 7.0.0. * - * @param route - The client will route the command to the nodes defined by `route`. - * If not defined, the command will be routed to all primary nodes. + * @param route - The command will be routed automatically to all nodes, unless `route` is provided, in which + * case the client will route the command to the nodes defined by `route`. * @returns A `Record` with two keys: * - `"running_script"` with information about the running script. * - `"engines"` with information about available engines and their stats. @@ -961,7 +963,7 @@ export class GlideClusterClient extends BaseClient { */ public async functionStats( route?: Routes, - ): Promise> { + ): Promise> { return this.createWritePromise(createFunctionStats(), { route: toProtobufRoute(route), }); @@ -975,7 +977,7 @@ export class GlideClusterClient extends BaseClient { * @remarks Since Valkey version 7.0.0. * * @param route - (Optional) The client will route the command to the nodes defined by `route`. - * If not defined, the command will be routed to all primary nodes. + * If not defined, the command will be routed to all nodes. * @returns `OK` if function is terminated. Otherwise, throws an error. * * @example diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index df89635de6..8d5d633fc9 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -27,7 +27,7 @@ import { FlushMode, FunctionListOptions, FunctionListResponse, // eslint-disable-line @typescript-eslint/no-unused-vars - FunctionStatsResponse, // eslint-disable-line @typescript-eslint/no-unused-vars + FunctionStatsSingleResponse, // eslint-disable-line @typescript-eslint/no-unused-vars GeoAddOptions, GeoBoxShape, // eslint-disable-line @typescript-eslint/no-unused-vars GeoCircleShape, // eslint-disable-line @typescript-eslint/no-unused-vars @@ -3182,7 +3182,7 @@ export class BaseTransaction> { * @see {@link https://valkey.io/commands/function-stats/|valkey.io} for details. * @remarks Since Valkey version 7.0.0. * - * Command Response - A `Record` of type {@link FunctionStatsResponse} with two keys: + * Command Response - A `Record` of type {@link FunctionStatsSingleResponse} with two keys: * * - `"running_script"` with information about the running script. * - `"engines"` with information about available engines and their stats. diff --git a/node/tests/GlideClient.test.ts b/node/tests/GlideClient.test.ts index f5526104bb..6c93134fae 100644 --- a/node/tests/GlideClient.test.ts +++ b/node/tests/GlideClient.test.ts @@ -675,7 +675,10 @@ describe("GlideClient", () => { ).toEqual("one"); let functionStats = await client.functionStats(); - checkFunctionStatsResponse(functionStats, [], 1, 1); + + for (const response of Object.values(functionStats)) { + checkFunctionStatsResponse(response, [], 1, 1); + } let functionList = await client.functionList({ libNamePattern: libName, @@ -736,7 +739,10 @@ describe("GlideClient", () => { ); functionStats = await client.functionStats(); - checkFunctionStatsResponse(functionStats, [], 1, 2); + + for (const response of Object.values(functionStats)) { + checkFunctionStatsResponse(response, [], 1, 2); + } expect( await client.fcall(func2Name, [], ["one", "two"]), @@ -747,7 +753,11 @@ describe("GlideClient", () => { } finally { expect(await client.functionFlush()).toEqual("OK"); const functionStats = await client.functionStats(); - checkFunctionStatsResponse(functionStats, [], 0, 0); + + for (const response of Object.values(functionStats)) { + checkFunctionStatsResponse(response, [], 0, 0); + } + client.close(); } }, diff --git a/node/tests/GlideClusterClient.test.ts b/node/tests/GlideClusterClient.test.ts index a4669086b5..8cd18e943a 100644 --- a/node/tests/GlideClusterClient.test.ts +++ b/node/tests/GlideClusterClient.test.ts @@ -30,7 +30,7 @@ import { RedisCluster } from "../../utils/TestUtils.js"; import { FlushMode, FunctionRestorePolicy, - FunctionStatsResponse, + FunctionStatsSingleResponse, GeoUnit, SortOrder, } from "../build-ts/src/Commands"; @@ -833,7 +833,7 @@ describe("GlideClusterClient", () => { singleNodeRoute, (value) => checkFunctionStatsResponse( - value as FunctionStatsResponse, + value as FunctionStatsSingleResponse, [], 0, 0, @@ -875,7 +875,7 @@ describe("GlideClusterClient", () => { singleNodeRoute, (value) => checkFunctionStatsResponse( - value as FunctionStatsResponse, + value as FunctionStatsSingleResponse, [], 1, 1, @@ -966,7 +966,7 @@ describe("GlideClusterClient", () => { singleNodeRoute, (value) => checkFunctionStatsResponse( - value as FunctionStatsResponse, + value as FunctionStatsSingleResponse, [], 1, 2, diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index 1e6ada1a34..4605f89aa9 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -19,7 +19,7 @@ import { ClusterTransaction, FlushMode, FunctionListResponse, - FunctionStatsResponse, + FunctionStatsSingleResponse, GeoUnit, GeospatialData, GlideClient, @@ -461,7 +461,7 @@ export function checkFunctionListResponse( * @param functionCount - Expected functions count. */ export function checkFunctionStatsResponse( - response: FunctionStatsResponse, + response: FunctionStatsSingleResponse, runningFunction: string[], libCount: number, functionCount: number, diff --git a/python/python/glide/async_commands/cluster_commands.py b/python/python/glide/async_commands/cluster_commands.py index 4371d23125..f42e25d032 100644 --- a/python/python/glide/async_commands/cluster_commands.py +++ b/python/python/glide/async_commands/cluster_commands.py @@ -18,7 +18,7 @@ TClusterResponse, TEncodable, TFunctionListResponse, - TFunctionStatsResponse, + TFunctionStatsSingleNodeResponse, TResult, TSingleNodeRoute, ) @@ -493,7 +493,7 @@ async def function_kill(self, route: Optional[Route] = None) -> TOK: See https://valkey.io/commands/function-kill/ for more details. Args: - route (Optional[Route]): The command will be routed to all primary nodes, unless `route` is provided, + route (Optional[Route]): The command will be routed to all nodes, unless `route` is provided, in which case the client will route the command to the nodes defined by `route`. Returns: @@ -588,7 +588,7 @@ async def fcall_ro_route( async def function_stats( self, route: Optional[Route] = None - ) -> TClusterResponse[TFunctionStatsResponse]: + ) -> TClusterResponse[TFunctionStatsSingleNodeResponse]: """ Returns information about the function that's currently running and information about the available execution engines. @@ -596,11 +596,11 @@ async def function_stats( See https://valkey.io/commands/function-stats/ for more details Args: - route (Optional[Route]): Specifies the routing configuration for the command. The client - will route the command to the nodes defined by `route`. + route (Optional[Route]): The command will be routed automatically to all nodes, unless `route` is provided, in which + case the client will route the command to the nodes defined by `route`. Defaults to None. Returns: - TClusterResponse[TFunctionStatsResponse]: A `Mapping` with two keys: + TClusterResponse[TFunctionStatsSingleNodeResponse]: A `Mapping` with two keys: - `running_script` with information about the running script. - `engines` with information about available engines and their stats. See example for more details. @@ -624,7 +624,7 @@ async def function_stats( Since: Valkey version 7.0.0. """ return cast( - TClusterResponse[TFunctionStatsResponse], + TClusterResponse[TFunctionStatsSingleNodeResponse], await self._execute_command(RequestType.FunctionStats, [], route), ) diff --git a/python/python/glide/async_commands/standalone_commands.py b/python/python/glide/async_commands/standalone_commands.py index 37815e14c4..1659134984 100644 --- a/python/python/glide/async_commands/standalone_commands.py +++ b/python/python/glide/async_commands/standalone_commands.py @@ -18,7 +18,7 @@ TOK, TEncodable, TFunctionListResponse, - TFunctionStatsResponse, + TFunctionStatsFullResponse, TResult, ) from glide.protobuf.command_request_pb2 import RequestType @@ -374,6 +374,8 @@ async def function_kill(self) -> TOK: Kills a function that is currently executing. This command only terminates read-only functions. + FUNCTION KILL runs on all nodes of the server, including primary and replicas. + See https://valkey.io/commands/function-kill/ for more details. Returns: @@ -390,39 +392,51 @@ async def function_kill(self) -> TOK: await self._execute_command(RequestType.FunctionKill, []), ) - async def function_stats(self) -> TFunctionStatsResponse: + async def function_stats(self) -> TFunctionStatsFullResponse: """ Returns information about the function that's currently running and information about the available execution engines. + FUNCTION STATS runs on all nodes of the server, including primary and replicas. + The response includes a mapping from node address to the command response for that node. + See https://valkey.io/commands/function-stats/ for more details Returns: - TFunctionStatsResponse: A `Mapping` with two keys: + TFunctionStatsFullResponse: A Map where the key is the node address and the value is a Map of two keys: - `running_script` with information about the running script. - `engines` with information about available engines and their stats. See example for more details. Examples: >>> await client.function_stats() - { - 'running_script': { - 'name': 'foo', - 'command': ['FCALL', 'foo', '0', 'hello'], - 'duration_ms': 7758 + {b"addr": { # Response from the master node + b'running_script': { + b'name': b'foo', + b'command': [b'FCALL', b'foo', b'0', b'hello'], + b'duration_ms': 7758 }, - 'engines': { - 'LUA': { - 'libraries_count': 1, - 'functions_count': 1, + b'engines': { + b'LUA': { + b'libraries_count': 1, + b'functions_count': 1, + } + } + }, + b"addr2": { # Response from a replica + b'running_script': None, + b"engines": { + b'LUA': { + b'libraries_count': 1, + b'functions_count': 1, } } - } + }} Since: Valkey version 7.0.0. """ return cast( - TFunctionStatsResponse, + TFunctionStatsFullResponse, await self._execute_command(RequestType.FunctionStats, []), ) diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index 3a03351224..8aa0f0fa1d 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -2028,7 +2028,7 @@ def function_stats(self: TTransaction) -> TTransaction: See https://valkey.io/commands/function-stats/ for more details Command Response: - TFunctionStatsResponse: A `Mapping` with two keys: + TFunctionStatsSingleNodeResponse: A `Mapping` with two keys: - `running_script` with information about the running script. - `engines` with information about available engines and their stats. See example for more details. diff --git a/python/python/glide/constants.py b/python/python/glide/constants.py index 24c30d8de7..754aacf6fa 100644 --- a/python/python/glide/constants.py +++ b/python/python/glide/constants.py @@ -42,15 +42,25 @@ Union[bytes, List[Mapping[bytes, Union[bytes, Set[bytes]]]]], ] ] -TFunctionStatsResponse = Mapping[ +# Response for function stats command on a single node. +# The response holds a map with 2 keys: Current running function / script and information about it, and the engines and the information about it. +TFunctionStatsSingleNodeResponse = Mapping[ bytes, Union[ None, Mapping[ - bytes, Union[Mapping[bytes, Mapping[bytes, int]], bytes, int, List[bytes]] + bytes, + Union[Mapping[bytes, Mapping[bytes, int]], bytes, int, List[bytes]], ], ], ] +# Full response for function stats command across multiple nodes. +# It maps node address to the per-node response. +TFunctionStatsFullResponse = Mapping[ + bytes, + TFunctionStatsSingleNodeResponse, +] + TXInfoStreamResponse = Mapping[ bytes, Union[bytes, int, Mapping[bytes, Optional[List[List[bytes]]]]] diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index d46e490b70..ce8008856a 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -76,7 +76,7 @@ ProtocolVersion, ServerCredentials, ) -from glide.constants import OK, TEncodable, TFunctionStatsResponse, TResult +from glide.constants import OK, TEncodable, TFunctionStatsSingleNodeResponse, TResult from glide.exceptions import TimeoutError as GlideTimeoutError from glide.glide_client import GlideClient, GlideClusterClient, TGlideClient from glide.routes import ( @@ -8206,14 +8206,14 @@ async def test_function_delete_with_routing( await glide_client.function_delete(lib_name) assert "Library not found" in str(e) - @pytest.mark.parametrize("cluster_mode", [False]) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) - async def test_function_stats(self, glide_client: GlideClient): + async def test_function_stats(self, glide_client: TGlideClient): min_version = "7.0.0" if await check_if_server_version_lt(glide_client, min_version): return pytest.mark.skip(reason=f"Redis version required >= {min_version}") - lib_name = "functionStats" + lib_name = "functionStats_without_route" func_name = lib_name assert await glide_client.function_flush(FlushMode.SYNC) == OK @@ -8222,7 +8222,10 @@ async def test_function_stats(self, glide_client: GlideClient): assert await glide_client.function_load(code, True) == lib_name.encode() response = await glide_client.function_stats() - check_function_stats_response(response, [], 1, 1) + for node_response in response.values(): + check_function_stats_response( + cast(TFunctionStatsSingleNodeResponse, node_response), [], 1, 1 + ) code = generate_lua_lib_code( lib_name + "_2", @@ -8234,56 +8237,74 @@ async def test_function_stats(self, glide_client: GlideClient): ) response = await glide_client.function_stats() - check_function_stats_response(response, [], 2, 3) + for node_response in response.values(): + check_function_stats_response( + cast(TFunctionStatsSingleNodeResponse, node_response), [], 2, 3 + ) assert await glide_client.function_flush(FlushMode.SYNC) == OK response = await glide_client.function_stats() - check_function_stats_response(response, [], 0, 0) + for node_response in response.values(): + check_function_stats_response( + cast(TFunctionStatsSingleNodeResponse, node_response), [], 0, 0 + ) - @pytest.mark.parametrize("cluster_mode", [True]) + @pytest.mark.parametrize("cluster_mode", [False, True]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) - async def test_function_stats_cluster(self, glide_client: GlideClusterClient): + async def test_function_stats_running_script( + self, request, cluster_mode, protocol, glide_client: TGlideClient + ): min_version = "7.0.0" if await check_if_server_version_lt(glide_client, min_version): return pytest.mark.skip(reason=f"Redis version required >= {min_version}") - lib_name = "functionStats_without_route" - func_name = lib_name - assert await glide_client.function_flush(FlushMode.SYNC) == OK - - # function $funcName returns first argument - code = generate_lua_lib_code(lib_name, {func_name: "return args[1]"}, False) - assert await glide_client.function_load(code, True) == lib_name.encode() + lib_name = f"mylib1C{get_random_string(5)}" + func_name = f"myfunc1c{get_random_string(5)}" + code = create_lua_lib_with_long_running_function(lib_name, func_name, 10, True) - response = await glide_client.function_stats() - for node_response in response.values(): - check_function_stats_response( - cast(TFunctionStatsResponse, node_response), [], 1, 1 - ) + # load the library + assert await glide_client.function_load(code, replace=True) == lib_name.encode() - code = generate_lua_lib_code( - lib_name + "_2", - {func_name + "_2": "return 'OK'", func_name + "_3": "return 42"}, - False, + # create a second client to run fcall + test_client = await create_client( + request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000 ) - assert ( - await glide_client.function_load(code, True) == (lib_name + "_2").encode() + + test_client2 = await create_client( + request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000 ) - response = await glide_client.function_stats() - for node_response in response.values(): - check_function_stats_response( - cast(TFunctionStatsResponse, node_response), [], 2, 3 - ) + async def endless_fcall_route_call(): + await test_client.fcall_ro(func_name, arguments=[]) - assert await glide_client.function_flush(FlushMode.SYNC) == OK + async def wait_and_function_stats(): + # it can take a few seconds for FCALL to register as running + await asyncio.sleep(3) + result = await test_client2.function_stats() + running_scripts = False + for res in result.values(): + if res.get(b"running_script"): + if running_scripts: + raise Exception("Already running script on a different node") + running_scripts = True + assert res.get(b"running_script").get(b"name") == func_name.encode() + assert res.get(b"running_script").get(b"command") == [ + b"FCALL_RO", + func_name.encode(), + b"0", + ] + assert res.get(b"running_script").get(b"duration_ms") > 0 - response = await glide_client.function_stats() - for node_response in response.values(): - check_function_stats_response( - cast(TFunctionStatsResponse, node_response), [], 0, 0 - ) + assert running_scripts + + await asyncio.gather( + endless_fcall_route_call(), + wait_and_function_stats(), + ) + + await test_client.close() + await test_client2.close() @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) @@ -8311,12 +8332,12 @@ async def test_function_stats_with_routing( response = await glide_client.function_stats(route) if single_route: check_function_stats_response( - cast(TFunctionStatsResponse, response), [], 1, 1 + cast(TFunctionStatsSingleNodeResponse, response), [], 1, 1 ) else: for node_response in response.values(): check_function_stats_response( - cast(TFunctionStatsResponse, node_response), [], 1, 1 + cast(TFunctionStatsSingleNodeResponse, node_response), [], 1, 1 ) code = generate_lua_lib_code( @@ -8332,12 +8353,12 @@ async def test_function_stats_with_routing( response = await glide_client.function_stats(route) if single_route: check_function_stats_response( - cast(TFunctionStatsResponse, response), [], 2, 3 + cast(TFunctionStatsSingleNodeResponse, response), [], 2, 3 ) else: for node_response in response.values(): check_function_stats_response( - cast(TFunctionStatsResponse, node_response), [], 2, 3 + cast(TFunctionStatsSingleNodeResponse, node_response), [], 2, 3 ) assert await glide_client.function_flush(FlushMode.SYNC, route) == OK @@ -8345,12 +8366,12 @@ async def test_function_stats_with_routing( response = await glide_client.function_stats(route) if single_route: check_function_stats_response( - cast(TFunctionStatsResponse, response), [], 0, 0 + cast(TFunctionStatsSingleNodeResponse, response), [], 0, 0 ) else: for node_response in response.values(): check_function_stats_response( - cast(TFunctionStatsResponse, node_response), [], 0, 0 + cast(TFunctionStatsSingleNodeResponse, node_response), [], 0, 0 ) @pytest.mark.parametrize("cluster_mode", [True, False]) @@ -8379,20 +8400,10 @@ async def test_function_kill_no_write( request, cluster_mode=cluster_mode, protocol=protocol, timeout=15000 ) - # call fcall to run the function - # make sure that fcall routes to a primary node, and not a replica - # if this happens, function_kill and function_stats won't find the function and will fail - primaryRoute = SlotKeyRoute(SlotType.PRIMARY, lib_name) - async def endless_fcall_route_call(): # fcall is supposed to be killed, and will return a RequestError with pytest.raises(RequestError) as e: - if cluster_mode: - await test_client.fcall_ro_route( - func_name, arguments=[], route=primaryRoute - ) - else: - await test_client.fcall_ro(func_name, arguments=[]) + await test_client.fcall_ro(func_name, arguments=[]) assert "Script killed by user" in str(e) async def wait_and_function_kill(): diff --git a/python/python/tests/utils/utils.py b/python/python/tests/utils/utils.py index 96f08a7b5a..691bf98c42 100644 --- a/python/python/tests/utils/utils.py +++ b/python/python/tests/utils/utils.py @@ -8,7 +8,7 @@ from glide.constants import ( TClusterResponse, TFunctionListResponse, - TFunctionStatsResponse, + TFunctionStatsSingleNodeResponse, TResult, ) from glide.glide_client import TGlideClient @@ -309,7 +309,7 @@ def check_function_list_response( def check_function_stats_response( - response: TFunctionStatsResponse, + response: TFunctionStatsSingleNodeResponse, running_function: List[bytes], lib_count: int, function_count: int, @@ -318,7 +318,7 @@ def check_function_stats_response( Validate whether `FUNCTION STATS` response contains required info. Args: - response (TFunctionStatsResponse): The response from server. + response (TFunctionStatsSingleNodeResponse): The response from server. running_function (List[bytes]): Command line of running function expected. Empty, if nothing expected. lib_count (int): Expected libraries count. function_count (int): Expected functions count. diff --git a/submodules/redis-rs b/submodules/redis-rs index de53b2b5c6..b43a07e7f7 160000 --- a/submodules/redis-rs +++ b/submodules/redis-rs @@ -1 +1 @@ -Subproject commit de53b2b5c68e7ef4667e2b195eb9b1d0dd460722 +Subproject commit b43a07e7f76e3f341661b95b40df0d60ba6f89f8