Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Add new CommandRequest - Pipeline #2954

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

shohamazon
Copy link
Collaborator

@shohamazon shohamazon commented Jan 15, 2025

This PR introduces a new CommandRequest type: Pipeline. A Pipeline represents a batch of commands that are sent to Valkey for execution, similar to a Transaction. However, unlike a transaction, a pipeline has the following distinguishing characteristics:

  1. Non-Atomic Execution:
    Transactions in Valkey are atomic. Pipelines, in contrast, do not provide such guarantee.
  2. Multi-Node Support:
    Transactions are limited to a single Valkey node because all commands within a transaction must belong to the same slot in cluster mode. Pipelines, however, can span multiple nodes, allowing commands to target different slots or involve multi-node commands (e.g., PING or MSET that span multiple keys in different slots). (Where in Transaction this Multi-Node commands would just route to a single node).

Implementation Details

To support the execution of pipelines in cluster mode, this PR introduces several changes:

1. Pipeline Splitting:

Since a pipeline can include commands targeting different slots, it needs to be split into sub-pipelines, each grouped by the node responsible for the relevant slots.
The process involves mapping each command in the pipeline to its corresponding node(s) based on the cluster's slot allocation. This mapping is handled by routing logic, which determines whether a command targets a single node or multiple nodes.

2. Node Communication:

Once the pipeline is split, each sub-pipeline is sent to its respective node for execution.
For commands that span multiple nodes, the implementation ensures the responses are tracked and aggregated to form a cohesive result.

3. Response Aggregation:

To handle multi-node commands, the responses from each node are stored along with the node's address. This allows for proper aggregation and processing of results, particularly when commands require combining responses (e.g., for commands like MGET).

Summary

This PR introduces the Pipeline request type, enabling non-atomic batch command execution in Glide.

Issue link

This Pull Request is linked to issue (URL): [REPLACE ME]

Checklist

Before submitting the PR make sure the following are checked:

  • This Pull Request is related to one issue.
  • Commit message has a detailed description of what changed and why.
  • Tests are added or updated.
  • CHANGELOG.md and documentation files are updated.
  • Destination branch is correct - main or release
  • Create merge commit if merging release branch into main, squash otherwise.

shohamazon and others added 4 commits January 15, 2025 14:04
@ikolomi ikolomi self-requested a review January 16, 2025 10:53
@shohamazon shohamazon added Rust core redis-rs/glide-core matter Core changes Used to label a PR as PR with significant changes that should trigger a full matrix tests. labels Jan 16, 2025
@shohamazon shohamazon marked this pull request as ready for review January 16, 2025 10:55
@shohamazon shohamazon requested a review from a team as a code owner January 16, 2025 10:55
pipeline: &'a redis::Pipeline,
) -> redis::RedisFuture<'a, Value> {
let command_count = pipeline.cmd_iter().count();
let _offset = command_count + 1; //TODO: check
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolve TODO

.push((index, inner_index));
}

