Skip to content

Commit

Permalink
Fix typo and format and build
Browse files Browse the repository at this point in the history
Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed Jan 23, 2025
1 parent 9284630 commit ace9caf
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 85 deletions.
4 changes: 2 additions & 2 deletions src/bio.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ typedef union bio_job {

struct {
int type;
rdb_load_fn *load_fn; /* Function that will call the provided arguments */
void *load_args[]; /* List of arguments to be passed to the load function */
rdb_load_fn *load_fn; /* Function that will call the provided arguments */
void *load_args[]; /* List of arguments to be passed to the load function */
} load_args;
} bio_job;

Expand Down
4 changes: 2 additions & 2 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2804,7 +2804,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
if (!exist) {
i = server.cluster->pending_del_slot_count++;
server.cluster->pending_del_slots[i] = dirty_slots[j];
serverLog(LL_WARNING,"Add dirty slot %d on node %.40s (%s) in shard %.40s to pending delete queue.",
serverLog(LL_WARNING, "Add dirty slot %d on node %.40s (%s) in shard %.40s to pending delete queue.",
dirty_slots[j], myself->name, myself->human_nodename, myself->shard_id);
}
}
Expand Down Expand Up @@ -6394,7 +6394,7 @@ void removeChannelsInSlot(unsigned int slot) {
void delkeysNotOwnedByMySelf(list *slot_ranges) {
listNode *ln;
listIter li;
listRewind(slot_ranges,&li);
listRewind(slot_ranges, &li);
while ((ln = listNext(&li)) != NULL) {
slotRange *range = ln->value;
for (int i = range->start_slot; i <= range->end_slot; i++) {
Expand Down
30 changes: 15 additions & 15 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -385,23 +385,23 @@ typedef struct clusterSlotSyncLink {
serverDb *temp_db; /* Temp db stores the keys during the sync process. */
functionsLibCtx *temp_func_ctx; /* Temp function ctx stores functions during the sync process. */

client *client; /* Client to slot sync source node. */
connection *sync_conn; /* Connection to slot sync source node. */
int sync_state; /* State of the slot sync link during slot sync. */
list *slot_ranges; /* List of the slot ranges we want to sync. */
client *client; /* Client to slot sync source node. */
connection *sync_conn; /* Connection to slot sync source node. */
int sync_state; /* State of the slot sync link during slot sync. */
list *slot_ranges; /* List of the slot ranges we want to sync. */

/* The following fields are used by slot sync RDB transfer. */
int repl_transfer_fd; /* Descriptor of the tmpfile to store slot sync RDB */
char *repl_transfer_tmpfile; /* Name of the tmpfile to store slot sync RDB */
int64_t repl_transfer_size; /* Total size of the slot sync RDB file */
int64_t repl_transfer_read; /* Amount of read from the slot sync RDB file */
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time */
time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
int repl_transfer_fd; /* Descriptor of the tmpfile to store slot sync RDB */
char *repl_transfer_tmpfile; /* Name of the tmpfile to store slot sync RDB */
int64_t repl_transfer_size; /* Total sixze of the slot sync RDB file */
int64_t repl_transfer_read; /* Amount of read from the slot sync RDB file */
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time */
time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */

/* The following fields are used by slot failover. */
int slot_mf_ready; /* If is ready to do slot manual failover */
mstime_t slot_mf_end; /* Slot manual failover time limit (ms unixtime) */
long long slot_mf_lag; /* Lag bytes with the slot sync source node */
int slot_mf_ready; /* If is ready to do slot manual failover */
mstime_t slot_mf_end; /* Slot manual failover time limit (ms unixtime) */
long long slot_mf_lag; /* Lag bytes with the slot sync source node */
} clusterSlotSyncLink;

typedef struct rdbLoadJob {
Expand All @@ -426,7 +426,7 @@ struct clusterState {
clusterNode *slots[CLUSTER_SLOTS];
int16_t pending_del_slots[CLUSTER_SLOTS];
int16_t pending_del_slot_count;
list *slotsync_links; /* The linked list stores all slot sync links. */
list *slotsync_links; /* The linked list stores all slot sync links. */
/* The following fields are used to take the replica state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
Expand All @@ -447,7 +447,7 @@ struct clusterState {
int mf_can_start; /* If non-zero signal that the manual failover
can start requesting primary vote. */
/* Slot manual failover state. */
mstime_t slot_mf_end; /* Slot failover time limit (ms unixtime) */
mstime_t slot_mf_end; /* Slot failover time limit (ms unixtime) */
/* The following fields are used by primaries to take state on elections. */
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
Expand Down
60 changes: 31 additions & 29 deletions src/cluster_slotsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ int isSlotRangeListSame(list *lx, list *ly) {
while (len--) {
lnx = listNext(&lix);
lny = listNext(&liy);
slotRange *range_x = (slotRange*)lnx->value;
slotRange *range_y = (slotRange*)lny->value;
slotRange *range_x = (slotRange *)lnx->value;
slotRange *range_y = (slotRange *)lny->value;
if (range_x->start_slot != range_y->start_slot || range_x->end_slot != range_y->end_slot) {
return 0;
}
Expand Down Expand Up @@ -124,7 +124,7 @@ int isSlotInSlotRangeList(int slot, list *slot_ranges) {
int isKeyInSlotRanges(robj *key, list *slot_ranges) {
// maybe this can optimize.
/* Get the slot of this key and check if the slot in the specified range. */
int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
int slot = keyHashSlot((char *)key->ptr, sdslen(key->ptr));
return isSlotInSlotRangeList(slot, slot_ranges);
}

Expand All @@ -145,7 +145,7 @@ int isCommandInSlotRanges(int argc, robj **argv, list *slot_ranges) {
robj **new_argv = NULL;
for (int i = 0; i < argc; i++) {
if (!sdsEncodedObject(argv[i])) {
new_argv = zmalloc(sizeof(robj*) * (argc));
new_argv = zmalloc(sizeof(robj *) * (argc));
break;
}
}
Expand Down Expand Up @@ -175,15 +175,15 @@ int isCommandInSlotRanges(int argc, robj **argv, list *slot_ranges) {
int slot = -1;
for (int j = 0; j < numkeys; j++) {
robj *thiskey = argv[keyindex[j].pos];
int thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr));
int thisslot = keyHashSlot((char *)thiskey->ptr, sdslen(thiskey->ptr));

if (firstkey == NULL) {
firstkey = thiskey;
slot = thisslot;
} else {
if (slot != thisslot) {
getKeysFreeResult(&result);
serverLog(LL_WARNING, "Cross slot '%s' '%s' ", (char*)(argv[0]->ptr), (char*)(argv[j]->ptr));
serverLog(LL_WARNING, "Cross slot '%s' '%s' ", (char *)(argv[0]->ptr), (char *)(argv[j]->ptr));
return 0;
}
}
Expand Down Expand Up @@ -483,7 +483,7 @@ void setSlotSyncImporting(list *slot_ranges, clusterNode *node) {
listRewind(slot_ranges, &li);
while ((ln = listNext(&li)) != NULL) {
slotRange *range = ln->value;
for (int i = range->start_slot; i <= range->end_slot; i++){
for (int i = range->start_slot; i <= range->end_slot; i++) {
server.cluster->importing_slots_from[i] = node;
}
}
Expand All @@ -508,21 +508,21 @@ sds formatSlotSyncImportingSlots(void) {
if (server.cluster->importing_slots_from[j]) {
importing_node = server.cluster->importing_slots_from[j]->name;
if (strncmp(importing_node, link_node, CLUSTER_NAMELEN) == 0) {
bit =1;
bit = 1;
}
}

if (bit && start == -1) {
start = j;
}

if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
if (bit && j == CLUSTER_SLOTS-1) j++;
if (start != -1 && (!bit || j == CLUSTER_SLOTS - 1)) {
if (bit && j == CLUSTER_SLOTS - 1) j++;

if (start == j-1) {
ci = sdscatprintf(ci," [%d-<-%.40s]", start, server.cluster->importing_slots_from[start]->name);
ci = sdscatprintf(ci, " [%d-<-%.40s]", start, server.cluster->importing_slots_from[start]->name);
} else {
ci = sdscatprintf(ci," [%d-%d<-%.40s]", start, j-1, server.cluster->importing_slots_from[start]->name);
ci = sdscatprintf(ci, " [%d-%d<-%.40s]", start, j-1, server.cluster->importing_slots_from[start]->name);
}
start = -1;
}
Expand Down Expand Up @@ -557,15 +557,15 @@ void slotLinkSendMessage(client *c, const char *option, long long value) {
* Replica (slotsync client) send this message to inform the primary (slotsync server)
* the amount of replication stream that it has processed so far in incremental
* propagation stage. */
void slotLinkSendOnline(client* c) {
void slotLinkSendOnline(client *c) {
slotLinkSendMessage(c, "SLOTONLINE", c->slotsync_recv_bytes);
}

/* Replica --> Primary: REPLCONF SLOTFAILOVER 0
*
* Replica (slotsync client) send this message to inform the primary (slotsync server)
* to pause clients for slot failover. */
void slotLinkSendFailover(client* c) {
void slotLinkSendFailover(client *c) {
slotLinkSendMessage(c, "SLOTFAILOVER", 0);
}

Expand All @@ -574,25 +574,25 @@ void slotLinkSendFailover(client* c) {
* Replica (slotsync client) send this message to inform the primary (slotsync server)
* the amount of replication stream that it has processed so far in slot failover
* stage. */
void slotLinkSendAck(client* c) {
void slotLinkSendAck(client *c) {
slotLinkSendMessage(c, "SLOTACK", c->slotsync_recv_bytes);
}

void replyToSlotSyncReplica(client* c, sds reply) {
void replyToSlotSyncReplica(client *c, sds reply) {
if (!c) return;

c->slotsync_sent_bytes += sdslen(reply);
addReplySds(c,reply); /* The sds 'reply' will be freed in addReplySds(). */
addReplySds(c, reply); /* The sds 'reply' will be freed in addReplySds(). */
}

/* Primary --> Replica: REPLCONF SLOTDIFF <diff_bytes>
*
* Primary (slotsync server) send this message to inform the replica (slotsync client)
* the replication stream lag. */
void replySlotOffsetToReplica(client* c, long long offset) {
void replySlotOffsetToReplica(client *c, long long offset) {
sds soffset = sdscatprintf(sdsempty(), "%llu", offset);
sds reply = sdscatprintf(sdsempty(), "*3\r\n$8\r\nREPLCONF\r\n$8\r\nSLOTDIFF\r\n$%lu\r\n%s\r\n",
sdslen(soffset), soffset);
(long unsigned int)sdslen(soffset), soffset);
sdsfree(soffset);
replyToSlotSyncReplica(c, reply);
}
Expand All @@ -602,7 +602,7 @@ void replySlotOffsetToReplica(client* c, long long offset) {
* Primary (slotsync server) send this message to inform the replica (slotsync client)
* the replication stream lag became zero and is ready for the replica to takeover
* the slots now. */
void replySlotReadyToReplica(client* c) {
void replySlotReadyToReplica(client *c) {
sds reply = sdscatprintf(sdsempty(), "*3\r\n$8\r\nREPLCONF\r\n$9\r\nSLOTREADY\r\n$1\r\n0\r\n");
replyToSlotSyncReplica(c, reply);
}
Expand All @@ -611,7 +611,7 @@ void replySlotReadyToReplica(client* c) {
*
* Primary (slotsync server) send this message to inform the replica (slotsync client)
* to close the slotsync link that bound with this client. */
void replyCloseSlotLinkToReplica(client* c) {
void replyCloseSlotLinkToReplica(client *c) {
sds reply = sdscatprintf(sdsempty(), "*2\r\n$7\r\nCLUSTER\r\n$21\r\nINTERNALCLOSESLOTLINK\r\n");
replyToSlotSyncReplica(c, reply);
}
Expand Down Expand Up @@ -669,7 +669,7 @@ void clusterSlotFailoverReplace(void) {
slotRange *range = ln2->value;
for (int i = range->start_slot; i <= range->end_slot; i++) {
clusterDelSlot(i);
clusterAddSlot(server.cluster->myself,i);
clusterAddSlot(server.cluster->myself, i);
}
}
}
Expand Down Expand Up @@ -722,7 +722,7 @@ int getSlotFailoverReplicaIngressCount(void) {
int ret = 0;
listNode *ln;
listIter li;
listRewind(server.replicas,&li);
listRewind(server.replicas, &li);
while ((ln = listNext(&li))) {
client *replica = listNodeValue(ln);
if (replica->slotsync_mf_end) {
Expand Down Expand Up @@ -766,8 +766,10 @@ void slotSyncMergeTempResources(clusterSlotSyncLink *link) {
if (functionsLibCtxFunctionsLen(link->temp_func_ctx)) {
sds err = NULL;
if (libraryJoin(functionsLibCtxGetCurrent(), link->temp_func_ctx, 1, &err) != C_OK) {
serverLog(LL_WARNING, "Discarding the merge of functions, an error occurred while merging functions "
"from the slot RDB, error: %s", err);
serverLog(LL_WARNING,
"Discarding the merge of functions, an error occurred while merging functions "
"from the slot RDB, error: %s",
err);
}
}
}
Expand All @@ -794,7 +796,7 @@ void clusterSlotSyncCron(void) {
if (!n) {
/* If cross node, this slot sync will never success. */
if (cross_node) {
serverLog(LL_WARNING,"Slot sync removed: slots cross node.");
serverLog(LL_WARNING, "Slot sync removed: slots cross node.");
clearSlotSyncImporting(link->slot_ranges);
listDelNode(server.cluster->slotsync_links, ln);
}
Expand All @@ -803,7 +805,7 @@ void clusterSlotSyncCron(void) {

/* The target node should not be myself. */
if (n == server.cluster->myself) {
serverLog(LL_WARNING,"Slot sync removed: slot owned by myself.");
serverLog(LL_WARNING, "Slot sync removed: slot owned by myself.");
clearSlotSyncImporting(link->slot_ranges);
listDelNode(server.cluster->slotsync_links, ln);
return;
Expand Down Expand Up @@ -927,7 +929,7 @@ void clusterSlotSyncCron(void) {
} else {
static long long count = 0;
if (count++ % 10 == 0) {
serverLog(LL_NOTICE,"Slot failover status: wait_links=%d, ready_links=%d",
serverLog(LL_NOTICE, "Slot failover status: wait_links=%d, ready_links=%d",
mf_link_cnt, ready_link_cnt);
}
}
Expand All @@ -953,7 +955,7 @@ clusterNode *getClusterNodeBySlotList(list *slot_ranges, int *cross_node) {
clusterNode *n = NULL;
listNode *ln;
listIter li;
listRewind(slot_ranges,&li);
listRewind(slot_ranges, &li);
while ((ln = listNext(&li)) != NULL) {
slotRange *range = ln->value;
for (int i = range->start_slot; i <= range->end_slot; i++) {
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ void kvstoreMoveHashtable(kvstore *src, kvstore *dst, int didx) {
src->allocated_hashtables--;
cumulativeKeyCountAdd(src, didx, -(long)hashtableSize(ht));

kvstoreHashtableMetadata *metadata = (kvstoreHashtableMetadata *) hashtableMetadata(ht);
kvstoreHashtableMetadata *metadata = (kvstoreHashtableMetadata *)hashtableMetadata(ht);
if (metadata->rehashing_node) {
listDelNode(metadata->kvs->rehashing, metadata->rehashing_node);

Expand Down
8 changes: 4 additions & 4 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3047,8 +3047,7 @@ void slotRdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
if (server.rdb_checksum) rioGenericUpdateChecksum(r, buf, len);
if (server.loading_process_events_interval_bytes &&
(r->processed_bytes + len) / server.loading_process_events_interval_bytes >
r->processed_bytes / server.loading_process_events_interval_bytes) {

r->processed_bytes / server.loading_process_events_interval_bytes) {
// todo
loadingAbsProgress(r->processed_bytes);
processModuleLoadingProgressEvent(0);
Expand Down Expand Up @@ -3579,7 +3578,8 @@ int rdbLoadWithLoadingCtx(char *filename, rdbSaveInfo *rsi, int rdbflags, rdbLoa
return (retval == C_OK) ? RDB_OK : RDB_FAILED;
}

#define SLOTSYNC_DEFAULT_LAG 20000000000 /* Just a value large enough */
// todo remove it
#define SLOTSYNC_DEFAULT_LAG 20000000000 /* Just a value large enough */
void bioRdbLoad(void *args[]) {
rdbLoadJob *job = args[0];
int rdbflags = job->rdbflags;
Expand Down Expand Up @@ -3616,7 +3616,7 @@ void bioRdbLoad(void *args[]) {
}
rioFreeConn(&rdb, NULL);
} else {
sprintf(rdbpath, "%s_slot", server.rdb_filename);
snprintf(rdbpath, sizeof(rdbpath), "%s_slot", server.rdb_filename);
int load_result = rdbLoadWithLoadingCtx((char *)rdbpath, &rsi, rdbflags, &loadingCtx);

bg_unlink(rdbpath);
Expand Down
5 changes: 2 additions & 3 deletions src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,8 @@
#define RDBFLAGS_ALLOW_DUP (1 << 2) /* Allow duplicated keys when loading.*/
#define RDBFLAGS_FEED_REPL (1 << 3) /* Feed replication stream when loading.*/
#define RDBFLAGS_KEEP_CACHE (1 << 4) /* Don't reclaim cache after rdb file is generated */
#define RDBFLAGS_SLOT_SYNC (1 << 5) /* Load/save for SLOT SYNC, this is a hint that
* we are targeting slot rdb. In this case, we
* can load multiple slot RDBs. */
#define RDBFLAGS_SLOT_SYNC (1 << 5) /* Load/save for SLOT SYNC, this is a hint that we are targeting slot rdb. \
* In this case, we can load multiple slot RDBs. */

/* When rdbLoadObject() returns NULL, the err flag is
* set to hold the type of error that occurred */
Expand Down
Loading

0 comments on commit ace9caf

Please sign in to comment.