Skip to content

Commit

Permalink
CLUSTER FAILOVER replicaid node-id
Browse files Browse the repository at this point in the history
Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed Jan 23, 2025
1 parent 0ccc4e4 commit c9bfd69
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 16 deletions.
1 change: 0 additions & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
#define CLUSTER_SLOTS (1 << CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
#define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */
#define CLUSTER_OK 0 /* Everything looks ok */
#define CLUSTER_FAIL 1 /* The cluster can't work */
#define CLUSTER_NAMELEN 40 /* sha1 hex length */
Expand Down
47 changes: 33 additions & 14 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1233,25 +1233,30 @@ void clusterInitLast(void) {

/* Called when a cluster node receives SHUTDOWN. */
void clusterHandleServerShutdown(void) {
if (server.auto_failover_on_shutdown) {
if (nodeIsPrimary(myself) && server.auto_failover_on_shutdown) {
/* Find the first best replica, that is, the replica with the largest offset. */
client *best_replica = NULL;
listIter replicas_iter;
listNode *replicas_list_node;
listRewind(server.replicas, &replicas_iter);
while ((replicas_list_node = listNext(&replicas_iter)) != NULL) {
client *replica = listNodeValue(replicas_list_node);
/* This is done only when the replica offset is caught up, to avoid data loss */
if (replica->repl_state == REPLICA_STATE_ONLINE && replica->repl_ack_off == server.primary_repl_offset) {
/* This is done only when the replica offset is caught up, to avoid data loss.
* And 0x800ff is 8.0.255, we only support new versions for this feature. */
if (replica->repl_data->repl_state == REPLICA_STATE_ONLINE &&
// replica->repl_data->replica_version > 0x800ff &&
replica->name && sdslen(replica->name->ptr) == CLUSTER_NAMELEN &&
replica->repl_data->repl_ack_off == server.primary_repl_offset) {
best_replica = replica;
break;
}
}

if (best_replica) {
/* Send a CLUSTER FAILOVER FORCE to the best replica. */
const char *buf = "*3\r\n$7\r\nCLUSTER\r\n$8\r\nFAILOVER\r\n$5\r\nFORCE\r\n";
if (connWrite(best_replica->conn, buf, strlen(buf)) == (int)strlen(buf)) {
char buf[128];
size_t buflen = snprintf(buf, sizeof(buf), "*5\r\n$7\r\nCLUSTER\r\n$8\r\nFAILOVER\r\n$5\r\nFORCE\r\n$9\r\nreplicaid\r\n$%d\r\n%s\r\n", CLUSTER_NAMELEN, (char *)best_replica->name->ptr);
if (connWrite(best_replica->conn, buf, buflen) == (int)strlen(buf)) {
serverLog(LL_NOTICE, "Sending CLUSTER FAILOVER FORCE to replica %s succeeded.",
replicationGetReplicaName(best_replica));
} else {
Expand Down Expand Up @@ -7027,32 +7032,46 @@ int clusterCommandSpecial(client *c) {
} else {
addReplyLongLong(c, clusterNodeFailureReportsCount(n));
}
} else if (!strcasecmp(c->argv[1]->ptr, "failover") && (c->argc == 2 || c->argc == 3)) {
/* CLUSTER FAILOVER [FORCE|TAKEOVER] */
} else if (!strcasecmp(c->argv[1]->ptr, "failover") && (c->argc >= 2)) {
/* CLUSTER FAILOVER [FORCE|TAKEOVER] [replicaid <node id>] */
int force = 0, takeover = 0;
robj *replicaid = NULL;

if (c->argc == 3) {
if (!strcasecmp(c->argv[2]->ptr, "force")) {
for (int j = 2; j < c->argc; j++) {
int moreargs = (c->argc - 1) - j;
if (!strcasecmp(c->argv[j]->ptr, "force")) {
force = 1;
} else if (!strcasecmp(c->argv[2]->ptr, "takeover")) {
} else if (!strcasecmp(c->argv[j]->ptr, "takeover")) {
takeover = 1;
force = 1; /* Takeover also implies force. */
} else if (!strcasecmp(c->argv[j]->ptr, "replicaid") && moreargs) {
j++;
replicaid = c->argv[j];
} else {
addReplyErrorObject(c, shared.syntaxerr);
return 1;
}
}

/* Check if it should be executed by myself. */
if (replicaid != NULL) {
clusterNode *n = clusterLookupNode(replicaid->ptr, sdslen(replicaid->ptr));
if (n != myself) {
/* Ignore this command, including the sanity check and the process. */
addReply(c, shared.ok);
return 1;
}
}

/* Check preconditions. */
if (clusterNodeIsPrimary(myself)) {
addReplyError(c, "You should send CLUSTER FAILOVER to a replica");
if (replicaid == NULL) addReplyError(c, "You should send CLUSTER FAILOVER to a replica");
return 1;
} else if (myself->replicaof == NULL) {
addReplyError(c, "I'm a replica but my master is unknown to me");
if (replicaid == NULL) addReplyError(c, "I'm a replica but my master is unknown to me");
return 1;
} else if (!force && (nodeFailed(myself->replicaof) || myself->replicaof->link == NULL)) {
addReplyError(c, "Master is down or failed, "
"please use CLUSTER FAILOVER FORCE");
if (replicaid == NULL) addReplyError(c, "Master is down or failed, please use CLUSTER FAILOVER FORCE");
return 1;
}
resetManualFailover();
Expand Down
26 changes: 26 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3594,6 +3594,14 @@ void syncWithPrimary(connection *conn) {
err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL);
if (err) goto write_error;

/* Inform the primary of our (replica) node name. */
if (server.cluster_enabled) {
char *argv[] = {"CLIENT", "SETNAME", server.cluster->myself->name};
size_t lens[] = {6, 7, CLUSTER_NAMELEN};
err = sendCommandArgv(conn, 3, argv, lens);
if (err) goto write_error;
}

server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY;
return;
}
Expand Down Expand Up @@ -3684,6 +3692,24 @@ void syncWithPrimary(connection *conn) {
}
sdsfree(err);
err = NULL;
if (server.cluster_enabled) {
server.repl_state = REPL_STATE_RECEIVE_SETNAME_REPLY;
return;
} else {
server.repl_state = REPL_STATE_SEND_PSYNC;
}
}

/* Receive CLIENT SETNAME reply. */
if (server.repl_state == REPL_STATE_RECEIVE_SETNAME_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
/* Ignore the error if any. 8.1 introduced this logic and we don't care if it failed. */
if (err[0] == '-') {
serverLog(LL_NOTICE, "(Non critical) Primary does not understand CLIENT SETNAME: %s", err);
}
sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_SEND_PSYNC;
}

Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ typedef enum {
REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_VERSION_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_SETNAME_REPLY, /* Wait for CLIENT SETNAME reply */
REPL_STATE_SEND_PSYNC, /* Send PSYNC */
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
/* --- End of handshake states --- */
Expand Down
2 changes: 1 addition & 1 deletion tests/support/util.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ proc check_replica_acked_ofs {primary replica_ip replica_port} {
proc wait_replica_acked_ofs {primary replica replica_ip replica_port} {
$primary config set repl-ping-replica-period 3600
$replica config set hz 500
wait_for_condition 100 100 {
wait_for_condition 1000 50 {
[check_replica_acked_ofs $primary $replica_ip $replica_port] eq 1
} else {
puts "INFO REPLICATION: [$primary info replication]"
Expand Down

0 comments on commit c9bfd69

Please sign in to comment.