diff --git a/src/bio.c b/src/bio.c index 2f0e5fc425..f5c254a600 100644 --- a/src/bio.c +++ b/src/bio.c @@ -113,8 +113,8 @@ typedef union bio_job { struct { int type; - rdb_load_fn *load_fn; /* Function that will call the provided arguments */ - void *load_args[]; /* List of arguments to be passed to the load function */ + rdb_load_fn *load_fn; /* Function that will call the provided arguments */ + void *load_args[]; /* List of arguments to be passed to the load function */ } load_args; } bio_job; diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 33813cd8d1..d5198f60f0 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -2804,7 +2804,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc if (!exist) { i = server.cluster->pending_del_slot_count++; server.cluster->pending_del_slots[i] = dirty_slots[j]; - serverLog(LL_WARNING,"Add dirty slot %d on node %.40s (%s) in shard %.40s to pending delete queue.", + serverLog(LL_WARNING, "Add dirty slot %d on node %.40s (%s) in shard %.40s to pending delete queue.", dirty_slots[j], myself->name, myself->human_nodename, myself->shard_id); /* Todo, this is for test, some tests is match the "Deleting keys in dirty slot" text. */ serverLog(LL_NOTICE, "Deleting keys in dirty slot %d on node %.40s (%s) in shard %.40s", dirty_slots[j], @@ -6397,7 +6397,7 @@ void removeChannelsInSlot(unsigned int slot) { void delkeysNotOwnedByMySelf(list *slot_ranges) { listNode *ln; listIter li; - listRewind(slot_ranges,&li); + listRewind(slot_ranges, &li); while ((ln = listNext(&li)) != NULL) { slotRange *range = ln->value; for (int i = range->start_slot; i <= range->end_slot; i++) { diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 6044b0edf7..3f31451b40 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -385,23 +385,23 @@ typedef struct clusterSlotSyncLink { serverDb *temp_db; /* Temp db stores the keys during the sync process. */ functionsLibCtx *temp_func_ctx; /* Temp function ctx stores functions during the sync process. */ - client *client; /* Client to slot sync source node. */ - connection *sync_conn; /* Connection to slot sync source node. */ - int sync_state; /* State of the slot sync link during slot sync. */ - list *slot_ranges; /* List of the slot ranges we want to sync. */ + client *client; /* Client to slot sync source node. */ + connection *sync_conn; /* Connection to slot sync source node. */ + int sync_state; /* State of the slot sync link during slot sync. */ + list *slot_ranges; /* List of the slot ranges we want to sync. */ /* The following fields are used by slot sync RDB transfer. */ - int repl_transfer_fd; /* Descriptor of the tmpfile to store slot sync RDB */ - char *repl_transfer_tmpfile; /* Name of the tmpfile to store slot sync RDB */ - int64_t repl_transfer_size; /* Total size of the slot sync RDB file */ - int64_t repl_transfer_read; /* Amount of read from the slot sync RDB file */ - off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time */ - time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ + int repl_transfer_fd; /* Descriptor of the tmpfile to store slot sync RDB */ + char *repl_transfer_tmpfile; /* Name of the tmpfile to store slot sync RDB */ + int64_t repl_transfer_size; /* Total sixze of the slot sync RDB file */ + int64_t repl_transfer_read; /* Amount of read from the slot sync RDB file */ + off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time */ + time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ /* The following fields are used by slot failover. */ - int slot_mf_ready; /* If is ready to do slot manual failover */ - mstime_t slot_mf_end; /* Slot manual failover time limit (ms unixtime) */ - long long slot_mf_lag; /* Lag bytes with the slot sync source node */ + int slot_mf_ready; /* If is ready to do slot manual failover */ + mstime_t slot_mf_end; /* Slot manual failover time limit (ms unixtime) */ + long long slot_mf_lag; /* Lag bytes with the slot sync source node */ } clusterSlotSyncLink; typedef struct rdbLoadJob { @@ -426,7 +426,7 @@ struct clusterState { clusterNode *slots[CLUSTER_SLOTS]; int16_t pending_del_slots[CLUSTER_SLOTS]; int16_t pending_del_slot_count; - list *slotsync_links; /* The linked list stores all slot sync links. */ + list *slotsync_links; /* The linked list stores all slot sync links. */ /* The following fields are used to take the replica state on elections. */ mstime_t failover_auth_time; /* Time of previous or next election. */ int failover_auth_count; /* Number of votes received so far. */ @@ -447,7 +447,7 @@ struct clusterState { int mf_can_start; /* If non-zero signal that the manual failover can start requesting primary vote. */ /* Slot manual failover state. */ - mstime_t slot_mf_end; /* Slot failover time limit (ms unixtime) */ + mstime_t slot_mf_end; /* Slot failover time limit (ms unixtime) */ /* The following fields are used by primaries to take state on elections. */ uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */ int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */ diff --git a/src/cluster_slotsync.c b/src/cluster_slotsync.c index 6af0e895ce..6c9fa09bf2 100644 --- a/src/cluster_slotsync.c +++ b/src/cluster_slotsync.c @@ -88,8 +88,8 @@ int isSlotRangeListSame(list *lx, list *ly) { while (len--) { lnx = listNext(&lix); lny = listNext(&liy); - slotRange *range_x = (slotRange*)lnx->value; - slotRange *range_y = (slotRange*)lny->value; + slotRange *range_x = (slotRange *)lnx->value; + slotRange *range_y = (slotRange *)lny->value; if (range_x->start_slot != range_y->start_slot || range_x->end_slot != range_y->end_slot) { return 0; } @@ -124,7 +124,7 @@ int isSlotInSlotRangeList(int slot, list *slot_ranges) { int isKeyInSlotRanges(robj *key, list *slot_ranges) { // maybe this can optimize. /* Get the slot of this key and check if the slot in the specified range. */ - int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr)); + int slot = keyHashSlot((char *)key->ptr, sdslen(key->ptr)); return isSlotInSlotRangeList(slot, slot_ranges); } @@ -145,7 +145,7 @@ int isCommandInSlotRanges(int argc, robj **argv, list *slot_ranges) { robj **new_argv = NULL; for (int i = 0; i < argc; i++) { if (!sdsEncodedObject(argv[i])) { - new_argv = zmalloc(sizeof(robj*) * (argc)); + new_argv = zmalloc(sizeof(robj *) * (argc)); break; } } @@ -175,7 +175,7 @@ int isCommandInSlotRanges(int argc, robj **argv, list *slot_ranges) { int slot = -1; for (int j = 0; j < numkeys; j++) { robj *thiskey = argv[keyindex[j].pos]; - int thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr)); + int thisslot = keyHashSlot((char *)thiskey->ptr, sdslen(thiskey->ptr)); if (firstkey == NULL) { firstkey = thiskey; @@ -183,7 +183,7 @@ int isCommandInSlotRanges(int argc, robj **argv, list *slot_ranges) { } else { if (slot != thisslot) { getKeysFreeResult(&result); - serverLog(LL_WARNING, "Cross slot '%s' '%s' ", (char*)(argv[0]->ptr), (char*)(argv[j]->ptr)); + serverLog(LL_WARNING, "Cross slot '%s' '%s' ", (char *)(argv[0]->ptr), (char *)(argv[j]->ptr)); return 0; } } @@ -483,7 +483,7 @@ void setSlotSyncImporting(list *slot_ranges, clusterNode *node) { listRewind(slot_ranges, &li); while ((ln = listNext(&li)) != NULL) { slotRange *range = ln->value; - for (int i = range->start_slot; i <= range->end_slot; i++){ + for (int i = range->start_slot; i <= range->end_slot; i++) { server.cluster->importing_slots_from[i] = node; } } @@ -508,7 +508,7 @@ sds formatSlotSyncImportingSlots(void) { if (server.cluster->importing_slots_from[j]) { importing_node = server.cluster->importing_slots_from[j]->name; if (strncmp(importing_node, link_node, CLUSTER_NAMELEN) == 0) { - bit =1; + bit = 1; } } @@ -516,13 +516,13 @@ sds formatSlotSyncImportingSlots(void) { start = j; } - if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) { - if (bit && j == CLUSTER_SLOTS-1) j++; + if (start != -1 && (!bit || j == CLUSTER_SLOTS - 1)) { + if (bit && j == CLUSTER_SLOTS - 1) j++; - if (start == j-1) { - ci = sdscatprintf(ci," [%d-<-%.40s]", start, server.cluster->importing_slots_from[start]->name); + if (start == j - 1) { + ci = sdscatprintf(ci, " [%d-<-%.40s]", start, server.cluster->importing_slots_from[start]->name); } else { - ci = sdscatprintf(ci," [%d-%d<-%.40s]", start, j-1, server.cluster->importing_slots_from[start]->name); + ci = sdscatprintf(ci, " [%d-%d<-%.40s]", start, j - 1, server.cluster->importing_slots_from[start]->name); } start = -1; } @@ -557,7 +557,7 @@ void slotLinkSendMessage(client *c, const char *option, long long value) { * Replica (slotsync client) send this message to inform the primary (slotsync server) * the amount of replication stream that it has processed so far in incremental * propagation stage. */ -void slotLinkSendOnline(client* c) { +void slotLinkSendOnline(client *c) { slotLinkSendMessage(c, "SLOTONLINE", c->slotsync_recv_bytes); } @@ -565,7 +565,7 @@ void slotLinkSendOnline(client* c) { * * Replica (slotsync client) send this message to inform the primary (slotsync server) * to pause clients for slot failover. */ -void slotLinkSendFailover(client* c) { +void slotLinkSendFailover(client *c) { slotLinkSendMessage(c, "SLOTFAILOVER", 0); } @@ -574,25 +574,25 @@ void slotLinkSendFailover(client* c) { * Replica (slotsync client) send this message to inform the primary (slotsync server) * the amount of replication stream that it has processed so far in slot failover * stage. */ -void slotLinkSendAck(client* c) { +void slotLinkSendAck(client *c) { slotLinkSendMessage(c, "SLOTACK", c->slotsync_recv_bytes); } -void replyToSlotSyncReplica(client* c, sds reply) { +void replyToSlotSyncReplica(client *c, sds reply) { if (!c) return; c->slotsync_sent_bytes += sdslen(reply); - addReplySds(c,reply); /* The sds 'reply' will be freed in addReplySds(). */ + addReplySds(c, reply); /* The sds 'reply' will be freed in addReplySds(). */ } /* Primary --> Replica: REPLCONF SLOTDIFF * * Primary (slotsync server) send this message to inform the replica (slotsync client) * the replication stream lag. */ -void replySlotOffsetToReplica(client* c, long long offset) { +void replySlotOffsetToReplica(client *c, long long offset) { sds soffset = sdscatprintf(sdsempty(), "%llu", offset); sds reply = sdscatprintf(sdsempty(), "*3\r\n$8\r\nREPLCONF\r\n$8\r\nSLOTDIFF\r\n$%lu\r\n%s\r\n", - sdslen(soffset), soffset); + (long unsigned int)sdslen(soffset), soffset); sdsfree(soffset); replyToSlotSyncReplica(c, reply); } @@ -602,7 +602,7 @@ void replySlotOffsetToReplica(client* c, long long offset) { * Primary (slotsync server) send this message to inform the replica (slotsync client) * the replication stream lag became zero and is ready for the replica to takeover * the slots now. */ -void replySlotReadyToReplica(client* c) { +void replySlotReadyToReplica(client *c) { sds reply = sdscatprintf(sdsempty(), "*3\r\n$8\r\nREPLCONF\r\n$9\r\nSLOTREADY\r\n$1\r\n0\r\n"); replyToSlotSyncReplica(c, reply); } @@ -611,7 +611,7 @@ void replySlotReadyToReplica(client* c) { * * Primary (slotsync server) send this message to inform the replica (slotsync client) * to close the slotsync link that bound with this client. */ -void replyCloseSlotLinkToReplica(client* c) { +void replyCloseSlotLinkToReplica(client *c) { sds reply = sdscatprintf(sdsempty(), "*2\r\n$7\r\nCLUSTER\r\n$21\r\nINTERNALCLOSESLOTLINK\r\n"); replyToSlotSyncReplica(c, reply); } @@ -669,7 +669,7 @@ void clusterSlotFailoverReplace(void) { slotRange *range = ln2->value; for (int i = range->start_slot; i <= range->end_slot; i++) { clusterDelSlot(i); - clusterAddSlot(server.cluster->myself,i); + clusterAddSlot(server.cluster->myself, i); } } } @@ -722,7 +722,7 @@ int getSlotFailoverReplicaIngressCount(void) { int ret = 0; listNode *ln; listIter li; - listRewind(server.replicas,&li); + listRewind(server.replicas, &li); while ((ln = listNext(&li))) { client *replica = listNodeValue(ln); if (replica->slotsync_mf_end) { @@ -766,8 +766,10 @@ void slotSyncMergeTempResources(clusterSlotSyncLink *link) { if (functionsLibCtxFunctionsLen(link->temp_func_ctx)) { sds err = NULL; if (libraryJoin(functionsLibCtxGetCurrent(), link->temp_func_ctx, 1, &err) != C_OK) { - serverLog(LL_WARNING, "Discarding the merge of functions, an error occurred while merging functions " - "from the slot RDB, error: %s", err); + serverLog(LL_WARNING, + "Discarding the merge of functions, an error occurred while merging functions " + "from the slot RDB, error: %s", + err); } } } @@ -794,7 +796,7 @@ void clusterSlotSyncCron(void) { if (!n) { /* If cross node, this slot sync will never success. */ if (cross_node) { - serverLog(LL_WARNING,"Slot sync removed: slots cross node."); + serverLog(LL_WARNING, "Slot sync removed: slots cross node."); clearSlotSyncImporting(link->slot_ranges); listDelNode(server.cluster->slotsync_links, ln); } @@ -803,7 +805,7 @@ void clusterSlotSyncCron(void) { /* The target node should not be myself. */ if (n == server.cluster->myself) { - serverLog(LL_WARNING,"Slot sync removed: slot owned by myself."); + serverLog(LL_WARNING, "Slot sync removed: slot owned by myself."); clearSlotSyncImporting(link->slot_ranges); listDelNode(server.cluster->slotsync_links, ln); return; @@ -927,7 +929,7 @@ void clusterSlotSyncCron(void) { } else { static long long count = 0; if (count++ % 10 == 0) { - serverLog(LL_NOTICE,"Slot failover status: wait_links=%d, ready_links=%d", + serverLog(LL_NOTICE, "Slot failover status: wait_links=%d, ready_links=%d", mf_link_cnt, ready_link_cnt); } } @@ -953,7 +955,7 @@ clusterNode *getClusterNodeBySlotList(list *slot_ranges, int *cross_node) { clusterNode *n = NULL; listNode *ln; listIter li; - listRewind(slot_ranges,&li); + listRewind(slot_ranges, &li); while ((ln = listNext(&li)) != NULL) { slotRange *range = ln->value; for (int i = range->start_slot; i <= range->end_slot; i++) { diff --git a/src/kvstore.c b/src/kvstore.c index d96d0562a5..1c79e89fc9 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -116,7 +116,7 @@ void kvstoreMoveHashtable(kvstore *src, kvstore *dst, int didx) { src->allocated_hashtables--; cumulativeKeyCountAdd(src, didx, -(long)hashtableSize(ht)); - kvstoreHashtableMetadata *metadata = (kvstoreHashtableMetadata *) hashtableMetadata(ht); + kvstoreHashtableMetadata *metadata = (kvstoreHashtableMetadata *)hashtableMetadata(ht); if (metadata->rehashing_node) { listDelNode(metadata->kvs->rehashing, metadata->rehashing_node); diff --git a/src/rdb.c b/src/rdb.c index 25caf73e0e..f58fcd8e77 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3047,8 +3047,7 @@ void slotRdbLoadProgressCallback(rio *r, const void *buf, size_t len) { if (server.rdb_checksum) rioGenericUpdateChecksum(r, buf, len); if (server.loading_process_events_interval_bytes && (r->processed_bytes + len) / server.loading_process_events_interval_bytes > - r->processed_bytes / server.loading_process_events_interval_bytes) { - + r->processed_bytes / server.loading_process_events_interval_bytes) { // todo loadingAbsProgress(r->processed_bytes); processModuleLoadingProgressEvent(0); @@ -3579,7 +3578,8 @@ int rdbLoadWithLoadingCtx(char *filename, rdbSaveInfo *rsi, int rdbflags, rdbLoa return (retval == C_OK) ? RDB_OK : RDB_FAILED; } -#define SLOTSYNC_DEFAULT_LAG 20000000000 /* Just a value large enough */ +// todo remove it +#define SLOTSYNC_DEFAULT_LAG 20000000000 /* Just a value large enough */ void bioRdbLoad(void *args[]) { rdbLoadJob *job = args[0]; int rdbflags = job->rdbflags; @@ -3616,7 +3616,7 @@ void bioRdbLoad(void *args[]) { } rioFreeConn(&rdb, NULL); } else { - sprintf(rdbpath, "%s_slot", server.rdb_filename); + snprintf(rdbpath, sizeof(rdbpath), "%s_slot", server.rdb_filename); int load_result = rdbLoadWithLoadingCtx((char *)rdbpath, &rsi, rdbflags, &loadingCtx); bg_unlink(rdbpath); diff --git a/src/rdb.h b/src/rdb.h index 30d77663c5..aa3a2be26a 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -133,9 +133,8 @@ #define RDBFLAGS_ALLOW_DUP (1 << 2) /* Allow duplicated keys when loading.*/ #define RDBFLAGS_FEED_REPL (1 << 3) /* Feed replication stream when loading.*/ #define RDBFLAGS_KEEP_CACHE (1 << 4) /* Don't reclaim cache after rdb file is generated */ -#define RDBFLAGS_SLOT_SYNC (1 << 5) /* Load/save for SLOT SYNC, this is a hint that - * we are targeting slot rdb. In this case, we - * can load multiple slot RDBs. */ +#define RDBFLAGS_SLOT_SYNC (1 << 5) /* Load/save for SLOT SYNC, this is a hint that we are targeting slot rdb. \ + * In this case, we can load multiple slot RDBs. */ /* When rdbLoadObject() returns NULL, the err flag is * set to hold the type of error that occurred */ diff --git a/src/replication.c b/src/replication.c index aa2f5e89f2..4c54b665c1 100644 --- a/src/replication.c +++ b/src/replication.c @@ -593,7 +593,7 @@ void replicationFeedReplicas(int dictid, robj **argv, int argc) { /* Also add it to the output buffer of the slaves which are in the slots * sync mode if any, we should do this as these special replicas will not * use the global replication buffer. */ - listRewind(server.replicas,&li); + listRewind(server.replicas, &li); while ((ln = listNext(&li))) { client *replica = ln->value; @@ -1346,7 +1346,6 @@ void syncCommand(client *c) { * a replica without replication buffer. What's more, we * can't use a replica in slot sync mode either. */ if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_END && - listLength(replica->slotsync_slots) == 0 && (!(replica->flag.repl_rdbonly) || (c->flag.repl_rdbonly)) && !replica->flag.slot_sync_replica) break; @@ -1377,8 +1376,8 @@ void syncCommand(client *c) { /* CASE 3: There is no BGSAVE is in progress. */ } else { - if (server.repl_diskless_sync && (c->repl_data->replica_capa & REPLICA_CAPA_EOF) - && server.repl_diskless_sync_delay && !c->flag.slot_sync_replica) { + if (server.repl_diskless_sync && (c->repl_data->replica_capa & REPLICA_CAPA_EOF) && + server.repl_diskless_sync_delay && !c->flag.slot_sync_replica) { /* Diskless replication RDB child is created inside * replicationCron() since we want to delay its start a * few seconds to wait for more replicas to arrive. */ @@ -1420,8 +1419,6 @@ void initClientReplicationData(client *c) { } void freeClientReplicationData(client *c) { - - if (!c->repl_data) return; freeReplicaReferencedReplBuffer(c); /* Primary/replica cleanup Case 1: @@ -1656,7 +1653,7 @@ void replconfCommand(client *c) { return; } c->repl_data->associated_rdb_client_id = (uint64_t)client_id; - } else if (!strcasecmp(c->argv[j]->ptr,"slotonline")) { + } else if (!strcasecmp(c->argv[j]->ptr, "slotonline")) { /* REPLCONF SLOTONLINE is used by slave(slotsync client) to inform * the master(slotsync server) the amount of replication stream * that it processed so far in incremental propagation stage. */ @@ -1677,14 +1674,14 @@ void replconfCommand(client *c) { sdsfree(slots); } else if (c->repl_data->repl_state == REPLICA_STATE_ONLINE) { long long recv_bytes = 0; - if ((getLongLongFromObject(c->argv[j+1], &recv_bytes) != C_OK)) { + if ((getLongLongFromObject(c->argv[j + 1], &recv_bytes) != C_OK)) { return; } /* Notify offset to the slave. */ replySlotOffsetToReplica(c, c->slotsync_sent_bytes - recv_bytes); } return; - } else if (!strcasecmp(c->argv[j]->ptr,"slotdiff")) { + } else if (!strcasecmp(c->argv[j]->ptr, "slotdiff")) { /* REPLCONF SLOTDIFF is used by master(slotsync server) to inform * the slave(slotsync client) the replication stream lag. */ @@ -1692,7 +1689,7 @@ void replconfCommand(client *c) { if (!server.cluster_enabled) return; long long diffbytes = 0; - if ((getLongLongFromObject(c->argv[j+1], &diffbytes) != C_OK)) { + if ((getLongLongFromObject(c->argv[j + 1], &diffbytes) != C_OK)) { return; } @@ -1705,10 +1702,10 @@ void replconfCommand(client *c) { } if (link->slot_mf_lag >= 0) { - serverLog(LL_WARNING, "Slot sync already failed, link lag is %lld, but it shoud be less than 0", + serverLog(LL_WARNING, "Slot sync already failed, link lag is %lld, but it should be less than 0", link->slot_mf_lag); } - } else if (!strcasecmp(c->argv[j]->ptr,"slotfailover")) { + } else if (!strcasecmp(c->argv[j]->ptr, "slotfailover")) { /* REPLCONF SLOTFAILOVER is used by slave(slotsync client) to inform * the master(slotsync server) to pause clients for slot failover. */ @@ -1738,7 +1735,7 @@ void replconfCommand(client *c) { serverLog(LL_NOTICE, "Recv slotfailover request. slots:%s", slots); sdsfree(slots); return; - } else if (!strcasecmp(c->argv[j]->ptr,"slotack")) { + } else if (!strcasecmp(c->argv[j]->ptr, "slotack")) { /* REPLCONF SLOTACK is used by slave(slotsync client) to inform * the master(slotsync server) the amount of replication stream * that it processed so far in slot failover stage. */ @@ -1753,24 +1750,24 @@ void replconfCommand(client *c) { if (!c->slotsync_mf_end) return; long long recv_bytes = 0; - if ((getLongLongFromObject(c->argv[j+1], &recv_bytes) != C_OK)) { + if ((getLongLongFromObject(c->argv[j + 1], &recv_bytes) != C_OK)) { return; } sds slots = reprSlotRangeListWithHyphen(c->slotsync_slots); if (c->slotsync_sent_bytes == recv_bytes) { - /* All the sent bytes have been recieved by this slotsync slave, + /* All the sent bytes have been received by this slotsync slave, * we can notify it to takeover the slots now. */ - serverLog(LL_WARNING,"Recv slotack success!! %llu=%llu slots:%s", + serverLog(LL_WARNING, "Recv slotack success!! %llu=%llu slots:%s", c->slotsync_sent_bytes, recv_bytes, slots); replySlotReadyToReplica(c); } else { - serverLog(LL_WARNING,"Recv slotack not equal. %llu=%llu slots:%s", + serverLog(LL_WARNING, "Recv slotack not equal. %llu=%llu slots:%s", c->slotsync_sent_bytes, recv_bytes, slots); } sdsfree(slots); return; - } else if (!strcasecmp(c->argv[j]->ptr,"slotready")) { + } else if (!strcasecmp(c->argv[j]->ptr, "slotready")) { /* REPLCONF SLOTREADY is used by master(slotsync server) to inform * the slave(slotsync client) the replication stream lag is zero * and is ready for the slave to takeover the slots now. */ @@ -1783,7 +1780,7 @@ void replconfCommand(client *c) { if (link && link->slot_mf_end && (mstime() < link->slot_mf_end)) { link->slot_mf_ready = 1; sds slots = reprSlotRangeListWithHyphen(c->slotsync_slots); - serverLog(LL_WARNING,"Recv slotfailover ready! slots:%s", slots); + serverLog(LL_NOTICE, "Recv slotfailover ready! slots:%s", slots); sdsfree(slots); } return; @@ -2324,7 +2321,7 @@ int useDisklessLoad(int slot_sync) { int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || server.repl_diskless_load == REPL_DISKLESS_LOAD_FLUSH_BEFORE_LOAD || (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && - (dbTotalServerKeyCount() == 0 || slot_sync)); + (dbTotalServerKeyCount() == 0 || slot_sync)); if (enabled) { /* Check all modules handle read errors, otherwise it's not safe to use diskless load. */ @@ -2819,7 +2816,7 @@ void readSyncBulkPayloadImpl(connection *conn, int slot_sync) { old_rdb_fd = open(server.rdb_filename, O_RDONLY | O_NONBLOCK); rename_res = rename(server.repl_transfer_tmpfile, server.rdb_filename); } else { - sprintf(rdbpath, "%s_slot", server.rdb_filename); + snprintf(rdbpath, sizeof(rdbpath), "%s_slot", server.rdb_filename); old_rdb_fd = open(rdbpath, O_RDONLY | O_NONBLOCK); rename_res = rename(link->repl_transfer_tmpfile, rdbpath); } @@ -4272,7 +4269,7 @@ void syncWithPrimaryStateMachine(connection *conn, int *repl_state, int slot_syn if (!useDisklessLoad(slot_sync)) { int dfd = -1, maxtries = 5; while (maxtries--) { - snprintf(tmpfile, 256, "temp-%d.%ld.rdb", (int)(mstime()/1000), (long int)getpid()); + snprintf(tmpfile, 256, "temp-%d.%ld.rdb", (int)(mstime() / 1000), (long int)getpid()); dfd = open(tmpfile, O_CREAT | O_WRONLY | O_EXCL, 0644); if (dfd != -1) break; /* We save the errno of open to prevent some systems from modifying it after diff --git a/src/server.h b/src/server.h index 0abb3e57a5..29f47e8831 100644 --- a/src/server.h +++ b/src/server.h @@ -1188,12 +1188,12 @@ typedef struct client { multiState *mstate; /* MULTI/EXEC state, lazily initialized when first needed */ blockingState *bstate; /* Blocking state, lazily initialized when first needed */ /* Slotsync data, todo maybe need to move it into ClientReplicationData or a new struct. */ - void *slotsync_link; /* Pointer to the slotsync link. */ - list *slotsync_slots; /* List of slot ranges that the client interested. */ - long long slotsync_sent_bytes; /* todo */ - long long slotsync_recv_bytes; /* todo */ - int slotsync_failed; /* todo */ - mstime_t slotsync_mf_end; /* todo */ + void *slotsync_link; /* Pointer to the slotsync link. */ + list *slotsync_slots; /* List of slot ranges that the client interested. */ + long long slotsync_sent_bytes; /* todo */ + long long slotsync_recv_bytes; /* todo */ + int slotsync_failed; /* todo */ + mstime_t slotsync_mf_end; /* todo */ /* Output buffer and reply handling */ long duration; /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */ char *buf; /* Output buffer */ @@ -1668,7 +1668,7 @@ struct valkeyServer { off_t loading_loaded_bytes; time_t loading_start_time; /* Slot RDB loading information */ - volatile sig_atomic_t slot_loading; /* We are loading slot data if true */ + volatile sig_atomic_t slot_loading; /* We are loading slot data if true */ off_t slot_loading_total_bytes; off_t slot_loading_rdb_used_mem; off_t slot_loading_loaded_bytes;