diff --git a/src/aof.c b/src/aof.c index 024cdb2771..8befd2d8a1 100644 --- a/src/aof.c +++ b/src/aof.c @@ -32,6 +32,7 @@ #include "rio.h" #include "functions.h" #include "module.h" +#include "cluster.h" #include #include @@ -2190,14 +2191,20 @@ static int rewriteFunctions(rio *aof) { return 0; } -int rewriteAppendOnlyFileRio(rio *aof) { +int slotFilterPredicate(int slot, void *privdata) { + if (privdata == NULL) return 1; + unsigned char *slot_bitmap = (unsigned char *)privdata; + return bitmapTestBit(slot_bitmap, slot); +} + +int rewriteAppendOnlyFileRio(rio *aof, slotBitmap slot_bitmap) { int j; long key_count = 0; long long updated_time = 0; kvstoreIterator *kvs_it = NULL; /* Record timestamp at the beginning of rewriting AOF. */ - if (server.aof_timestamp_enabled) { + if (server.aof_timestamp_enabled && (slot_bitmap == NULL || isSlotBitmapEmpty(slot_bitmap))) { sds ts = genAofTimestampAnnotationIfNeeded(1); if (rioWrite(aof, ts, sdslen(ts)) == 0) { sdsfree(ts); @@ -2217,7 +2224,11 @@ int rewriteAppendOnlyFileRio(rio *aof) { if (rioWrite(aof, selectcmd, sizeof(selectcmd) - 1) == 0) goto werr; if (rioWriteBulkLongLong(aof, j) == 0) goto werr; - kvs_it = kvstoreIteratorInit(db->keys); + if (slot_bitmap == NULL || isSlotBitmapEmpty(slot_bitmap)) { + kvs_it = kvstoreIteratorInit(db->keys); + } else { + kvs_it = kvstoreFilteredIteratorInit(db->keys, &slotFilterPredicate, slot_bitmap); + } /* Iterate this DB writing every entry */ void *next; while (kvstoreIteratorNext(kvs_it, &next)) { @@ -2330,7 +2341,7 @@ int rewriteAppendOnlyFile(char *filename) { goto werr; } } else { - if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr; + if (rewriteAppendOnlyFileRio(&aof, NULL) == C_ERR) goto werr; } /* Make sure data will not remain on the OS's output buffers */ diff --git a/src/blocked.c b/src/blocked.c index d2d6a5d314..9bdab5be8e 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -100,8 +100,8 @@ void freeClientBlockingState(client *c) { * flag is set client query buffer is not longer processed, but accumulated, * and will be processed when the client is unblocked. */ void blockClient(client *c, int btype) { - /* Primary client should never be blocked unless pause or module */ - serverAssert(!(c->flag.primary && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE)); + /* Replication clients should never be blocked unless pause or module */ + serverAssert(!(c->flag.replicated && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE)); initClientBlockingState(c); diff --git a/src/cluster.c b/src/cluster.c index 309279e0be..f650d979f7 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1002,7 +1002,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int /* We handle all the cases as if they were EXEC commands, so we have * a common code path for everything */ - if (cmd->proc == execCommand) { + if (c && cmd->proc == execCommand) { /* If CLIENT_MULTI flag is not set EXEC is just going to return an * error. */ if (!c->flag.multi) return myself; @@ -1019,11 +1019,11 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int mc.cmd = cmd; } - uint64_t cmd_flags = getCommandFlags(c); + uint64_t cmd_flags = c ? getCommandFlags(c) : cmd->flags; /* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */ int pubsubshard_included = - (cmd_flags & CMD_PUBSUB) || (c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_PUBSUB)); + (cmd_flags & CMD_PUBSUB) || (c && c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_PUBSUB)); /* Check that all the keys are in the same hash slot, and obtain this * slot and the node associated. */ @@ -1068,7 +1068,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int * can safely serve the request, otherwise we return a TRYAGAIN * error). To do so we set the importing/migrating state and * increment a counter for every missing key. */ - if (clusterNodeIsPrimary(myself) || c->flag.readonly) { + if (clusterNodeIsPrimary(myself) || (c && c->flag.readonly)) { if (n == clusterNodeGetPrimary(myself) && getMigratingSlotDest(slot) != NULL) { migrating_slot = 1; } else if (getImportingSlotSource(slot) != NULL) { @@ -1163,7 +1163,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int * request as "ASKING", we can serve the request. However if the request * involves multiple keys and we don't have them all, the only option is * to send a TRYAGAIN error. */ - if (importing_slot && (c->flag.asking || cmd_flags & CMD_ASKING)) { + if (importing_slot && (c && (c->flag.asking || cmd_flags & CMD_ASKING))) { if (multiple_keys && missing_keys) { if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE; return NULL; @@ -1176,8 +1176,8 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int * node is a replica and the request is about a hash slot our primary * is serving, we can reply without redirection. */ int is_write_command = - (cmd_flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_WRITE)); - if ((c->flag.readonly || pubsubshard_included) && !is_write_command && clusterNodeIsReplica(myself) && + (cmd_flags & CMD_WRITE) || (c && c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_WRITE)); + if (((c && c->flag.readonly) || pubsubshard_included) && !is_write_command && clusterNodeIsReplica(myself) && clusterNodeGetPrimary(myself) == n) { return myself; } diff --git a/src/cluster.h b/src/cluster.h index 142f2d70b3..41b6263bd4 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -5,8 +5,6 @@ * Cluster exported API. *----------------------------------------------------------------------------*/ -#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */ -#define CLUSTER_SLOTS (1 << CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */ #define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */ #define CLUSTER_OK 0 /* Everything looks ok */ #define CLUSTER_FAIL 1 /* The cluster can't work */ @@ -116,7 +114,17 @@ client *createCachedResponseClient(int resp); void deleteCachedResponseClient(client *recording_client); void clearCachedClusterSlotsResponse(void); unsigned int countKeysInSlot(unsigned int hashslot); +void bitmapToSlotRanges(unsigned char *bitmap, slotBitmap slot_bitmap_out); +int bitmapTestBit(unsigned char *bitmap, int pos); +void bitmapSetBit(unsigned char *bitmap, int pos); +void bitmapClearBit(unsigned char *bitmap, int pos); +void bitmapSetAllBits(unsigned char *bitmap, int len); +int slotBitmapCompare(slotBitmap bitmap, slotBitmap other); +int isSlotBitmapEmpty(slotBitmap bitmap); int getSlotOrReply(client *c, robj *o); +void clusterSlotMigrationHandleClientClose(client *c); +void clusterFeedSlotMigration(int dbid, robj **argv, int argc); +int clusterShouldWriteToSlotMigrationTarget(void); /* functions with shared implementations */ int clusterNodeIsMyself(clusterNode *n); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 5c4bb65aae..04a4297b02 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -72,9 +72,6 @@ int clusterNodeSetSlotBit(clusterNode *n, int slot); static void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync_required); void clusterHandleReplicaFailover(void); void clusterHandleReplicaMigration(int max_replicas); -int bitmapTestBit(unsigned char *bitmap, int pos); -void bitmapSetBit(unsigned char *bitmap, int pos); -void bitmapClearBit(unsigned char *bitmap, int pos); void clusterDoBeforeSleep(int flags); void clusterSendUpdate(clusterLink *link, clusterNode *node); void resetManualFailover(void); @@ -86,6 +83,8 @@ sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pairs_cou void clusterFreeNodesSlotsInfo(clusterNode *n); uint64_t clusterGetMaxEpoch(void); int clusterBumpConfigEpochWithoutConsensus(void); +slotImport *clusterGetCurrentSlotImport(void); +slotExport *clusterGetCurrentSlotExport(void); void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, @@ -95,6 +94,7 @@ const char *clusterGetMessageTypeString(int type); void removeChannelsInSlot(unsigned int slot); unsigned int countChannelsInSlot(unsigned int hashslot); unsigned int delKeysInSlot(unsigned int hashslot); +unsigned int delKeysInSlotBitmap(slotBitmap bitmap); void clusterAddNodeToShard(const char *shard_id, clusterNode *node); list *clusterLookupNodeListByShardId(const char *shard_id); void clusterRemoveNodeFromShard(clusterNode *node); @@ -123,6 +123,7 @@ int verifyClusterNodeId(const char *name, int length); sds clusterEncodeOpenSlotsAuxField(int rdbflags); int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s); static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now); +void clusterProceedWithSlotImport(void); /* Only primaries that own slots have voting rights. * Returns 1 if the node has voting rights, otherwise returns 0. */ @@ -1134,6 +1135,8 @@ void clusterInit(void) { server.cluster->failover_auth_epoch = 0; server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE; server.cluster->lastVoteEpoch = 0; + server.cluster->slot_import_jobs = listCreate(); + server.cluster->slot_export_jobs = listCreate(); /* Initialize stats */ for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) { @@ -2502,7 +2505,7 @@ void clusterSetNodeAsPrimary(clusterNode *n) { * The 'sender' is the node for which we received a configuration update. * Sometimes it is not actually the "Sender" of the information, like in the * case we receive the info via an UPDATE packet. */ -void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) { +void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, slotBitmap slots) { int j; clusterNode *cur_primary = NULL, *new_primary = NULL; /* The dirty slots list is a list of slots for which we lose the ownership @@ -2570,6 +2573,19 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc migrated_our_slots++; } + /* Was this slot mine and it was in a paused state for slot + * migration? If so, mark the move as done. */ + slotExport *curr_export = clusterGetCurrentSlotExport(); + if (server.cluster->slots[j] == myself && curr_export && bitmapTestBit(curr_export->slot_bitmap, j)) { + bitmapClearBit(curr_export->slot_bitmap, j); + if (isSlotBitmapEmpty(curr_export->slot_bitmap)) { + serverLog(LL_NOTICE, "Slot migration has finished. Unpausing myself."); + unpauseActions(PAUSE_DURING_SLOT_MIGRATION); + curr_export->state = SLOT_EXPORT_FINISH; + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_SLOTMIGRATION); + } + } + /* If the sender who claims this slot is not in the same shard, * it must be a result of deliberate operator actions. Therefore, * we should honor it and clear the outstanding migrating_slots_to @@ -2609,6 +2625,19 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc } } + /* Handle the case that we are importing the slot via atomic + * slot migration and the ownership changes. */ + slotImport *curr_import = clusterGetCurrentSlotImport(); + if (curr_import != NULL && bitmapTestBit(curr_import->slot_bitmap, j) && curr_import->source_node != sender && curr_import->state != SLOT_IMPORT_FAILED) { + if (areInSameShard(sender, curr_import->source_node)) { + serverLog(LL_WARNING, "Failover occurred during slot migration from %.40s (%s). Cancelling the migration.", curr_import->source_node->name, curr_import->source_node->human_nodename); + } else { + serverLog(LL_WARNING, "Slot %d has been moved to a different shard than that of %.40s (%s). Cancelling the migration.", j, curr_import->source_node->name, curr_import->source_node->human_nodename); + } + curr_import->state = SLOT_IMPORT_FAILED; + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_SLOTMIGRATION); + } + clusterDelSlot(j); clusterAddSlot(sender, j); bitmapClearBit(server.cluster->owner_not_claiming_slot, j); @@ -4395,6 +4424,412 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) { clusterMsgSendBlockDecrRefCount(msgblock_light); } +/* ----------------------------------------------------------------------------- + * Slot Migration functions + * -------------------------------------------------------------------------- */ + +slotImport *clusterCreateSlotImportJob(clusterNode *source, slotBitmap slots) { + slotImport *result = (slotImport *)zcalloc(sizeof(slotImport)); + memcpy(result->slot_bitmap, slots, sizeof(slotBitmap)); + result->source_node = source; + result->state = SLOT_IMPORT_QUEUED; + result->paused_at_offset = -1; + return result; +} + +slotExport *clusterCreateSlotExportJob(client *c, slotBitmap slots) { + slotExport *result = (slotExport *)zcalloc(sizeof(slotExport)); + memcpy(result->slot_bitmap, slots, sizeof(slotBitmap)); + result->state = SLOT_EXPORT_QUEUED; + result->client = c; + return result; +} + +void clusterFreeSlotImportJob(slotImport *slot_import) { + if (slot_import->client) { + freeClient(slot_import->client); + } else if (slot_import->conn) { + connClose(slot_import->conn); + } + zfree(slot_import); +} + +void clusterFreeSlotExportJob(slotExport *slot_export) { + if (slot_export->client) { + freeClient(slot_export->client); + } + zfree(slot_export); +} + +slotImport *clusterGetCurrentSlotImport(void) { + if (listLength(server.cluster->slot_import_jobs) == 0) return NULL; + return (slotImport *)listFirst(server.cluster->slot_import_jobs)->value; +} + +slotExport *clusterGetCurrentSlotExport(void) { + if (listLength(server.cluster->slot_export_jobs) == 0) return NULL; + return (slotExport *)listFirst(server.cluster->slot_export_jobs)->value; +} + +void clusterFeedSlotMigration(int dbid, robj **argv, int argc) { + UNUSED(dbid); + int i, error_code; + int slot = -1; + slotExport *curr_export = clusterGetCurrentSlotExport(); + if (curr_export == NULL || curr_export->state < SLOT_EXPORT_SNAPSHOTTING) { + return; + } + + /* Check the slot this command belongs to. Note that it is not a guarantee + * that the slot of the replicated command is the same as the slot of the + * executed command, for example in the case of module VM_Replicate APIs. + * Because of this case, we need to recomplete the slot lookup completely + * at this time. */ + struct serverCommand *cmd = lookupCommand(argv, argc); + getNodeByQuery(server.current_client, cmd, argv, argc, &slot, &error_code); + if (error_code != CLUSTER_REDIR_NONE || slot == -1) { + /* A couple cases where this could happen: + * - The replicated command is a command without a slot. + * - The replicated command is written by VM_Replicate module APIs + * and is a cross-slot command, or a slot that is not owned by + * this node. + * + * In any case, our best solution is to not replicate this to the + * target node. */ + return; + } + if (!bitmapTestBit(curr_export->slot_bitmap, slot)) return; + + addReplyArrayLen(curr_export->client, argc); + /* '*' + argc + '\r\n' */ + curr_export->streamed_repl_offset += 1 + digits10(argc) + 2; + for (i = 0; i < argc; i++) { + addReplyBulk(curr_export->client, argv[i]); + /* '$' + len(argv[i]) + '\r\n' + argv[i] + '\r\n' */ + curr_export->streamed_repl_offset += 1 + digits10(stringObjectLen(argv[i])) + 2 + stringObjectLen(argv[i]) + 2; + } +} + +int clusterShouldWriteToSlotMigrationTarget(void) { + slotExport *curr_export = clusterGetCurrentSlotExport(); + return curr_export && (curr_export->state == SLOT_EXPORT_PAUSE_AND_REPLY || curr_export->state == SLOT_EXPORT_PAUSED); +} + +void clusterSlotMigrationHandleClientClose(client *c) { + if (c->flag.slot_migration_source) { + serverLog(LL_NOTICE, "Connection with slot migration source lost."); + slotImport *import = clusterGetCurrentSlotImport(); + if (import == NULL || import->client != c) return; + import->client = NULL; + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_SLOTMIGRATION); + } else if (c->flag.slot_migration_target) { + serverLog(LL_NOTICE, "Connection with slot export target lost."); + slotExport *export = clusterGetCurrentSlotExport(); + if (export == NULL || export->client != c) return; + export->client = NULL; + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_SLOTMIGRATION); + } +} + +void clusterImportHandler(connection *conn) { + UNUSED(conn); + clusterProceedWithSlotImport(); +} + +void clusterProceedWithSlotImport(void) { + char *err; + mstime_t now; + while (clusterGetCurrentSlotImport() != NULL) { + now = mstime(); + listNode *curr_node = listFirst(server.cluster->slot_import_jobs); + slotImport *curr_import = (slotImport *)curr_node->value; + if (curr_import->state != SLOT_IMPORT_FAILED) { + if (curr_import->end_time && curr_import->end_time < now) { + serverLog(LL_WARNING, + "Timed out for slot import from source node %.40s", curr_import->source_node->name); + curr_import->state = SLOT_IMPORT_FAILED; + } else if (curr_import->state > SLOT_IMPORT_CONNECTING && curr_import->client == NULL) { + serverLog(LL_WARNING, "Client for slot import from source node %.40s (%s) has been closed. Cancelling the migration.", curr_import->source_node->name, curr_import->source_node->human_nodename); + curr_import->state = SLOT_IMPORT_FAILED; + } else if (nodeIsReplica(curr_import->source_node)) { + serverLog(LL_WARNING, "Source node %.40s (%s) has been demote to replica. Cancelling the migration.", curr_import->source_node->name, curr_import->source_node->human_nodename); + curr_import->state = SLOT_IMPORT_FAILED; + } else if (curr_import->pause_end && curr_import->pause_end < now) { + /* If the owner ever unpauses, we have to move back in the state machine and retry. */ + serverLog(LL_WARNING, "Reinitiating pause on the node owning the slot range..."); + curr_import->state = SLOT_IMPORT_PAUSE_OWNER; + } + } + switch (curr_import->state) { + case SLOT_IMPORT_QUEUED: + serverLog(LL_NOTICE, "Starting slot import from source node %.40s", curr_import->source_node->name); + curr_import->end_time = now + CLUSTER_SLOT_IMPORT_TIMEOUT; + curr_import->conn = connCreate(server.tls_replication ? connectionTypeTls() : connectionTypeTcp()); + if (connConnect(curr_import->conn, curr_import->source_node->ip, getNodeDefaultReplicationPort(curr_import->source_node), server.bind_source_addr, clusterImportHandler) == C_ERR) { + serverLog(LL_WARNING, + "Failed to connect to slot import source node %.40s (%s). Cancelling the migration.", curr_import->source_node->name, curr_import->source_node->human_nodename); + curr_import->state = SLOT_IMPORT_FAILED; + continue; + } + curr_import->state = SLOT_IMPORT_CONNECTING; + continue; + case SLOT_IMPORT_CONNECTING: + if (curr_import->conn->state == CONN_STATE_CONNECTING) { + /* Nothing to do, waiting for connection to be established. */ + return; + } else if (curr_import->conn->state != CONN_STATE_CONNECTED) { + serverLog(LL_WARNING, "Failed during connection to slot import source node %.40s (%s): %s. Cancelling the migration.", curr_import->source_node->name, curr_import->source_node->human_nodename, connGetLastError(curr_import->conn)); + curr_import->state = SLOT_IMPORT_FAILED; + continue; + } + serverLog(LL_NOTICE, "Connected to slot import source node %.40s (%s)", curr_import->source_node->name, curr_import->source_node->human_nodename); + connSetReadHandler(curr_import->conn, NULL); + client *c = createClient(curr_import->conn); + curr_import->client = c; + c->flag.replicated = 1; + c->flag.slot_migration_source = 1; + c->flag.authenticated = 1; + c->user = NULL; /* This client can do everything. */ + c->querybuf = sdsempty(); /* Similar to primary, we use a dedicated query buf. */ + initClientReplicationData(c); + + curr_import->state = SLOT_IMPORT_SEND_AUTH; + continue; + case SLOT_IMPORT_SEND_AUTH: + if (!server.primary_auth) { + curr_import->state = SLOT_IMPORT_SEND_SYNCSLOTS; + continue; + } + char *auth_args[3] = {"AUTH", NULL, NULL}; + size_t auth_lens[3] = {4, 0, 0}; + int argc = 1; + if (server.primary_user) { + auth_args[argc] = server.primary_user; + auth_lens[argc] = strlen(server.primary_user); + argc++; + } + auth_args[argc] = server.primary_auth; + auth_lens[argc] = sdslen(server.primary_auth); + argc++; + err = sendCommandArgv(curr_import->conn, argc, auth_args, auth_lens); + if (err) { + serverLog(LL_WARNING, "Failed to write AUTH to slot migration source %.40s (%s): %s. Cancelling the migration.", curr_import->source_node->name, curr_import->source_node->human_nodename, err); + sdsfree(err); + curr_import->state = SLOT_IMPORT_FAILED; + continue; + } + curr_import->state = SLOT_IMPORT_RECEIVE_AUTH; + continue; + case SLOT_IMPORT_RECEIVE_AUTH: + err = receiveSynchronousResponse(curr_import->conn); + if (err == NULL) { + serverLog(LL_WARNING, "Slot migration source %.40s (%s) did not respond to AUTH command. Cancelling the migration.", curr_import->source_node->name, curr_import->source_node->human_nodename); + curr_import->state = SLOT_IMPORT_FAILED; + continue; + } + if (err[0] == '-') { + serverLog(LL_WARNING, "Unable to AUTH to slot migration source %.40s (%s): %s", curr_import->source_node->name, curr_import->source_node->human_nodename, err); + sdsfree(err); + curr_import->state = SLOT_IMPORT_FAILED; + continue; + } + sdsfree(err); + err = NULL; + serverLog(LL_NOTICE, "Successfully authenticated to slot migration source %.40s (%s)", curr_import->source_node->name, curr_import->source_node->human_nodename); + curr_import->state = SLOT_IMPORT_SEND_SYNCSLOTS; + continue; + case SLOT_IMPORT_SEND_SYNCSLOTS: + /* Ensure we have a clean state for the SYNC. */ + delKeysInSlotBitmap(curr_import->slot_bitmap); + + serverLog(LL_NOTICE, "Sending CLUSTER SYNCSLOTS START request to source node %.40s (%s).", curr_import->source_node->name, curr_import->source_node->human_nodename); + char *syncslots_args[4] = {"CLUSTER", "SYNCSLOTS", "START", (char *)curr_import->slot_bitmap}; + size_t syncslots_lens[4] = {7, 9, 5, sizeof(slotBitmap)}; + err = sendCommandArgv(curr_import->conn, 4, syncslots_args, syncslots_lens); + if (err) { + serverLog(LL_WARNING, "Failed to write SYNCSLOTS START to slot migration source %.40s (%s): %s", curr_import->source_node->name, curr_import->source_node->human_nodename, err); + sdsfree(err); + curr_import->state = SLOT_IMPORT_FAILED; + continue; + } + + /* Our result will be received in AOF format, so we can pipe it + * straight to readQueryFromClient. */ + connSetReadHandler(curr_import->client->conn, readQueryFromClient); + curr_import->state = SLOT_IMPORT_RECEIVE_SYNCSLOTS; + continue; + case SLOT_IMPORT_RECEIVE_SYNCSLOTS: + /* Nothing to do in this state. Waiting for CLUSTER SYNCSLOTS ENDSNAPSHOT to be processed. */ + return; + case SLOT_IMPORT_PAUSE_OWNER: + curr_import->client->flag.replication_force_reply = 1; + addReplyArrayLen(curr_import->client, 3); + addReplyBulkCBuffer(curr_import->client, "CLUSTER", 7); + addReplyBulkCBuffer(curr_import->client, "SYNCSLOTS", 9); + addReplyBulkCBuffer(curr_import->client, "PAUSE", 5); + curr_import->client->flag.replication_force_reply = 0; + + serverLog(LL_NOTICE, "SYNCSLOTS from slot migration source %.40s (%s) has been performed, received offset %lld. Pausing source node and waiting to continue", curr_import->source_node->name, curr_import->source_node->human_nodename, curr_import->client->repl_data->reploff); + curr_import->paused_at_offset = -1; + curr_import->pause_end = now + CLUSTER_MF_TIMEOUT; + curr_import->state = SLOT_IMPORT_WAITING_FOR_OFFSET; + continue; + case SLOT_IMPORT_WAITING_FOR_OFFSET: + return; + case SLOT_IMPORT_SYNCING_TO_OFFSET: + if (curr_import->client->repl_data->reploff >= curr_import->paused_at_offset) { + serverLog(LL_NOTICE, "SYNCSLOTS of slot range has caught up to paused slot owner %.40s (%s): my offset %lld, source offset %lld, slot migration can start.", curr_import->source_node->name, curr_import->source_node->human_nodename, curr_import->client->repl_data->reploff, curr_import->paused_at_offset); + curr_import->state = SLOT_IMPORT_FINISH; + continue; + } + /* Need to wait for the sync to progress further */ + return; + case SLOT_IMPORT_FINISH: + serverLog(LL_NOTICE, "Setting myself to owner of migrating slots and broadcasting."); + for (int i = 0; i < CLUSTER_SLOTS; i++) { + if (bitmapTestBit(curr_import->slot_bitmap, i)) { + clusterDelSlot(i); + clusterAddSlot(myself, i); + } + } + clusterUpdateState(); + clusterSaveConfigOrDie(1); + if (clusterBumpConfigEpochWithoutConsensus() == C_OK) { + serverLog(LL_NOTICE, "Epoch bumped after importing slots. New epoch %llu", (unsigned long long)server.cluster->currentEpoch); + } + clusterFreeSlotImportJob(curr_import); + clusterBroadcastPong(CLUSTER_BROADCAST_ALL); + listDelNode(server.cluster->slot_import_jobs, curr_node); + continue; + case SLOT_IMPORT_FAILED: + listDelNode(server.cluster->slot_import_jobs, curr_node); + delKeysInSlotBitmap(curr_import->slot_bitmap); + clusterFreeSlotImportJob(curr_import); + continue; + } + } +} + +int childSnapshotForSyncSlot(int req, rio *rdb, void *privdata) { + UNUSED(req); + int retval = rewriteAppendOnlyFileRio(rdb, (unsigned char *)privdata); + rioWrite(rdb, "*3\r\n", 4); + rioWriteBulkString(rdb, "CLUSTER", 7); + rioWriteBulkString(rdb, "SYNCSLOTS", 9); + rioWriteBulkString(rdb, "ENDSNAPSHOT", 11); + return retval; +} + +void clusterProceedWithSlotExport(void) { + mstime_t now; + while (clusterGetCurrentSlotExport() != NULL) { + now = mstime(); + listNode *curr_node = listFirst(server.cluster->slot_export_jobs); + slotExport *curr_export = (slotExport *)curr_node->value; + if (curr_export->client == NULL) { + serverLog(LL_WARNING, "Client for slot export has been closed"); + curr_export->state = SLOT_EXPORT_FAILED; + } + switch (curr_export->state) { + case SLOT_EXPORT_QUEUED: + if (hasActiveChildProcess()) { + /* We need to wait for the child to die, then we can + * proceed. */ + return; + } + connection **conns = zmalloc(sizeof(connection *)); + *conns = curr_export->client->conn; + serverLog(LL_NOTICE, "Initiating snapshot to conn with fd %d", curr_export->client->conn->fd); + if (saveSnapshotToConnectionSockets(conns, 1, 1, 0, childSnapshotForSyncSlot, curr_export->slot_bitmap) != C_OK) { + serverLog(LL_WARNING, "Failed to start slot export to target"); + curr_export->state = SLOT_EXPORT_FAILED; + continue; + } + curr_export->state = SLOT_EXPORT_SNAPSHOTTING; + continue; + case SLOT_EXPORT_SNAPSHOTTING: + /* During this time, we are waiting for SYNCSLOTS PAUSE to + * start flushing the accumulated backlog. */ + return; + case SLOT_EXPORT_PAUSE_AND_REPLY: + addReplyArrayLen(curr_export->client, 4); + curr_export->streamed_repl_offset += 4; /* '*4\r\n' */ + addReplyBulkCBuffer(curr_export->client, "CLUSTER", 7); + curr_export->streamed_repl_offset += 13; /* '$7\r\nCLUSTER\r\n' */ + addReplyBulkCBuffer(curr_export->client, "SYNCSLOTS", 9); + curr_export->streamed_repl_offset += 15; /* '$9\r\nSYNCSLOTS\r\n' */ + addReplyBulkCBuffer(curr_export->client, "PAUSEOFFSET", 11); + curr_export->streamed_repl_offset += 18; /* '$11\r\nPAUSEOFFSET\r\n' */ + + /* We add the length of the offset reply to the offset itself. */ + uint32_t offset_len = digits10(curr_export->streamed_repl_offset + curr_export->client->repl_data->repldbsize); + uint32_t offset_len_len = digits10(offset_len); + curr_export->streamed_repl_offset += 1 + offset_len_len + 2 + offset_len + 2; + uint32_t offset_len2 = digits10(curr_export->streamed_repl_offset + curr_export->client->repl_data->repldbsize); + if (offset_len2 > offset_len) { + /* Adding the offset will add at most one more digit, since + * it's length will be <=10 (uint32_t max) */ + serverAssert(offset_len2 == offset_len + 1); + curr_export->streamed_repl_offset++; + uint32_t offset_len_len2 = digits10(digits10(curr_export->streamed_repl_offset)); + if (offset_len_len2 > offset_len_len) { + /* If offset_len was really close to another digit, we + * have to handle that too. */ + serverAssert(offset_len_len2 == offset_len_len + 1); + curr_export->streamed_repl_offset++; + } + } + serverLog(LL_NOTICE, "At time of pause slot migration AOF snapshot size: %llu, " + "slot migration streaming offset: %llu, total " + "offset (with AOF snapshot): %llu", + (unsigned long long)curr_export->client->repl_data->repldbsize, + curr_export->streamed_repl_offset, + curr_export->streamed_repl_offset + curr_export->client->repl_data->repldbsize); + addReplyBulkLongLong(curr_export->client, curr_export->streamed_repl_offset + curr_export->client->repl_data->repldbsize); + + /* Even though we just added replies, it's possible that, due to + * existing pending data, the client is not in the pending write + * queue. We enqueue it explicitly to work around this. */ + putClientInPendingWriteQueue(curr_export->client); + + curr_export->pause_end = now + (CLUSTER_MF_TIMEOUT * CLUSTER_MF_PAUSE_MULT); + pauseActions(PAUSE_DURING_SLOT_MIGRATION, curr_export->pause_end, PAUSE_ACTIONS_CLIENT_WRITE_SET); + + curr_export->state = SLOT_EXPORT_PAUSED; + continue; + case SLOT_EXPORT_PAUSED: + /* While paused, we simply want to check if we should unpause. */ + if (curr_export->pause_end <= now) { + /* Every CLUSTER_MF_TIMEOUT, the source node should + * re-attempt the pause. If we reach this point, it hasn't + * attempted the pause in that time, we can assume it is + * dead and fail the migration.*/ + serverLog(LL_WARNING, "During slot export, unpausing self and cancelling export due to timeout."); + unpauseActions(PAUSE_DURING_SLOT_MIGRATION); + curr_export->state = SLOT_EXPORT_FAILED; + continue; + } + return; + case SLOT_EXPORT_FINISH: + case SLOT_EXPORT_FAILED: + listDelNode(server.cluster->slot_export_jobs, curr_node); + clusterFreeSlotExportJob(curr_export); + continue; + } + } +} + + +/* This is the main state machine for the slot migration workflow. Slot + * migration is mostly driven by the new owner of the slot (target node). These + * functions will do as much work as possible synchronously, processing the + * enqueued slot migrations and only returning once we are waiting on some IO. */ +void clusterProceedWithSlotMigration(void) { + server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_SLOTMIGRATION; + clusterProceedWithSlotImport(); + clusterProceedWithSlotExport(); +} + /* ----------------------------------------------------------------------------- * REPLICA node specific functions * -------------------------------------------------------------------------- */ @@ -5353,6 +5788,8 @@ void clusterCron(void) { } if (update_state || server.cluster->state == CLUSTER_FAIL) clusterUpdateState(); + + clusterProceedWithSlotMigration(); } /* This function is called before the event handler returns to sleep for @@ -5378,6 +5815,9 @@ void clusterBeforeSleep(void) { /* Handle failover, this is needed when it is likely that there is already * the quorum from primaries in order to react fast. */ clusterHandleReplicaFailover(); + } else if (flags & CLUSTER_TODO_HANDLE_SLOTMIGRATION) { + /* Continue with slot migration (e.g. if import offset is updated) */ + clusterProceedWithSlotMigration(); } /* Update the cluster state. */ @@ -5430,6 +5870,16 @@ void bitmapClearBit(unsigned char *bitmap, int pos) { bitmap[byte] &= ~(1 << bit); } +int slotBitmapCompare(slotBitmap bitmap, slotBitmap otherbitmap) { + return memcmp(bitmap, otherbitmap, sizeof(slotBitmap)); +} + +int isSlotBitmapEmpty(slotBitmap bitmap) { + slotBitmap empty; + memset(empty, 0, sizeof(slotBitmap)); + return slotBitmapCompare(bitmap, empty) == 0; +} + /* Return non-zero if there is at least one primary with replicas in the cluster. * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the * MIGRATE_TO flag the when a primary gets the first slot. */ @@ -6328,6 +6778,16 @@ void removeChannelsInSlot(unsigned int slot) { pubsubShardUnsubscribeAllChannelsInSlot(slot); } +unsigned int delKeysInSlotBitmap(slotBitmap bitmap) { + unsigned int res = 0; + for (int i = 0; i < CLUSTER_SLOTS; i++) { + if (bitmapTestBit(bitmap, i)) { + res += delKeysInSlot(i); + } + } + return res; +} + /* Remove all the keys in the specified hash slot. * The number of removed items is returned. */ unsigned int delKeysInSlot(unsigned int hashslot) { @@ -7108,6 +7568,160 @@ int clusterCommandSpecial(client *c) { } else if (!strcasecmp(c->argv[1]->ptr, "links") && c->argc == 2) { /* CLUSTER LINKS */ addReplyClusterLinksDescription(c); + } else if (!strcasecmp(c->argv[1]->ptr, "import")) { + /* CLUSTER IMPORT SLOTSRANGE [ ] */ + if (nodeIsReplica(myself)) { + addReplyError(c, "Only primaries can import slots"); + return 1; + } + if (c->argc < 5 || strcasecmp(c->argv[2]->ptr, "slotsrange")) { + addReplyError(c, "CLUSTER IMPORT command requires at least one slot range"); + return 1; + } + if (c->argc % 2 == 0) { + addReplyError(c, "Invalid SLOTSRANGE, missing end slot"); + return 1; + } + slotBitmap requested_slots; + memset(requested_slots, 0, sizeof(slotBitmap)); + int i; + clusterNode *curr_owner = NULL; + for (i = 3; i + 1 < c->argc; i += 2) { + int start = getSlotOrReply(c, c->argv[i]); + if (start < 0) { + return 1; + } + int end = getSlotOrReply(c, c->argv[i + 1]); + if (end < 0) { + return 1; + } + if (end < start) { + addReplyErrorFormat(c, "Invalid SLOTSRANGE, start slot %d is greater than end slot %d", start, end); + return 1; + } + for (int j = start; j <= end; j++) { + if (bitmapTestBit(requested_slots, j)) { + addReplyError(c, "Invalid SLOTSRANGE, slot ranges overlap"); + return 1; + } + if (curr_owner == NULL) { + curr_owner = server.cluster->slots[j]; + } else { + if (curr_owner != server.cluster->slots[j]) { + addReplyError(c, "Invalid SLOTSRANGE, slot ranges are not all owned by the same shard"); + return 1; + } + } + if (curr_owner == myself) { + addReplyErrorFormat(c, "I'm already the owner of hash slot %u", j); + return 1; + } + if (nodeFailed(curr_owner)) { + addReplyErrorFormat(c, "Primary is currently failing for slot %u. Please try again once there is a healthy primary", j); + return 1; + } + bitmapSetBit(requested_slots, j); + } + } + + slotImport *to_enqueue = clusterCreateSlotImportJob(curr_owner, requested_slots); + listAddNodeTail(server.cluster->slot_import_jobs, to_enqueue); + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_SLOTMIGRATION); + addReply(c, shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr, "syncslots")) { + if (c->argc < 3) { + addReplyError(c, "SYNCSLOTS command requires a subcommand to be provided."); + return 1; + } + if (!strcasecmp(c->argv[2]->ptr, "start")) { + /* CLUSTER SYNCSLOTS START */ + if (c->argc != 4) { + addReplyError(c, "CLUSTER SYNCSLOTS START command requires exactly one argument"); + return 1; + } + if (sdslen(c->argv[3]->ptr) != sizeof(slotBitmap)) { + addReplyError(c, "Invalid slot bitmap length"); + return 1; + } + c->flag.slot_migration_target = 1; + initClientReplicationData(c); + slotExport *job = clusterCreateSlotExportJob(c, c->argv[3]->ptr); + listAddNodeTail(server.cluster->slot_export_jobs, job); + clusterProceedWithSlotMigration(); + } else if (!strcasecmp(c->argv[2]->ptr, "endsnapshot")) { + /* CLUSTER SYNCSLOTS ENDSNAPSHOT */ + if (c->argc != 3) { + addReplyError(c, "CLUSTER SYNCSLOTS ENDSNAPSHOT does not expect any arguments."); + return 1; + } + if (c->flag.primary) { + /* Due to the proxying nature of replication from the source + * node through the target node to the target node's replicas, + * this message should simply be ignored. */ + return 1; + } + slotImport *curr_import = clusterGetCurrentSlotImport(); + if (!curr_import || (curr_import->state != SLOT_IMPORT_RECEIVE_SYNCSLOTS)) { + addReplyError(c, "No ongoing snapshot to end."); + return 1; + } + if (curr_import->client != c) { + addReplyError(c, "This client is not the one that initiated the ongoing CLUSTER SYNCSLOTS."); + return 1; + } + curr_import->state = SLOT_IMPORT_PAUSE_OWNER; + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_SLOTMIGRATION); + addReply(c, shared.ok); + } else if (!strcasecmp(c->argv[2]->ptr, "pause")) { + /* CLUSTER SYNCSLOTS PAUSE */ + if (c->argc != 3) { + addReplyError(c, "CLUSTER SYNCSLOTS PAUSE does not expect any arguments."); + return 1; + } + slotExport *slot_export = clusterGetCurrentSlotExport(); + if (!slot_export) { + addReplyError(c, "No ongoing CLUSTER SYNCSLOTS to pause."); + return 1; + } + if (slot_export->state == SLOT_EXPORT_PAUSED) { + serverLog(LL_NOTICE, "Pause retriggered by target during slot migration."); + } else if (slot_export->state != SLOT_EXPORT_SNAPSHOTTING) { + addReplyError(c, "SYNCSLOTS is not in the correct state for this command."); + return 1; + } + serverLog(LL_NOTICE, "Pause received by target during slot migration. Pausing and initiating stream of commands."); + + slot_export->state = SLOT_EXPORT_PAUSE_AND_REPLY; + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_SLOTMIGRATION); + } else if (!strcasecmp(c->argv[2]->ptr, "pauseoffset")) { + /* CLUSTER SYNCSLOTS PAUSEOFFSET */ + if (c->argc != 4) { + addReplyError(c, "CLUSTER SYNCSLOTS PAUSEOFFSET command requires exactly one argument."); + return 1; + } + if (c->flag.primary) { + /* Due to the proxying nature of replication from the source + * node through the target node to the target node's replicas, + * this message should simply be ignored. */ + return 1; + } + slotImport *slot_import = clusterGetCurrentSlotImport(); + if (!slot_import || slot_import->state != SLOT_IMPORT_WAITING_FOR_OFFSET) { + addReplyError(c, "No CLUSTER SYNCSLOTS is waiting for a PAUSEOFFSET response."); + return 1; + } + long long offset; + if (getLongLongFromObject(c->argv[3], &offset) != C_OK) { + addReplyError(c, "Failed to parse PAUSEOFFSET offset."); + return 1; + } + serverLog(LL_NOTICE, "Received paused offset for slot migration from %.40s (%s). My offset: %lld, source offset: %lld", slot_import->source_node->name, slot_import->source_node->human_nodename, slot_import->client->repl_data->reploff, offset); + slot_import->paused_at_offset = offset; + slot_import->state = SLOT_IMPORT_SYNCING_TO_OFFSET; + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_SLOTMIGRATION); + } else { + addReplyError(c, "Unknown subcommand for CLUSTER SYNCSLOTS."); + } } else { return 0; } @@ -7150,6 +7764,14 @@ const char **clusterCommandExtendedHelp(void) { "LINKS", " Return information about all network links between this node and its peers.", " Output format is an array where each array element is a map containing attributes of a link", + "MIGRATE SLOTSRANGE [ ...]", + " Initiate server driven slot migration of all slot ranges to the designated shard.", + "SYNCSLOTS [START |ENDSNAPSHOT|PAUSE|PAUSEOFFSET ]", + " Internal command. SYNCSLOTS START initiates send of an AOF formatted snapshot containing the", + " provided slot bitmap. SYNCSLOTS ENDSNAPSHOT terminates the AOF formatted snapshot, and after this", + " SYNCSLOTS PAUSE signals for this node to be paused and for a continuous stream of commands" + " for the slots to be replicated. SYNCSLOTS PAUSEOFFSET will be replied with the offset of remaining" + " commands.", NULL}; return help; @@ -7208,6 +7830,9 @@ int clusterAllowFailoverCmd(client *c) { void clusterPromoteSelfToPrimary(void) { replicationUnsetPrimary(); + /* verifyClusterConfigWithData will delete keys in unowned slots. This + * could happen in the case of failover during a slot migration. */ + serverAssert(verifyClusterConfigWithData() == C_OK); } int detectAndUpdateCachedNodeHealth(void) { diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 226842c5dc..471dbdb950 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -10,6 +10,7 @@ #define CLUSTER_MF_TIMEOUT 5000 /* Milliseconds to do a manual failover. */ #define CLUSTER_MF_PAUSE_MULT 2 /* Primary pause manual failover mult. */ #define CLUSTER_REPLICA_MIGRATION_DELAY 5000 /* Delay for replica migration. */ +#define CLUSTER_SLOT_IMPORT_TIMEOUT 30000 /* Milliseconds to do a slot import. */ /* Reasons why a replica is not able to failover. */ #define CLUSTER_CANT_FAILOVER_NONE 0 @@ -26,6 +27,7 @@ #define CLUSTER_TODO_FSYNC_CONFIG (1 << 3) #define CLUSTER_TODO_HANDLE_MANUALFAILOVER (1 << 4) #define CLUSTER_TODO_BROADCAST_ALL (1 << 5) +#define CLUSTER_TODO_HANDLE_SLOTMIGRATION (1 << 6) /* clusterLink encapsulates everything needed to talk with a remote node. */ typedef struct clusterLink { @@ -130,9 +132,9 @@ typedef struct { } clusterMsgDataPublish; typedef struct { - uint64_t configEpoch; /* Config epoch of the specified instance. */ - char nodename[CLUSTER_NAMELEN]; /* Name of the slots owner. */ - unsigned char slots[CLUSTER_SLOTS / 8]; /* Slots bitmap. */ + uint64_t configEpoch; /* Config epoch of the specified instance. */ + char nodename[CLUSTER_NAMELEN]; /* Name of the slots owner. */ + slotBitmap slots; /* Slots bitmap. */ } clusterMsgDataUpdate; typedef struct { @@ -246,7 +248,7 @@ typedef struct { uint64_t offset; /* Primary replication offset if node is a primary or processed replication offset if node is a replica. */ char sender[CLUSTER_NAMELEN]; /* Name of the sender node */ - unsigned char myslots[CLUSTER_SLOTS / 8]; + slotBitmap myslots; char replicaof[CLUSTER_NAMELEN]; char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */ uint16_t extensions; /* Number of extensions sent along with this packet. */ @@ -326,7 +328,7 @@ struct _clusterNode { char shard_id[CLUSTER_NAMELEN]; /* shard id, hex string, sha1-size */ int flags; /* CLUSTER_NODE_... */ uint64_t configEpoch; /* Last configEpoch observed for this node */ - unsigned char slots[CLUSTER_SLOTS / 8]; /* slots handled by this node */ + slotBitmap slots; /* slots handled by this node */ uint16_t *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */ int slot_info_pairs_count; /* Used number of slots in slot_info_pairs */ int numslots; /* Number of slots handled by this node */ @@ -362,6 +364,50 @@ struct _clusterNode { Update with updateAndCountChangedNodeHealth(). */ }; +typedef enum slotImportState { + SLOT_IMPORT_QUEUED, + SLOT_IMPORT_CONNECTING, + SLOT_IMPORT_SEND_AUTH, + SLOT_IMPORT_RECEIVE_AUTH, + SLOT_IMPORT_SEND_SYNCSLOTS, + SLOT_IMPORT_RECEIVE_SYNCSLOTS, + SLOT_IMPORT_PAUSE_OWNER, + SLOT_IMPORT_WAITING_FOR_OFFSET, + SLOT_IMPORT_SYNCING_TO_OFFSET, + SLOT_IMPORT_FINISH, + SLOT_IMPORT_FAILED, +} slotImportState; + +typedef struct slotImport { + slotBitmap slot_bitmap; + slotImportState state; + clusterNode *source_node; + mstime_t end_time; /* Slot migration time limit (ms unixtime). + If not yet in progress (e.g. queued), will be zero. */ + connection *conn; + client *client; + mstime_t pause_end; + long long syncslots_offset; + long long paused_at_offset; +} slotImport; + +typedef enum slotExportState { + SLOT_EXPORT_QUEUED, + SLOT_EXPORT_SNAPSHOTTING, + SLOT_EXPORT_PAUSE_AND_REPLY, + SLOT_EXPORT_PAUSED, + SLOT_EXPORT_FINISH, + SLOT_EXPORT_FAILED, +} slotExportState; + +typedef struct slotExport { + slotBitmap slot_bitmap; + slotExportState state; + client *client; /* Client for replication */ + unsigned long long streamed_repl_offset; /* Offset for just the streamed part of the syncslots command.*/ + mstime_t pause_end; +} slotExport; + /* Struct used for storing slot statistics. */ typedef struct slotStat { uint64_t cpu_usec; @@ -417,9 +463,11 @@ struct clusterState { * the ownership transfer. Set the bit corresponding to the slot when a node * stops claiming the slot. This prevents spreading incorrect information (that * source still owns the slot) using UPDATE messages. */ - unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8]; + slotBitmap owner_not_claiming_slot; /* Struct used for storing slot statistics, for all slots owned by the current shard. */ slotStat slot_stats[CLUSTER_SLOTS]; + list *slot_import_jobs; /* Queue of ongoing slot imports (we are the target). */ + list *slot_export_jobs; /* Queue of ongoing slot exports (we are the source). */ }; #endif // CLUSTER_LEGACY_H diff --git a/src/commands.def b/src/commands.def index c5d766e3f8..fc910f96bd 100644 --- a/src/commands.def +++ b/src/commands.def @@ -599,6 +599,25 @@ struct COMMAND_ARG CLUSTER_GETKEYSINSLOT_Args[] = { #define CLUSTER_HELP_Keyspecs NULL #endif +/********** CLUSTER IMPORT ********************/ + +#ifndef SKIP_CMD_HISTORY_TABLE +/* CLUSTER IMPORT history */ +#define CLUSTER_IMPORT_History NULL +#endif + +#ifndef SKIP_CMD_TIPS_TABLE +/* CLUSTER IMPORT tips */ +const char *CLUSTER_IMPORT_Tips[] = { +"nondeterministic_output", +}; +#endif + +#ifndef SKIP_CMD_KEY_SPECS_TABLE +/* CLUSTER IMPORT key specs */ +#define CLUSTER_IMPORT_Keyspecs NULL +#endif + /********** CLUSTER INFO ********************/ #ifndef SKIP_CMD_HISTORY_TABLE @@ -1002,6 +1021,23 @@ const char *CLUSTER_SLOTS_Tips[] = { #define CLUSTER_SLOTS_Keyspecs NULL #endif +/********** CLUSTER SYNCSLOTS ********************/ + +#ifndef SKIP_CMD_HISTORY_TABLE +/* CLUSTER SYNCSLOTS history */ +#define CLUSTER_SYNCSLOTS_History NULL +#endif + +#ifndef SKIP_CMD_TIPS_TABLE +/* CLUSTER SYNCSLOTS tips */ +#define CLUSTER_SYNCSLOTS_Tips NULL +#endif + +#ifndef SKIP_CMD_KEY_SPECS_TABLE +/* CLUSTER SYNCSLOTS key specs */ +#define CLUSTER_SYNCSLOTS_Keyspecs NULL +#endif + /* CLUSTER command table */ struct COMMAND_STRUCT CLUSTER_Subcommands[] = { {MAKE_CMD("addslots","Assigns new hash slots to a node.","O(N) where N is the total number of hash slot arguments","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_ADDSLOTS_History,0,CLUSTER_ADDSLOTS_Tips,0,clusterCommand,-3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_ADDSLOTS_Keyspecs,0,NULL,1),.args=CLUSTER_ADDSLOTS_Args}, @@ -1016,6 +1052,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = { {MAKE_CMD("forget","Removes a node from the nodes table.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FORGET_History,0,CLUSTER_FORGET_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FORGET_Keyspecs,0,NULL,1),.args=CLUSTER_FORGET_Args}, {MAKE_CMD("getkeysinslot","Returns the key names in a hash slot.","O(N) where N is the number of requested keys","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_GETKEYSINSLOT_History,0,CLUSTER_GETKEYSINSLOT_Tips,1,clusterCommand,4,CMD_STALE,0,CLUSTER_GETKEYSINSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_GETKEYSINSLOT_Args}, {MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_HELP_History,0,CLUSTER_HELP_Tips,0,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_HELP_Keyspecs,0,NULL,0)}, +{MAKE_CMD("import","Initiates server driven hash slot migration, importing the given slot to this shard.","O(N) where N is the total number of hash slot arguments","8.1.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_IMPORT_History,0,CLUSTER_IMPORT_Tips,1,clusterCommand,-2,CMD_ADMIN|CMD_STALE,0,CLUSTER_IMPORT_Keyspecs,0,NULL,0)}, {MAKE_CMD("info","Returns information about the state of a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_INFO_History,0,CLUSTER_INFO_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_INFO_Keyspecs,0,NULL,0)}, {MAKE_CMD("keyslot","Returns the hash slot for a key.","O(N) where N is the number of bytes in the key","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_KEYSLOT_History,0,CLUSTER_KEYSLOT_Tips,0,clusterCommand,3,CMD_STALE,0,CLUSTER_KEYSLOT_Keyspecs,0,NULL,1),.args=CLUSTER_KEYSLOT_Args}, {MAKE_CMD("links","Returns a list of all TCP links to and from peer nodes.","O(N) where N is the total number of Cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_LINKS_History,0,CLUSTER_LINKS_Tips,1,clusterCommand,2,CMD_STALE,0,CLUSTER_LINKS_Keyspecs,0,NULL,0)}, @@ -1033,6 +1070,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = { {MAKE_CMD("slaves","Lists the replica nodes of a primary node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args}, {MAKE_CMD("slot-stats","Return an array of slot usage statistics for slots assigned to the current node.","O(N) where N is the total number of slots based on arguments. O(N*log(N)) with ORDERBY subcommand.","8.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOT_STATS_History,0,CLUSTER_SLOT_STATS_Tips,2,clusterSlotStatsCommand,-4,CMD_STALE|CMD_LOADING,0,CLUSTER_SLOT_STATS_Keyspecs,0,NULL,1),.args=CLUSTER_SLOT_STATS_Args}, {MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)}, +{MAKE_CMD("syncslots","An internal command used in slot migration.",NULL,"8.1.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SYNCSLOTS_History,0,CLUSTER_SYNCSLOTS_Tips,0,clusterCommand,-2,CMD_ADMIN|CMD_STALE,0,CLUSTER_SYNCSLOTS_Keyspecs,0,NULL,0)}, {0} }; diff --git a/src/commands/cluster-migrate.json b/src/commands/cluster-migrate.json new file mode 100644 index 0000000000..e7b34be508 --- /dev/null +++ b/src/commands/cluster-migrate.json @@ -0,0 +1,18 @@ +{ + "IMPORT": { + "summary": "Initiates server driven hash slot migration, importing the given slot to this shard.", + "complexity": "O(N) where N is the total number of hash slot arguments", + "group": "cluster", + "since": "8.1.0", + "arity": -2, + "container": "CLUSTER", + "function": "clusterCommand", + "command_flags": [ + "ADMIN", + "STALE" + ], + "command_tips": [ + "NONDETERMINISTIC_OUTPUT" + ] + } +} diff --git a/src/commands/cluster-syncslots.json b/src/commands/cluster-syncslots.json new file mode 100644 index 0000000000..2d23903ac4 --- /dev/null +++ b/src/commands/cluster-syncslots.json @@ -0,0 +1,14 @@ +{ + "SYNCSLOTS": { + "summary": "An internal command used in slot migration.", + "group": "cluster", + "since": "8.1.0", + "arity": -2, + "container": "CLUSTER", + "function": "clusterCommand", + "command_flags": [ + "ADMIN", + "STALE" + ] + } +} diff --git a/src/db.c b/src/db.c index 94074bf668..905f5c6120 100644 --- a/src/db.c +++ b/src/db.c @@ -258,7 +258,7 @@ int getKeySlot(sds key) { * so we must always recompute the slot for commands coming from the primary. */ if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command && - !server.current_client->flag.primary) { + !server.current_client->flag.replicated) { debugServerAssertWithInfo(server.current_client, NULL, (int)keyHashSlot(key, (int)sdslen(key)) == server.current_client->slot); return server.current_client->slot; @@ -267,7 +267,7 @@ int getKeySlot(sds key) { /* For the case of replicated commands from primary, getNodeByQuery() never gets called, * and thus c->slot never gets populated. That said, if this command ends up accessing a key, * we are able to backfill c->slot here, where the key's hash calculation is made. */ - if (server.current_client && server.current_client->flag.primary) { + if (server.current_client && server.current_client->flag.replicated) { server.current_client->slot = slot; } return slot; @@ -432,6 +432,7 @@ void setKey(client *c, serverDb *db, robj *key, robj **valref, int flags) { * If there are no keys, NULL is returned. * * The function makes sure to return keys not already expired. */ +// TODO murphyjacob4 need to exclude the loading slots from this robj *dbRandomKey(serverDb *db) { int maxtries = 100; int allvolatile = kvstoreSize(db->keys) == kvstoreSize(db->expires); diff --git a/src/io_threads.c b/src/io_threads.c index 66ef4948b6..93fe56fdb9 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -345,7 +345,7 @@ int trySendReadToIOThreads(client *c) { c->cur_tid = tid; c->read_flags = canParseCommand(c) ? 0 : READ_FLAGS_DONT_PARSE; c->read_flags |= authRequired(c) ? READ_FLAGS_AUTH_REQUIRED : 0; - c->read_flags |= c->flag.primary ? READ_FLAGS_PRIMARY : 0; + c->read_flags |= c->flag.replicated ? READ_FLAGS_REPLICATED : 0; c->io_read_state = CLIENT_PENDING_IO; connSetPostponeUpdateState(c->conn, 1); diff --git a/src/kvstore.c b/src/kvstore.c index d6db4d3fe1..b84ec5e8df 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -74,6 +74,8 @@ struct _kvstoreIterator { kvstore *kvs; long long didx; long long next_didx; + kvstoreIteratorPredicate *predicate; + void *filter_privdata; hashtableIterator di; }; @@ -581,6 +583,23 @@ kvstoreIterator *kvstoreIteratorInit(kvstore *kvs) { kvs_it->kvs = kvs; kvs_it->didx = -1; kvs_it->next_didx = kvstoreGetFirstNonEmptyHashtableIndex(kvs_it->kvs); /* Finds first non-empty hashtable index. */ + kvs_it->predicate = NULL; + kvs_it->filter_privdata = NULL; + hashtableInitSafeIterator(&kvs_it->di, NULL); + return kvs_it; +} + +/* Returns kvstore iterator that filters out hash tables based on the predicate.*/ +kvstoreIterator *kvstoreFilteredIteratorInit(kvstore *kvs, kvstoreIteratorPredicate *predicate, void *privdata) { + kvstoreIterator *kvs_it = zmalloc(sizeof(*kvs_it)); + kvs_it->kvs = kvs; + kvs_it->didx = -1; + kvs_it->next_didx = kvstoreGetFirstNonEmptyHashtableIndex(kvs_it->kvs); + while (kvs_it->next_didx != -1 && predicate && !predicate(kvs_it->next_didx, privdata)) { + kvs_it->next_didx = kvstoreGetNextNonEmptyHashtableIndex(kvs_it->kvs, kvs_it->next_didx); + } + kvs_it->predicate = predicate; + kvs_it->filter_privdata = privdata; hashtableInitSafeIterator(&kvs_it->di, NULL); return kvs_it; } @@ -608,7 +627,9 @@ static hashtable *kvstoreIteratorNextHashtable(kvstoreIterator *kvs_it) { } kvs_it->didx = kvs_it->next_didx; - kvs_it->next_didx = kvstoreGetNextNonEmptyHashtableIndex(kvs_it->kvs, kvs_it->didx); + do { + kvs_it->next_didx = kvstoreGetNextNonEmptyHashtableIndex(kvs_it->kvs, kvs_it->next_didx); + } while (kvs_it->next_didx != -1 && kvs_it->predicate && !kvs_it->predicate(kvs_it->didx, kvs_it->filter_privdata)); return kvs_it->kvs->hashtables[kvs_it->didx]; } diff --git a/src/kvstore.h b/src/kvstore.h index 1a8c74a6b9..fee8d71dbd 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -10,6 +10,7 @@ typedef struct _kvstoreHashtableIterator kvstoreHashtableIterator; typedef int(kvstoreScanShouldSkipHashtable)(hashtable *d); typedef int(kvstoreExpandShouldSkipHashtableIndex)(int didx); +typedef int(kvstoreIteratorPredicate)(int didx, void *privdata); #define KVSTORE_ALLOCATE_HASHTABLES_ON_DEMAND (1 << 0) #define KVSTORE_FREE_EMPTY_HASHTABLES (1 << 1) @@ -44,6 +45,7 @@ size_t kvstoreHashtableMetadataSize(void); /* kvstore iterator specific functions */ kvstoreIterator *kvstoreIteratorInit(kvstore *kvs); +kvstoreIterator *kvstoreFilteredIteratorInit(kvstore *kvs, kvstoreIteratorPredicate *filter, void *privdata); void kvstoreIteratorRelease(kvstoreIterator *kvs_it); int kvstoreIteratorGetCurrentHashtableIndex(kvstoreIterator *kvs_it); int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next); diff --git a/src/module.c b/src/module.c index fa60335837..01c9962e90 100644 --- a/src/module.c +++ b/src/module.c @@ -3949,6 +3949,7 @@ int VM_GetContextFlags(ValkeyModuleCtx *ctx) { if (ctx->client->flag.deny_blocking) flags |= VALKEYMODULE_CTX_FLAGS_DENY_BLOCKING; /* Module command received from PRIMARY, is replicated. */ if (ctx->client->flag.primary) flags |= VALKEYMODULE_CTX_FLAGS_REPLICATED; + if (ctx->client->flag.slot_migration_source) flags |= VALKEYMODULE_CTX_FLAGS_IMPORTING_SLOT; if (ctx->client->resp == 3) { flags |= VALKEYMODULE_CTX_FLAGS_RESP3; } diff --git a/src/networking.c b/src/networking.c index 48e397e6f4..cad0e86de0 100644 --- a/src/networking.c +++ b/src/networking.c @@ -242,7 +242,8 @@ void putClientInPendingWriteQueue(client *c) { if (!c->flag.pending_write && (!c->repl_data || c->repl_data->repl_state == REPL_STATE_NONE || - (isReplicaReadyForReplData(c) && !c->repl_data->repl_start_cmd_stream_on_ack))) { + (isReplicaReadyForReplData(c) && !c->repl_data->repl_start_cmd_stream_on_ack)) && + (!c->flag.slot_migration_target || clusterShouldWriteToSlotMigrationTarget())) { /* Here instead of installing the write handler, we just flag the * client and put it into a list of clients that have something * to write to the socket. This way before re-entering the event @@ -288,9 +289,9 @@ int prepareClientToWrite(client *c) { * CLIENT_PUSHING handling: disables the reply silencing flags. */ if ((c->flag.reply_off || c->flag.reply_skip) && !c->flag.pushing) return C_ERR; - /* Primaries don't receive replies, unless CLIENT_PRIMARY_FORCE_REPLY flag + /* Replication sources don't receive replies, unless force reply flag * is set. */ - if (c->flag.primary && !c->flag.primary_force_reply) return C_ERR; + if ((c->flag.replicated) && !c->flag.replication_force_reply) return C_ERR; /* Skip the fake client, such as the fake client for AOF loading. * But CLIENT_ID_CACHED_RESPONSE is allowed since it is a fake client @@ -581,7 +582,7 @@ void afterErrorReply(client *c, const char *s, size_t len, int flags) { * the commands sent by the primary. However it is useful to log such events since * they are rare and may hint at errors in a script or a bug in the server. */ int ctype = getClientType(c); - if (ctype == CLIENT_TYPE_PRIMARY || ctype == CLIENT_TYPE_REPLICA || c->id == CLIENT_ID_AOF) { + if (ctype == CLIENT_TYPE_PRIMARY || ctype == CLIENT_TYPE_REPLICA || c->id == CLIENT_ID_AOF || ctype == CLIENT_TYPE_SLOT_MIGRATION) { char *to, *from; if (c->id == CLIENT_ID_AOF) { @@ -590,9 +591,12 @@ void afterErrorReply(client *c, const char *s, size_t len, int flags) { } else if (ctype == CLIENT_TYPE_PRIMARY) { to = "primary"; from = "replica"; - } else { + } else if (ctype == CLIENT_TYPE_REPLICA) { to = "replica"; from = "primary"; + } else { + to = "slot-migration-source"; + from = "slot-migration-target"; } if (len > 4096) len = 4096; @@ -1599,7 +1603,7 @@ void clearClientConnectionState(client *c) { c->flag.replica = 0; } - serverAssert(!(c->flag.replica || c->flag.primary)); + serverAssert(!(c->flag.replica || c->flag.replicated)); if (c->flag.tracking) disableTracking(c); selectDb(c, 0); @@ -1678,6 +1682,10 @@ void freeClient(client *c) { } } + if (c->flag.slot_migration_source || c->flag.slot_migration_target) { + clusterSlotMigrationHandleClientClose(c); + } + /* Log link disconnection with replica */ if (getClientType(c) == CLIENT_TYPE_REPLICA) { if (c->flag.repl_rdb_channel) @@ -1818,15 +1826,15 @@ void beforeNextClient(client *c) { * blocked client as well */ /* Trim the query buffer to the current position. */ - if (c->flag.primary) { - /* If the client is a primary, trim the querybuf to repl_applied, - * since primary client is very special, its querybuf not only + if (c->flag.replicated) { + /* If the client is a replication source, trim the querybuf to repl_applied, + * since replication clients are very special, its querybuf not only * used to parse command, but also proxy to sub-replicas. * * Here are some scenarios we cannot trim to qb_pos: - * 1. we don't receive complete command from primary - * 2. primary client blocked cause of client pause - * 3. io threads operate read, primary client flagged with CLIENT_PENDING_COMMAND + * 1. we don't receive complete command from replication + * 2. replication client blocked cause of client pause + * 3. io threads operate read, replication client flagged with CLIENT_PENDING_COMMAND * * In these scenarios, qb_pos points to the part of the current command * or the beginning of next command, and the current command is not applied yet, @@ -2132,7 +2140,11 @@ int postWriteToClient(client *c) { if (getClientType(c) != CLIENT_TYPE_REPLICA) { _postWriteToClient(c); } else { - server.stat_net_repl_output_bytes += c->nwritten > 0 ? c->nwritten : 0; + if (c->flag.slot_migration_target) { + server.stat_net_slot_migration_output_bytes += c->nwritten > 0 ? c->nwritten : 0; + } else { + server.stat_net_repl_output_bytes += c->nwritten > 0 ? c->nwritten : 0; + } } if (c->write_flags & WRITE_FLAGS_WRITE_ERROR) { @@ -2236,9 +2248,13 @@ int handleReadResult(client *c) { c->last_interaction = server.unixtime; c->net_input_bytes += c->nread; - if (c->flag.primary) { + if (c->flag.replicated) { c->repl_data->read_reploff += c->nread; - server.stat_net_repl_input_bytes += c->nread; + if (c->flag.primary) { + server.stat_net_repl_input_bytes += c->nread; + } else if (c->flag.slot_migration_source) { + server.stat_net_slot_migration_input_bytes += c->nread; + } } else { server.stat_net_input_bytes += c->nread; } @@ -2281,7 +2297,7 @@ void handleParseError(client *c) { } else if (flags & READ_FLAGS_ERROR_UNBALANCED_QUOTES) { addReplyError(c, "Protocol error: unbalanced quotes in request"); setProtocolError("unbalanced quotes in inline request", c); - } else if (flags & READ_FLAGS_ERROR_UNEXPECTED_INLINE_FROM_PRIMARY) { + } else if (flags & READ_FLAGS_ERROR_UNEXPECTED_INLINE_FROM_REPLICATION_SOURCE) { serverLog(LL_WARNING, "WARNING: Receiving inline protocol from primary, primary stream corruption? Closing the " "primary connection and discarding the cached primary."); setProtocolError("Master using the inline protocol. Desync?", c); @@ -2295,7 +2311,7 @@ int isParsingError(client *c) { READ_FLAGS_ERROR_INVALID_MULTIBULK_LEN | READ_FLAGS_ERROR_UNAUTHENTICATED_MULTIBULK_LEN | READ_FLAGS_ERROR_UNAUTHENTICATED_BULK_LEN | READ_FLAGS_ERROR_MBULK_INVALID_BULK_LEN | READ_FLAGS_ERROR_BIG_BULK_COUNT | READ_FLAGS_ERROR_MBULK_UNEXPECTED_CHARACTER | - READ_FLAGS_ERROR_UNEXPECTED_INLINE_FROM_PRIMARY | READ_FLAGS_ERROR_UNBALANCED_QUOTES); + READ_FLAGS_ERROR_UNEXPECTED_INLINE_FROM_REPLICATION_SOURCE | READ_FLAGS_ERROR_UNBALANCED_QUOTES); } /* This function is called after the query-buffer was parsed. @@ -2556,7 +2572,7 @@ void processInlineBuffer(client *c) { int argc, j, linefeed_chars = 1; sds *argv, aux; size_t querylen; - int is_primary = c->read_flags & READ_FLAGS_PRIMARY; + int is_replicated = c->read_flags & READ_FLAGS_REPLICATED; /* Search for end of line */ newline = strchr(c->querybuf + c->qb_pos, '\n'); @@ -2593,9 +2609,9 @@ void processInlineBuffer(client *c) { * * However there is an exception: primaries may send us just a newline * to keep the connection active. */ - if (querylen != 0 && is_primary) { + if (querylen != 0 && is_replicated) { sdsfreesplitres(argv, argc); - c->read_flags |= READ_FLAGS_ERROR_UNEXPECTED_INLINE_FROM_PRIMARY; + c->read_flags |= READ_FLAGS_ERROR_UNEXPECTED_INLINE_FROM_REPLICATION_SOURCE; return; } @@ -2642,7 +2658,7 @@ void processInlineBuffer(client *c) { * CLIENT_PROTOCOL_ERROR. */ #define PROTO_DUMP_LEN 128 static void setProtocolError(const char *errstr, client *c) { - if (server.verbosity <= LL_VERBOSE || c->flag.primary) { + if (server.verbosity <= LL_VERBOSE || c->flag.replicated) { sds client = catClientInfoString(sdsempty(), c, server.hide_user_data_from_log); /* Sample some protocol to given an idea about what was inside. */ @@ -2664,7 +2680,7 @@ static void setProtocolError(const char *errstr, client *c) { } /* Log all the client and protocol info. */ - int loglevel = (c->flag.primary) ? LL_WARNING : LL_VERBOSE; + int loglevel = (c->flag.replicated) ? LL_WARNING : LL_VERBOSE; serverLog(loglevel, "Protocol error (%s) from client: %s. %s", errstr, client, buf); sdsfree(client); } @@ -2683,7 +2699,7 @@ void processMultibulkBuffer(client *c) { char *newline = NULL; int ok; long long ll; - int is_primary = c->read_flags & READ_FLAGS_PRIMARY; + int is_replicated = c->read_flags & READ_FLAGS_REPLICATED; int auth_required = c->read_flags & READ_FLAGS_AUTH_REQUIRED; if (c->multibulklen == 0) { @@ -2787,7 +2803,7 @@ void processMultibulkBuffer(client *c) { size_t bulklen_slen = newline - (c->querybuf + c->qb_pos + 1); ok = string2ll(c->querybuf + c->qb_pos + 1, bulklen_slen, &ll); - if (!ok || ll < 0 || (!(is_primary) && ll > server.proto_max_bulk_len)) { + if (!ok || ll < 0 || (!(is_replicated) && ll > server.proto_max_bulk_len)) { c->read_flags |= READ_FLAGS_ERROR_MBULK_INVALID_BULK_LEN; return; } else if (ll > 16384 && auth_required) { @@ -2796,7 +2812,7 @@ void processMultibulkBuffer(client *c) { } c->qb_pos = newline - c->querybuf + 2; - if (!(is_primary) && ll >= PROTO_MBULK_BIG_ARG) { + if (!(is_replicated) && ll >= PROTO_MBULK_BIG_ARG) { /* When the client is not a primary client (because primary * client's querybuf can only be trimmed after data applied * and sent to replicas). @@ -2845,7 +2861,7 @@ void processMultibulkBuffer(client *c) { /* Optimization: if a non-primary client's buffer contains JUST our bulk element * instead of creating a new object by *copying* the sds we * just use the current sds string. */ - if (!is_primary && c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && + if (!is_replicated && c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen + 2)) { c->argv[c->argc++] = createObject(OBJ_STRING, c->querybuf); c->argv_len_sum += c->bulklen; @@ -2895,18 +2911,18 @@ void commandProcessed(client *c) { if (!c->repl_data) return; long long prev_offset = c->repl_data->reploff; - if (c->flag.primary && !c->flag.multi) { - /* Update the applied replication offset of our primary. */ + if (!c->flag.multi && c->flag.replicated) { + /* Update the applied replication offset of our source. */ c->repl_data->reploff = c->repl_data->read_reploff - sdslen(c->querybuf) + c->qb_pos; } - /* If the client is a primary we need to compute the difference + /* If the client is a replication source we need to compute the difference * between the applied offset before and after processing the buffer, * to understand how much of the replication stream was actually - * applied to the primary state: this quantity, and its corresponding + * applied to the replication state: this quantity, and its corresponding * part of the replication stream, will be propagated to the * sub-replicas and to the replication backlog. */ - if (c->flag.primary) { + if (c->flag.replicated) { long long applied = c->repl_data->reploff - prev_offset; if (applied) { replicationFeedStreamFromPrimaryStream(c->querybuf + c->repl_data->repl_applied, applied); @@ -3010,11 +3026,11 @@ int canParseCommand(client *c) { * commands to execute in c->argv. */ if (c->flag.pending_command) return 0; - /* Don't process input from the primary while there is a busy script + /* Don't process input from replication while there is a busy script * condition on the replica. We want just to accumulate the replication * stream (instead of replying -BUSY like we do with other clients) and * later resume the processing. */ - if (isInsideYieldingLongCommand() && c->flag.primary) return 0; + if (isInsideYieldingLongCommand() && c->flag.replicated) return 0; /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is * written to the client. Make sure to not let the reply grow after @@ -3033,7 +3049,7 @@ int processInputBuffer(client *c) { break; } - c->read_flags = c->flag.primary ? READ_FLAGS_PRIMARY : 0; + c->read_flags = c->flag.replicated ? READ_FLAGS_REPLICATED : 0; c->read_flags |= authRequired(c) ? READ_FLAGS_AUTH_REQUIRED : 0; parseCommand(c); @@ -3076,7 +3092,7 @@ void readToQueryBuf(client *c) { /* If the replica RDB client is marked as closed ASAP, do not try to read from it */ if (c->flag.close_asap) return; - int is_primary = c->read_flags & READ_FLAGS_PRIMARY; + int is_replicated = c->read_flags & READ_FLAGS_REPLICATED; readlen = PROTO_IOBUF_LEN; qblen = c->querybuf ? sdslen(c->querybuf) : 0; @@ -3097,7 +3113,7 @@ void readToQueryBuf(client *c) { /* Primary client needs expand the readlen when meet BIG_ARG(see #9100), * but doesn't need align to the next arg, we can read more data. */ - if (c->flag.primary && readlen < PROTO_IOBUF_LEN) readlen = PROTO_IOBUF_LEN; + if (c->flag.replicated && readlen < PROTO_IOBUF_LEN) readlen = PROTO_IOBUF_LEN; } if (c->querybuf == NULL) { @@ -3110,7 +3126,7 @@ void readToQueryBuf(client *c) { * Although we have ensured that c->querybuf will not be expanded in the current * thread_shared_qb, we still add this check for code robustness. */ int use_thread_shared_qb = (c->querybuf == thread_shared_qb) ? 1 : 0; - if (!is_primary && // primary client's querybuf can grow greedy. + if (!is_replicated && /* replication client's querybuf can grow greedy. */ (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) { /* When reading a BIG_ARG we won't be reading more than that one arg * into the query buffer, so we don't need to pre-allocate more than we @@ -3137,7 +3153,7 @@ void readToQueryBuf(client *c) { sdsIncrLen(c->querybuf, c->nread); qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; - if (!is_primary) { + if (!is_replicated) { /* The commands cached in the MULTI/EXEC queue have not been executed yet, * so they are also considered a part of the query buffer in a broader sense. * @@ -3458,7 +3474,7 @@ void resetCommand(client *c) { flags.replica = 0; } - if (flags.replica || flags.primary || flags.module) { + if (flags.replica || flags.replicated || flags.module) { addReplyError(c, "can only reset normal client connections"); return; } @@ -4361,6 +4377,7 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { * CLIENT_TYPE_REPLICA -> replica * CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels * CLIENT_TYPE_PRIMARY -> The client representing our replication primary. + * CLIENT_TYPE_SLOT_MIGRATION -> The client representing a slot migration. */ int getClientType(client *c) { if (c->flag.primary) return CLIENT_TYPE_PRIMARY; @@ -4368,6 +4385,7 @@ int getClientType(client *c) { * want the expose them as normal clients. */ if (c->flag.replica && !c->flag.monitor) return CLIENT_TYPE_REPLICA; if (c->flag.pubsub) return CLIENT_TYPE_PUBSUB; + if (c->flag.slot_migration_source) return CLIENT_TYPE_SLOT_MIGRATION; return CLIENT_TYPE_NORMAL; } @@ -4382,6 +4400,8 @@ int getClientTypeByName(char *name) { return CLIENT_TYPE_PUBSUB; else if (!strcasecmp(name, "master") || !strcasecmp(name, "primary")) return CLIENT_TYPE_PRIMARY; + else if (!strcasecmp(name, "slot-migration")) + return CLIENT_TYPE_SLOT_MIGRATION; else return -1; } @@ -4392,6 +4412,7 @@ char *getClientTypeName(int class) { case CLIENT_TYPE_REPLICA: return "slave"; case CLIENT_TYPE_PUBSUB: return "pubsub"; case CLIENT_TYPE_PRIMARY: return "master"; + case CLIENT_TYPE_SLOT_MIGRATION: return "slot-migration"; default: return NULL; } } @@ -4407,9 +4428,9 @@ int checkClientOutputBufferLimits(client *c) { unsigned long used_mem = getClientOutputBufferMemoryUsage(c); class = getClientType(c); - /* For the purpose of output buffer limiting, primaries are handled - * like normal clients. */ - if (class == CLIENT_TYPE_PRIMARY) class = CLIENT_TYPE_NORMAL; + /* For the purpose of output buffer limiting, primaries and slot migrations + * are handled like normal clients. */ + if (class == CLIENT_TYPE_PRIMARY || class == CLIENT_TYPE_SLOT_MIGRATION) class = CLIENT_TYPE_NORMAL; /* Note that it doesn't make sense to set the replica clients output buffer * limit lower than the repl-backlog-size config (partial sync will succeed @@ -4892,7 +4913,7 @@ void ioThreadReadQueryFromClient(void *data) { done: /* Only trim query buffer for non-primary clients * Primary client's buffer is handled by main thread using repl_applied position */ - if (!(c->read_flags & READ_FLAGS_PRIMARY)) { + if (!(c->read_flags & READ_FLAGS_REPLICATED)) { trimClientQueryBuffer(c); } atomic_thread_fence(memory_order_release); diff --git a/src/object.c b/src/object.c index b8200dd815..a9c701964a 100644 --- a/src/object.c +++ b/src/object.c @@ -1337,7 +1337,8 @@ struct serverMemOverhead *getMemoryOverheadData(void) { * updateClientMemoryUsage(). */ mh->clients_normal = server.stat_clients_type_memory[CLIENT_TYPE_PRIMARY] + server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] + - server.stat_clients_type_memory[CLIENT_TYPE_NORMAL]; + server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] + + server.stat_clients_type_memory[CLIENT_TYPE_SLOT_MIGRATION]; mem_total += mh->clients_normal; mh->cluster_links = server.stat_cluster_links_memory; diff --git a/src/rdb.c b/src/rdb.c index 0bb5d7d45d..36ae825670 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -38,6 +38,7 @@ #include "bio.h" #include "zmalloc.h" #include "module.h" +#include "cluster.h" #include #include @@ -1868,8 +1869,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) { int deep_integrity_validation = server.sanitize_dump_payload == SANITIZE_DUMP_YES; if (server.sanitize_dump_payload == SANITIZE_DUMP_CLIENTS) { /* Skip sanitization when loading (an RDB), or getting a RESTORE command - * from either the primary or a client using an ACL user with the skip-sanitize-payload flag. */ - int skip = server.loading || (server.current_client && (server.current_client->flag.primary)); + * from either a replication source or a client using an ACL user with the skip-sanitize-payload flag. */ + int skip = server.loading || (server.current_client && (server.current_client->flag.replicated)); if (!skip && server.current_client && server.current_client->user) skip = !!(server.current_client->user->flags & USER_FLAG_SANITIZE_PAYLOAD_SKIP); deep_integrity_validation = !skip; @@ -3524,15 +3525,14 @@ void killRDBChild(void) { * - rdbRemoveTempFile */ } -/* Spawn an RDB child that writes the RDB to the sockets of the replicas - * that are currently in REPLICA_STATE_WAIT_BGSAVE_START state. */ -int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { - listNode *ln; - listIter li; +/* Save snapshot to the provided connections, spawning a child process and + * running the provided function. + * + * Connections array provided will be freed after the save is completed, and + * should not be freed by the caller. */ +int saveSnapshotToConnectionSockets(connection **conns, int connsnum, int use_pipe, int req, ChildSnapshotFunc snapshot_func, void *privdata) { pid_t childpid; int pipefds[2], rdb_pipe_write = 0, safe_to_exit_pipe = 0; - int dual_channel = (req & REPLICA_REQ_RDB_CHANNEL); - if (hasActiveChildProcess()) return C_ERR; serverAssert(server.rdb_pipe_read == -1 && server.rdb_child_exit_pipe == -1); @@ -3540,7 +3540,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { * drained the pipe. */ if (server.rdb_pipe_conns) return C_ERR; - if (!dual_channel) { + if (use_pipe) { /* Before to fork, create a pipe that is used to transfer the rdb bytes to * the parent, we can't let it write directly to the sockets, since in case * of TLS we must let the parent handle a continuous TLS state when the @@ -3559,47 +3559,18 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { safe_to_exit_pipe = pipefds[0]; /* read end */ server.rdb_child_exit_pipe = pipefds[1]; /* write end */ } - /* Collect the connections of the replicas we want to transfer - * the RDB to, which are i WAIT_BGSAVE_START state. */ - int connsnum = 0; - connection **conns = zmalloc(sizeof(connection *) * listLength(server.replicas)); server.rdb_pipe_conns = NULL; - if (!dual_channel) { + if (use_pipe) { server.rdb_pipe_conns = conns; - server.rdb_pipe_numconns = 0; + server.rdb_pipe_numconns = connsnum; server.rdb_pipe_numconns_writing = 0; } - /* Filter replica connections pending full sync (ie. in WAIT_BGSAVE_START state). */ - listRewind(server.replicas, &li); - while ((ln = listNext(&li))) { - client *replica = ln->value; - if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) { - /* Check replica has the exact requirements */ - if (replica->repl_data->replica_req != req) continue; - - conns[connsnum++] = replica->conn; - if (dual_channel) { - connSendTimeout(replica->conn, server.repl_timeout * 1000); - /* This replica uses diskless dual channel sync, hence we need - * to inform it with the save end offset.*/ - sendCurrentOffsetToReplica(replica); - /* Make sure repl traffic is appended to the replication backlog */ - addRdbReplicaToPsyncWait(replica); - /* Put the socket in blocking mode to simplify RDB transfer. */ - connBlock(replica->conn); - } else { - server.rdb_pipe_numconns++; - } - replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset()); - } - } - /* Create the child process. */ if ((childpid = serverFork(CHILD_TYPE_RDB)) == 0) { /* Child */ int retval, dummy; rio rdb; - if (dual_channel) { + if (!use_pipe) { rioInitWithConnset(&rdb, conns, connsnum); } else { rioInitWithFd(&rdb, rdb_pipe_write); @@ -3607,7 +3578,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { /* Close the reading part, so that if the parent crashes, the child will * get a write error and exit. */ - if (!dual_channel) close(server.rdb_pipe_read); + if (use_pipe) close(server.rdb_pipe_read); if (strstr(server.exec_argv[0], "redis-server") != NULL) { serverSetProcTitle("redis-rdb-to-slaves"); } else { @@ -3615,13 +3586,13 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } serverSetCpuAffinity(server.bgsave_cpulist); - retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi); + retval = snapshot_func(req, &rdb, privdata); if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR; if (retval == C_OK) { sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB"); } - if (dual_channel) { + if (!use_pipe) { rioFreeConnset(&rdb); } else { rioFreeFd(&rdb); @@ -3632,7 +3603,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { zfree(conns); /* hold exit until the parent tells us it's safe. we're not expecting * to read anything, just get the error when the pipe is closed. */ - if (!dual_channel) dummy = read(safe_to_exit_pipe, pipefds, 1); + if (use_pipe) dummy = read(safe_to_exit_pipe, pipefds, 1); UNUSED(dummy); exitFromChild((retval == C_OK) ? 0 : 1); } else { @@ -3640,23 +3611,13 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { if (childpid == -1) { serverLog(LL_WARNING, "Can't save in background: fork: %s", strerror(errno)); - /* Undo the state change. The caller will perform cleanup on - * all the replicas in BGSAVE_START state, but an early call to - * replicationSetupReplicaForFullResync() turned it into BGSAVE_END */ - listRewind(server.replicas, &li); - while ((ln = listNext(&li))) { - client *replica = ln->value; - if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_END) { - replica->repl_data->repl_state = REPLICA_STATE_WAIT_BGSAVE_START; - } - } - if (!dual_channel) { + if (use_pipe) { close(rdb_pipe_write); close(server.rdb_pipe_read); close(server.rdb_child_exit_pipe); } zfree(conns); - if (dual_channel) { + if (!use_pipe) { closeChildInfoPipe(); } else { server.rdb_pipe_conns = NULL; @@ -3665,10 +3626,10 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } } else { serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s", (long)childpid, - dual_channel ? "direct socket to replica" : "pipe through parent process"); + !use_pipe ? "direct socket to replica" : "pipe through parent process"); server.rdb_save_time_start = time(NULL); server.rdb_child_type = RDB_CHILD_TYPE_SOCKET; - if (dual_channel) { + if (!use_pipe) { /* For dual channel sync, the main process no longer requires these RDB connections. */ zfree(conns); } else { @@ -3679,12 +3640,70 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } } } - if (!dual_channel) close(safe_to_exit_pipe); + if (use_pipe) close(safe_to_exit_pipe); return (childpid == -1) ? C_ERR : C_OK; } return C_OK; /* Unreached. */ } +int childSnapshotUsingRDB(int req, rio *rdb, void *privdata) { + return rdbSaveRioWithEOFMark(req, rdb, NULL, (rdbSaveInfo *)privdata); +} + +/* Spawn an RDB child that writes the RDB to the sockets of the replicas + * that are currently in REPLICA_STATE_WAIT_BGSAVE_START state. */ +int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { + listNode *ln; + listIter li; + int dual_channel = (req & REPLICA_REQ_RDB_CHANNEL); + + /* Collect the connections of the replicas we want to transfer + * the RDB to, which are i WAIT_BGSAVE_START state. */ + int connsnum = 0; + connection **conns = zmalloc(sizeof(connection *) * listLength(server.replicas)); + + /* Filter replica connections pending full sync (ie. in WAIT_BGSAVE_START state). */ + listRewind(server.replicas, &li); + while ((ln = listNext(&li))) { + client *replica = ln->value; + if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) { + /* Check replica has the exact requirements */ + if (replica->repl_data->replica_req != req) continue; + + conns[connsnum++] = replica->conn; + if (dual_channel) { + connSendTimeout(replica->conn, server.repl_timeout * 1000); + /* This replica uses diskless dual channel sync, hence we need + * to inform it with the save end offset.*/ + sendCurrentOffsetToReplica(replica); + /* Make sure repl traffic is appended to the replication backlog */ + addRdbReplicaToPsyncWait(replica); + /* Put the socket in blocking mode to simplify RDB transfer. */ + connBlock(replica->conn); + } + replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset()); + } + } + + int retval = saveSnapshotToConnectionSockets(conns, connsnum, !dual_channel, req, childSnapshotUsingRDB, (void *)rsi); + + if (retval != C_OK) { + serverLog(LL_WARNING, "Can't save in background: fork: %s", strerror(errno)); + + /* Undo the state change. The caller will perform cleanup on + * all the replicas in BGSAVE_START state, but an early call to + * replicationSetupReplicaForFullResync() turned it into BGSAVE_END */ + listRewind(server.replicas, &li); + while ((ln = listNext(&li))) { + client *replica = ln->value; + if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_END) { + replica->repl_data->repl_state = REPLICA_STATE_WAIT_BGSAVE_START; + } + } + } + return retval; +} + void saveCommand(client *c) { if (server.child_type == CHILD_TYPE_RDB) { addReplyError(c, "Background save already in progress"); diff --git a/src/replication.c b/src/replication.c index 9913d64d65..c13a0edb3f 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1700,6 +1700,7 @@ void rdbPipeWriteHandler(struct connection *conn) { } else { replica->repl_data->repldboff += nwritten; server.stat_net_repl_output_bytes += nwritten; + replica->repl_data->repldbsize += nwritten; if (replica->repl_data->repldboff < server.rdb_pipe_bufflen) { replica->repl_data->repl_last_partial_write = server.unixtime; return; /* more data to write.. */ @@ -1774,6 +1775,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, * of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */ replica->repl_data->repldboff = nwritten; server.stat_net_repl_output_bytes += nwritten; + replica->repl_data->repldbsize += nwritten; } /* If we were unable to write all the data to one of the replicas, * setup write handler (and disable pipe read handler, below) */ @@ -1983,6 +1985,7 @@ void replicationCreatePrimaryClientWithHandler(connection *conn, int dbid, Conne * connection. */ server.primary->flag.primary = 1; server.primary->flag.authenticated = 1; + server.primary->flag.replicated = 1; /* Allocate a private query buffer for the primary client instead of using the shared query buffer. * This is done because the primary's query buffer data needs to be preserved for my sub-replicas to use. */ @@ -4165,7 +4168,7 @@ void replicationSendAck(void) { if (c != NULL) { int send_fack = server.fsynced_reploff != -1; - c->flag.primary_force_reply = 1; + c->flag.replication_force_reply = 1; addReplyArrayLen(c, send_fack ? 5 : 3); addReplyBulkCString(c, "REPLCONF"); addReplyBulkCString(c, "ACK"); @@ -4174,7 +4177,7 @@ void replicationSendAck(void) { addReplyBulkCString(c, "FACK"); addReplyBulkLongLong(c, server.fsynced_reploff); } - c->flag.primary_force_reply = 0; + c->flag.replication_force_reply = 0; /* Accumulation from above replies must be reset back to 0 manually, * as this subroutine does not invoke resetClient(). */ diff --git a/src/server.c b/src/server.c index 8255b57e25..8654c89df0 100644 --- a/src/server.c +++ b/src/server.c @@ -900,8 +900,8 @@ int clientsCronResizeQueryBuffer(client *c) { if (idletime > 2) { /* 1) Query is idle for a long time. */ size_t remaining = sdslen(c->querybuf) - c->qb_pos; - if (!c->flag.primary && !remaining) { - /* If the client is not a primary and no data is pending, + if (!c->flag.replicated && !remaining) { + /* If the client is not for replication and no data is pending, * The client can safely use the shared query buffer in the next read - free the client's querybuf. */ sdsfree(c->querybuf); /* By setting the querybuf to NULL, the client will use the shared query buffer in the next read. @@ -1451,7 +1451,7 @@ long long serverCron(struct aeEventLoop *eventLoop, long long id, void *clientDa monotime current_time = getMonotonicUs(); long long factor = 1000000; // us trackInstantaneousMetric(STATS_METRIC_COMMAND, server.stat_numcommands, current_time, factor); - trackInstantaneousMetric(STATS_METRIC_NET_INPUT, server.stat_net_input_bytes + server.stat_net_repl_input_bytes, + trackInstantaneousMetric(STATS_METRIC_NET_INPUT, server.stat_net_input_bytes + server.stat_net_repl_input_bytes + server.stat_net_slot_migration_input_bytes, current_time, factor); trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, server.stat_net_output_bytes + server.stat_net_repl_output_bytes, current_time, @@ -1464,6 +1464,8 @@ long long serverCron(struct aeEventLoop *eventLoop, long long id, void *clientDa factor); trackInstantaneousMetric(STATS_METRIC_EL_DURATION, server.duration_stats[EL_DURATION_TYPE_EL].sum, server.duration_stats[EL_DURATION_TYPE_EL].cnt, 1); + trackInstantaneousMetric(STATS_METRIC_NET_INPUT_SLOT_MIGRATION, server.stat_net_slot_migration_input_bytes, + current_time, factor); } /* We have just LRU_BITS bits per object for LRU information. @@ -2684,6 +2686,7 @@ void resetServerStats(void) { server.stat_net_input_bytes = 0; server.stat_net_output_bytes = 0; server.stat_net_repl_input_bytes = 0; + server.stat_net_slot_migration_input_bytes = 0; server.stat_net_repl_output_bytes = 0; server.stat_unexpected_error_replies = 0; server.stat_total_error_replies = 0; @@ -3357,9 +3360,9 @@ struct serverCommand *lookupCommandOrOriginal(robj **argv, int argc) { return cmd; } -/* Commands arriving from the primary client or AOF client, should never be rejected. */ +/* Commands arriving from a replication source or AOF client, should never be rejected. */ int mustObeyClient(client *c) { - return c->id == CLIENT_ID_AOF || c->flag.primary; + return c->id == CLIENT_ID_AOF || c->flag.replicated; } static int shouldPropagate(int target) { @@ -3418,7 +3421,12 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) { server.server_del_keys_in_slot); if (server.aof_state != AOF_OFF && target & PROPAGATE_AOF) feedAppendOnlyFile(dbid, argv, argc); - if (target & PROPAGATE_REPL) replicationFeedReplicas(dbid, argv, argc); + if (target & PROPAGATE_REPL) { + replicationFeedReplicas(dbid, argv, argc); + if (server.cluster_enabled) { + clusterFeedSlotMigration(dbid, argv, argc); + } + } } /* Used inside commands to schedule the propagation of additional commands @@ -4297,8 +4305,7 @@ int processCommand(client *c) { /* If the server is paused, block the client until * the pause has ended. Replicas are never paused. */ - if (!c->flag.replica && ((isPausedActions(PAUSE_ACTION_CLIENT_ALL)) || - ((isPausedActions(PAUSE_ACTION_CLIENT_WRITE)) && is_may_replicate_command))) { + if (!c->flag.replica && !c->flag.slot_migration_target && ((isPausedActions(PAUSE_ACTION_CLIENT_ALL)) || ((isPausedActions(PAUSE_ACTION_CLIENT_WRITE)) && is_may_replicate_command))) { blockPostponeClient(c); return C_OK; } @@ -5903,8 +5910,8 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "total_connections_received:%lld\r\n", server.stat_numconnections, "total_commands_processed:%lld\r\n", server.stat_numcommands, "instantaneous_ops_per_sec:%lld\r\n", getInstantaneousMetric(STATS_METRIC_COMMAND), - "total_net_input_bytes:%lld\r\n", server.stat_net_input_bytes + server.stat_net_repl_input_bytes, - "total_net_output_bytes:%lld\r\n", server.stat_net_output_bytes + server.stat_net_repl_output_bytes, + "total_net_input_bytes:%lld\r\n", server.stat_net_input_bytes + server.stat_net_repl_input_bytes + server.stat_net_slot_migration_input_bytes, + "total_net_output_bytes:%lld\r\n", server.stat_net_output_bytes + server.stat_net_repl_output_bytes + server.stat_net_slot_migration_output_bytes, "total_net_repl_input_bytes:%lld\r\n", server.stat_net_repl_input_bytes, "total_net_repl_output_bytes:%lld\r\n", server.stat_net_repl_output_bytes, "instantaneous_input_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT) / 1024, @@ -5962,7 +5969,11 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "eventloop_duration_sum:%llu\r\n", server.duration_stats[EL_DURATION_TYPE_EL].sum, "eventloop_duration_cmd_sum:%llu\r\n", server.duration_stats[EL_DURATION_TYPE_CMD].sum, "instantaneous_eventloop_cycles_per_sec:%llu\r\n", getInstantaneousMetric(STATS_METRIC_EL_CYCLE), - "instantaneous_eventloop_duration_usec:%llu\r\n", getInstantaneousMetric(STATS_METRIC_EL_DURATION))); + "instantaneous_eventloop_duration_usec:%llu\r\n", getInstantaneousMetric(STATS_METRIC_EL_DURATION), + "total_net_slot_migration_input_bytes:%lld\r\n", server.stat_net_slot_migration_input_bytes, + "total_net_slot_migration_output_bytes:%lld\r\n", server.stat_net_slot_migration_output_bytes, + "instantaneous_slot_migration_input_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT_SLOT_MIGRATION) / 1024, + "instantaneous_slot_migration_output_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT_SLOT_MIGRATION) / 1024)); info = genValkeyInfoStringACLStats(info); } diff --git a/src/server.h b/src/server.h index d186d16c73..6269461ed5 100644 --- a/src/server.h +++ b/src/server.h @@ -182,15 +182,17 @@ struct hdr_histogram; #define RIO_CONNSET_WRITE_MAX_CHUNK_SIZE 16384 /* Instantaneous metrics tracking. */ -#define STATS_METRIC_SAMPLES 16 /* Number of samples per metric. */ -#define STATS_METRIC_COMMAND 0 /* Number of commands executed. */ -#define STATS_METRIC_NET_INPUT 1 /* Bytes read to network. */ -#define STATS_METRIC_NET_OUTPUT 2 /* Bytes written to network. */ -#define STATS_METRIC_NET_INPUT_REPLICATION 3 /* Bytes read to network during replication. */ -#define STATS_METRIC_NET_OUTPUT_REPLICATION 4 /* Bytes written to network during replication. */ -#define STATS_METRIC_EL_CYCLE 5 /* Number of eventloop cycled. */ -#define STATS_METRIC_EL_DURATION 6 /* Eventloop duration. */ -#define STATS_METRIC_COUNT 7 +#define STATS_METRIC_SAMPLES 16 /* Number of samples per metric. */ +#define STATS_METRIC_COMMAND 0 /* Number of commands executed. */ +#define STATS_METRIC_NET_INPUT 1 /* Bytes read to network. */ +#define STATS_METRIC_NET_OUTPUT 2 /* Bytes written to network. */ +#define STATS_METRIC_NET_INPUT_REPLICATION 3 /* Bytes read to network during replication. */ +#define STATS_METRIC_NET_OUTPUT_REPLICATION 4 /* Bytes written to network during replication. */ +#define STATS_METRIC_EL_CYCLE 5 /* Number of eventloop cycled. */ +#define STATS_METRIC_EL_DURATION 6 /* Eventloop duration. */ +#define STATS_METRIC_NET_INPUT_SLOT_MIGRATION 7 /* Bytes read to network during slot migration. */ +#define STATS_METRIC_NET_OUTPUT_SLOT_MIGRATION 7 /* Bytes written to network during slot migration. */ +#define STATS_METRIC_COUNT 8 /* Protocol and I/O related defines */ #define PROTO_IOBUF_LEN (1024 * 16) /* Generic I/O buffer size */ @@ -373,14 +375,15 @@ typedef enum blocking_type { /* Client classes for client limits, currently used only for * the max-client-output-buffer limit implementation. */ -#define CLIENT_TYPE_NORMAL 0 /* Normal req-reply clients + MONITORs */ -#define CLIENT_TYPE_REPLICA 1 /* Replicas. */ -#define CLIENT_TYPE_PUBSUB 2 /* Clients subscribed to PubSub channels. */ -#define CLIENT_TYPE_PRIMARY 3 /* Primary. */ -#define CLIENT_TYPE_COUNT 4 /* Total number of client types. */ -#define CLIENT_TYPE_OBUF_COUNT 3 /* Number of clients to expose to output \ - buffer configuration. Just the first \ - three: normal, replica, pubsub. */ +#define CLIENT_TYPE_NORMAL 0 /* Normal req-reply clients + MONITORs */ +#define CLIENT_TYPE_REPLICA 1 /* Replicas. */ +#define CLIENT_TYPE_PUBSUB 2 /* Clients subscribed to PubSub channels. */ +#define CLIENT_TYPE_PRIMARY 3 /* Primary. */ +#define CLIENT_TYPE_SLOT_MIGRATION 4 /* Slot migration client. */ +#define CLIENT_TYPE_COUNT 5 /* Total number of client types. */ +#define CLIENT_TYPE_OBUF_COUNT 3 /* Number of clients to expose to output \ + buffer configuration. Just the first \ + three: normal, replica, pubsub. */ /* Replica replication state. Used in server.repl_state for replicas to remember * what to do next. */ @@ -595,6 +598,7 @@ typedef enum { PAUSE_BY_CLIENT_COMMAND = 0, PAUSE_DURING_SHUTDOWN, PAUSE_DURING_FAILOVER, + PAUSE_DURING_SLOT_MIGRATION, NUM_PAUSE_PURPOSES /* This value is the number of purposes above. */ } pause_purpose; @@ -1024,7 +1028,7 @@ typedef struct ClientFlags { uint64_t close_asap : 1; /* Close this client ASAP */ uint64_t unix_socket : 1; /* Client connected via Unix domain socket */ uint64_t dirty_exec : 1; /* EXEC will fail for errors while queueing */ - uint64_t primary_force_reply : 1; /* Queue replies even if is primary */ + uint64_t replication_force_reply : 1; /* Queue replies even if is primary */ uint64_t force_aof : 1; /* Force AOF propagation of current cmd. */ uint64_t force_repl : 1; /* Force replication of current cmd. */ uint64_t pre_psync : 1; /* Instance don't understand PSYNC. */ @@ -1087,7 +1091,10 @@ typedef struct ClientFlags { * flag, we won't cache the primary in freeClient. */ uint64_t fake : 1; /* This is a fake client without a real connection. */ uint64_t import_source : 1; /* This client is importing data to server and can visit expired key. */ - uint64_t reserved : 4; /* Reserved for future use */ + uint64_t replicated : 1; /* This client is a replication source (i.e. primary or slot migration). */ + uint64_t slot_migration_source : 1; /* This client is a slot migration source. */ + uint64_t slot_migration_target : 1; /* This client is a slot migration target. */ + uint64_t reserved : 1; /* Reserved for future use */ } ClientFlags; typedef struct ClientPubSubData { @@ -1103,6 +1110,11 @@ typedef struct ClientPubSubData { context of client side caching. */ } ClientPubSubData; +#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */ +#define CLUSTER_SLOTS (1 << CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */ + +typedef unsigned char slotBitmap[CLUSTER_SLOTS / 8]; + typedef struct ClientReplicationData { int repl_state; /* Replication state if this is a replica. */ int repl_start_cmd_stream_on_ack; /* Install replica write handler on first ACK. */ @@ -1130,9 +1142,9 @@ typedef struct ClientReplicationData { uint64_t associated_rdb_client_id; /* The client id of this replica's rdb connection */ time_t rdb_client_disconnect_time; /* Time of the first freeClient call on this client. Used for delaying free. */ listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks, - see the definition of replBufBlock. */ + see the definition of replBufBlock. */ size_t ref_block_pos; /* Access position of referenced buffer block, - i.e. the next offset to send. */ + i.e. the next offset to send. */ } ClientReplicationData; typedef struct ClientModuleData { @@ -1698,6 +1710,8 @@ struct valkeyServer { long long stat_net_repl_input_bytes; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */ /* Bytes written during replication, added to stat_net_output_bytes in 'info'. */ long long stat_net_repl_output_bytes; + long long stat_net_slot_migration_input_bytes; /* Bytes read during slot migration, added to stat_net_input_bytes in 'info'. */ + long long stat_net_slot_migration_output_bytes; /* Bytes written during slot migration, added to stat_net_output_bytes in 'info'. */ size_t stat_current_cow_peak; /* Peak size of copy on write bytes. */ size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */ monotime stat_current_cow_updated; /* Last update time of stat_current_cow_bytes */ @@ -2603,12 +2617,12 @@ void dictVanillaFree(void *val); #define READ_FLAGS_ERROR_BIG_BULK_COUNT (1 << 6) #define READ_FLAGS_ERROR_MBULK_UNEXPECTED_CHARACTER (1 << 7) #define READ_FLAGS_ERROR_MBULK_INVALID_BULK_LEN (1 << 8) -#define READ_FLAGS_ERROR_UNEXPECTED_INLINE_FROM_PRIMARY (1 << 9) +#define READ_FLAGS_ERROR_UNEXPECTED_INLINE_FROM_REPLICATION_SOURCE (1 << 9) #define READ_FLAGS_ERROR_UNBALANCED_QUOTES (1 << 10) #define READ_FLAGS_INLINE_ZERO_QUERY_LEN (1 << 11) #define READ_FLAGS_PARSING_NEGATIVE_MBULK_LEN (1 << 12) #define READ_FLAGS_PARSING_COMPLETED (1 << 13) -#define READ_FLAGS_PRIMARY (1 << 14) +#define READ_FLAGS_REPLICATED (1 << 14) #define READ_FLAGS_DONT_PARSE (1 << 15) #define READ_FLAGS_AUTH_REQUIRED (1 << 16) @@ -2935,6 +2949,8 @@ int sendCurrentOffsetToReplica(client *replica); void addRdbReplicaToPsyncWait(client *replica); void initClientReplicationData(client *c); void freeClientReplicationData(client *c); +char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens); +char *receiveSynchronousResponse(connection *conn); /* Generic persistence functions */ void startLoadingFile(size_t size, char *filename, int rdbflags); @@ -2946,6 +2962,8 @@ void updateLoadingFileName(char *filename); void startSaving(int rdbflags); void stopSaving(int success); int allPersistenceDisabled(void); +typedef int (*ChildSnapshotFunc)(int req, rio *rdb, void *privdata); +int saveSnapshotToConnectionSockets(connection **conns, int connsnum, int use_pipe, int req, ChildSnapshotFunc snapshot_func, void *privdata); #define DISK_ERROR_TYPE_AOF 1 /* Don't accept writes: AOF errors. */ #define DISK_ERROR_TYPE_RDB 2 /* Don't accept writes: RDB errors. */ @@ -2974,6 +2992,7 @@ void aofOpenIfNeededOnServerStart(void); void aofManifestFree(aofManifest *am); int aofDelHistoryFiles(void); int aofRewriteLimited(void); +int rewriteAppendOnlyFileRio(rio *aof, slotBitmap slot_bitmap); /* Child info */ void openChildInfoPipe(void); diff --git a/src/valkeymodule.h b/src/valkeymodule.h index 1d99d2ff7a..8a2090fcca 100644 --- a/src/valkeymodule.h +++ b/src/valkeymodule.h @@ -221,11 +221,13 @@ typedef struct ValkeyModuleStreamID { #define VALKEYMODULE_CTX_FLAGS_ASYNC_LOADING (1 << 23) /* Valkey is starting. */ #define VALKEYMODULE_CTX_FLAGS_SERVER_STARTUP (1 << 24) +/* The command was sent via slot migration link. */ +#define VALKEYMODULE_CTX_FLAGS_IMPORTING_SLOT (1 << 25) /* Next context flag, must be updated when adding new flags above! This flag should not be used directly by the module. * Use ValkeyModule_GetContextFlagsAll instead. */ -#define _VALKEYMODULE_CTX_FLAGS_NEXT (1 << 25) +#define _VALKEYMODULE_CTX_FLAGS_NEXT (1 << 26) /* Keyspace changes notification classes. Every class is associated with a * character for configuration purposes. diff --git a/tests/unit/slot-migration.tcl b/tests/unit/slot-migration.tcl new file mode 100644 index 0000000000..96048dad60 --- /dev/null +++ b/tests/unit/slot-migration.tcl @@ -0,0 +1,28 @@ + + +# TEST CASES +# ---- General ---- +# - Only migrating slots are synced +# - Changes in non-migrating slots are not sent to target +# - Parsing test +# - Slot must have available primary +# +# ---- Error handling ---- +# - Target gives up if primary is unavailable +# - Source unpauses itself if replica is unavailable +# - Client is closed by target during migration +# - Client is closed by source during migration +# +# ---- Importing slot is not exposed ---- +# - KEYS command on importing node +# - RANDOMKEY on importing node +# +# ---- Replication ---- +# - Replica receives updates through target primary +# - Time out results in replica dropping slots +# - Failover during migration cleans up slots +# - Full sync with pending migration includes pending slots, is cleaned up if migration fails +# +# ---- Loading ---- +# - Partial slot migration is cleaned up after AOF load +# - Partial slot migration is cleaned up after RDB load \ No newline at end of file