From 8239112cab1d35602e4bf1132a4fba47fab85885 Mon Sep 17 00:00:00 2001 From: Binbin Date: Wed, 20 Nov 2024 11:54:01 +0800 Subject: [PATCH] Move the handler logic to replication --- src/cluster.h | 1 + src/cluster_legacy.c | 24 +- src/cluster_legacy.h | 53 +-- src/cluster_slotsync.c | 634 ++++++----------------------- src/commands.def | 2 +- src/commands/cluster-slotsync.json | 2 +- src/db.c | 2 +- src/expire.c | 1 + src/rdb.c | 131 ++---- src/replication.c | 598 +++++++++++++++++++-------- src/server.c | 8 + src/server.h | 15 +- tests/test_slot_sync.py | 57 +-- 13 files changed, 683 insertions(+), 845 deletions(-) diff --git a/src/cluster.h b/src/cluster.h index 31a00474fb..f1f07503cf 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -149,5 +149,6 @@ void clusterSlotPendingDelete(void); int testInjectError(const char *error); char *getInjectOptionValue(const char *option); list *createSlotRangeList(void); +void resetSlotSyncLinkForReconnect(void *link); #endif /* __CLUSTER_H */ diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 6d412e5a79..746351b48d 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -131,7 +131,7 @@ int isSlotInClusterSlotSyncLinkList(int slot); int isSlotInPendingDelete(int slot); clusterSlotSyncLink *createSlotSyncLink(void); void initSlotSyncLink(clusterSlotSyncLink *link, clusterNode *node, list *slot_ranges); -const char *slotSyncStateToString(slotSyncState state); +const char *slotSyncStateToString(int repl_state); sds formatSlotSyncImportingSlots(void); void clusterCommandSlotLinkList(client *c); void clusterCommandSlotLinkKill(client *c, const char *linkname); @@ -6995,13 +6995,13 @@ int clusterCommandSpecial(client *c) { /* CLUSTER SLOTSYNC [ ...] * * This command is sent to the target node, which must be a primary node, and - * it will try to synchronize slot data from the slot owner. */ + * it will try to synchronize slot data from the slot primary. */ if (!nodeIsPrimary(myself)) { addReplyError(c, "Myself should be a primary."); return 1; } - clusterNode *n = NULL; + clusterNode *source_node = NULL; /* Build the slot range list by the command arguments. */ list *slot_ranges = createSlotRangeList(); @@ -7028,15 +7028,15 @@ int clusterCommandSpecial(client *c) { listRelease(slot_ranges); return 1; } - if (!n) { - n = server.cluster->slots[j]; - } else if (n != server.cluster->slots[j]) { + if (!source_node) { + source_node = server.cluster->slots[j]; + } else if (source_node != server.cluster->slots[j]) { addReplyErrorFormat(c, "The slot ranges can not cross nodes, please check slot: %d.", j); listRelease(slot_ranges); return 1; } - if (n == myself) { - addReplyErrorFormat(c, "Slot %d is served by myself.", j); + if (source_node == myself) { + addReplyErrorFormat(c, "Slot %d is already served by myself.", j); listRelease(slot_ranges); return 1; } @@ -7057,13 +7057,13 @@ int clusterCommandSpecial(client *c) { new_range->end_slot = endslot; listAddNodeTail(slot_ranges, new_range); serverLog(LL_NOTICE, "Syncing slot range [%d-%d] from node %.40s (%s)", - startslot, endslot, n->name, n->human_nodename); + startslot, endslot, source_node->name, source_node->human_nodename); } - serverAssert(n); + serverAssert(source_node && source_node != myself); /* Create and initialize the slot sync link. */ clusterSlotSyncLink *link = createSlotSyncLink(); - initSlotSyncLink(link, n, slot_ranges); + initSlotSyncLink(link, source_node, slot_ranges); listAddNodeTail(server.cluster->slotsync_links, link); addReply(c, shared.ok); } else if (!strcasecmp(c->argv[1]->ptr, "slotfailover") && (c->argc == 2 || c->argc == 3)) { @@ -7091,7 +7091,7 @@ int clusterCommandSpecial(client *c) { listRewind(server.cluster->slotsync_links, &li); while ((ln = listNext(&li)) != NULL) { clusterSlotSyncLink *link = ln->value; - if (link->sync_state != CLUSTER_SLOTSYNC_STATE_CONNECTED) { + if (link->sync_state != REPL_STATE_CONNECTED) { addReplyErrorFormat(c, "Slot sync link %.40s is not connected, link status: %s", link->linkname, slotSyncStateToString(link->sync_state)); return 1; diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index d2fb067f70..d49d7cab09 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -372,49 +372,28 @@ typedef struct slotRange { int end_slot; } slotRange; -/* Slot sync state. Used in clusterSlotSyncLink struct sync_state for links to remember - * what to do next. */ -typedef enum { - CLUSTER_SLOTSYNC_STATE_NONE = 0, - CLUSTER_SLOTSYNC_STATE_TOCONNECT, /* Need to reconnect with the slot owner */ - CLUSTER_SLOTSYNC_STATE_CONNECTING, /* In connecting with the slot owner */ - /* --- Handshake states, must be ordered --- */ - CLUSTER_SLOTSYNC_STATE_SEND_AUTH, /* Need to send AUTH */ - CLUSTER_SLOTSYNC_STATE_RECV_AUTH, /* Wait for AUTH reply */ - CLUSTER_SLOTSYNC_STATE_SEND_CAPA, /* Need to send REPLCONF capa */ - CLUSTER_SLOTSYNC_STATE_RECV_CAPA, /* Wait for REPLCONF reply */ - CLUSTER_SLOTSYNC_STATE_WAIT_SCHED, /* Wait for schedule to avod oncurrency bug */ - CLUSTER_SLOTSYNC_STATE_SEND_SYNC, /* Need to send SYNC */ - /* --- End of handshake states --- */ - CLUSTER_SLOTSYNC_STATE_RECV_RDB, /* Receiving the filtered rdb */ - CLUSTER_SLOTSYNC_STATE_LOADING_RDB, /* Loading the RDB in bio. */ - CLUSTER_SLOTSYNC_STATE_DONE_LOADING,/* Done loading the RDB in bio. */ - CLUSTER_SLOTSYNC_STATE_CONNECTED, /* Synced with the slot owner */ - CLUSTER_SLOTSYNC_STATE_LOADING_FAIL,/* Loading fail. */ - CLUSTER_SLOTSYNC_STATE_FAILED, /* Meet unexpected error and retry will not work */ -} slotSyncState; - /* Encapsulate everything needed to talk with the slot sync source node. */ typedef struct clusterSlotSyncLink { - mstime_t ctime; /* Link creation time */ - connection *sync_conn; /* Connection to slot sync source node */ - client* client; - char linkname[CLUSTER_NAMELEN]; /* Name of this link */ - char nodename[CLUSTER_NAMELEN]; /* Name of the slot sync source node */ + mstime_t ctime; /* Link object creation time. */ + char linkname[CLUSTER_NAMELEN]; /* Name of this link, hex string, sha1-size. */ + char nodename[CLUSTER_NAMELEN]; /* Name of the slot sync source node, hex string, sha1-size. */ - serverDb *db; - functionsLibCtx *functions_lib_ctx; + /* Temporary resources during slot synchronization. */ + serverDb *temp_db; /* Temp db stores the keys during the sync process. */ + functionsLibCtx *temp_func_ctx; /* Temp function ctx stores functions during the sync process. */ - slotSyncState sync_state; /* Sync state */ - list* slot_ranges; /* List of the slot ranges we want to sync */ + client *client; /* Client to slot sync source node. */ + connection *sync_conn; /* Connection to slot sync source node. */ + int sync_state; /* State of the slot sync link during slot sync. */ + list *slot_ranges; /* List of the slot ranges we want to sync. */ /* The following fields are used by slot sync RDB transfer. */ - int transfer_tmpfile_fd; /* Descriptor of the tmpfile to store slot sync RDB */ - char* transfer_tmpfile_name; /* Name of the tmpfile to store slot sync RDB */ - int64_t transfer_total_size; /* Total size of the slot sync RDB file */ - int64_t transfer_read_size; /* Amount of read from the slot sync RDB file */ - off_t transfer_last_fsync_off; /* Offset when we fsync-ed last time */ - time_t transfer_lastio; /* Unix time of the latest read, for timeout */ + int repl_transfer_fd; /* Descriptor of the tmpfile to store slot sync RDB */ + char *repl_transfer_tmpfile; /* Name of the tmpfile to store slot sync RDB */ + int64_t repl_transfer_size; /* Total size of the slot sync RDB file */ + int64_t repl_transfer_read; /* Amount of read from the slot sync RDB file */ + off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time */ + time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ /* The following fields are used by slot failover. */ int slot_mf_ready; /* If is ready to do slot manual failover */ diff --git a/src/cluster_slotsync.c b/src/cluster_slotsync.c index 509cffe2ae..1e240da40f 100644 --- a/src/cluster_slotsync.c +++ b/src/cluster_slotsync.c @@ -6,9 +6,7 @@ /* The following functions are declared here as they will be used in this file * but they are defined in other files. */ -char *sendCommand(connection *conn, ...); -char *receiveSynchronousResponse(connection *conn); -int useDisklessLoad(void); +void syncWithSlotSyncPrimary(connection *conn); void delkeysNotOwnedByMySelf(list *slot_ranges); void clusterUpdateState(void); void clusterSaveConfigOrDie(int do_fsync); @@ -22,9 +20,6 @@ unsigned int delKeysInSlotWithTimeLimit(unsigned int hashslot, ustime_t *limit); * before the definition, we will define them in this file later. */ void setSlotSyncImporting(list *slot_ranges, clusterNode *node); void clearSlotSyncImporting(list *slot_ranges); -void syncWithSlotOwner(connection *conn); -void continueSlotSync(clusterSlotSyncLink *link); -void readSlotSyncBulkPayload(connection *conn); clusterNode *getClusterNodeBySlotList(list *slot_ranges, int *cross_node); void notifyClientsCloseSlotSyncLink(void); void clusterDoBeforeSleep(int flags); @@ -219,38 +214,55 @@ void freeSlotSyncConn(clusterSlotSyncLink *link) { } } -void initSlotSyncLink(clusterSlotSyncLink *link, clusterNode *node, list *slot_ranges) { +int connectWithSlotOwner(clusterSlotSyncLink *link, clusterNode *source_node) { + link->sync_conn = connCreate(connTypeOfCluster()); + if (connConnect(link->sync_conn, source_node->ip, getNodeDefaultClientPort(source_node), server.bind_source_addr, + syncWithSlotSyncPrimary) == C_ERR) { + serverLog(LL_WARNING, "Unable to connect to slot owner %.40s (%s): %s", source_node->name, + source_node->human_nodename, connGetLastError(link->sync_conn)); + connClose(link->sync_conn); + link->sync_conn = NULL; + return C_ERR; + } + + connSetPrivateData(link->sync_conn, link); + link->repl_transfer_lastio = server.unixtime; + link->sync_state = REPL_STATE_CONNECTING; + return C_OK; +} + +/* Initialize a slot sync link, pass in the source node and the slot ranges that need to be synced. */ +void initSlotSyncLink(clusterSlotSyncLink *link, clusterNode *source_node, list *slot_ranges) { link->ctime = mstime(); - strncpy(link->nodename, node->name, CLUSTER_NAMELEN); getRandomHexChars(link->linkname, sizeof(link->linkname)); + memcpy(link->nodename, source_node->name, CLUSTER_NAMELEN); + + link->temp_db = NULL; + link->temp_func_ctx = NULL; link->client = NULL; - link->transfer_total_size = -1; - link->transfer_read_size = 0; - link->transfer_last_fsync_off = 0; - link->transfer_lastio = server.unixtime; - link->transfer_tmpfile_fd = -1; - link->transfer_tmpfile_name = NULL; + link->sync_conn = NULL; + link->sync_state = REPL_STATE_NONE; if (slot_ranges) link->slot_ranges = slot_ranges; + + link->repl_transfer_fd = -1; + link->repl_transfer_tmpfile = NULL; + link->repl_transfer_size = -1; + link->repl_transfer_read = 0; + link->repl_transfer_last_fsync_off = 0; + link->slot_mf_ready = 0; link->slot_mf_end = 0; link->slot_mf_lag = 0; - link->sync_state = CLUSTER_SLOTSYNC_STATE_CONNECTING; - link->sync_conn = connCreate(connTypeOfCluster()); - if (connConnect(link->sync_conn, node->ip, getNodeDefaultClientPort(node), server.bind_source_addr, - syncWithSlotOwner) == C_ERR) { - serverLog(LL_WARNING, "Unable to connect to slot owner %.40s (%s): %s", node->name, node->human_nodename, - connGetLastError(link->sync_conn)); - freeSlotSyncConn(link); - return; + serverLog(LL_NOTICE, "Connecting to slot owner %.40s (%s) %s:%d", source_node->name, source_node->human_nodename, + source_node->ip, getNodeDefaultClientPort(source_node)); + if (connectWithSlotOwner(link, source_node) == C_OK) { + // todo check + setSlotSyncImporting(link->slot_ranges, source_node); + serverLog(LL_NOTICE, "Init slot sync link %.40s from node %.40s (%s).", link->linkname, source_node->name, + source_node->human_nodename); } - - connSetReadHandler(link->sync_conn, syncWithSlotOwner); - connSetPrivateData(link->sync_conn, link); - setSlotSyncImporting(link->slot_ranges, node); - serverLog(LL_NOTICE, "Init slot sync link %.40s from node %.40s (%s).", link->linkname, node->name, - node->human_nodename); } void resetSlotSyncLink(clusterSlotSyncLink *link, int reconn) { @@ -260,45 +272,43 @@ void resetSlotSyncLink(clusterSlotSyncLink *link, int reconn) { freeSlotSyncConn(link); } - if (link->transfer_tmpfile_fd > 0) { - close(link->transfer_tmpfile_fd); - link->transfer_tmpfile_fd = -1; + if (link->repl_transfer_fd != -1) { + close(link->repl_transfer_fd); + bg_unlink(link->repl_transfer_tmpfile); + zfree(link->repl_transfer_tmpfile); + link->repl_transfer_fd = -1; + link->repl_transfer_tmpfile = NULL; } - if (link->transfer_tmpfile_name) { - zfree(link->transfer_tmpfile_name); - link->transfer_tmpfile_name = NULL; + if (link->temp_db) { + discardTempDb(link->temp_db); + link->temp_db = NULL; } - if (link->db) { - serverLog(LL_NOTICE, "have link->db"); - discardTempDb(link->db, NULL); - serverLog(LL_NOTICE, "after have link->db"); - - } - - if (link->functions_lib_ctx) { - functionsLibCtxFree(link->functions_lib_ctx); + if (link->temp_func_ctx) { + functionsLibCtxFree(link->temp_func_ctx); + link->temp_func_ctx = NULL; } if (reconn) { - /* Set the state to TOCONNECT, so the cron will retry start next time. */ - link->sync_state = CLUSTER_SLOTSYNC_STATE_TOCONNECT; + /* Set the state to CONNECT, so the cron will retry start next time. */ + link->sync_state = REPL_STATE_CONNECT; link->client = NULL; - link->transfer_total_size = -1; - link->transfer_read_size = 0; - link->transfer_last_fsync_off = 0; - link->transfer_lastio = server.unixtime; + link->repl_transfer_size = -1; + link->repl_transfer_read = 0; + link->repl_transfer_last_fsync_off = 0; + link->repl_transfer_lastio = server.unixtime; link->slot_mf_ready = 0; link->slot_mf_end = 0; link->slot_mf_lag = 0; link->sync_conn = NULL; } else { + clearSlotSyncImporting(link->slot_ranges); listRelease(link->slot_ranges); } } -void resetSlotSyncLinkForReconnect(clusterSlotSyncLink *link) { +void resetSlotSyncLinkForReconnect(void *link) { resetSlotSyncLink(link, 1); } @@ -348,23 +358,28 @@ int isSlotInClusterSlotSyncLinkList(int slot) { return 0; } -const char *slotSyncStateToString(slotSyncState state) { - switch (state) { - case CLUSTER_SLOTSYNC_STATE_NONE: return "none"; - case CLUSTER_SLOTSYNC_STATE_TOCONNECT: return "to_connect"; - case CLUSTER_SLOTSYNC_STATE_CONNECTING: return "connecting"; - case CLUSTER_SLOTSYNC_STATE_SEND_AUTH: return "send_auth"; - case CLUSTER_SLOTSYNC_STATE_RECV_AUTH: return "recv_auth"; - case CLUSTER_SLOTSYNC_STATE_SEND_CAPA: return "send_capa"; - case CLUSTER_SLOTSYNC_STATE_RECV_CAPA: return "recv_capa"; - case CLUSTER_SLOTSYNC_STATE_WAIT_SCHED: return "wait_sched"; - case CLUSTER_SLOTSYNC_STATE_SEND_SYNC: return "send_sync"; - case CLUSTER_SLOTSYNC_STATE_RECV_RDB: return "recv_rdb"; - case CLUSTER_SLOTSYNC_STATE_LOADING_RDB: return "loading_rdb"; - case CLUSTER_SLOTSYNC_STATE_DONE_LOADING: return "done_loading"; - case CLUSTER_SLOTSYNC_STATE_CONNECTED: return "connected"; - case CLUSTER_SLOTSYNC_STATE_FAILED: return "failed"; - default: serverPanic("Unknown slot sync state."); +// todo This may need to be removed, or not exposed so much +const char *slotSyncStateToString(int repl_state) { + switch (repl_state) { + case REPL_STATE_NONE: return "none"; + case REPL_STATE_CONNECT: return "connect"; + case REPL_STATE_CONNECTING: return "connecting"; + case REPL_STATE_RECEIVE_PING_REPLY: return "recv_ping_reply"; + case REPL_STATE_SEND_HANDSHAKE: return "send_handshake"; + case REPL_STATE_RECEIVE_AUTH_REPLY: return "recv_auth_reply"; + case REPL_STATE_RECEIVE_PORT_REPLY: return "recv_port_reply"; + case REPL_STATE_RECEIVE_IP_REPLY: return "recv_ip_reply"; + case REPL_STATE_RECEIVE_CAPA_REPLY: return "recv_capa_reply"; + case REPL_STATE_RECEIVE_VERSION_REPLY: return "recv_version_reply"; + case REPL_STATE_WAIT_SCHED: return "wait_sched"; + case REPL_STATE_SEND_PSYNC: return "send_psync"; + case REPL_STATE_RECEIVE_PSYNC_REPLY: return "recv_psync_reply"; + case REPL_STATE_TRANSFER: return "transfer"; + case REPL_STATE_LOADING: return "loading"; + case REPL_STATE_CONNECTED: return "connected"; + case REPL_STATE_LOADED: return "loaded"; + case REPL_STATE_LOAD_FAIL: return "load_fail"; + default: return "unknown"; } } @@ -433,7 +448,7 @@ int clusterGetSlotSyncLinkRank(clusterSlotSyncLink *in) { clusterSlotSyncLink *link = ln->value; /* Skip connected links. */ - if (link->sync_state == CLUSTER_SLOTSYNC_STATE_CONNECTED) { + if (link->sync_state == REPL_STATE_CONNECTED) { continue; } @@ -443,8 +458,7 @@ int clusterGetSlotSyncLinkRank(clusterSlotSyncLink *in) { } /* Current link is in progress, the input link should rank after it. */ - if (link->sync_state > CLUSTER_SLOTSYNC_STATE_WAIT_SCHED && - link->sync_state < CLUSTER_SLOTSYNC_STATE_CONNECTED) { + if (link->sync_state > REPL_STATE_WAIT_SCHED && link->sync_state < REPL_STATE_CONNECTED) { rank++; continue; } @@ -516,424 +530,6 @@ sds formatSlotSyncImportingSlots(void) { return ci; } -/* ----------------------------------------------------------------------------- - * Cluster functions related to slot sync handshake and rdb transfer. - * -------------------------------------------------------------------------- */ - -void syncWithSlotOwner(connection *conn) { - clusterSlotSyncLink *link = connGetPrivateData(conn); - char *err = NULL; - - /* Simulate IO error. */ - if (testInjectError("crs-io-error-beforce-slot-sync")) { - serverLog(LL_WARNING, "inject crs-io-error-beforce-slot-sync"); - goto error; - } - - /* Check for errors in the socket: after a non blocking connect() we - * may find that the socket is in error state. */ - if (connGetState(conn) != CONN_STATE_CONNECTED) { - serverLog(LL_WARNING, "Error condition on socket for slot sync: %s", connGetLastError(conn)); - goto error; - } - - /* CLUSTER_SLOTSYNC_STATE: CONNECTING ==> SEND_AUTH|SEND_CAPA - * - * Set the read/write event handler. */ - if (link->sync_state == CLUSTER_SLOTSYNC_STATE_CONNECTING) { - serverLog(LL_NOTICE, "Non blocking connect for slot sync fired the event."); - connSetWriteHandler(conn, NULL); - connSetReadHandler(conn, syncWithSlotOwner); - if (server.primary_auth) { - link->sync_state = CLUSTER_SLOTSYNC_STATE_SEND_AUTH; - } else { - link->sync_state = CLUSTER_SLOTSYNC_STATE_SEND_CAPA; - } - } - - /* Simulate IO error. */ - if (testInjectError("crs-io-error-send-auth")) { - serverLog(LL_WARNING, "inject crs-io-error-send-auth"); - goto error; - } - - /* CLUSTER_SLOTSYNC_STATE: SEND_AUTH ==> RECV_AUTH - * - * AUTH with the slot owner if needed. */ - if (link->sync_state == CLUSTER_SLOTSYNC_STATE_SEND_AUTH) { - err = sendCommand(conn, "AUTH", server.primary_auth, NULL); - if (err) goto write_error; - link->sync_state = CLUSTER_SLOTSYNC_STATE_RECV_AUTH; - return; - } - - /* CLUSTER_SLOTSYNC_STATE: RECV_AUTH ==> SEND_CAPA - * - * Receive AUTH reply. */ - if (link->sync_state == CLUSTER_SLOTSYNC_STATE_RECV_AUTH) { - err = receiveSynchronousResponse(conn); - if (err == NULL) goto no_response_error; - if (err[0] == '-') { - serverLog(LL_WARNING, "Unable to AUTH to slot owner: %s", err); - sdsfree(err); - err = NULL; - connClose(conn); - link->sync_state = CLUSTER_SLOTSYNC_STATE_FAILED; - return; - } - sdsfree(err); - err = NULL; - link->sync_state = CLUSTER_SLOTSYNC_STATE_SEND_CAPA; - } - - // todo may need to confirm whether the slot owner supports slot migration. - // like support SYNC xxx xxx - /* CLUSTER_SLOTSYNC_STATE: SEND_CAPA ==> RECV_CAPA - * - * Inform the slot owner of our capabilities. */ - if (link->sync_state == CLUSTER_SLOTSYNC_STATE_SEND_CAPA) { - sds portstr = getReplicaPortString(); - err = sendCommand(conn, "REPLCONF", "capa", "eof", "listening-port", portstr, NULL); - sdsfree(portstr); - if (err) goto write_error; - sdsfree(err); - err = NULL; - link->sync_state = CLUSTER_SLOTSYNC_STATE_RECV_CAPA; - return; - } - - /* CLUSTER_SLOTSYNC_STATE: RECV_CAPA ==> WAIT_SCHED - * - * Receive CAPA reply. */ - if (link->sync_state == CLUSTER_SLOTSYNC_STATE_RECV_CAPA) { - err = receiveSynchronousResponse(conn); - if (err == NULL) goto no_response_error; - /* Ignore the error if any, not all the versions support REPLCONF capa. */ - if (err[0] == '-') { - serverLog(LL_NOTICE, "(Non critical) Slot owner does not understand REPLCONF capa: %s", err); - } - sdsfree(err); - err = NULL; - link->sync_state = CLUSTER_SLOTSYNC_STATE_WAIT_SCHED; - } - - /* CLUSTER_SLOTSYNC_STATE: WAIT_SCHED ==> SEND_SYNC - * - * Wait other slotsync links. */ - if (link->sync_state == CLUSTER_SLOTSYNC_STATE_WAIT_SCHED) { - if (clusterGetSlotSyncLinkRank(link) == 0) { - link->sync_state = CLUSTER_SLOTSYNC_STATE_SEND_SYNC; - } else { - return; - } - } - continueSlotSync(link); - return; - -no_response_error: /* Handle receiveSynchronousResponse() error when slot owner has no reply. */ - serverLog(LL_WARNING, "Slot owner did not respond to command during slotsync handshake."); - /* Fall through to regular error handling */ - -error: - freeSlotSyncConn(link); - - /* Set the state to TOCONNECT, so the cron will retry start next time. */ - link->sync_state = CLUSTER_SLOTSYNC_STATE_TOCONNECT; - return; - -write_error: /* Handle sendCommand() errors. */ - serverLog(LL_WARNING,"Sending command to target handshake: %s", err); - sdsfree(err); - err = NULL; - goto error; -} - -void continueSlotSync(clusterSlotSyncLink *link) { - char tmpfile[256]; - int tmpfd = -1; - int maxtries = 5; - - /* CLUSTER_SLOTSYNC_STATE: SEND_SYNC ==> RECV_RDB - * - * Send the special SYNC command to the slots owner. */ - if (link->sync_state == CLUSTER_SLOTSYNC_STATE_SEND_SYNC) { - sds sync_cmd = sdscatprintf(sdsempty(), "SYNC "); - sds slot_ranges = reprSlotRangeListWithBlank(link->slot_ranges); - sync_cmd = sdscatsds(sync_cmd, slot_ranges); - sync_cmd = sdscatprintf(sync_cmd, "\r\n"); - - /* Simulate IO error. */ - if (testInjectError("crs-io-error-beforce-send-sync")) { - serverLog(LL_WARNING, "inject crs-io-error-beforce-send-sync"); - sdsfree(slot_ranges); - sdsfree(sync_cmd); - goto error; - } - - if (connSyncWrite(link->sync_conn, sync_cmd, sdslen(sync_cmd), server.repl_syncio_timeout * 1000) == -1) { - serverLog(LL_WARNING, "I/O error writing to slot owner: %s", connGetLastError(link->sync_conn)); - sdsfree(slot_ranges); - sdsfree(sync_cmd); - goto error; - } - sdsfree(slot_ranges); - sdsfree(sync_cmd); - link->sync_state = CLUSTER_SLOTSYNC_STATE_RECV_RDB; - } - - /* Prepare a suitable temp file for slot rdb transfer. */ - while (maxtries--) { - snprintf(tmpfile,256, "temp-%d.%ld.rdb", (int)server.unixtime, (long int)getpid()); - tmpfd = open(tmpfile, O_CREAT | O_WRONLY | O_EXCL, 0644); - if (tmpfd != -1) break; - sleep(1); - } - if (tmpfd == -1) { - serverLog(LL_WARNING, "Opening the temp file needed for slot synchronization: %s", strerror(errno)); - goto error; - } - - /* Change the read event handler from syncWithSlotOwner() to readSlotSyncBulkPayload(). */ - if (link->sync_state == CLUSTER_SLOTSYNC_STATE_RECV_RDB) { - if (connSetReadHandler(link->sync_conn, readSlotSyncBulkPayload) == C_ERR) { - char conninfo[CONN_INFO_LEN]; - serverLog(LL_WARNING, "Can't create readable event for slot sync: %s (%s)", strerror(errno), - connGetInfo(link->sync_conn, conninfo, sizeof(conninfo))); - goto error; - } - } - - /* Store the name and fd of the rdb file to the clusterSlotSyncLink. */ - link->transfer_tmpfile_fd = tmpfd; - link->transfer_tmpfile_name = zstrdup(tmpfile); - return; - -error: - if (tmpfd != -1) close(tmpfd); - connClose(link->sync_conn); - - /* Set the state to TOCONNECT, so the cron will retry start next time. */ - link->sync_state = CLUSTER_SLOTSYNC_STATE_TOCONNECT; -} - -#define SLOTSYNC_MAX_WRITTEN_BEFORE_FSYNC (8<<20) /* 8MB */ -void readSlotSyncBulkPayload(connection *conn) { - clusterSlotSyncLink *link = connGetPrivateData(conn); - char buf[PROTO_IOBUF_LEN]; - ssize_t nread, readlen, nwritten; - int use_diskless_load = useDisklessLoad(); - off_t left; - - if (server.loading == 1) { - serverLog(LL_NOTICE, "Waiting prev loading finish"); - return; - } - - /* Static vars used to hold the EOF mark, and the last bytes received - * from the server: when they match, we reached the end of the transfer. */ - static char eofmark[RDB_EOF_MARK_SIZE]; - static char lastbytes[RDB_EOF_MARK_SIZE]; - static int usemark = 0; - - /* If transfer_total_size == -1, we still have to read the bulk length - * from the slot owner reply. */ - if (link->transfer_total_size == -1) { - if (connSyncReadLine(conn, buf, 1024, server.repl_syncio_timeout * 1000) == -1) { - serverLog(LL_WARNING, "I/O error reading bulk count from slot owner: %s", connGetLastError(conn)); - goto error; - } - - if (buf[0] == '-') { - serverLog(LL_WARNING, "Slot owner aborted replication with an error: %s", buf + 1); - goto error; - } else if (buf[0] == '\0') { - /* At this stage just a newline works as a PING in order to take - * the connection live. So we refresh our last interaction - * timestamp. */ - link->transfer_lastio = server.unixtime; - return; - } else if (buf[0] != '$') { - serverLog(LL_WARNING,"Bad protocol from slot owner, the first byte is not '$' " - "(we received '%s'), are you sure the host and port are right?", buf); - goto error; - } - - /* There are two possible forms for the bulk payload. One is the - * usual $ bulk format. The other is used for diskless transfers - * when the master does not know beforehand the size of the file to - * transfer. In the latter case, the following format is used: - * - * $EOF:<40 bytes delimiter> - * - * At the end of the file the announced delimiter is transmitted. The - * delimiter is long and random enough that the probability of a - * collision with the actual file content can be ignored. */ - if (strncmp(buf+1, "EOF:", 4) == 0 && strlen(buf + 5) >= RDB_EOF_MARK_SIZE) { - usemark = 1; - memcpy(eofmark, buf + 5, RDB_EOF_MARK_SIZE); - memset(lastbytes, 0, RDB_EOF_MARK_SIZE); - /* Set any transfer_total_size to avoid entering this code path - * at the next call. */ - link->transfer_total_size = 0; - serverLog(LL_NOTICE, "Cluster slot sync: receiving streamed RDB from slot owner with EOF %s", - use_diskless_load ? "to parser": "to disk"); - } else { - usemark = 0; - link->transfer_total_size = strtol(buf+1,NULL,10); - serverLog(LL_NOTICE, "Cluster slot sync: receiving %lld bytes from slot owner %s", - (long long)link->transfer_total_size, use_diskless_load ? "to parser" : "to disk"); - } - return; - } - - if (!use_diskless_load) { - /* Read the data from the socket, store it to a file and search - * for the EOF. */ - if (usemark) { - readlen = sizeof(buf); - } else { - left = link->transfer_total_size - link->transfer_read_size; - readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); - } - - if (testInjectError("crs-io-error-recv-rdb")) { - serverLog(LL_WARNING, "inject crs-io-error-recv-rdb"); - goto error; - } - - nread = connRead(conn,buf,readlen); - if (nread <= 0) { - if (connGetState(conn) == CONN_STATE_CONNECTED) { - /* equivalent to EAGAIN */ - return; - } - serverLog(LL_WARNING, "I/O error trying to sync with slot owner: %s", - (nread == -1) ? connGetLastError(conn) : "connection lost"); - goto error; - } - // check stat_net_repl_input_bytes? - server.stat_net_input_bytes += nread; - - /* When a mark is used, we want to detect EOF asap in order to avoid - * writing the EOF mark into the file... */ - int eof_reached = 0; - - if (usemark) { - /* Update the last bytes array, and check if it matches our - * delimiter. */ - if (nread >= RDB_EOF_MARK_SIZE) { - memcpy(lastbytes, buf + nread - RDB_EOF_MARK_SIZE, RDB_EOF_MARK_SIZE); - } else { - int rem = RDB_EOF_MARK_SIZE - nread; - memmove(lastbytes, lastbytes + nread, rem); - memcpy(lastbytes + rem, buf, nread); - } - if (memcmp(lastbytes, eofmark, RDB_EOF_MARK_SIZE) == 0) - eof_reached = 1; - } - - if (testInjectError("crs-io-error-write-rdb")) { - serverLog(LL_WARNING, "inject crs-io-error-write-rdb"); - goto error; - } - - /* Update the last I/O time for the replication transfer (used in - * order to detect timeouts during replication), and write what we - * got from the socket to the dump file on disk. */ - link->transfer_lastio = server.unixtime; - if ((nwritten = write(link->transfer_tmpfile_fd,buf,nread)) != nread) { - serverLog(LL_WARNING, - "Write error or short write writing to the DB dump file " - "needed for cluster slot synchronization: %s", - (nwritten == -1) ? strerror(errno) : "short write"); - goto error; - } - link->transfer_read_size += nread; - - if (testInjectError("crs-io-error-truncate-rdb")) { - serverLog(LL_WARNING, "inject crs-io-error-truncate-rdb"); - goto error; - } - - /* Delete the last 40 bytes from the file if we reached EOF. */ - if (usemark && eof_reached) { - if (ftruncate(link->transfer_tmpfile_fd, link->transfer_read_size - RDB_EOF_MARK_SIZE) == -1) { - serverLog(LL_WARNING, - "Error truncating the RDB file received from the slot owner " - "for SYNC: %s", strerror(errno)); - goto error; - } - } - - /* Sync data on disk from time to time, otherwise at the end of the - * transfer we may suffer a big delay as the memory buffers are copied - * into the actual disk. */ - if (link->transfer_read_size >= link->transfer_last_fsync_off + SLOTSYNC_MAX_WRITTEN_BEFORE_FSYNC) { - off_t sync_size = link->transfer_read_size - link->transfer_last_fsync_off; - rdb_fsync_range(link->transfer_tmpfile_fd, link->transfer_last_fsync_off, sync_size); - link->transfer_last_fsync_off += sync_size; - } - - /* Check if the transfer is now complete */ - if (!usemark) { - if (link->transfer_read_size == link->transfer_total_size) eof_reached = 1; - } - - /* If the transfer is yet not complete, we need to read more, so - * return ASAP and wait for the handler to be called again. */ - if (!eof_reached) return; - } - - /* We reach this point in one of the following cases: - * - * 1. The replica is using diskless replication, that is, it reads data - * directly from the socket to the Redis memory, without using - * a temporary RDB file on disk. In that case we just block and - * read everything from the socket. - * - * 2. Or when we are done reading from the socket to the RDB file, in - * such case we want just to read the RDB file in memory. */ - - /* We need to stop any AOF rewriting child before flusing and parsing - * the RDB, otherwise we'll create a copy-on-write disaster. */ - if (server.aof_state != AOF_OFF) stopAppendOnly(); - - /* Before loading the DB into memory we need to delete the readable - * handler, otherwise it will get called recursively since - * rdbLoad() will call the event loop to process events from time to - * time for non blocking loading. - * - * And we are also using a bio to do the load, we need to make sure - * we won't enter this function again, make sure the connection won't - * be closed during the loading. */ - connSetReadHandler(conn, NULL); - - link->sync_state = CLUSTER_SLOTSYNC_STATE_RECV_RDB; - - serverLog(LL_NOTICE, "Cluster slot sync: Loading slot DB in memory"); - - /* Doing a bio RDB loading */ - rdbLoadJob *job = zmalloc(sizeof(rdbLoadJob)); - job->rdbflags = RDBFLAGS_REPLICATION | RDBFLAGS_SLOT_SYNC; - job->use_diskless_load = use_diskless_load; - job->usemark = usemark; - if (usemark) job->eofmark = sdsnew(eofmark); - job->link = link; - - link->db = initTempDb(); - link->functions_lib_ctx = functionsLibCtxCreate(); - link->sync_state = CLUSTER_SLOTSYNC_STATE_LOADING_RDB; - - bioCreateRdbLoadJob(bioRdbLoad, 1, job); - - return; - -error: - /* Reset the link state to TOCONNECT, the cron will retry start next time. */ - resetSlotSyncLinkForReconnect(link); - return; -} - /* ----------------------------------------------------------------------------- * Cluster functions related to slot sync messages exchange. * -------------------------------------------------------------------------- */ @@ -1151,7 +747,7 @@ void slotSyncMergeTempResources(clusterSlotSyncLink *link) { while ((ln2 = listNext(&li2)) != NULL) { slotRange *range = ln2->value; for (int j = 0; j < server.dbnum; j++) { - serverDb *src_db = link->db + j; + serverDb *src_db = link->temp_db + j; serverDb *dst_db = server.db + j; if (kvstoreSize(src_db->keys) == 0) continue; @@ -1163,11 +759,11 @@ void slotSyncMergeTempResources(clusterSlotSyncLink *link) { } /* Merge the function. */ - if (functionsLibCtxFunctionsLen(link->functions_lib_ctx)) { + if (functionsLibCtxFunctionsLen(link->temp_func_ctx)) { sds err = NULL; - if (libraryJoin(functionsLibCtxGetCurrent(), link->functions_lib_ctx, 1, &err) != C_OK) { - serverLog(LL_WARNING, "Discarding the merge of functions, an error occurred while merging functions" - " from the slot RDB, error: %s", err); + if (libraryJoin(functionsLibCtxGetCurrent(), link->temp_func_ctx, 1, &err) != C_OK) { + serverLog(LL_WARNING, "Discarding the merge of functions, an error occurred while merging functions " + "from the slot RDB, error: %s", err); } } } @@ -1184,7 +780,7 @@ void clusterSlotSyncCron(void) { while ((ln = listNext(&li)) != NULL) { link = ln->value; - if (link->sync_state == CLUSTER_SLOTSYNC_STATE_TOCONNECT) { + if (link->sync_state == REPL_STATE_CONNECT) { /* Firstly we delete the keys in the slots to avoid key corrupt. */ delkeysNotOwnedByMySelf(link->slot_ranges); @@ -1211,14 +807,14 @@ void clusterSlotSyncCron(void) { /* Really do the reconnect for this link. */ initSlotSyncLink(link, n, NULL); - } else if (link->sync_state == CLUSTER_SLOTSYNC_STATE_WAIT_SCHED) { + } else if (link->sync_state == REPL_STATE_WAIT_SCHED) { if (clusterGetSlotSyncLinkRank(link) == 0) { - link->sync_state = CLUSTER_SLOTSYNC_STATE_SEND_SYNC; - continueSlotSync(link); + link->sync_state = REPL_STATE_SEND_PSYNC; + syncWithSlotSyncPrimary(link->sync_conn); } - } else if (link->sync_state == CLUSTER_SLOTSYNC_STATE_LOADING_RDB) { + } else if (link->sync_state == REPL_STATE_LOADING) { /* Check the bio loading result. */ - } else if (link->sync_state == CLUSTER_SLOTSYNC_STATE_DONE_LOADING) { + } else if (link->sync_state == REPL_STATE_LOADED) { /* Create a client, here we don't mark the client as a primary for some reasons. * This client is used to receive the subsequent slot replication buffer, and * we set reply_off indicates that it does not need reply. */ @@ -1234,17 +830,41 @@ void clusterSlotSyncCron(void) { /* Merge the temp resources from link. */ serverLog(LL_NOTICE, "Slot RDB loading completed, merging the temp resources."); slotSyncMergeTempResources(link); - - /* After loading a slot RDB, drop replicas if exist. */ + discardTempDb(link->temp_db); + link->temp_db = NULL; + freeFunctionsAsync(link->temp_func_ctx); + link->temp_func_ctx = NULL; + + /* We are done loading a slot RDB and we are start a new replication + * history, we must discard the cached primary structure and force + * resync of sub-replicas. */ serverLog(LL_NOTICE, "Slot RDB loading completed, dropping the replicas if exist."); - disconnectReplicas(); - freeReplicationBacklog(); + replicationAttachToNewPrimary(); + changeReplicationId(); + clearReplicationId2(); + if (server.repl_backlog == NULL) createReplicationBacklog(); + + /* Restart the AOF subsystem now that we finished the sync. This + * will trigger an AOF rewrite, and when done will start appending + * to the new file. */ + if (server.aof_enabled) restartAOFAfterSYNC(); + + stopSlotLoading(1); + + link->sync_state = REPL_STATE_CONNECTED; - link->sync_state = CLUSTER_SLOTSYNC_STATE_CONNECTED; - } else if (link->sync_state == CLUSTER_SLOTSYNC_STATE_LOADING_FAIL) { + connNonBlock(link->sync_conn); + connRecvTimeout(link->sync_conn, 0); + + } else if (link->sync_state == REPL_STATE_LOAD_FAIL) { /* Check the bio loading result. */ resetSlotSyncLinkForReconnect(link); - } else if (link->sync_state == CLUSTER_SLOTSYNC_STATE_CONNECTED) { + + stopSlotLoading(1); + + connNonBlock(link->sync_conn); + connRecvTimeout(link->sync_conn, 0); + } else if (link->sync_state == REPL_STATE_CONNECTED) { if (link->client && link->slot_mf_end == 0) { slotLinkSendOnline(link->client); } @@ -1278,7 +898,7 @@ void clusterSlotSyncCron(void) { } /* Count the links that are ready to do slot failover. */ - if (link->sync_state == CLUSTER_SLOTSYNC_STATE_CONNECTED) { + if (link->sync_state == REPL_STATE_CONNECTED) { /* Keep to send ack until this link marked slot ready. */ if (!link->slot_mf_ready) { slotLinkSendAck(link->client); diff --git a/src/commands.def b/src/commands.def index 4a3140ab36..909e959592 100644 --- a/src/commands.def +++ b/src/commands.def @@ -1131,7 +1131,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = { {MAKE_CMD("slotfailover","Trigger a slot failover for the hash slots that in the slot links","O(N) where N is the total number of the slot sync links.","8.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTFAILOVER_History,0,CLUSTER_SLOTFAILOVER_Tips,0,clusterCommand,-2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SLOTFAILOVER_Keyspecs,0,NULL,1),.args=CLUSTER_SLOTFAILOVER_Args}, {MAKE_CMD("slotlink","Manage the slot sync links","O(N) where N is the total number of the slot sync links.","8.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTLINK_History,0,CLUSTER_SLOTLINK_Tips,0,clusterCommand,-3,CMD_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SLOTLINK_Keyspecs,0,NULL,1),.args=CLUSTER_SLOTLINK_Args}, {MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)}, -{MAKE_CMD("slotsync","Trigger a slot sync task for the specified hash slots","O(N) where N is the total number of the slots between the start slot and end slot arguments.","4.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTSYNC_History,0,CLUSTER_SLOTSYNC_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SLOTSYNC_Keyspecs,0,NULL,1),.args=CLUSTER_SLOTSYNC_Args}, +{MAKE_CMD("slotsync","Trigger a slot sync task for the specified hash slots","O(N) where N is the total number of the slots between the start slot and end slot arguments.","8.1.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTSYNC_History,0,CLUSTER_SLOTSYNC_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SLOTSYNC_Keyspecs,0,NULL,1),.args=CLUSTER_SLOTSYNC_Args}, {0} }; diff --git a/src/commands/cluster-slotsync.json b/src/commands/cluster-slotsync.json index fa712c6db2..abbf700cae 100644 --- a/src/commands/cluster-slotsync.json +++ b/src/commands/cluster-slotsync.json @@ -3,7 +3,7 @@ "summary": "Trigger a slot sync task for the specified hash slots", "complexity": "O(N) where N is the total number of the slots between the start slot and end slot arguments.", "group": "cluster", - "since": "4.0.0", + "since": "8.1.0", "arity": -4, "container": "CLUSTER", "function": "clusterCommand", diff --git a/src/db.c b/src/db.c index dbfd896da5..a9ac309305 100644 --- a/src/db.c +++ b/src/db.c @@ -1853,7 +1853,7 @@ keyStatus expireIfNeededWithDictIndex(serverDb *db, robj *key, int flags, int di if (server.current_client && (server.current_client->flag.primary)) return KEY_VALID; /* When replicating commands from the slot sync source node, * keys are never considered expired. */ - if (server.current_client->slotsync_link) return 0; + if (server.current_client && server.current_client->flag.slot_sync_primary) return KEY_VALID; if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED; } else if (server.import_mode) { /* If we are running in the import mode on a primary, instead of diff --git a/src/expire.c b/src/expire.c index c22df1ef86..a6f5790935 100644 --- a/src/expire.c +++ b/src/expire.c @@ -524,6 +524,7 @@ int checkAlreadyExpired(long long when) { * * If the server is a primary and in the import mode, we also add the already * expired key and wait for an explicit DEL from the import source. */ + if (server.current_client && server.current_client->flag.slot_sync_primary) return 0; return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host && !server.import_mode); } diff --git a/src/rdb.c b/src/rdb.c index abcba6b914..7b07ef149c 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2921,6 +2921,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) { /* Mark that we are loading in the global state and setup the fields * needed to provide loading stats. */ void startLoading(size_t size, int rdbflags, int async) { + if (rdbflags & RDBFLAGS_SLOT_SYNC) { + server.slot_loading = 1; + server.slot_loading_start_time = time(NULL); + server.slot_loading_loaded_bytes = 0; + server.slot_loading_total_bytes = size; + server.slot_loading_rdb_used_mem = 0; + return; + } + /* Load the DB */ server.loading = 1; if (async == 1) server.async_loading = 1; @@ -2931,16 +2940,6 @@ void startLoading(size_t size, int rdbflags, int async) { server.rdb_last_load_keys_expired = 0; server.rdb_last_load_keys_loaded = 0; blockingOperationStarts(); - - // todo, check if we need to reset other vars. - /* When doing a slot RDB loading, we don't set loading flag so that - * the target node can still process the requests. */ - if (rdbflags & RDBFLAGS_SLOT_SYNC) { - server.loading = 0; - } else { - server.loading = 1; - } - /* Fire the loading modules start event. */ int subevent; if (rdbflags & RDBFLAGS_AOF_PREAMBLE) @@ -2989,6 +2988,11 @@ void stopLoading(int success) { success ? VALKEYMODULE_SUBEVENT_LOADING_ENDED : VALKEYMODULE_SUBEVENT_LOADING_FAILED, NULL); } +void stopSlotLoading(int success) { + UNUSED(success); + server.slot_loading = 0; +} + void startSaving(int rdbflags) { /* Fire the persistence modules start event. */ int subevent; @@ -3393,7 +3397,8 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin sdsfree(key); goto eoferr; } - } else if (iAmPrimary() && !isSlotSyncInProgress() && !(rdbflags & RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) { + } else if (iAmPrimary() && !(rdbflags & RDBFLAGS_SLOT_SYNC) && !(rdbflags & RDBFLAGS_AOF_PREAMBLE) && + expiretime != -1 && expiretime < now) { if (rdbflags & RDBFLAGS_FEED_REPL) { /* Caller should have created replication backlog, * and now this path only works when rebooting, @@ -3417,7 +3422,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin int added = dbAddRDBLoad(db, key, val); server.rdb_last_load_keys_loaded++; if (!added) { - if (rdbflags & RDBFLAGS_ALLOW_DUP) { + if (rdbflags & RDBFLAGS_ALLOW_DUP || rdbflags & RDBFLAGS_SLOT_SYNC) { /* This flag is useful for DEBUG RELOAD special modes. * When it's set we allow new keys to replace the current * keys with the same name. */ @@ -3558,8 +3563,8 @@ void bioRdbLoad(void *args[]) { int use_diskless_load = job->use_diskless_load; int usemark = job->usemark; sds eofmark = job->eofmark; - serverDb *db = job->link->db; - functionsLibCtx *functions_lib_ctx = job->link->functions_lib_ctx; + serverDb *db = job->link->temp_db; + functionsLibCtx *functions_lib_ctx = job->link->temp_func_ctx; clusterSlotSyncLink *link = job->link; connection *conn = link->sync_conn; @@ -3572,116 +3577,52 @@ void bioRdbLoad(void *args[]) { if (use_diskless_load) { rio rdb; - int async = 0; /* Do not use async loading. */ - rioInitWithConn(&rdb, conn, link->transfer_total_size); - - /* Put the socket in blocking mode to simplify RDB transfer. - * We'll restore it when the RDB is received. */ - connBlock(conn); - connRecvTimeout(conn, server.repl_timeout * 1000); - startLoading(link->transfer_total_size, rdbflags, async); - + rioInitWithConn(&rdb, conn, link->repl_transfer_size); if (rdbLoadRioWithLoadingCtx(&rdb, rdbflags, &rsi, &loadingCtx) != C_OK) { /* RDB loading failed. */ - stopLoading(0); serverLog(LL_WARNING, "Failed trying to load the slot owner synchronization DB from socket"); rioFreeConn(&rdb, NULL); - goto error; - } - - /* Verify the end mark is correct. */ - if (usemark) { + goto load_error; + } else if (usemark) { + /* Verify the end mark is correct. */ if (!rioRead(&rdb, buf, RDB_EOF_MARK_SIZE) || memcmp(buf, eofmark, RDB_EOF_MARK_SIZE) != 0) { - stopLoading(0); serverLog(LL_WARNING, "Replication stream EOF marker is broken"); rioFreeConn(&rdb, NULL); - goto error; + goto load_error; } } - - stopLoading(1); - - /* Cleanup and restore the socket to the original state to continue - * with the normal replication. */ rioFreeConn(&rdb, NULL); - connNonBlock(conn); - connRecvTimeout(conn, 0); } else { - /* Make sure the new file (also used for persistence) is fully synced - * (not covered by earlier calls to rdb_fsync_range). */ - if (fsync(link->transfer_tmpfile_fd) == -1) { - serverLog(LL_WARNING, - "Failed trying to sync the temp DB to disk in " - "Cluster slot synchronization: %s", - strerror(errno)); - goto error; - } - - if (testInjectError("crs-io-error-rename-rdb")) { - serverLog(LL_WARNING, "inject crs-io-error-rename-rdb"); - goto error; - } - - /* Rename slot rdb like renaming rewrite aof asynchronously. */ sprintf(rdbpath, "%s_slot", server.rdb_filename); - int old_rdb_fd = open(rdbpath, O_RDONLY | O_NONBLOCK); - if (rename(link->transfer_tmpfile_name, rdbpath) == -1) { - serverLog(LL_WARNING, - "Failed trying to rename the temp slot DB into %s in " - "cluster slot synchronization: %s", - rdbpath, strerror(errno)); - if (old_rdb_fd != -1) close(old_rdb_fd); - goto error; - } - /* Close old rdb asynchronously. */ - if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd, 0, 1); - int load_result = rdbLoadWithLoadingCtx((char *)rdbpath, &rsi, rdbflags, &loadingCtx); + bg_unlink(rdbpath); + if (load_result != RDB_OK) { serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization " "slot DB from disk, check server logs."); + goto load_error; } - if (server.rdb_del_sync_files && allPersistenceDisabled()) { - serverLog(LL_NOTICE, "Removing the slot RDB file obtained from " - "the slot owner. This replica has persistence " - "disabled"); - bg_unlink(rdbpath); - } - - if (load_result != RDB_OK) { - goto error; - } - - zfree(link->transfer_tmpfile_name); - close(link->transfer_tmpfile_fd); - link->transfer_tmpfile_fd = -1; - link->transfer_tmpfile_name = NULL; + zfree(link->repl_transfer_tmpfile); + close(link->repl_transfer_fd); + link->repl_transfer_fd = -1; + link->repl_transfer_tmpfile = NULL; } - /* Set to a value large enough after first init. */ link->slot_mf_lag = SLOTSYNC_DEFAULT_LAG; - // todo - /* Restart the AOF subsystem now that we finished the sync. This - * will trigger an AOF rewrite, and when done will start appending - * to the new file. */ -// if (server.aof_enabled) restartAOFAfterSYNC(); - + /* Mark the synchronization has done. */ + serverLog(LL_WARNING, "CLUSTER_SLOTSYNC_STATE_DONE_LOADING"); if (job->usemark) sdsfree(job->eofmark); - + link->sync_state = REPL_STATE_LOADED; zfree(job); - - /* Mark the synchronization has done. */ - link->sync_state = CLUSTER_SLOTSYNC_STATE_DONE_LOADING; - return; -error: +load_error: if (job->usemark) sdsfree(job->eofmark); - link->sync_state = CLUSTER_SLOTSYNC_STATE_LOADING_FAIL; + link->sync_state = REPL_STATE_LOAD_FAIL; zfree(job); } diff --git a/src/replication.c b/src/replication.c index 18cc32c2af..194d2f4fdc 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2237,11 +2237,12 @@ void restartAOFAfterSYNC(void) { } } -int useDisklessLoad(void) { +int useDisklessLoad(int slot_sync) { /* compute boolean decision to use diskless load */ int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || server.repl_diskless_load == REPL_DISKLESS_LOAD_FLUSH_BEFORE_LOAD || - (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount() == 0); + (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && + (dbTotalServerKeyCount() == 0 || slot_sync)); if (enabled) { /* Check all modules handle read errors, otherwise it's not safe to use diskless load. */ @@ -2299,17 +2300,23 @@ void replicationAttachToNewPrimary(void) { freeReplicationBacklog(); /* Don't allow our chained replicas to PSYNC. */ } -/* Asynchronously read the SYNC payload we receive from a primary */ +/* Asynchronously read the SYNC/PSYNC payload we receive from a node. */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024 * 1024 * 8) /* 8 MB */ -void readSyncBulkPayload(connection *conn) { +void readSyncBulkPayloadImpl(connection *conn, int slot_sync) { char buf[PROTO_IOBUF_LEN]; ssize_t nread, readlen, nwritten; - int use_diskless_load = useDisklessLoad(); + int use_diskless_load = useDisklessLoad(slot_sync); serverDb *diskless_load_tempDb = NULL; functionsLibCtx *temp_functions_lib_ctx = NULL; int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; off_t left; + clusterSlotSyncLink *link = !slot_sync ? NULL : connGetPrivateData(conn); + serverDb *slotsync_load_tempDb = NULL; + functionsLibCtx *slotsync_temp_functions_lib_ctx = NULL; + const char *node_primary = !slot_sync ? "PRIMARY" : "SLOT PRIMARY"; + const char *node_replica = !slot_sync ? "REPLICA" : "SLOT REPLICA"; + /* Static vars used to hold the EOF mark, and the last bytes received * from the server: when they match, we reached the end of the transfer. */ static char eofmark[RDB_EOF_MARK_SIZE]; @@ -2318,10 +2325,10 @@ void readSyncBulkPayload(connection *conn) { /* If repl_transfer_size == -1 we still have to read the bulk length * from the primary reply. */ - if (server.repl_transfer_size == -1) { + if ((!slot_sync && server.repl_transfer_size == -1) || (slot_sync && link->repl_transfer_size == -1)) { nread = connSyncReadLine(conn, buf, 1024, server.repl_syncio_timeout * 1000); if (nread == -1) { - serverLog(LL_WARNING, "I/O error reading bulk count from PRIMARY: %s", connGetLastError(conn)); + serverLog(LL_WARNING, "I/O error reading bulk count from %s: %s", node_primary, connGetLastError(conn)); goto error; } else { /* nread here is returned by connSyncReadLine(), which calls syncReadLine() and @@ -2330,18 +2337,23 @@ void readSyncBulkPayload(connection *conn) { } if (buf[0] == '-') { - serverLog(LL_WARNING, "PRIMARY aborted replication with an error: %s", buf + 1); + serverLog(LL_WARNING, "%s aborted replication with an error: %s", node_primary, buf + 1); goto error; } else if (buf[0] == '\0') { /* At this stage just a newline works as a PING in order to take * the connection live. So we refresh our last interaction * timestamp. */ - server.repl_transfer_lastio = server.unixtime; + if (!slot_sync) { + server.repl_transfer_lastio = server.unixtime; + } else { + link->repl_transfer_lastio = server.unixtime; + } return; } else if (buf[0] != '$') { serverLog(LL_WARNING, - "Bad protocol from PRIMARY, the first byte is not '$' (we received '%s'), are you sure the host " + "Bad protocol from %s, the first byte is not '$' (we received '%s'), are you sure the host " "and port are right?", + node_primary, buf); goto error; } @@ -2362,14 +2374,25 @@ void readSyncBulkPayload(connection *conn) { memset(lastbytes, 0, RDB_EOF_MARK_SIZE); /* Set any repl_transfer_size to avoid entering this code path * at the next call. */ - server.repl_transfer_size = 0; - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: receiving streamed RDB from primary with EOF %s", + if (!slot_sync) { + server.repl_transfer_size = 0; + } else { + link->repl_transfer_size = 0; + } + serverLog(LL_NOTICE, "%s <-> %s sync: receiving streamed RDB from primary with EOF %s", + node_primary, node_replica, use_diskless_load ? "to parser" : "to disk"); } else { usemark = 0; - server.repl_transfer_size = strtol(buf + 1, NULL, 10); - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: receiving %lld bytes from primary %s", - (long long)server.repl_transfer_size, use_diskless_load ? "to parser" : "to disk"); + off_t transfer_size = strtol(buf + 1, NULL, 10); + if (!slot_sync) { + server.repl_transfer_size = transfer_size; + } else { + link->repl_transfer_size = transfer_size; + } + serverLog(LL_NOTICE, "%s <-> %s sync: receiving %lld bytes from primary %s", + node_primary, node_replica, + (long long)transfer_size, use_diskless_load ? "to parser" : "to disk"); } return; } @@ -2380,17 +2403,27 @@ void readSyncBulkPayload(connection *conn) { if (usemark) { readlen = sizeof(buf); } else { - left = server.repl_transfer_size - server.repl_transfer_read; + if (!slot_sync) { + left = server.repl_transfer_size - server.repl_transfer_read; + } else { + left = link->repl_transfer_size - link->repl_transfer_read; + } readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); } + if (testInjectError("crs-io-error-recv-rdb")) { + serverLog(LL_WARNING, "inject crs-io-error-recv-rdb"); + goto error; + } + nread = connRead(conn, buf, readlen); if (nread <= 0) { if (connGetState(conn) == CONN_STATE_CONNECTED) { /* equivalent to EAGAIN */ return; } - serverLog(LL_WARNING, "I/O error trying to sync with PRIMARY: %s", + serverLog(LL_WARNING, "I/O error trying to sync with %s: %s", + node_primary, (nread == -1) ? connGetLastError(conn) : "connection lost"); goto error; } @@ -2413,25 +2446,54 @@ void readSyncBulkPayload(connection *conn) { if (memcmp(lastbytes, eofmark, RDB_EOF_MARK_SIZE) == 0) eof_reached = 1; } + if (testInjectError("crs-io-error-write-rdb")) { + serverLog(LL_WARNING, "inject crs-io-error-write-rdb"); + goto error; + } + /* Update the last I/O time for the replication transfer (used in * order to detect timeouts during replication), and write what we * got from the socket to the dump file on disk. */ - server.repl_transfer_lastio = server.unixtime; - if ((nwritten = write(server.repl_transfer_fd, buf, nread)) != nread) { + int transfer_fd; + if (!slot_sync) { + transfer_fd = server.repl_transfer_fd; + server.repl_transfer_lastio = server.unixtime; + } else { + transfer_fd = link->repl_transfer_fd; + link->repl_transfer_lastio = server.unixtime; + } + if ((nwritten = (write(transfer_fd, buf, nread))) != nread) { serverLog(LL_WARNING, "Write error or short write writing to the DB dump file " - "needed for PRIMARY <-> REPLICA synchronization: %s", + "needed for %s <-> %s synchronization: %s", + node_primary, node_replica, (nwritten == -1) ? strerror(errno) : "short write"); goto error; } - server.repl_transfer_read += nread; + if (!slot_sync) { + server.repl_transfer_read += nread; + } else { + link->repl_transfer_read += nread; + } + + if (testInjectError("crs-io-error-truncate-rdb")) { + serverLog(LL_WARNING, "inject crs-io-error-truncate-rdb"); + goto error; + } /* Delete the last 40 bytes from the file if we reached EOF. */ if (usemark && eof_reached) { - if (ftruncate(server.repl_transfer_fd, server.repl_transfer_read - RDB_EOF_MARK_SIZE) == -1) { + off_t transfer_read; + if (!slot_sync) { + transfer_read = server.repl_transfer_read; + } else { + transfer_read = link->repl_transfer_read; + } + if (ftruncate(transfer_fd, transfer_read - RDB_EOF_MARK_SIZE) == -1) { serverLog(LL_WARNING, - "Error truncating the RDB file received from the primary " + "Error truncating the RDB file received from the %s " "for SYNC: %s", + node_primary, strerror(errno)); goto error; } @@ -2440,15 +2502,27 @@ void readSyncBulkPayload(connection *conn) { /* Sync data on disk from time to time, otherwise at the end of the * transfer we may suffer a big delay as the memory buffers are copied * into the actual disk. */ - if (server.repl_transfer_read >= server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) { - off_t sync_size = server.repl_transfer_read - server.repl_transfer_last_fsync_off; - rdb_fsync_range(server.repl_transfer_fd, server.repl_transfer_last_fsync_off, sync_size); - server.repl_transfer_last_fsync_off += sync_size; + if (!slot_sync) { + if (server.repl_transfer_read >= server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) { + off_t sync_size = server.repl_transfer_read - server.repl_transfer_last_fsync_off; + rdb_fsync_range(server.repl_transfer_fd, server.repl_transfer_last_fsync_off, sync_size); + server.repl_transfer_last_fsync_off += sync_size; + } + } else { + if (link->repl_transfer_read >= link->repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) { + off_t sync_size = link->repl_transfer_read - link->repl_transfer_last_fsync_off; + rdb_fsync_range(link->repl_transfer_fd, link->repl_transfer_last_fsync_off, sync_size); + link->repl_transfer_last_fsync_off += sync_size; + } } /* Check if the transfer is now complete */ if (!usemark) { - if (server.repl_transfer_read == server.repl_transfer_size) eof_reached = 1; + if (!slot_sync) { + if (server.repl_transfer_read == server.repl_transfer_size) eof_reached = 1; + } else { + if (link->repl_transfer_read == link->repl_transfer_size) eof_reached = 1; + } } /* If the transfer is yet not complete, we need to read more, so @@ -2475,21 +2549,26 @@ void readSyncBulkPayload(connection *conn) { if (server.child_type == CHILD_TYPE_RDB) { if (!use_diskless_load) { serverLog(LL_NOTICE, - "Replica is about to load the RDB file received from the " - "primary, but there is a pending RDB child running. " + "%s is about to load the RDB file received from the " + "%s, but there is a pending RDB child running. " "Killing process %ld and removing its temp file to avoid " "any race", + node_replica, node_primary, (long)server.child_pid); } killRDBChild(); } - if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { - /* Initialize empty tempDb dictionaries. */ + if (!slot_sync && use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + /* Initialize empty tempDb and temp function ctx. */ diskless_load_tempDb = disklessLoadInitTempDb(); temp_functions_lib_ctx = disklessLoadFunctionsLibCtxCreate(); moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, NULL); + } else if (slot_sync) { + /* Initialize empty tempDb and temp function ctx. */ + slotsync_load_tempDb = initTempDb(); + slotsync_temp_functions_lib_ctx = functionsLibCtxCreate(); } /* Before loading the DB into memory we need to delete the readable @@ -2497,9 +2576,8 @@ void readSyncBulkPayload(connection *conn) { * rdbLoad() will call the event loop to process events from time to * time for non blocking loading. */ connSetReadHandler(conn, NULL); - rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; - if (use_diskless_load) { + if (!slot_sync && use_diskless_load) { rio rdb; serverDb *dbarray; functionsLibCtx *functions_lib_ctx; @@ -2541,7 +2619,7 @@ void readSyncBulkPayload(connection *conn) { int loadingFailed = 0; rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx}; - if (rdbLoadRioWithLoadingCtx(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) { + if (rdbLoadRioWithLoadingCtx(&rdb, RDBFLAGS_REPLICATION | RDBFLAGS_SLOT_SYNC, &rsi, &loadingCtx) != C_OK) { /* RDB loading failed. */ serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB " "from socket, check server logs."); @@ -2549,7 +2627,7 @@ void readSyncBulkPayload(connection *conn) { } else if (usemark) { /* Verify the end mark is correct. */ if (!rioRead(&rdb, buf, RDB_EOF_MARK_SIZE) || memcmp(buf, eofmark, RDB_EOF_MARK_SIZE) != 0) { - serverLog(LL_WARNING, "Replication stream EOF marker is broken"); + serverLog(LL_WARNING, "Replication stream EOF marker from PRIMARY is broken"); loadingFailed = 1; } } @@ -2609,24 +2687,65 @@ void readSyncBulkPayload(connection *conn) { rioFreeConn(&rdb, NULL); connNonBlock(conn); connRecvTimeout(conn, 0); + } else if (slot_sync && use_diskless_load) { + /* Put the socket in blocking mode to simplify RDB transfer. + * We'll restore it when the RDB is received. */ + connBlock(conn); + connRecvTimeout(conn, server.repl_timeout * 1000); + + /* Before loading the DB into memory we need to delete the readable + * handler since we are doing a bio load. */ + connSetReadHandler(conn, NULL); + + startLoading(link->repl_transfer_size, RDBFLAGS_REPLICATION | RDBFLAGS_SLOT_SYNC, 0); + + link->temp_db = slotsync_load_tempDb; + link->temp_func_ctx = slotsync_temp_functions_lib_ctx; + link->sync_state = REPL_STATE_LOADING; + + /* Doing a bio RDB loading */ + rdbLoadJob *job = zmalloc(sizeof(rdbLoadJob)); + job->rdbflags = RDBFLAGS_REPLICATION | RDBFLAGS_SLOT_SYNC; + job->use_diskless_load = use_diskless_load; + job->usemark = usemark; + job->eofmark = usemark ? sdsnew(eofmark) : NULL; + job->link = link; + bioCreateRdbLoadJob(bioRdbLoad, 1, job); } else { /* Make sure the new file (also used for persistence) is fully synced * (not covered by earlier calls to rdb_fsync_range). */ - if (fsync(server.repl_transfer_fd) == -1) { + int transfer_fd = !slot_sync ? server.repl_transfer_fd : link->repl_transfer_fd; + if (fsync(transfer_fd) == -1) { serverLog(LL_WARNING, "Failed trying to sync the temp DB to disk in " - "PRIMARY <-> REPLICA synchronization: %s", + "%s <-> %s synchronization: %s", + node_primary, node_replica, strerror(errno)); goto error; } + if (testInjectError("crs-io-error-rename-rdb")) { + serverLog(LL_WARNING, "inject crs-io-error-rename-rdb"); + goto error; + } + /* Rename rdb like renaming rewrite aof asynchronously. */ - int old_rdb_fd = open(server.rdb_filename, O_RDONLY | O_NONBLOCK); - if (rename(server.repl_transfer_tmpfile, server.rdb_filename) == -1) { + int old_rdb_fd; + int rename_res; + char rdbpath[1024]; + if (!slot_sync) { + old_rdb_fd = open(server.rdb_filename, O_RDONLY | O_NONBLOCK); + rename_res = rename(server.repl_transfer_tmpfile, server.rdb_filename); + } else { + sprintf(rdbpath, "%s_slot", server.rdb_filename); + old_rdb_fd = open(rdbpath, O_RDONLY | O_NONBLOCK); + rename_res = rename(link->repl_transfer_tmpfile, rdbpath); + } + if (rename_res == -1) { serverLog(LL_WARNING, "Failed trying to rename the temp DB into %s in " - "PRIMARY <-> REPLICA synchronization: %s", - server.rdb_filename, strerror(errno)); + "%s <-> %s synchronization: %s", + server.rdb_filename, node_primary, node_replica, strerror(errno)); if (old_rdb_fd != -1) close(old_rdb_fd); goto error; } @@ -2634,96 +2753,120 @@ void readSyncBulkPayload(connection *conn) { if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd, 0, 0); /* Sync the directory to ensure rename is persisted */ - if (fsyncFileDir(server.rdb_filename) == -1) { + if ((!slot_sync && fsyncFileDir(server.rdb_filename) == -1) || (slot_sync && fsyncFileDir(rdbpath) == -1)) { serverLog(LL_WARNING, "Failed trying to sync DB directory %s in " - "PRIMARY <-> REPLICA synchronization: %s", + "%s <-> %s synchronization: %s", + node_primary, node_replica, server.rdb_filename, strerror(errno)); goto error; } - /* We will soon start loading the RDB from disk, the replication history is changed, - * we must discard the cached primary structure and force resync of sub-replicas. */ - replicationAttachToNewPrimary(); + if (!slot_sync) { + /* We will soon start loading the RDB from disk, the replication history is changed, + * we must discard the cached primary structure and force resync of sub-replicas. */ + replicationAttachToNewPrimary(); + + /* Empty the databases only after the RDB file is ok, that is, before the RDB file + * is actually loaded, in case we encounter an error and drop the replication stream + * and leave an empty database. */ + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); + emptyData(-1, empty_db_flags, replicationEmptyDbCallback); - /* Empty the databases only after the RDB file is ok, that is, before the RDB file - * is actually loaded, in case we encounter an error and drop the replication stream - * and leave an empty database. */ - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); - emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); - if (rdbLoad(server.rdb_filename, &rsi, RDBFLAGS_REPLICATION) != RDB_OK) { - serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization " - "DB from disk, check server logs."); - if (server.rdb_del_sync_files && allPersistenceDisabled()) { - serverLog(LL_NOTICE, "Removing the RDB file obtained from " - "the primary. This replica has persistence " - "disabled"); - bg_unlink(server.rdb_filename); + if (rdbLoad(server.rdb_filename, &rsi, RDBFLAGS_REPLICATION) != RDB_OK) { + serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization " + "DB from disk, check server logs."); + if (server.rdb_del_sync_files && allPersistenceDisabled()) { + serverLog(LL_NOTICE, "Removing the RDB file obtained from " + "the primary. This replica has persistence " + "disabled"); + bg_unlink(server.rdb_filename); + } + + /* If disk-based RDB loading fails, remove the half-loaded dataset. */ + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); + emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + + /* Note that there's no point in restarting the AOF on sync failure, + it'll be restarted when sync succeeds or replica promoted. */ + goto error; } + } else { + startLoading(link->repl_transfer_size, RDBFLAGS_REPLICATION | RDBFLAGS_SLOT_SYNC, 0); - /* If disk-based RDB loading fails, remove the half-loaded dataset. */ - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); - emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + link->temp_db = slotsync_load_tempDb; + link->temp_func_ctx = slotsync_temp_functions_lib_ctx; + link->sync_state = REPL_STATE_LOADING; - /* Note that there's no point in restarting the AOF on sync failure, - it'll be restarted when sync succeeds or replica promoted. */ - goto error; + /* Doing a bio RDB loading */ + rdbLoadJob *job = zmalloc(sizeof(rdbLoadJob)); + job->rdbflags = RDBFLAGS_REPLICATION | RDBFLAGS_SLOT_SYNC; + job->use_diskless_load = 0; + job->usemark = 0; + job->eofmark = NULL; + job->link = link; + bioCreateRdbLoadJob(bioRdbLoad, 1, job); } /* Cleanup. */ - if (server.rdb_del_sync_files && allPersistenceDisabled()) { + if (!slot_sync && server.rdb_del_sync_files && allPersistenceDisabled()) { serverLog(LL_NOTICE, "Removing the RDB file obtained from " "the primary. This replica has persistence " "disabled"); bg_unlink(server.rdb_filename); } - zfree(server.repl_transfer_tmpfile); - close(server.repl_transfer_fd); - server.repl_transfer_fd = -1; - server.repl_transfer_tmpfile = NULL; + if (!slot_sync) { + zfree(server.repl_transfer_tmpfile); + close(server.repl_transfer_fd); + server.repl_transfer_fd = -1; + server.repl_transfer_tmpfile = NULL; + } } /* Final setup of the connected replica <- primary link */ if (conn == server.repl_rdb_transfer_s) { dualChannelSyncHandleRdbLoadCompletion(); - } else { + } else if (!slot_sync) { replicationCreatePrimaryClient(server.repl_transfer_s, rsi.repl_stream_db); server.repl_state = REPL_STATE_CONNECTED; /* Send the initial ACK immediately to put this replica in online state. */ replicationSendAck(); } - server.repl_down_since = 0; - /* Fire the primary link modules event. */ - moduleFireServerEvent(VALKEYMODULE_EVENT_PRIMARY_LINK_CHANGE, VALKEYMODULE_SUBEVENT_PRIMARY_LINK_UP, NULL); - if (server.repl_state == REPL_STATE_CONNECTED) { - /* After a full resynchronization we use the replication ID and - * offset of the primary. The secondary ID / offset are cleared since - * we are starting a new history. */ - memcpy(server.replid, server.primary->replid, sizeof(server.replid)); - server.primary_repl_offset = server.primary->reploff; - } - clearReplicationId2(); - - /* Let's create the replication backlog if needed. Replicas need to - * accumulate the backlog regardless of the fact they have sub-replicas - * or not, in order to behave correctly if they are promoted to - * primaries after a failover. */ - if (server.repl_backlog == NULL) createReplicationBacklog(); - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Finished with success"); + if (!slot_sync) { + server.repl_down_since = 0; - if (server.supervised_mode == SUPERVISED_SYSTEMD) { - serverCommunicateSystemd("STATUS=PRIMARY <-> REPLICA sync: Finished with success. Ready to accept connections " - "in read-write mode.\n"); - } + /* Fire the primary link modules event. */ + moduleFireServerEvent(VALKEYMODULE_EVENT_PRIMARY_LINK_CHANGE, VALKEYMODULE_SUBEVENT_PRIMARY_LINK_UP, NULL); + if (server.repl_state == REPL_STATE_CONNECTED) { + /* After a full resynchronization we use the replication ID and + * offset of the primary. The secondary ID / offset are cleared since + * we are starting a new history. */ + memcpy(server.replid, server.primary->replid, sizeof(server.replid)); + server.primary_repl_offset = server.primary->reploff; + } + clearReplicationId2(); - /* Restart the AOF subsystem now that we finished the sync. This - * will trigger an AOF rewrite, and when done will start appending - * to the new file. */ - if (server.aof_enabled) restartAOFAfterSYNC(); + /* Let's create the replication backlog if needed. Replicas need to + * accumulate the backlog regardless of the fact they have sub-replicas + * or not, in order to behave correctly if they are promoted to + * primaries after a failover. */ + if (server.repl_backlog == NULL) createReplicationBacklog(); + serverLog(LL_NOTICE, "%s <-> %s sync: Finished with success", node_primary, node_replica); + + if (server.supervised_mode == SUPERVISED_SYSTEMD) { + serverCommunicateSystemd("STATUS=PRIMARY <-> REPLICA sync: Finished with success. Ready to accept connections " + "in read-write mode.\n"); + } + + /* Restart the AOF subsystem now that we finished the sync. This + * will trigger an AOF rewrite, and when done will start appending + * to the new file. */ + if (server.aof_enabled) restartAOFAfterSYNC(); + } /* In case of dual channel replication sync we want to close the RDB connection * once the connection is established */ @@ -2734,18 +2877,43 @@ void readSyncBulkPayload(connection *conn) { return; error: + if (slot_sync) goto slot_sync_error; + cancelReplicationHandshake(1); return; + +slot_sync_error: + if (slotsync_load_tempDb) discardTempDb(slotsync_load_tempDb); + if (slotsync_temp_functions_lib_ctx) freeFunctionsAsync(slotsync_temp_functions_lib_ctx); + + /* Reset the link state to CONNECT, the cron will retry start next time. */ + resetSlotSyncLinkForReconnect(link); + return; +} + +/* Asynchronously read the SYNC/PSYNC payload we receive from a primary. */ +void readSyncBulkPayload(connection *conn) { + readSyncBulkPayloadImpl(conn, 0); } -char *receiveSynchronousResponse(connection *conn) { +/* Asynchronously read the SYNC/PSYNC payload we receive from a slow owner. */ +void readSlotSyncBulkPayload(connection *conn) { + readSyncBulkPayloadImpl(conn, 1); +} + +char *receiveSynchronousResponse(connection *conn, int slot_sync) { char buf[256]; /* Read the reply from the server. */ if (connSyncReadLine(conn, buf, sizeof(buf), server.repl_syncio_timeout * 1000) == -1) { serverLog(LL_WARNING, "Failed to read response from the server: %s", connGetLastError(conn)); return NULL; } - server.repl_transfer_lastio = server.unixtime; + if (!slot_sync) { + server.repl_transfer_lastio = server.unixtime; + } else { + clusterSlotSyncLink *link = connGetPrivateData(conn); + link->repl_transfer_lastio = server.unixtime; + } return sdsnew(buf); } @@ -2933,7 +3101,7 @@ static int dualChannelReplHandleHandshake(connection *conn, sds *err) { } static int dualChannelReplHandleAuthReply(connection *conn, sds *err) { - *err = receiveSynchronousResponse(conn); + *err = receiveSynchronousResponse(conn, 0); if (*err == NULL) { dualChannelServerLog(LL_WARNING, "Primary did not respond to auth command during SYNC handshake"); return C_ERR; @@ -2947,7 +3115,7 @@ static int dualChannelReplHandleAuthReply(connection *conn, sds *err) { } static int dualChannelReplHandleReplconfReply(connection *conn, sds *err) { - *err = receiveSynchronousResponse(conn); + *err = receiveSynchronousResponse(conn, 0); if (*err == NULL) { dualChannelServerLog(LL_WARNING, "Primary did not respond to replconf command during SYNC handshake"); return C_ERR; @@ -2967,7 +3135,7 @@ static int dualChannelReplHandleReplconfReply(connection *conn, sds *err) { static int dualChannelReplHandleEndOffsetResponse(connection *conn, sds *err) { uint64_t rdb_client_id; - *err = receiveSynchronousResponse(conn); + *err = receiveSynchronousResponse(conn, 0); if (*err == NULL) { return C_ERR; } @@ -3338,13 +3506,40 @@ void dualChannelSyncHandleRdbLoadCompletion(void) { #define PSYNC_NOT_SUPPORTED 4 #define PSYNC_TRY_LATER 5 #define PSYNC_FULLRESYNC_DUAL_CHANNEL 6 -int replicaTryPartialResynchronization(connection *conn, int read_reply) { +#define PSYNC_FOR_SLOT_SYNC 7 +int replicaTryPartialResynchronization(connection *conn, int read_reply, int slot_sync) { char *psync_replid; char psync_offset[32]; sds reply; /* Writing half */ if (!read_reply) { + /* If this is a slot sync, in the writing half, we will send SYNC to the slow primary. */ + if (slot_sync) { + /* Simulate IO error. */ + if (testInjectError("crs-io-error-beforce-send-sync")) { + serverLog(LL_WARNING, "inject crs-io-error-beforce-send-sync"); + return PSYNC_WRITE_ERROR; + } + + clusterSlotSyncLink *link = connGetPrivateData(conn); + sds sync_cmd = sdscatprintf(sdsempty(), "SYNC "); + sds slot_ranges = reprSlotRangeListWithBlank(link->slot_ranges); + sync_cmd = sdscatsds(sync_cmd, slot_ranges); + sync_cmd = sdscatprintf(sync_cmd, "\r\n"); + reply = sendCommandRaw(conn, sync_cmd); + sdsfree(slot_ranges); + sdsfree(sync_cmd); + + if (reply != NULL) { + serverLog(LL_WARNING, "Unable to send SYNC to slot primary: %s", reply); + sdsfree(reply); + connSetReadHandler(conn, NULL); + return PSYNC_WRITE_ERROR; + } + return PSYNC_WAIT_REPLY; + } + /* Initially set primary_initial_offset to -1 to mark the current * primary replid and offset as not valid. Later if we'll be able to do * a FULL resync using the PSYNC command we'll set the offset at the @@ -3388,7 +3583,12 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) { } /* Reading half */ - reply = receiveSynchronousResponse(conn); + if (slot_sync) { + /* If this is a slot sync, in the reading half, we just return PSYNC_FOR_SLOT_SYNC. + * We may need a +FULLSLOTSYNC interaction. */ + return PSYNC_FOR_SLOT_SYNC; + } + reply = receiveSynchronousResponse(conn, slot_sync); /* Primary did not reply to PSYNC */ if (reply == NULL) { connSetReadHandler(conn, NULL); @@ -3533,6 +3733,7 @@ sds getTryPsyncString(int result) { case PSYNC_NOT_SUPPORTED: return sdsnew("PSYNC_NOT_SUPPORTED"); case PSYNC_TRY_LATER: return sdsnew("PSYNC_TRY_LATER"); case PSYNC_FULLRESYNC_DUAL_CHANNEL: return sdsnew("PSYNC_FULLRESYNC_DUAL_CHANNEL"); + case PSYNC_FOR_SLOT_SYNC: return sdsnew("PSYNC_FOR_SLOT_SYNC"); default: return sdsnew("Unknown result"); } } @@ -3546,7 +3747,7 @@ int dualChannelReplMainConnSendHandshake(connection *conn, sds *err) { } int dualChannelReplMainConnRecvCapaReply(connection *conn, sds *err) { - *err = receiveSynchronousResponse(conn); + *err = receiveSynchronousResponse(conn, 0); if (*err == NULL) return C_ERR; if ((*err)[0] == '-') { dualChannelServerLog(LL_NOTICE, "Primary does not understand REPLCONF identify: %s", *err); @@ -3557,7 +3758,7 @@ int dualChannelReplMainConnRecvCapaReply(connection *conn, sds *err) { int dualChannelReplMainConnSendPsync(connection *conn, sds *err) { if (server.debug_pause_after_fork) debugPauseProcess(); - if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) { + if (replicaTryPartialResynchronization(conn, 0, 0) == PSYNC_WRITE_ERROR) { dualChannelServerLog(LL_WARNING, "Aborting dual channel sync. Write error."); *err = sdsnew(connGetLastError(conn)); return C_ERR; @@ -3566,7 +3767,7 @@ int dualChannelReplMainConnSendPsync(connection *conn, sds *err) { } int dualChannelReplMainConnRecvPsyncReply(connection *conn, sds *err) { - int psync_result = replicaTryPartialResynchronization(conn, 1); + int psync_result = replicaTryPartialResynchronization(conn, 1, 0); if (psync_result == PSYNC_WAIT_REPLY) return C_OK; /* Try again later... */ if (psync_result == PSYNC_CONTINUE) { @@ -3701,15 +3902,20 @@ void dualChannelSetupMainConnForPsync(connection *conn) { * │ │ * └─────────────────────────────────────────────────┘ */ -/* This handler fires when the non blocking connect was able to - * establish a connection with the primary. */ -void syncWithPrimary(connection *conn) { +void syncWithPrimary(connection *conn); +void syncWithSlotSyncPrimary(connection *conn); + +void syncWithPrimaryStateMachine(connection *conn, int *repl_state, int slot_sync) { char tmpfile[256], *err = NULL; int psync_result; + clusterSlotSyncLink *link = !slot_sync ? NULL : connGetPrivateData(conn); + const char *node_primary = !slot_sync ? "PRIMARY" : "SLOT PRIMARY"; + const char *node_replica = !slot_sync ? "REPLICA" : "SLOT REPLICA"; /* If this event fired after the user turned the instance into a primary * with REPLICAOF NO ONE we must just return ASAP. */ - if (server.repl_state == REPL_STATE_NONE) { + if (*repl_state == REPL_STATE_NONE) { + serverAssert(!slot_sync); connClose(conn); return; } @@ -3722,13 +3928,17 @@ void syncWithPrimary(connection *conn) { } /* Send a PING to check the primary is able to reply without errors. */ - if (server.repl_state == REPL_STATE_CONNECTING) { + if (*repl_state == REPL_STATE_CONNECTING) { serverLog(LL_NOTICE, "Non blocking connect for SYNC fired the event."); /* Delete the writable event so that the readable event remains * registered and we can wait for the PONG reply. */ - connSetReadHandler(conn, syncWithPrimary); + if (!slot_sync) { + connSetReadHandler(conn, syncWithPrimary); + } else { + connSetReadHandler(conn, syncWithSlotSyncPrimary); + } connSetWriteHandler(conn, NULL); - server.repl_state = REPL_STATE_RECEIVE_PING_REPLY; + *repl_state = REPL_STATE_RECEIVE_PING_REPLY; /* Send the PING, don't check for errors at all, we have the timeout * that will take care about this. */ err = sendCommand(conn, "PING", NULL); @@ -3737,8 +3947,8 @@ void syncWithPrimary(connection *conn) { } /* Receive the PONG command. */ - if (server.repl_state == REPL_STATE_RECEIVE_PING_REPLY) { - err = receiveSynchronousResponse(conn); + if (*repl_state == REPL_STATE_RECEIVE_PING_REPLY) { + err = receiveSynchronousResponse(conn, slot_sync); /* The primary did not reply */ if (err == NULL) goto no_response_error; @@ -3750,18 +3960,18 @@ void syncWithPrimary(connection *conn) { * both. */ if (err[0] != '+' && strncmp(err, "-NOAUTH", 7) != 0 && strncmp(err, "-NOPERM", 7) != 0 && strncmp(err, "-ERR operation not permitted", 28) != 0) { - serverLog(LL_WARNING, "Error reply to PING from primary: '%s'", err); + serverLog(LL_WARNING, "Error reply to PING from %s: '%s'", node_primary, err); sdsfree(err); goto error; } else { - serverLog(LL_NOTICE, "Primary replied to PING, replication can continue..."); + serverLog(LL_NOTICE, "%s replied to PING, replication can continue", node_primary); } sdsfree(err); err = NULL; - server.repl_state = REPL_STATE_SEND_HANDSHAKE; + *repl_state = REPL_STATE_SEND_HANDSHAKE; } - if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) { + if (*repl_state == REPL_STATE_SEND_HANDSHAKE) { /* AUTH with the primary if required. */ if (server.primary_auth) { char *args[3] = {"AUTH", NULL, NULL}; @@ -3811,97 +4021,103 @@ void syncWithPrimary(connection *conn) { err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL); if (err) goto write_error; - server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; + *repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; return; } - if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY && !server.primary_auth) - server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; + if (*repl_state == REPL_STATE_RECEIVE_AUTH_REPLY && !server.primary_auth) + *repl_state = REPL_STATE_RECEIVE_PORT_REPLY; /* Receive AUTH reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY) { - err = receiveSynchronousResponse(conn); + if (*repl_state == REPL_STATE_RECEIVE_AUTH_REPLY) { + err = receiveSynchronousResponse(conn, slot_sync); if (err == NULL) goto no_response_error; if (err[0] == '-') { - serverLog(LL_WARNING, "Unable to AUTH to PRIMARY: %s", err); + serverLog(LL_WARNING, "Unable to AUTH to %s: %s", node_primary, err); sdsfree(err); goto error; } sdsfree(err); err = NULL; - server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; + *repl_state = REPL_STATE_RECEIVE_PORT_REPLY; return; } /* Receive REPLCONF listening-port reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_PORT_REPLY) { - err = receiveSynchronousResponse(conn); + if (*repl_state == REPL_STATE_RECEIVE_PORT_REPLY) { + err = receiveSynchronousResponse(conn, slot_sync); if (err == NULL) goto no_response_error; /* Ignore the error if any, not all the Redis OSS versions support * REPLCONF listening-port. */ if (err[0] == '-') { serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " + "(Non critical) %s does not understand " "REPLCONF listening-port: %s", + node_primary, err); } sdsfree(err); - server.repl_state = REPL_STATE_RECEIVE_IP_REPLY; + err = NULL; + *repl_state = REPL_STATE_RECEIVE_IP_REPLY; return; } - if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY && !server.replica_announce_ip) - server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + if (*repl_state == REPL_STATE_RECEIVE_IP_REPLY && !server.replica_announce_ip) + *repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; /* Receive REPLCONF ip-address reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY) { - err = receiveSynchronousResponse(conn); + if (*repl_state == REPL_STATE_RECEIVE_IP_REPLY) { + err = receiveSynchronousResponse(conn, slot_sync); if (err == NULL) goto no_response_error; /* Ignore the error if any, not all the Redis OSS versions support * REPLCONF ip-address. */ if (err[0] == '-') { serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " + "(Non critical) %s does not understand " "REPLCONF ip-address: %s", + node_primary, err); } sdsfree(err); - server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + err = NULL; + *repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; return; } /* Receive CAPA reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_CAPA_REPLY) { - err = receiveSynchronousResponse(conn); + if (*repl_state == REPL_STATE_RECEIVE_CAPA_REPLY) { + err = receiveSynchronousResponse(conn, slot_sync); if (err == NULL) goto no_response_error; /* Ignore the error if any, not all the Redis OSS versions support * REPLCONF capa. */ if (err[0] == '-') { serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " + "(Non critical) %s does not understand " "REPLCONF capa: %s", + node_primary, err); } sdsfree(err); err = NULL; - server.repl_state = REPL_STATE_RECEIVE_VERSION_REPLY; + *repl_state = REPL_STATE_RECEIVE_VERSION_REPLY; return; } /* Receive VERSION reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_VERSION_REPLY) { - err = receiveSynchronousResponse(conn); + if (*repl_state == REPL_STATE_RECEIVE_VERSION_REPLY) { + err = receiveSynchronousResponse(conn, slot_sync); if (err == NULL) goto no_response_error; /* Ignore the error if any. Valkey >= 8 supports REPLCONF VERSION. */ if (err[0] == '-') { serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " + "(Non critical) %s does not understand " "REPLCONF VERSION: %s", + node_primary, err); } sdsfree(err); err = NULL; - server.repl_state = REPL_STATE_SEND_PSYNC; + *repl_state = REPL_STATE_SEND_PSYNC; } /* Try a partial resynchronization. If we don't have a cached primary @@ -3909,32 +4125,32 @@ void syncWithPrimary(connection *conn) { * to start a full resynchronization so that we get the primary replid * and the global offset, to try a partial resync at the next * reconnection attempt. */ - if (server.repl_state == REPL_STATE_SEND_PSYNC) { - if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) { + if (*repl_state == REPL_STATE_SEND_PSYNC) { + if (replicaTryPartialResynchronization(conn, 0, slot_sync) == PSYNC_WRITE_ERROR) { err = sdsnew("Write error sending the PSYNC command."); abortFailover("Write error to failover target"); goto write_error; } - server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; + *repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; return; } /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */ - if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { + if (*repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { serverLog(LL_WARNING, - "syncWithPrimary(): state machine error, " + "syncWithPrimaryStateMachine(): state machine error, " "state should be RECEIVE_PSYNC but is %d", - server.repl_state); + *repl_state); goto error; } - psync_result = replicaTryPartialResynchronization(conn, 1); + psync_result = replicaTryPartialResynchronization(conn, 1, slot_sync); if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ /* Check the status of the planned failover. We expect PSYNC_CONTINUE, * but there is nothing technically wrong with a full resync which * could happen in edge cases. */ - if (server.failover_state == FAILOVER_IN_PROGRESS) { + if (!slot_sync && server.failover_state == FAILOVER_IN_PROGRESS) { if (psync_result == PSYNC_CONTINUE || psync_result == PSYNC_FULLRESYNC) { clearFailoverState(); } else { @@ -3973,10 +4189,10 @@ void syncWithPrimary(connection *conn) { } /* Prepare a suitable temp file for bulk transfer */ - if (!useDisklessLoad()) { + if (!useDisklessLoad(slot_sync)) { int dfd = -1, maxtries = 5; while (maxtries--) { - snprintf(tmpfile, 256, "temp-%d.%ld.rdb", (int)server.unixtime, (long int)getpid()); + snprintf(tmpfile, 256, "temp-%d.%ld.rdb", (int)(mstime()/1000), (long int)getpid()); dfd = open(tmpfile, O_CREAT | O_WRONLY | O_EXCL, 0644); if (dfd != -1) break; /* We save the errno of open to prevent some systems from modifying it after @@ -3986,12 +4202,19 @@ void syncWithPrimary(connection *conn) { errno = saved_errno; } if (dfd == -1) { - serverLog(LL_WARNING, "Opening the temp file needed for PRIMARY <-> REPLICA synchronization: %s", + serverLog(LL_WARNING, "Opening the temp file needed for %s <-> %s synchronization: %s", + node_primary, node_replica, strerror(errno)); goto error; } - server.repl_transfer_tmpfile = zstrdup(tmpfile); - server.repl_transfer_fd = dfd; + if (!slot_sync) { + server.repl_transfer_tmpfile = zstrdup(tmpfile); + server.repl_transfer_fd = dfd; + } else { + /* Store the name and fd of the rdb file to the clusterSlotSyncLink. */ + link->repl_transfer_tmpfile = zstrdup(tmpfile); + link->repl_transfer_fd = dfd; + } } /* Using dual-channel-replication, the primary responded +DUALCHANNELSYNC. We need to @@ -4015,6 +4238,23 @@ void syncWithPrimary(connection *conn) { server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_SEND_HANDSHAKE; return; } + + if (psync_result == PSYNC_FOR_SLOT_SYNC) { + /* Setup the non blocking download of the bulk file. */ + if (connSetReadHandler(link->sync_conn, readSlotSyncBulkPayload) == C_ERR) { + char conninfo[CONN_INFO_LEN]; + serverLog(LL_WARNING, "Can't create readable event for slot SYNC: %s (%s)", strerror(errno), + connGetInfo(link->sync_conn, conninfo, sizeof(conninfo))); + goto error; + } + *repl_state = REPL_STATE_TRANSFER; + link->repl_transfer_size = -1; + link->repl_transfer_read = 0; + link->repl_transfer_last_fsync_off = 0; + link->repl_transfer_lastio = server.unixtime; + return; + } + /* Setup the non blocking download of the bulk file. */ if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) { char conninfo[CONN_INFO_LEN]; @@ -4023,7 +4263,7 @@ void syncWithPrimary(connection *conn) { goto error; } - server.repl_state = REPL_STATE_TRANSFER; + *repl_state = REPL_STATE_TRANSFER; server.repl_transfer_size = -1; server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; @@ -4031,10 +4271,12 @@ void syncWithPrimary(connection *conn) { return; no_response_error: /* Handle receiveSynchronousResponse() error when primary has no reply */ - serverLog(LL_WARNING, "Primary did not respond to command during SYNC handshake"); + serverLog(LL_WARNING, "%s did not respond to command during SYNC handshake", node_primary); /* Fall through to regular error handling */ error: + if (slot_sync) goto slot_sync_error; + connClose(conn); server.repl_transfer_s = NULL; if (server.repl_rdb_transfer_s) { @@ -4048,12 +4290,39 @@ void syncWithPrimary(connection *conn) { server.repl_state = REPL_STATE_CONNECT; return; +slot_sync_error: + connClose(conn); + link->sync_conn = NULL; + if (link->repl_transfer_fd != -1) { + close(link->repl_transfer_fd); + bg_unlink(link->repl_transfer_tmpfile); + zfree(link->repl_transfer_tmpfile); + link->repl_transfer_fd = -1; + link->repl_transfer_tmpfile = NULL; + } + *repl_state = REPL_STATE_CONNECT; + return; + write_error: /* Handle sendCommand() errors. */ - serverLog(LL_WARNING, "Sending command to primary in replication handshake: %s", err); + serverLog(LL_WARNING, "Error sending command to server in replication handshake: %s", err); sdsfree(err); goto error; } +/* This handler fires when the non blocking connect was able to + * establish a connection with the primary. */ +void syncWithPrimary(connection *conn) { + syncWithPrimaryStateMachine(conn, &server.repl_state, 0); +} + +/* This handler fires when the non blocking connect was able to + * establish a connection with the slot sync primary. */ +void syncWithSlotSyncPrimary(connection *conn) { + clusterSlotSyncLink *link = connGetPrivateData(conn); + + syncWithPrimaryStateMachine(conn, &link->sync_state, 1); +} + int connectWithPrimary(void) { server.repl_transfer_s = connCreate(connTypeOfReplication()); if (connConnect(server.repl_transfer_s, server.primary_host, server.primary_port, server.bind_source_addr, @@ -4064,7 +4333,6 @@ int connectWithPrimary(void) { return C_ERR; } - server.repl_transfer_lastio = server.unixtime; server.repl_state = REPL_STATE_CONNECTING; serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync started"); @@ -4351,7 +4619,7 @@ void roleCommand(client *c) { } setDeferredArrayLen(c, mbcount, replicas); } else { - char *replica_state = NULL; + const char *replica_state = NULL; addReplyArrayLen(c, 5); addReplyBulkCBuffer(c, "slave", 5); diff --git a/src/server.c b/src/server.c index c676ad8334..cca6667d57 100644 --- a/src/server.c +++ b/src/server.c @@ -2104,7 +2104,15 @@ void initServerConfig(void) { server.skip_checksum_validation = 0; server.loading = 0; server.async_loading = 0; + server.loading_total_bytes = 0; server.loading_rdb_used_mem = 0; + server.loading_loaded_bytes = 0; + server.loading_start_time = 0; + server.slot_loading = 0; + server.slot_loading_total_bytes = 0; + server.slot_loading_rdb_used_mem = 0; + server.slot_loading_loaded_bytes = 0; + server.slot_loading_start_time = 0; server.aof_state = AOF_OFF; server.aof_rewrite_base_size = 0; server.aof_rewrite_scheduled = 0; diff --git a/src/server.h b/src/server.h index cbd6834d5e..0c522b45ce 100644 --- a/src/server.h +++ b/src/server.h @@ -389,11 +389,15 @@ typedef enum { REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_VERSION_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_WAIT_SCHED, /* Wait for schedule to avoid concurrency bug */ REPL_STATE_SEND_PSYNC, /* Send PSYNC */ REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ /* --- End of handshake states --- */ REPL_STATE_TRANSFER, /* Receiving .rdb from primary */ + REPL_STATE_LOADING, /* Loading the RDB in bio. */ REPL_STATE_CONNECTED, /* Connected to primary */ + REPL_STATE_LOADED, /* Done loading the RDB in bio. */ + REPL_STATE_LOAD_FAIL, /* Loading fail. */ } repl_state; /* Replica rdb-channel replication state. Used in server.repl_rdb_channel_state for @@ -1784,12 +1788,19 @@ struct valkeyServer { int enable_debug_assert; /* Enable debug asserts */ /* RDB / AOF loading information */ - volatile sig_atomic_t loading; /* We are loading data from disk if true */ + volatile sig_atomic_t loading; /* We are loading data if true */ volatile sig_atomic_t async_loading; /* We are loading data without blocking the db being served */ off_t loading_total_bytes; off_t loading_rdb_used_mem; off_t loading_loaded_bytes; time_t loading_start_time; + /* Slot RDB loading information */ + volatile sig_atomic_t slot_loading; /* We are loading slot data if true */ + off_t slot_loading_total_bytes; + off_t slot_loading_rdb_used_mem; + off_t slot_loading_loaded_bytes; + time_t slot_loading_start_time; + /* Loading information */ off_t loading_process_events_interval_bytes; time_t loading_process_events_interval_ms; /* Fields used only for stats */ @@ -3087,6 +3098,7 @@ void changeReplicationId(void); void clearReplicationId2(void); void createReplicationBacklog(void); void freeReplicationBacklog(void); +void replicationAttachToNewPrimary(void); void replicationCachePrimaryUsingMyself(void); void feedReplicationBacklog(void *ptr, size_t len); void incrementalTrimReplicationBacklog(size_t blocks); @@ -3110,6 +3122,7 @@ void startLoading(size_t size, int rdbflags, int async); void loadingAbsProgress(off_t pos); void loadingIncrProgress(off_t size); void stopLoading(int success); +void stopSlotLoading(int success); void updateLoadingFileName(char *filename); void startSaving(int rdbflags); void stopSaving(int success); diff --git a/tests/test_slot_sync.py b/tests/test_slot_sync.py index e92dd7e77c..b8d3cf5967 100644 --- a/tests/test_slot_sync.py +++ b/tests/test_slot_sync.py @@ -1,3 +1,7 @@ +""" +python3 tests/test_slot_sync.py +""" + import redis import rediscluster import time @@ -137,7 +141,7 @@ def test_case01(self): try: nodes = conn.cluster('slotsync 0 0') except Exception as e: - assert "Slot 0 is served by myself" in str(e) + assert "Slot 0 is already served by myself" in str(e) # 6. slotlink 命令语法检查 try: conn.cluster('slotlink xxx') @@ -783,6 +787,7 @@ def test_case18(self): wait_slotlink_connected(conn, 1, 10) # 8. 检查数据同步是否准确无误 keys = conn.keys() + print(keys) assert len(keys) == 3 util.StopAllRedis() time.sleep(1) @@ -1230,6 +1235,8 @@ def my_test_case27(self, diskless=False, repl_diskless_load="", dual_channel=Fal # 12. slotfailover 之前确保键数据都正常,目标节点在 slotfailover 之前可以看到 slot RDB 的数据 assert len(conn_9000.keys()) == 3 # ['{b}1', '{b}2', '{b}new'] + print("========") + print(conn_9006.keys()) assert len(conn_9006.keys()) == 3 # ['{b}1', '{b}2', '{b}new'] assert len(conn_9007.keys()) == 3 # ['{b}1', '{b}2', '{b}new'] assert len(conn_9008.keys()) == 3 # ['{b}1', '{b}2', '{b}new'] @@ -1301,30 +1308,30 @@ def tearDown(self): if __name__ == "__main__": suite = unittest.TestSuite() test_cases = [ - TestSlotSync("test_case01"), - TestSlotSync("test_case02"), - TestSlotSync("test_case05"), - TestSlotSync("test_case06"), - TestSlotSync("test_case07"), - TestSlotSync("test_case08"), - TestSlotSync("test_case09"), - TestSlotSync("test_case10"), - TestSlotSync("test_case11"), - TestSlotSync("test_case12"), - TestSlotSync("test_case13"), - TestSlotSync("test_case14"), - TestSlotSync("test_case15"), - TestSlotSync("test_case16"), - TestSlotSync("test_case17"), - TestSlotSync("test_case18"), - TestSlotSync("test_case19"), - TestSlotSync("test_case20"), - TestSlotSync("test_case21"), - TestSlotSync("test_case22"), - TestSlotSync("test_case23"), - TestSlotSync("test_case24"), - TestSlotSync("test_case25"), - TestSlotSync("test_case26"), + # TestSlotSync("test_case01"), + # TestSlotSync("test_case02"), + # TestSlotSync("test_case05"), + # TestSlotSync("test_case06"), + # TestSlotSync("test_case07"), + # TestSlotSync("test_case08"), + # TestSlotSync("test_case09"), + # TestSlotSync("test_case10"), + # TestSlotSync("test_case11"), + # TestSlotSync("test_case12"), + # TestSlotSync("test_case13"), + # TestSlotSync("test_case14"), + # TestSlotSync("test_case15"), + # TestSlotSync("test_case16"), + # TestSlotSync("test_case17"), + # TestSlotSync("test_case18"), + # TestSlotSync("test_case19"), + # TestSlotSync("test_case20"), + # TestSlotSync("test_case21"), + # TestSlotSync("test_case22"), + # TestSlotSync("test_case23"), + # TestSlotSync("test_case24"), + # TestSlotSync("test_case25"), + # TestSlotSync("test_case26"), TestSlotSync("test_case27"), TestSlotSync("test_case29"), TestSlotSync("test_case31"),