Skip to content

Commit

Permalink
Merge branch 'Ladicek-fix-cluster-slots-retrieval'
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Oct 11, 2023
2 parents 0f79564 + c0edd0e commit df02b83
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 64 deletions.
9 changes: 7 additions & 2 deletions src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,19 @@ To work with cluster the connection creation is quite similar:
{@link examples.RedisExamples#example6}
----

In this case the configuration requires one of more members of the cluster to be known.
In this case the configuration requires one or more members of the cluster to be known.
This list will be used to ask the cluster for the current configuration, which means if any of the listed members is not available it will be skipped.

In cluster mode a connection is established to each node and special care is needed when executing commands.
It is recommended to read redis manual in order to understand how clustering works.
It is recommended to read the Redis manual in order to understand how clustering works.
The client operating in this mode will do a best effort to identify which slot is used by the executed command in order to execute it on the right node.
There could be cases where this isn't possible to identify and in that case as a best effort the command will be run on a random node.

To know which Redis node holds which slots, the clustered Redis client holds a cache of the hash slot assignment.
When the cache is empty, the first attempt to acquire a `RedisClusterConnection` will execute `CLUSTER SLOTS`.
The cache has a configurable TTL (time to live), which defaults to 1 second.
The cache is also cleared whenever any command executed by the client receives the MOVED redirection.

== Replication Mode

Working with replication is transparent to the client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setUseReplicas(io.vertx.redis.client.RedisReplicas.valueOf((String)member.getValue()));
}
break;
case "hashSlotCacheTTL":
if (member.getValue() instanceof Number) {
obj.setHashSlotCacheTTL(((Number)member.getValue()).longValue());
}
break;
}
}
}
Expand All @@ -37,5 +42,6 @@ public static void toJson(RedisClusterConnectOptions obj, java.util.Map<String,
if (obj.getUseReplicas() != null) {
json.put("useReplicas", obj.getUseReplicas().name());
}
json.put("hashSlotCacheTTL", obj.getHashSlotCacheTTL());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setPoolName((String)member.getValue());
}
break;
case "hashSlotCacheTTL":
if (member.getValue() instanceof Number) {
obj.setHashSlotCacheTTL(((Number)member.getValue()).longValue());
}
break;
}
}
}
Expand Down Expand Up @@ -171,5 +176,6 @@ public static void toJson(RedisOptions obj, java.util.Map<String, Object> json)
if (obj.getPoolName() != null) {
json.put("poolName", obj.getPoolName());
}
json.put("hashSlotCacheTTL", obj.getHashSlotCacheTTL());
}
}
1 change: 1 addition & 0 deletions src/main/java/examples/RedisExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void example5(Vertx vertx) {

public void example6() {
final RedisOptions options = new RedisOptions()
.setType(RedisClientType.CLUSTER)
.addConnectionString("redis://127.0.0.1:7000")
.addConnectionString("redis://127.0.0.1:7001")
.addConnectionString("redis://127.0.0.1:7002")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,22 @@
public class RedisClusterConnectOptions extends RedisConnectOptions {

private RedisReplicas useReplicas;
private long hashSlotCacheTTL;

public RedisClusterConnectOptions(RedisOptions options) {
super(options);
setUseReplicas(options.getUseReplicas());
setHashSlotCacheTTL(options.getHashSlotCacheTTL());
}

public RedisClusterConnectOptions() {
useReplicas = RedisReplicas.NEVER;
hashSlotCacheTTL = 1000;
}

public RedisClusterConnectOptions(RedisClusterConnectOptions other) {
this.useReplicas = other.useReplicas;
this.hashSlotCacheTTL = other.hashSlotCacheTTL;
}

public RedisClusterConnectOptions(JsonObject json) {
Expand All @@ -58,11 +62,36 @@ public RedisReplicas getUseReplicas() {
* @param useReplicas the cluster replica use mode.
* @return fluent self.
*/
public RedisConnectOptions setUseReplicas(RedisReplicas useReplicas) {
public RedisClusterConnectOptions setUseReplicas(RedisReplicas useReplicas) {
this.useReplicas = useReplicas;
return this;
}

/**
* Returns the TTL of the hash slot cache. This is only meaningful in case of
* a {@linkplain RedisClientType#CLUSTER clustered} Redis client.
* <p>
* The TTL is expressed in milliseconds. Defaults to 1000 millis (1 second).
*
* @return the TTL of the hash slot cache
*/
public long getHashSlotCacheTTL() {
return hashSlotCacheTTL;
}

/**
* Sets the TTL of the hash slot cache. This is only meaningful in case of
* a {@linkplain RedisClientType#CLUSTER clustered} Redis client.
* <p>
* The TTL is expressed in milliseconds. Defaults to 1000 millis (1 second).
*
* @param hashSlotCacheTTL the TTL of the hash slot cache, in millis
*/
public RedisClusterConnectOptions setHashSlotCacheTTL(long hashSlotCacheTTL) {
this.hashSlotCacheTTL = hashSlotCacheTTL;
return this;
}

@Override
public RedisClusterConnectOptions setMaxNestedArrays(int maxNestedArrays) {
return (RedisClusterConnectOptions) super.setMaxNestedArrays(maxNestedArrays);
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/io/vertx/redis/client/RedisOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class RedisOptions {
private RedisReplicas useReplicas;
private volatile String password;
private boolean protocolNegotiation;
private long hashSlotCacheTTL;
private TracingPolicy tracingPolicy;

/**
Expand All @@ -66,6 +67,7 @@ public RedisOptions() {
maxNestedArrays = 32;
protocolNegotiation = true;
maxWaitingHandlers = 2048;
hashSlotCacheTTL = 1000;
}

/**
Expand All @@ -84,6 +86,7 @@ public RedisOptions(RedisOptions other) {
this.useReplicas = other.useReplicas;
this.password = other.password;
this.protocolNegotiation = other.protocolNegotiation;
this.hashSlotCacheTTL = other.hashSlotCacheTTL;
this.maxWaitingHandlers = other.maxWaitingHandlers;
this.tracingPolicy = other.tracingPolicy;
}
Expand Down Expand Up @@ -539,6 +542,31 @@ public String getPoolName() {
return poolOptions.getName();
}

/**
* Returns the TTL of the hash slot cache. This is only meaningful in case of
* a {@linkplain RedisClientType#CLUSTER clustered} Redis client.
* <p>
* The TTL is expressed in milliseconds. Defaults to 1000 millis (1 second).
*
* @return the TTL of the hash slot cache
*/
public long getHashSlotCacheTTL() {
return hashSlotCacheTTL;
}

/**
* Sets the TTL of the hash slot cache. This is only meaningful in case of
* a {@linkplain RedisClientType#CLUSTER clustered} Redis client.
* <p>
* The TTL is expressed in milliseconds. Defaults to 1000 millis (1 second).
*
* @param hashSlotCacheTTL the TTL of the hash slot cache, in millis
*/
public RedisOptions setHashSlotCacheTTL(long hashSlotCacheTTL) {
this.hashSlotCacheTTL = hashSlotCacheTTL;
return this;
}

/**
* Converts this object to JSON notation.
*
Expand Down
147 changes: 88 additions & 59 deletions src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.vertx.redis.client.impl;

import io.vertx.core.*;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetClientOptions;
Expand All @@ -30,6 +31,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static io.vertx.redis.client.Command.*;
Expand Down Expand Up @@ -124,6 +126,8 @@ public static void addMasterOnlyCommand(Command command) {
private final RedisClusterConnectOptions connectOptions;
private final PoolOptions poolOptions;

private final AtomicReference<Future<Slots>> slots = new AtomicReference<>();

public RedisClusterClient(Vertx vertx, NetClientOptions tcpOptions, PoolOptions poolOptions, RedisClusterConnectOptions connectOptions, TracingPolicy tracingPolicy) {
super(vertx, tcpOptions, poolOptions, connectOptions, tracingPolicy);
this.connectOptions = connectOptions;
Expand All @@ -139,70 +143,47 @@ public RedisClusterClient(Vertx vertx, NetClientOptions tcpOptions, PoolOptions
@Override
public Future<RedisConnection> connect() {
final Promise<RedisConnection> promise = vertx.promise();
// attempt to load the slots from the first good endpoint
connect(connectOptions.getEndpoints(), 0, promise);
getSlots(vertx.getOrCreateContext())
.onSuccess(slots -> connect(slots, promise))
.onFailure(promise::fail);
return promise.future();
}

private void connect(List<String> endpoints, int index, Handler<AsyncResult<RedisConnection>> onConnect) {
if (index >= endpoints.size()) {
// stop condition
onConnect.handle(Future.failedFuture("Cannot connect to any of the provided endpoints"));
private void connect(Slots slots, Handler<AsyncResult<RedisConnection>> onConnected) {
// validate if the pool config is valid
final int totalUniqueEndpoints = slots.endpoints().length;
if (poolOptions.getMaxSize() < totalUniqueEndpoints) {
// this isn't a valid setup, the connection pool will not accommodate all the required connections
onConnected.handle(Future.failedFuture("RedisOptions maxPoolSize < Cluster size(" + totalUniqueEndpoints + "): The pool is not able to hold all required connections!"));
return;
}

connectionManager.getConnection(endpoints.get(index), RedisReplicas.NEVER != connectOptions.getUseReplicas() ? cmd(READONLY) : null)
.onFailure(err -> {
// failed try with the next endpoint
connect(endpoints, index + 1, onConnect);
})
.onSuccess(conn -> {
// fetch slots from the cluster immediately to ensure slots are correct
getSlots(endpoints.get(index), conn)
.onFailure(err -> {
// the slots command failed.
conn.close().onFailure(LOG::warn);
// try with the next one
connect(endpoints, index + 1, onConnect);
})
.onSuccess(slots -> {
// slots are loaded (this connection isn't needed anymore)
conn.close().onFailure(LOG::warn);
// create a cluster connection
final AtomicBoolean failed = new AtomicBoolean(false);
final AtomicInteger counter = new AtomicInteger();
final Map<String, PooledRedisConnection> connections = new HashMap<>();

// validate if the pool config is valid
final int totalUniqueEndpoints = slots.endpoints().length;
if (poolOptions.getMaxSize() < totalUniqueEndpoints) {
// this isn't a valid setup, the connection pool will not accommodate all the required connections
onConnect.handle(Future.failedFuture("RedisOptions maxPoolSize < Cluster size(" + totalUniqueEndpoints + "): The pool is not able to hold all required connections!"));
return;
}

for (String endpoint : slots.endpoints()) {
connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? cmd(READONLY) : null)
.onFailure(err -> {
// failed try with the next endpoint
failed.set(true);
connectionComplete(counter, slots, connections, failed, onConnect);
})
.onSuccess(cconn -> {
// there can be concurrent access to the connection map
// since this is a one time operation we can pay the penalty of
// synchronizing on each write (hopefully is only a few writes)
synchronized (connections) {
connections.put(endpoint, cconn);
}
connectionComplete(counter, slots, connections, failed, onConnect);
});
}
});
});
// create a cluster connection
final AtomicBoolean failed = new AtomicBoolean(false);
final AtomicInteger counter = new AtomicInteger();
final Map<String, PooledRedisConnection> connections = new HashMap<>();

for (String endpoint : slots.endpoints()) {
connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? cmd(READONLY) : null)
.onFailure(err -> {
// failed try with the next endpoint
failed.set(true);
connectionComplete(counter, slots, connections, failed, onConnected);
})
.onSuccess(cconn -> {
// there can be concurrent access to the connection map
// since this is a one time operation we can pay the penalty of
// synchronizing on each write (hopefully is only a few writes)
synchronized (connections) {
connections.put(endpoint, cconn);
}
connectionComplete(counter, slots, connections, failed, onConnected);
});
}
}

private void connectionComplete(AtomicInteger counter, Slots slots, Map<String, PooledRedisConnection> connections, AtomicBoolean failed, Handler<AsyncResult<RedisConnection>> onConnect) {
private void connectionComplete(AtomicInteger counter, Slots slots, Map<String, PooledRedisConnection> connections,
AtomicBoolean failed, Handler<AsyncResult<RedisConnection>> onConnected) {
if (counter.incrementAndGet() == slots.endpoints().length) {
// end condition
if (failed.get()) {
Expand All @@ -218,15 +199,63 @@ private void connectionComplete(AtomicInteger counter, Slots slots, Map<String,
}
}
// return
onConnect.handle(Future.failedFuture("Failed to connect to all nodes of the cluster"));
onConnected.handle(Future.failedFuture("Failed to connect to all nodes of the cluster"));
} else {
onConnect.handle(Future.succeededFuture(new RedisClusterConnection(vertx, connectOptions, slots, connections)));
onConnected.handle(Future.succeededFuture(new RedisClusterConnection(vertx, connectOptions, slots,
() -> this.slots.set(null), connections)));
}
}
}

private Future<Slots> getSlots(String endpoint, RedisConnection conn) {
private Future<Slots> getSlots(ContextInternal context) {
while (true) {
Future<Slots> slots = this.slots.get();
if (slots != null) {
return slots;
}

Promise<Slots> promise = context.promise();
Future<Slots> future = promise.future();
if (this.slots.compareAndSet(null, future)) {
LOG.debug("Obtaining hash slot assignment");
// attempt to load the slots from the first good endpoint
getSlots(connectOptions.getEndpoints(), 0, promise);
return future;
}
}
}

private void getSlots(List<String> endpoints, int index, Handler<AsyncResult<Slots>> onGotSlots) {
if (index >= endpoints.size()) {
// stop condition
onGotSlots.handle(Future.failedFuture("Cannot connect to any of the provided endpoints"));
return;
}

connectionManager.getConnection(endpoints.get(index), RedisReplicas.NEVER != connectOptions.getUseReplicas() ? cmd(READONLY) : null)
.onFailure(err -> {
// try with the next endpoint
getSlots(endpoints, index + 1, onGotSlots);
})
.onSuccess(conn -> {
getSlots(endpoints.get(index), conn).onComplete(result -> {
// the connection is not needed anymore, regardless of success or failure
// (on success, we just finish, on failure, we'll try another endpoint)
conn.close().onFailure(LOG::warn);

if (result.failed()) {
// the slots command failed, try with next endpoint
getSlots(endpoints, index + 1, onGotSlots);
} else {
Slots slots = result.result();
onGotSlots.handle(Future.succeededFuture(slots));
vertx.setTimer(connectOptions.getHashSlotCacheTTL(), ignored -> this.slots.set(null));
}
});
});
}

private Future<Slots> getSlots(String endpoint, RedisConnection conn) {
return conn
.send(cmd(CLUSTER).arg("SLOTS"))
.compose(reply -> {
Expand Down
Loading

0 comments on commit df02b83

Please sign in to comment.