-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Add new CommandRequest - Pipeline #2954
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Shoham Elias <[email protected]>
Signed-off-by: Shoham Elias <[email protected]>
Signed-off-by: Shoham Elias <[email protected]>
pipeline: &'a redis::Pipeline, | ||
) -> redis::RedisFuture<'a, Value> { | ||
let command_count = pipeline.cmd_iter().count(); | ||
let _offset = command_count + 1; //TODO: check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolve TODO
.push((index, inner_index)); | ||
} | ||
|
||
async fn routes_pipeline_commands( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
find a better name and make it shorter
pipeline: &crate::Pipeline, | ||
core: Core<C>, | ||
) -> RedisResult<( | ||
HashMap<String, (Pipeline, C, Vec<(usize, Option<usize>)>)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
encapsulate in struct
match cluster_routing::RoutingInfo::for_routable(cmd) { | ||
Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) | ||
| None => { | ||
if pipelines_by_connection.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment
} else { | ||
// since the map is not empty, add the command to a random connection within the map. | ||
let mut rng = rand::thread_rng(); | ||
let keys: Vec<_> = pipelines_by_connection.keys().cloned().collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to addresses
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think about way not to clone the addresses
for (index, routing_info, response_policy) in response_policies { | ||
#[allow(clippy::type_complexity)] | ||
// Safely access `values_and_addresses` for the current index | ||
let response_receivers: Vec<( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use structs for complex types
|
||
// Collect final responses | ||
for mut value in values_and_addresses.into_iter() { | ||
assert_eq!(value.len(), 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont use asserts in prod code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use index 0 instead of poping and unwrapping
.map_err(|err| (OperationTarget::FanOut, err))?; | ||
|
||
// Update `values_and_addresses` for the current index | ||
values_and_addresses[index] = vec![(aggregated_response, "".to_string())]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use index 0 for storing aggregated_response
glide-core/src/socket_listener.rs
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets try to use pipeline for both transaction and pipeline, differentiating by is_atomic
@@ -501,6 +501,10 @@ message Transaction { | |||
repeated Command commands = 1; | |||
} | |||
|
|||
message Pipeline { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets remove Transaction and use Pipeline + is_atomic flag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add comments there describing things there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From protocol's view the only difference between pipeline and transaction is 2 extra commands MULTI
and EXEC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also being atomic + multi slots enabled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial notes (note to self - got up to handle_single_node_route)
) | ||
}) | ||
} else { | ||
// Pipeline is not atomic, so we can have commands with different slots. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add documentation to the function and state that for non atomic pipelines it returns none
let addresses: Vec<_> = pipeline_map.keys().cloned().collect(); | ||
let random_address = addresses.choose(&mut rng).unwrap(); | ||
let context = pipeline_map.get_mut(random_address).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Don't use unwrap in production code. If a bug would cause pipeline_map.keys() to be empty it would crash the whole client. Instead, change this function to return a result and return ClientError if no random address is found.
- Coping all addresses is redundant, you can achieve the same with:
let mut rng = rand::thread_rng();
if let Some(node_context) = pipeline_map
.values_mut()
.choose(&mut rng) {
node_context.add_command(cmd, index, None);
Ok(())
} else {
// return error
}
// This function handles commands with routing info of MultiSlot (like MSET or MGET), creates sub-commands for the matching slots and add it to the correct pipeline | ||
async fn handle_multi_slot_routing( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of the function should declare that it's only relevant for pipelines, the current name is misleading
}; | ||
if let Some((address, conn)) = conn { | ||
let new_cmd = | ||
crate::cluster_routing::command_for_multi_slot_indices(cmd, indices.iter()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add use crate::cluster_routing::command_for_multi_slot_indices
and remove the prefix
} | ||
} | ||
|
||
fn determine_internal_routing( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing description, it ain't clear when and why it would be used
// This function handles commands with routing info of SingleNode | ||
async fn handle_single_node_route( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same issue - all of these functions are placed under ClusterConnInner though they are only relevant for pipelines. I'm not sure there is a good reason placing them here. Why not moving all of the pipelines logic into a separate file (e.g. pipeline, pipeline_routing) under the async_cluster folder?
Signed-off-by: Shoham Elias <[email protected]>
Ignore the review request ^, added by accident. 🙂 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is long lol. Most comments refer to readability of the code, too complex types and redundant 'pub' declarations, please fix in all required places - not only where I commented.
Will continue tomorrow
@@ -2125,7 +2092,7 @@ where | |||
.map_err(|err| (address.into(), err)) | |||
} | |||
|
|||
async fn try_pipeline_request( | |||
pub async fn try_pipeline_request( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why pub? pub refer to user-facing APIs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see if it can be removed or if pub(crate) is enough
@@ -2180,7 +2233,7 @@ where | |||
} | |||
} | |||
|
|||
async fn get_connection( | |||
pub async fn get_connection( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same - function shouldn't be pub
@@ -2139,6 +2106,38 @@ where | |||
.map_err(|err| (OperationTarget::Node { address }, err)) | |||
} | |||
|
|||
/// Aggregates responses for multi-node commands and updates the `values_and_addresses` vector. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Aggregates responses for multi-node commands and updates the `values_and_addresses` vector. | |
/// Aggregates pipeline responses for multi-node commands and updates the `values_and_addresses` vector. |
/// - It collects responses and their source node addresses from the corresponding entry in `values_and_addresses`. | ||
/// - Uses the routing information and optional response policy to aggregate the responses into a single result. | ||
/// | ||
/// The aggregated result replaces the existing entries in `values_and_addresses` for the given command index. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read the description 3 times and it's still unclear to me what this function does :(
maybe a simple example would help.
/// - It collects responses and their source node addresses from the corresponding entry in `values_and_addresses`. | ||
/// - Uses the routing information and optional response policy to aggregate the responses into a single result. | ||
/// | ||
/// The aggregated result replaces the existing entries in `values_and_addresses` for the given command index. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean by "replaces the existing entries in values_and_addresses
for the given command index."? do you mean that it sorts the entries in values_and_addresses by the command indices calculated in this function? try to make it clearer
pub async fn execute_pipeline_on_node<C>( | ||
address: String, | ||
node_context: NodePipelineContext<C>, | ||
) -> Result<(Vec<(usize, Option<usize>)>, Vec<Value>, String), (OperationTarget, RedisError)> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please create type aliases for the return type: (Vec<(usize, Option)>, Vec, String), it isn't readable
// might produce multiple responses, each from a different node. By storing the responses with their | ||
// respective node addresses, we ensure that we have all the information needed to aggregate the results later. | ||
// This structure is essential for handling scenarios where responses from multiple nodes must be combined. | ||
let mut values_and_addresses = vec![Vec::new(); pipeline.len()]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can make this complex type a bit clearer with type aliases:
// define on top
type NodeResponse = (Value, String); // A response with its source node address.
type PipelineResponses = Vec<Vec<NodeResponse>>; // Outer Vec: pipeline commands, Inner Vec: (response, address).
...
let mut values_and_addresses: PipelineResponses = vec![Vec::new(); pipeline.len()];
and it can also be used elsewhere (e.g. aggregate_pipeline_multi_node_commands)
#[allow(clippy::type_complexity)] | ||
pub async fn collect_pipeline_tasks( | ||
join_set: &mut tokio::task::JoinSet< | ||
Result<(Vec<(usize, Option<usize>)>, Vec<Value>, String), (OperationTarget, RedisError)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same-type alias
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then remove #[allow(clippy::type_complexity)]
/// - `Ok(Some((OperationTarget, RedisError)))`: If one or more tasks encountered an error, returns the first error. | ||
/// - `Ok(None)`: If all tasks completed successfully. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since returning Ok with Some(err) is confusing, you can make it more readable with some enum representing the return values, something like
enum MultiPipelineResult {
/// All tasks completed successfully.
AllSuccessful,
/// Some tasks failed, returning the first encountered error and the associated operation target.
FirstError {
target: OperationTarget,
error: RedisError,
},
}
This PR introduces a new
CommandRequest
type: Pipeline. A Pipeline represents a batch of commands that are sent to Valkey for execution, similar to a Transaction. However, unlike a transaction, a pipeline has the following distinguishing characteristics:Transactions in Valkey are atomic. Pipelines, in contrast, do not provide such guarantee.
Transactions are limited to a single Valkey node because all commands within a transaction must belong to the same slot in cluster mode. Pipelines, however, can span multiple nodes, allowing commands to target different slots or involve multi-node commands (e.g., PING or MSET that span multiple keys in different slots). (Where in Transaction this Multi-Node commands would just route to a single node).
Implementation Details
To support the execution of pipelines in cluster mode, this PR introduces several changes:
1. Pipeline Splitting:
Since a pipeline can include commands targeting different slots, it needs to be split into sub-pipelines, each grouped by the node responsible for the relevant slots.
The process involves mapping each command in the pipeline to its corresponding node(s) based on the cluster's slot allocation. This mapping is handled by routing logic, which determines whether a command targets a single node or multiple nodes.
2. Node Communication:
Once the pipeline is split, each sub-pipeline is sent to its respective node for execution.
For commands that span multiple nodes, the implementation ensures the responses are tracked and aggregated to form a cohesive result.
3. Response Aggregation:
To handle multi-node commands, the responses from each node are stored along with the node's address. This allows for proper aggregation and processing of results, particularly when commands require combining responses (e.g., for commands like MGET).
Summary
This PR introduces the Pipeline request type, enabling non-atomic batch command execution in Glide.
Issue link
This Pull Request is linked to issue (URL): [REPLACE ME]
Checklist
Before submitting the PR make sure the following are checked: