Skip to content

Commit

Permalink
Throw when gossip unanimously says token is already owned by a live n…
Browse files Browse the repository at this point in the history
…ode.

C* will throw anyway in these cases and there is some chance that the current owner has yet to be taken offline.
  • Loading branch information
mattl-netflix committed Aug 6, 2020
1 parent 4387a1c commit 0913cb2
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.netflix.priam.identity;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
Expand All @@ -29,6 +30,7 @@
import com.netflix.priam.identity.token.INewTokenRetriever;
import com.netflix.priam.identity.token.IPreGeneratedTokenRetriever;
import com.netflix.priam.identity.token.TokenRetrieverUtils;
import com.netflix.priam.identity.token.TokenRetrieverUtils.GossipParseException;
import com.netflix.priam.utils.ITokenManager;
import com.netflix.priam.utils.RetryableCallable;
import com.netflix.priam.utils.Sleeper;
Expand Down Expand Up @@ -166,26 +168,28 @@ public PriamInstance retriableCall() throws Exception {
// gossip.

// Infer current ownership information from other instances using gossip.
TokenRetrieverUtils.InferredTokenOwnership inferredTokenInformation =
TokenRetrieverUtils.InferredTokenOwnership inferredTokenOwnership =
TokenRetrieverUtils.inferTokenOwnerFromGossip(
aliveInstances, instance.getToken(), instance.getDC());
String inferredIp =
(inferredTokenInformation.getTokenInformation() == null)
? null
: inferredTokenInformation
.getTokenInformation()
.getIpAddress();
// if unreachable rely on token database.
// if mismatch rely on token database.
if (inferredTokenInformation.getTokenInformationStatus()
== TokenRetrieverUtils.InferredTokenOwnership
.TokenInformationStatus.GOOD
&& !inferredIp.equalsIgnoreCase(instance.getHostIP())
&& !inferredTokenInformation.getTokenInformation().isLive()) {
setReplacedIp(inferredIp);
logger.info(
"Priam found that the token is not alive according to Cassandra and we should start Cassandra in replace mode with replace ip: "
+ inferredIp);
if (inferredTokenOwnership.getTokenInformationStatus()
== TokenRetrieverUtils.InferredTokenOwnership.TokenInformationStatus
.GOOD) {
Preconditions.checkNotNull(
inferredTokenOwnership.getTokenInformation());
String inferredIp =
inferredTokenOwnership.getTokenInformation().getIpAddress();
if (!inferredIp.equalsIgnoreCase(instance.getHostIP())) {
if (inferredTokenOwnership.getTokenInformation().isLive()) {
throw new GossipParseException(
"We have been assigned a token that C* thinks is alive. Throwing to buy time in the hopes that Gossip just needs to settle.");
}
setReplacedIp(inferredIp);
logger.info(
"Priam found that the token is not alive according to Cassandra and we should start Cassandra in replace mode with replace ip: "
+ inferredIp);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.netflix.priam.identity.token;

import com.google.common.base.Strings;
import com.google.common.truth.Truth;
import com.netflix.priam.config.IConfiguration;
import com.netflix.priam.identity.IMembership;
import com.netflix.priam.identity.IPriamInstanceFactory;
Expand All @@ -15,9 +17,8 @@
import java.util.stream.IntStream;
import mockit.Expectations;
import mockit.Mocked;
import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

public class AssignedTokenRetrieverTest {
public static final String APP = "testapp";
Expand Down Expand Up @@ -82,11 +83,11 @@ public void grabAssignedTokenStartDbInBootstrapModeWhenGossipAgreesCurrentInstan
newTokenRetriever,
instanceInfo);

Assert.assertEquals(false, instanceIdentity.isReplace());
Truth.assertThat(instanceIdentity.isReplace()).isFalse();
}

@Test
public void grabAssignedTokenStartDbInReplaceModeWhenGossipAgreesOnPreviousTokenOwner(
public void grabAssignedTokenStartDbInReplaceModeWhenGossipAgreesPreviousTokenOwnerIsNotLive(
@Mocked IPriamInstanceFactory<PriamInstance> factory,
@Mocked IConfiguration config,
@Mocked IMembership membership,
Expand All @@ -101,7 +102,6 @@ public void grabAssignedTokenStartDbInReplaceModeWhenGossipAgreesOnPreviousToken
PriamInstance deadInstance = liveHosts.remove(0);
PriamInstance newInstance =
newMockPriamInstance(
APP,
deadInstance.getDC(),
deadInstance.getRac(),
deadInstance.getId(),
Expand Down Expand Up @@ -158,8 +158,82 @@ public void grabAssignedTokenStartDbInReplaceModeWhenGossipAgreesOnPreviousToken
newTokenRetriever,
instanceInfo);

Assert.assertEquals(deadInstance.getHostIP(), instanceIdentity.getReplacedIp());
Assert.assertEquals(true, instanceIdentity.isReplace());
Truth.assertThat(instanceIdentity.getReplacedIp()).isEqualTo(deadInstance.getHostIP());
Truth.assertThat(instanceIdentity.isReplace()).isTrue();
}

@Test
public void grabAssignedTokenThrowWhenGossipAgreesPreviousTokenOwnerIsLive(
@Mocked IPriamInstanceFactory<PriamInstance> factory,
@Mocked IConfiguration config,
@Mocked IMembership membership,
@Mocked Sleeper sleeper,
@Mocked ITokenManager tokenManager,
@Mocked InstanceInfo instanceInfo,
@Mocked TokenRetrieverUtils retrievalUtils) {
List<PriamInstance> liveHosts = newPriamInstances();
Collections.shuffle(liveHosts);

PriamInstance deadInstance = liveHosts.remove(0);
PriamInstance newInstance =
newMockPriamInstance(
deadInstance.getDC(),
deadInstance.getRac(),
deadInstance.getId(),
String.format("new-fakeInstance-%d", deadInstance.getId()),
String.format("127.1.1.%d", deadInstance.getId() + 100),
String.format("new-fakeHost-%d", deadInstance.getId()),
deadInstance.getToken());

// the case we are trying to test is when Priam restarted after it acquired the
// token. new instance is already registered with token database.
liveHosts.add(newInstance);
TokenRetrieverUtils.InferredTokenOwnership inferredTokenOwnership =
new TokenRetrieverUtils.InferredTokenOwnership();
inferredTokenOwnership.setTokenInformationStatus(
TokenRetrieverUtils.InferredTokenOwnership.TokenInformationStatus.GOOD);
inferredTokenOwnership.setTokenInformation(
new TokenRetrieverUtils.TokenInformation(deadInstance.getHostIP(), true));

new Expectations() {
{
config.getAppName();
result = APP;

factory.getAllIds(DEAD_APP);
result = Collections.singletonList(deadInstance);
factory.getAllIds(APP);
result = liveHosts;

instanceInfo.getInstanceId();
result = newInstance.getInstanceId();

TokenRetrieverUtils.inferTokenOwnerFromGossip(
liveHosts, newInstance.getToken(), newInstance.getDC());
result = inferredTokenOwnership;
}
};

IDeadTokenRetriever deadTokenRetriever =
new DeadTokenRetriever(factory, membership, config, sleeper, instanceInfo);
IPreGeneratedTokenRetriever preGeneratedTokenRetriever =
new PreGeneratedTokenRetriever(factory, membership, config, sleeper, instanceInfo);
INewTokenRetriever newTokenRetriever =
new NewTokenRetriever(
factory, membership, config, sleeper, tokenManager, instanceInfo);
Assertions.assertThrows(
TokenRetrieverUtils.GossipParseException.class,
() ->
new InstanceIdentity(
factory,
membership,
config,
sleeper,
tokenManager,
deadTokenRetriever,
preGeneratedTokenRetriever,
newTokenRetriever,
instanceInfo));
}

@Test
Expand Down Expand Up @@ -220,8 +294,8 @@ public void grabAssignedTokenStartDbInBootstrapModeWhenGossipDisagreesOnPrevious
newTokenRetriever,
instanceInfo);

Assert.assertTrue(StringUtils.isEmpty(instanceIdentity.getReplacedIp()));
Assert.assertEquals(false, instanceIdentity.isReplace());
Truth.assertThat(Strings.isNullOrEmpty(instanceIdentity.getReplacedIp())).isTrue();
Truth.assertThat(instanceIdentity.isReplace()).isFalse();
}

private List<PriamInstance> newPriamInstances() {
Expand All @@ -246,10 +320,9 @@ private List<PriamInstance> newPriamInstances(
String dc, String rack, int seqNo, String ipRanges) {
return IntStream.range(0, 3)
.map(e -> seqNo + (e * 9))
.<PriamInstance>mapToObj(
.mapToObj(
e ->
newMockPriamInstance(
APP,
dc,
rack,
e,
Expand All @@ -261,7 +334,6 @@ private List<PriamInstance> newPriamInstances(
}

private PriamInstance newMockPriamInstance(
String app,
String dc,
String rack,
int id,
Expand All @@ -270,7 +342,7 @@ private PriamInstance newMockPriamInstance(
String hostName,
String token) {
PriamInstance priamInstance = new PriamInstance();
priamInstance.setApp(app);
priamInstance.setApp(APP);
priamInstance.setDC(dc);
priamInstance.setRac(rack);
priamInstance.setId(id);
Expand Down

0 comments on commit 0913cb2

Please sign in to comment.