async fn routes_pipeline_commands(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

find a better name and make it shorter

pipeline: &crate::Pipeline,
core: Core<C>,
) -> RedisResult<(
HashMap<String, (Pipeline, C, Vec<(usize, Option<usize>)>)>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

encapsulate in struct

match cluster_routing::RoutingInfo::for_routable(cmd) {
Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
| None => {
if pipelines_by_connection.is_empty() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment

} else {
// since the map is not empty, add the command to a random connection within the map.
let mut rng = rand::thread_rng();
let keys: Vec<_> = pipelines_by_connection.keys().cloned().collect();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to addresses

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think about way not to clone the addresses

for (index, routing_info, response_policy) in response_policies {
#[allow(clippy::type_complexity)]
// Safely access `values_and_addresses` for the current index
let response_receivers: Vec<(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use structs for complex types


// Collect final responses
for mut value in values_and_addresses.into_iter() {
assert_eq!(value.len(), 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont use asserts in prod code

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use index 0 instead of poping and unwrapping

.map_err(|err| (OperationTarget::FanOut, err))?;

// Update `values_and_addresses` for the current index
values_and_addresses[index] = vec![(aggregated_response, "".to_string())];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use index 0 for storing aggregated_response

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets try to use pipeline for both transaction and pipeline, differentiating by is_atomic

@@ -501,6 +501,10 @@ message Transaction {
repeated Command commands = 1;
}

message Pipeline {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets remove Transaction and use Pipeline + is_atomic flag

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add comments there describing things there

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From protocol's view the only difference between pipeline and transaction is 2 extra commands MULTI and EXEC

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also being atomic + multi slots enabled

Copy link
Collaborator

@barshaul barshaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial notes (note to self - got up to handle_single_node_route)

)
})
} else {
// Pipeline is not atomic, so we can have commands with different slots.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation to the function and state that for non atomic pipelines it returns none

Comment on lines 1129 to 1131
let addresses: Vec<_> = pipeline_map.keys().cloned().collect();
let random_address = addresses.choose(&mut rng).unwrap();
let context = pipeline_map.get_mut(random_address).unwrap();
Copy link
Collaborator

@barshaul barshaul Jan 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Don't use unwrap in production code. If a bug would cause pipeline_map.keys() to be empty it would crash the whole client. Instead, change this function to return a result and return ClientError if no random address is found.
  2. Coping all addresses is redundant, you can achieve the same with:
    let mut rng = rand::thread_rng();
    if let Some(node_context) = pipeline_map
        .values_mut()
        .choose(&mut rng) {
            node_context.add_command(cmd, index, None);
            Ok(())
    } else {
        // return error
    }

Comment on lines 2207 to 2208
// This function handles commands with routing info of MultiSlot (like MSET or MGET), creates sub-commands for the matching slots and add it to the correct pipeline
async fn handle_multi_slot_routing(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the function should declare that it's only relevant for pipelines, the current name is misleading

};
if let Some((address, conn)) = conn {
let new_cmd =
crate::cluster_routing::command_for_multi_slot_indices(cmd, indices.iter());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add use crate::cluster_routing::command_for_multi_slot_indices and remove the prefix

}
}

fn determine_internal_routing(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing description, it ain't clear when and why it would be used

Comment on lines 2256 to 2257
// This function handles commands with routing info of SingleNode
async fn handle_single_node_route(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same issue - all of these functions are placed under ClusterConnInner though they are only relevant for pipelines. I'm not sure there is a good reason placing them here. Why not moving all of the pipelines logic into a separate file (e.g. pipeline, pipeline_routing) under the async_cluster folder?

@BoazBD
Copy link
Collaborator

BoazBD commented Jan 21, 2025

Ignore the review request ^, added by accident. 🙂

Copy link
Collaborator

@barshaul barshaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is long lol. Most comments refer to readability of the code, too complex types and redundant 'pub' declarations, please fix in all required places - not only where I commented.
Will continue tomorrow

@@ -2125,7 +2092,7 @@ where
.map_err(|err| (address.into(), err))
}

async fn try_pipeline_request(
pub async fn try_pipeline_request(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why pub? pub refer to user-facing APIs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see if it can be removed or if pub(crate) is enough

@@ -2180,7 +2233,7 @@ where
}
}

async fn get_connection(
pub async fn get_connection(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same - function shouldn't be pub

@@ -2139,6 +2106,38 @@ where
.map_err(|err| (OperationTarget::Node { address }, err))
}

/// Aggregates responses for multi-node commands and updates the `values_and_addresses` vector.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Aggregates responses for multi-node commands and updates the `values_and_addresses` vector.
/// Aggregates pipeline responses for multi-node commands and updates the `values_and_addresses` vector.

/// - It collects responses and their source node addresses from the corresponding entry in `values_and_addresses`.
/// - Uses the routing information and optional response policy to aggregate the responses into a single result.
///
/// The aggregated result replaces the existing entries in `values_and_addresses` for the given command index.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the description 3 times and it's still unclear to me what this function does :(
maybe a simple example would help.

/// - It collects responses and their source node addresses from the corresponding entry in `values_and_addresses`.
/// - Uses the routing information and optional response policy to aggregate the responses into a single result.
///
/// The aggregated result replaces the existing entries in `values_and_addresses` for the given command index.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by "replaces the existing entries in values_and_addresses for the given command index."? do you mean that it sorts the entries in values_and_addresses by the command indices calculated in this function? try to make it clearer

pub async fn execute_pipeline_on_node<C>(
address: String,
node_context: NodePipelineContext<C>,
) -> Result<(Vec<(usize, Option<usize>)>, Vec<Value>, String), (OperationTarget, RedisError)>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create type aliases for the return type: (Vec<(usize, Option)>, Vec, String), it isn't readable

// might produce multiple responses, each from a different node. By storing the responses with their
// respective node addresses, we ensure that we have all the information needed to aggregate the results later.
// This structure is essential for handling scenarios where responses from multiple nodes must be combined.
let mut values_and_addresses = vec![Vec::new(); pipeline.len()];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can make this complex type a bit clearer with type aliases:

// define on top
type NodeResponse = (Value, String); // A response with its source node address.
type PipelineResponses = Vec<Vec<NodeResponse>>; // Outer Vec: pipeline commands, Inner Vec: (response, address).
...

let mut values_and_addresses: PipelineResponses = vec![Vec::new(); pipeline.len()];

and it can also be used elsewhere (e.g. aggregate_pipeline_multi_node_commands)

#[allow(clippy::type_complexity)]
pub async fn collect_pipeline_tasks(
join_set: &mut tokio::task::JoinSet<
Result<(Vec<(usize, Option<usize>)>, Vec<Value>, String), (OperationTarget, RedisError)>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same-type alias

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then remove #[allow(clippy::type_complexity)]

Comment on lines +339 to +340
/// - `Ok(Some((OperationTarget, RedisError)))`: If one or more tasks encountered an error, returns the first error.
/// - `Ok(None)`: If all tasks completed successfully.
Copy link
Collaborator

@barshaul barshaul Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since returning Ok with Some(err) is confusing, you can make it more readable with some enum representing the return values, something like

enum MultiPipelineResult {
    /// All tasks completed successfully.
    AllSuccessful,

    /// Some tasks failed, returning the first encountered error and the associated operation target.
    FirstError {
        target: OperationTarget,
        error: RedisError,
    },
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Core changes Used to label a PR as PR with significant changes that should trigger a full matrix tests. Rust core redis-rs/glide-core matter
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants