Skip to content

Commit

Permalink
Fix data loss when replica do a failover with a old history repl offs…
Browse files Browse the repository at this point in the history
…et (valkey-io#885)

Our current replica can initiate a failover without restriction when
it detects that the primary node is offline. This is generally not a
problem. However, consider the following scenarios:

1. In slot migration, a primary loses its last slot and then becomes
a replica. When it is fully synchronized with the new primary, the new
primary downs.

2. In CLUSTER REPLICATE command, a replica becomes a replica of another
primary. When it is fully synchronized with the new primary, the new
primary downs.

In the above scenario, case 1 may cause the empty primary to be elected
as the new primary, resulting in primary data loss. Case 2 may cause the
non-empty replica to be elected as the new primary, resulting in data
loss and confusion.

The reason is that we have cached primary logic, which is used for psync.
In the above scenario, when clusterSetPrimary is called, myself will cache
server.primary in server.cached_primary for psync. In replicationGetReplicaOffset,
we get server.cached_primary->reploff for offset, gossip it and rank it,
which causes the replica to use the old historical offset to initiate
failover, and it get a good rank, initiates election first, and then is
elected as the new primary.

The main problem here is that when the replica has not completed full
sync, it may get the historical offset in replicationGetReplicaOffset.

The fix is to clear cached_primary in these places where full sync is
obviously needed, and let the replica use offset == 0 to participate
in the election. In this way, this unhealthy replica has a worse rank
and is not easy to be elected.

Of course, it is possible that it will be elected with offset == 0.
In the future, we may need to prohibit the replica with offset == 0
from having the right to initiate elections.

Another point worth mentioning, in above cases:
1. In the ROLE command, the replica status will be handshake, and the
offset will be -1.
2. Before this PR, in the CLUSTER SHARD command, the replica status will
be online, and the offset will be the old cached value (which is wrong).
3. After this PR, in the CLUSTER SHARD, the replica status will be loading,
and the offset will be 0.

Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed Aug 21, 2024
1 parent 829243e commit e1b3629
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 25 deletions.
57 changes: 38 additions & 19 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ int clusterAddSlot(clusterNode *n, int slot);
int clusterDelSlot(int slot);
int clusterDelNodeSlots(clusterNode *node);
int clusterNodeSetSlotBit(clusterNode *n, int slot);
void clusterSetPrimary(clusterNode *n, int closeSlots);
static void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync_required);
void clusterHandleReplicaFailover(void);
void clusterHandleReplicaMigration(int max_replicas);
int bitmapTestBit(unsigned char *bitmap, int pos);
Expand Down Expand Up @@ -2370,7 +2370,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, clusterMsg *
/* Check if this is our primary and we have to change the
* replication target as well. */
if (nodeIsReplica(myself) && myself->replicaof == node)
replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node));
replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node), 0);
return 1;
}

Expand Down Expand Up @@ -2432,6 +2432,9 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
return;
}

/* Sender and myself in the same shard? */
int are_in_same_shard = areInSameShard(sender, myself);

