Skip to content

Commit

Permalink
code review v1
Browse files Browse the repository at this point in the history
Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed Jan 24, 2025
1 parent c9bfd69 commit c8037a1
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 22 deletions.
43 changes: 26 additions & 17 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1253,15 +1253,23 @@ void clusterHandleServerShutdown(void) {
}

if (best_replica) {
/* Send a CLUSTER FAILOVER FORCE to the best replica. */
/* Send the CLUSTER FAILOVER FORCE REPLICAID node-id to all replicas since
* it is a shared replication buffer, but only the replica with the matching
* node-id will execute it. The caller will call flushReplicasOutputBuffers,
* so in here it is a best effort. */
char buf[128];
size_t buflen = snprintf(buf, sizeof(buf), "*5\r\n$7\r\nCLUSTER\r\n$8\r\nFAILOVER\r\n$5\r\nFORCE\r\n$9\r\nreplicaid\r\n$%d\r\n%s\r\n", CLUSTER_NAMELEN, (char *)best_replica->name->ptr);
if (connWrite(best_replica->conn, buf, buflen) == (int)strlen(buf)) {
serverLog(LL_NOTICE, "Sending CLUSTER FAILOVER FORCE to replica %s succeeded.",
replicationGetReplicaName(best_replica));
} else {
serverLog(LL_WARNING, "Failed to send CLUSTER FAILOVER FORCE to replica: %s", strerror(errno));
}
size_t buflen = snprintf(buf, sizeof(buf),
"*5\r\n$7\r\nCLUSTER\r\n"
"$8\r\nFAILOVER\r\n"
"$5\r\nFORCE\r\n"
"$9\r\nREPLICAID\r\n"
"$%d\r\n%s\r\n",
CLUSTER_NAMELEN,
(char *)best_replica->name->ptr);
/* Must install write handler for all replicas first before feeding
* replication stream. */
prepareReplicasToWrite();
feedReplicationBuffer(buf, buflen);
} else {
serverLog(LL_NOTICE, "Unable to find a replica to perform an auto failover on shutdown.");
}
Expand Down Expand Up @@ -7033,7 +7041,9 @@ int clusterCommandSpecial(client *c) {
addReplyLongLong(c, clusterNodeFailureReportsCount(n));
}
} else if (!strcasecmp(c->argv[1]->ptr, "failover") && (c->argc >= 2)) {
/* CLUSTER FAILOVER [FORCE|TAKEOVER] [replicaid <node id>] */
/* CLUSTER FAILOVER [FORCE|TAKEOVER] [REPLICAID <NODE ID>]
* REPLICAID is currently available only for internal so we won't
* put it into the JSON file. */
int force = 0, takeover = 0;
robj *replicaid = NULL;

Expand All @@ -7044,7 +7054,8 @@ int clusterCommandSpecial(client *c) {
} else if (!strcasecmp(c->argv[j]->ptr, "takeover")) {
takeover = 1;
force = 1; /* Takeover also implies force. */
} else if (!strcasecmp(c->argv[j]->ptr, "replicaid") && moreargs) {
} else if (c == server.primary && !strcasecmp(c->argv[j]->ptr, "replicaid") && moreargs) {
/* This option is currently available only for primary. */
j++;
replicaid = c->argv[j];
} else {
Expand All @@ -7054,13 +7065,11 @@ int clusterCommandSpecial(client *c) {
}

/* Check if it should be executed by myself. */
if (replicaid != NULL) {
clusterNode *n = clusterLookupNode(replicaid->ptr, sdslen(replicaid->ptr));
if (n != myself) {
/* Ignore this command, including the sanity check and the process. */
addReply(c, shared.ok);
return 1;
}
if (replicaid != NULL && memcmp(replicaid->ptr, myself->name, CLUSTER_NAMELEN) != 0) {
/* Ignore this command, including the sanity check and the process. */
serverLog(LL_NOTICE, "return ok");
addReply(c, shared.ok);
return 1;
}

/* Check preconditions. */
Expand Down
2 changes: 1 addition & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3704,7 +3704,7 @@ void syncWithPrimary(connection *conn) {
if (server.repl_state == REPL_STATE_RECEIVE_SETNAME_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
/* Ignore the error if any. 8.1 introduced this logic and we don't care if it failed. */
/* Ignore the error if any, we don't care if it failed, it is best effort. */
if (err[0] == '-') {
serverLog(LL_NOTICE, "(Non critical) Primary does not understand CLIENT SETNAME: %s", err);
}
Expand Down
6 changes: 3 additions & 3 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -4585,16 +4585,16 @@ int finishShutdown(void) {
unlink(server.pidfile);
}

/* Handle cluster-related matters when shutdown. */
if (server.cluster_enabled) clusterHandleServerShutdown();

/* Best effort flush of replica output buffers, so that we hopefully
* send them pending writes. */
flushReplicasOutputBuffers();

/* Close the listening sockets. Apparently this allows faster restarts. */
closeListeningSockets(1);

/* Handle cluster-related matters when shutdown. */
if (server.cluster_enabled) clusterHandleServerShutdown();

serverLog(LL_WARNING, "%s is now ready to exit, bye bye...", server.sentinel_mode ? "Sentinel" : "Valkey");
return C_OK;

Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2891,6 +2891,7 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout);
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);

/* Replication */
int prepareReplicasToWrite(void);
void replicationFeedReplicas(int dictid, robj **argv, int argc);
void replicationFeedStreamFromPrimaryStream(char *buf, size_t buflen);
void resetReplicationBuffer(void);
Expand Down
1 change: 0 additions & 1 deletion tests/unit/cluster/auto-failover-on-shutdown.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ proc test_main {how shutdown_timeout} {
}

# Make sure that the expected logs are printed.
verify_log_message 0 "*Sending CLUSTER FAILOVER FORCE to replica*" 0
verify_log_message -6 "*Forced failover primary request accepted*" 0

resume_process $replica1_pid
Expand Down

0 comments on commit c8037a1

Please sign in to comment.