Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce atomic slot migration #1591

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "rio.h"
#include "functions.h"
#include "module.h"
#include "cluster.h"

#include <signal.h>
#include <fcntl.h>
Expand Down Expand Up @@ -2190,14 +2191,20 @@ static int rewriteFunctions(rio *aof) {
return 0;
}

int rewriteAppendOnlyFileRio(rio *aof) {
int slotFilterPredicate(int slot, void *privdata) {
if (privdata == NULL) return 1;
unsigned char *slot_bitmap = (unsigned char *)privdata;
return bitmapTestBit(slot_bitmap, slot);
}

int rewriteAppendOnlyFileRio(rio *aof, slotBitmap slot_bitmap) {
int j;
long key_count = 0;
long long updated_time = 0;
kvstoreIterator *kvs_it = NULL;

/* Record timestamp at the beginning of rewriting AOF. */
if (server.aof_timestamp_enabled) {
if (server.aof_timestamp_enabled && (slot_bitmap == NULL || isSlotBitmapEmpty(slot_bitmap))) {
sds ts = genAofTimestampAnnotationIfNeeded(1);
if (rioWrite(aof, ts, sdslen(ts)) == 0) {
sdsfree(ts);
Expand All @@ -2217,7 +2224,11 @@ int rewriteAppendOnlyFileRio(rio *aof) {
if (rioWrite(aof, selectcmd, sizeof(selectcmd) - 1) == 0) goto werr;
if (rioWriteBulkLongLong(aof, j) == 0) goto werr;

kvs_it = kvstoreIteratorInit(db->keys);
if (slot_bitmap == NULL || isSlotBitmapEmpty(slot_bitmap)) {
kvs_it = kvstoreIteratorInit(db->keys);
} else {
kvs_it = kvstoreFilteredIteratorInit(db->keys, &slotFilterPredicate, slot_bitmap);
}
/* Iterate this DB writing every entry */
void *next;
while (kvstoreIteratorNext(kvs_it, &next)) {
Expand Down Expand Up @@ -2330,7 +2341,7 @@ int rewriteAppendOnlyFile(char *filename) {
goto werr;
}
} else {
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
if (rewriteAppendOnlyFileRio(&aof, NULL) == C_ERR) goto werr;
}

/* Make sure data will not remain on the OS's output buffers */
Expand Down
4 changes: 2 additions & 2 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ void freeClientBlockingState(client *c) {
* flag is set client query buffer is not longer processed, but accumulated,
* and will be processed when the client is unblocked. */
void blockClient(client *c, int btype) {
/* Primary client should never be blocked unless pause or module */
serverAssert(!(c->flag.primary && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));
/* Replication clients should never be blocked unless pause or module */
serverAssert(!(c->flag.replicated && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));

initClientBlockingState(c);

Expand Down
14 changes: 7 additions & 7 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int

/* We handle all the cases as if they were EXEC commands, so we have
* a common code path for everything */
if (cmd->proc == execCommand) {
if (c && cmd->proc == execCommand) {
/* If CLIENT_MULTI flag is not set EXEC is just going to return an
* error. */
if (!c->flag.multi) return myself;
Expand All @@ -1019,11 +1019,11 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
mc.cmd = cmd;
}

uint64_t cmd_flags = getCommandFlags(c);
uint64_t cmd_flags = c ? getCommandFlags(c) : cmd->flags;

/* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
int pubsubshard_included =
(cmd_flags & CMD_PUBSUB) || (c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_PUBSUB));
(cmd_flags & CMD_PUBSUB) || (c && c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_PUBSUB));

/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
Expand Down Expand Up @@ -1068,7 +1068,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* can safely serve the request, otherwise we return a TRYAGAIN
* error). To do so we set the importing/migrating state and
* increment a counter for every missing key. */
if (clusterNodeIsPrimary(myself) || c->flag.readonly) {
if (clusterNodeIsPrimary(myself) || (c && c->flag.readonly)) {
if (n == clusterNodeGetPrimary(myself) && getMigratingSlotDest(slot) != NULL) {
migrating_slot = 1;
} else if (getImportingSlotSource(slot) != NULL) {
Expand Down Expand Up @@ -1163,7 +1163,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* request as "ASKING", we can serve the request. However if the request
* involves multiple keys and we don't have them all, the only option is
* to send a TRYAGAIN error. */
if (importing_slot && (c->flag.asking || cmd_flags & CMD_ASKING)) {
if (importing_slot && (c && (c->flag.asking || cmd_flags & CMD_ASKING))) {
if (multiple_keys && missing_keys) {
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
return NULL;
Expand All @@ -1176,8 +1176,8 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* node is a replica and the request is about a hash slot our primary
* is serving, we can reply without redirection. */
int is_write_command =
(cmd_flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_WRITE));
if ((c->flag.readonly || pubsubshard_included) && !is_write_command && clusterNodeIsReplica(myself) &&
(cmd_flags & CMD_WRITE) || (c && c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_WRITE));
if (((c && c->flag.readonly) || pubsubshard_included) && !is_write_command && clusterNodeIsReplica(myself) &&
clusterNodeGetPrimary(myself) == n) {
return myself;
}
Expand Down
12 changes: 10 additions & 2 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
* Cluster exported API.
*----------------------------------------------------------------------------*/

#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
#define CLUSTER_SLOTS (1 << CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
#define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */
#define CLUSTER_OK 0 /* Everything looks ok */
#define CLUSTER_FAIL 1 /* The cluster can't work */
Expand Down Expand Up @@ -116,7 +114,17 @@ client *createCachedResponseClient(int resp);
void deleteCachedResponseClient(client *recording_client);
void clearCachedClusterSlotsResponse(void);
unsigned int countKeysInSlot(unsigned int hashslot);
void bitmapToSlotRanges(unsigned char *bitmap, slotBitmap slot_bitmap_out);
int bitmapTestBit(unsigned char *bitmap, int pos);
void bitmapSetBit(unsigned char *bitmap, int pos);
void bitmapClearBit(unsigned char *bitmap, int pos);
void bitmapSetAllBits(unsigned char *bitmap, int len);
int slotBitmapCompare(slotBitmap bitmap, slotBitmap other);
int isSlotBitmapEmpty(slotBitmap bitmap);
int getSlotOrReply(client *c, robj *o);
void clusterSlotMigrationHandleClientClose(client *c);
void clusterFeedSlotMigration(int dbid, robj **argv, int argc);
int clusterShouldWriteToSlotMigrationTarget(void);

/* functions with shared implementations */
int clusterNodeIsMyself(clusterNode *n);
Expand Down
Loading
Loading