for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(slots, j)) {
sender_slots++;
Expand Down Expand Up @@ -2474,7 +2477,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
* the same shard and we should retain the migrating_slots_to state
* for the slot in question */
if (server.cluster->migrating_slots_to[j] != NULL) {
if (!areInSameShard(sender, myself)) {
if (!are_in_same_shard) {
serverLog(LL_NOTICE, "Slot %d is no longer being migrated to node %.40s (%s) in shard %.40s.",
j, server.cluster->migrating_slots_to[j]->name,
server.cluster->migrating_slots_to[j]->human_nodename,
Expand Down Expand Up @@ -2595,7 +2598,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
* the new primary if my current config epoch is lower than the
* sender's. */
if (!new_primary && myself->replicaof != sender && sender_slots == 0 && myself->numslots == 0 &&
nodeEpoch(myself) < senderConfigEpoch && areInSameShard(sender, myself)) {
nodeEpoch(myself) < senderConfigEpoch && are_in_same_shard) {
new_primary = sender;
}

Expand All @@ -2619,16 +2622,18 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
* sender. In this case we don't reconfigure ourselves as a replica
* of the sender. */
if (new_primary && cur_primary->numslots == 0) {
if (server.cluster_allow_replica_migration || areInSameShard(sender, myself)) {
if (server.cluster_allow_replica_migration || are_in_same_shard) {
serverLog(LL_NOTICE,
"Configuration change detected. Reconfiguring myself "
"as a replica of node %.40s (%s) in shard %.40s",
sender->name, sender->human_nodename, sender->shard_id);
/* Don't clear the migrating/importing states if this is a replica that
* just gets promoted to the new primary in the shard. */
clusterSetPrimary(sender, !areInSameShard(sender, myself));
* just gets promoted to the new primary in the shard.
*
* If the sender and myself are in the same shard, try psync. */
clusterSetPrimary(sender, !are_in_same_shard, !are_in_same_shard);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
} else if ((sender_slots >= migrated_our_slots) && !areInSameShard(sender, myself)) {
} else if ((sender_slots >= migrated_our_slots) && !are_in_same_shard) {
/* When all our slots are lost to the sender and the sender belongs to
* a different shard, this is likely due to a client triggered slot
* migration. Don't reconfigure this node to migrate to the new shard
Expand Down Expand Up @@ -3383,12 +3388,19 @@ int clusterProcessPacket(clusterLink *link) {
/* Explicitly check for a replication loop before attempting the replication
* chain folding logic. */
if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) {
/* Safeguard against sub-replicas. A replica's primary can turn itself
* into a replica if its last slot is removed. If no other node takes
* over the slot, there is nothing else to trigger replica migration. */
/* Safeguard against sub-replicas.
*
* A replica's primary can turn itself into a replica if its last slot
* is removed. If no other node takes over the slot, there is nothing
* else to trigger replica migration. In this case, they are not in the
* same shard, so a full sync is required.
*
* Or a replica's primary can turn itself into a replica of its other
* replica during a failover. In this case, they are in the same shard,
* so we can try a psync. */
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
myself->replicaof->replicaof->name, myself->replicaof->name);
clusterSetPrimary(myself->replicaof->replicaof, 1);
clusterSetPrimary(myself->replicaof->replicaof, 1, !areInSameShard(myself->replicaof->replicaof, myself));
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

Expand Down Expand Up @@ -4692,7 +4704,9 @@ void clusterHandleReplicaMigration(int max_replicas) {
!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)) {
serverLog(LL_NOTICE, "Migrating to orphaned primary %.40s (%s) in shard %.40s", target->name,
target->human_nodename, target->shard_id);
clusterSetPrimary(target, 1);
/* We are migrating to a different shard that has a completely different
* replication history, so a full sync is required. */
clusterSetPrimary(target, 1, 1);
}
}

Expand Down Expand Up @@ -5005,7 +5019,7 @@ void clusterCron(void) {
* enable it if we know the address of our primary and it appears to
* be up. */
if (nodeIsReplica(myself) && server.primary_host == NULL && myself->replicaof && nodeHasAddr(myself->replicaof)) {
replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof));
replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof), 0);
}

/* Abort a manual failover if the timeout is reached. */
Expand Down Expand Up @@ -5398,7 +5412,7 @@ static inline void removeAllNotOwnedShardChannelSubscriptions(void) {

/* Set the specified node 'n' as primary for this node.
* If this node is currently a primary, it is turned into a replica. */
void clusterSetPrimary(clusterNode *n, int closeSlots) {
static void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync_required) {
serverAssert(n != myself);
serverAssert(myself->numslots == 0);

Expand All @@ -5412,7 +5426,7 @@ void clusterSetPrimary(clusterNode *n, int closeSlots) {
myself->replicaof = n;
updateShardId(myself, n->shard_id);
clusterNodeAddReplica(n, myself);
replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n));
replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n), full_sync_required);
removeAllNotOwnedShardChannelSubscriptions();
resetManualFailover();

Expand Down Expand Up @@ -6343,7 +6357,9 @@ void clusterCommandSetSlot(client *c) {
"Lost my last slot during slot migration. Reconfiguring myself "
"as a replica of %.40s (%s) in shard %.40s",
n->name, n->human_nodename, n->shard_id);
clusterSetPrimary(n, 1);
/* We are migrating to a different shard that has a completely different
* replication history, so a full sync is required. */
clusterSetPrimary(n, 1, 1);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

Expand Down Expand Up @@ -6548,8 +6564,11 @@ int clusterCommandSpecial(client *c) {
return 1;
}

/* Set the primary. */
clusterSetPrimary(n, 1);
/* Set the primary.
* If the instance is a primary, it is an empty primary.
* If the instance is a replica, it had a totally different replication history.
* In these both cases, myself as a replica has to do a full sync. */
clusterSetPrimary(n, 1, 1);
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
addReply(c, shared.ok);
Expand Down
19 changes: 14 additions & 5 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3726,7 +3726,7 @@ int cancelReplicationHandshake(int reconnect) {
}

/* Set replication to the specified primary address and port. */
void replicationSetPrimary(char *ip, int port) {
void replicationSetPrimary(char *ip, int port, int full_sync_required) {
int was_primary = server.primary_host == NULL;

sdsfree(server.primary_host);
Expand All @@ -3752,13 +3752,22 @@ void replicationSetPrimary(char *ip, int port) {
* sync with new primary. */

cancelReplicationHandshake(0);

/* Before destroying our primary state, create a cached primary using
* our own parameters, to later PSYNC with the new primary. */
if (was_primary) {
if (was_primary && !full_sync_required) {
replicationDiscardCachedPrimary();
replicationCachePrimaryUsingMyself();
}

/* If full sync is required, drop the cached primary. Doing so increases
* this replica node's election rank (delay) and reduces its chance of
* winning the election. If a replica requiring a full sync wins the
* election, it will flush valid data in the shard, causing data loss. */
if (full_sync_required) {
replicationDiscardCachedPrimary();
}

/* Fire the role change modules event. */
moduleFireServerEvent(VALKEYMODULE_EVENT_REPLICATION_ROLE_CHANGED, VALKEYMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
NULL);
Expand Down Expand Up @@ -3896,7 +3905,7 @@ void replicaofCommand(client *c) {
}
/* There was no previous primary or the user specified a different one,
* we can continue. */
replicationSetPrimary(c->argv[1]->ptr, port);
replicationSetPrimary(c->argv[1]->ptr, port, 0);
sds client = catClientInfoString(sdsempty(), c);
serverLog(LL_NOTICE, "REPLICAOF %s:%d enabled (user request from '%s')", server.primary_host,
server.primary_port, client);
Expand Down Expand Up @@ -4907,7 +4916,7 @@ void updateFailoverStatus(void) {
server.target_replica_port);
server.failover_state = FAILOVER_IN_PROGRESS;
/* If timeout has expired force a failover if requested. */
replicationSetPrimary(server.target_replica_host, server.target_replica_port);
replicationSetPrimary(server.target_replica_host, server.target_replica_port, 0);
return;
} else {
/* Force was not requested, so timeout. */
Expand Down Expand Up @@ -4950,6 +4959,6 @@ void updateFailoverStatus(void) {
serverLog(LL_NOTICE, "Failover target %s:%d is synced, failing over.", server.target_replica_host,
server.target_replica_port);
/* Designated replica is caught up, failover to it. */
replicationSetPrimary(server.target_replica_host, server.target_replica_port);
replicationSetPrimary(server.target_replica_host, server.target_replica_port, 0);
}
}
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3029,7 +3029,7 @@ void replicationStartPendingFork(void);
void replicationHandlePrimaryDisconnection(void);
void replicationCachePrimary(client *c);
void resizeReplicationBacklog(void);
void replicationSetPrimary(char *ip, int port);
void replicationSetPrimary(char *ip, int port, int full_sync_required);
void replicationUnsetPrimary(void);
void refreshGoodReplicasCount(void);
int checkGoodReplicasStatus(void);
Expand Down
159 changes: 159 additions & 0 deletions tests/unit/cluster/replica_migration.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Allocate slot 0 to the last primary and evenly distribute the remaining
# slots to the remaining primaries.
proc my_slot_allocation {masters replicas} {
set avg [expr double(16384) / [expr $masters-1]]
set slot_start 1
for {set j 0} {$j < $masters-1} {incr j} {
set slot_end [expr int(ceil(($j + 1) * $avg) - 1)]
R $j cluster addslotsrange $slot_start $slot_end
set slot_start [expr $slot_end + 1]
}
R [expr $masters-1] cluster addslots 0
}

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test "Migrated replica reports zero repl offset and rank, and fails to win election" {
# Write some data to primary 0, slot 1, make a small repl_offset.
for {set i 0} {$i < 1024} {incr i} {
R 0 incr key_991803
}
assert_equal {1024} [R 0 get key_991803]

# Write some data to primary 3, slot 0, make a big repl_offset.
for {set i 0} {$i < 10240} {incr i} {
R 3 incr key_977613
}
assert_equal {10240} [R 3 get key_977613]

# 10s, make sure primary 0 will hang in the save.
R 0 config set rdb-key-save-delay 100000000

# Move the slot 0 from primary 3 to primary 0
set addr "[srv 0 host]:[srv 0 port]"
set myid [R 3 CLUSTER MYID]
set code [catch {
exec src/valkey-cli {*}[valkeycli_tls_config "./tests"] --cluster rebalance $addr --cluster-weight $myid=0
} result]
if {$code != 0} {
fail "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result"
}

# Validate that shard 3's primary and replica can convert to replicas after
# they lose the last slot.
R 3 config set cluster-replica-validity-factor 0
R 7 config set cluster-replica-validity-factor 0
R 3 config set cluster-allow-replica-migration yes
R 7 config set cluster-allow-replica-migration yes

# Shutdown primary 0.
catch {R 0 shutdown nosave}

# Wait for the replica to become a primary, and make sure
# the other primary become a replica.
wait_for_condition 1000 50 {
[s -4 role] eq {master} &&
[s -3 role] eq {slave} &&
[s -7 role] eq {slave}
} else {
puts "s -4 role: [s -4 role]"
puts "s -3 role: [s -3 role]"
puts "s -7 role: [s -7 role]"
fail "Failover does not happened"
}

# Make sure the offset of server 3 / 7 is 0.
verify_log_message -3 "*Start of election*offset 0*" 0
verify_log_message -7 "*Start of election*offset 0*" 0

# Make sure the right replica gets the higher rank.
verify_log_message -4 "*Start of election*rank #0*" 0

# Wait for the cluster to be ok.
wait_for_condition 1000 50 {
[CI 3 cluster_state] eq "ok" &&
[CI 4 cluster_state] eq "ok" &&
[CI 7 cluster_state] eq "ok"
} else {
puts "R 3: [R 3 cluster info]"
puts "R 4: [R 4 cluster info]"
puts "R 7: [R 7 cluster info]"
fail "Cluster is down"
}

# Make sure the key exists and is consistent.
R 3 readonly
R 7 readonly
wait_for_condition 1000 50 {
[R 3 get key_991803] == 1024 && [R 3 get key_977613] == 10240 &&
[R 4 get key_991803] == 1024 && [R 4 get key_977613] == 10240 &&
[R 7 get key_991803] == 1024 && [R 7 get key_977613] == 10240
} else {
puts "R 3: [R 3 keys *]"
puts "R 4: [R 4 keys *]"
puts "R 7: [R 7 keys *]"
fail "Key not consistent"
}
}
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test "New non-empty replica reports zero repl offset and rank, and fails to win election" {
# Write some data to primary 0, slot 1, make a small repl_offset.
for {set i 0} {$i < 1024} {incr i} {
R 0 incr key_991803
}
assert_equal {1024} [R 0 get key_991803]

# Write some data to primary 3, slot 0, make a big repl_offset.
for {set i 0} {$i < 10240} {incr i} {
R 3 incr key_977613
}
assert_equal {10240} [R 3 get key_977613]

# 10s, make sure primary 0 will hang in the save.
R 0 config set rdb-key-save-delay 100000000

# Make server 7 a replica of server 0.
R 7 config set cluster-replica-validity-factor 0
R 7 config set cluster-allow-replica-migration yes
R 7 cluster replicate [R 0 cluster myid]

# Shutdown primary 0.
catch {R 0 shutdown nosave}

# Wait for the replica to become a primary.
wait_for_condition 1000 50 {
[s -4 role] eq {master} &&
[s -7 role] eq {slave}
} else {
puts "s -4 role: [s -4 role]"
puts "s -7 role: [s -7 role]"
fail "Failover does not happened"
}

# Make sure server 7 gets the lower rank and it's offset is 0.
verify_log_message -4 "*Start of election*rank #0*" 0
verify_log_message -7 "*Start of election*offset 0*" 0

# Wait for the cluster to be ok.
wait_for_condition 1000 50 {
[CI 4 cluster_state] eq "ok" &&
[CI 7 cluster_state] eq "ok"
} else {
puts "R 4: [R 4 cluster info]"
puts "R 7: [R 7 cluster info]"
fail "Cluster is down"
}

# Make sure the key exists and is consistent.
R 7 readonly
wait_for_condition 1000 50 {
[R 4 get key_991803] == 1024 &&
[R 7 get key_991803] == 1024
} else {
puts "R 4: [R 4 get key_991803]"
puts "R 7: [R 7 get key_991803]"
fail "Key not consistent"
}
}
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

0 comments on commit e1b3629

Please sign in to comment.