Skip to content

Commit

Permalink
Merge pull request #894 from arunagrawal84/3.x_inferTokenOwnership
Browse files Browse the repository at this point in the history
Fix the inferTokenOwnership method.
  • Loading branch information
arunagrawal-84 authored Jul 11, 2020
2 parents 8b4e565 + f44a2dc commit 3827a40
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 107 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Changelog
## 2020/07/13 3.1.96
(#894) Fix the inferTokenOwnership information. This will provide all the details to the caller method so they can make decision rather than throwing any exception.

## 2020/07/02/ 3.1.95
(#890) Adding an exception in the replace-ip path when a node attempts to bootstrap to an existing token because of a stale state.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.netflix.priam.identity.token.INewTokenRetriever;
import com.netflix.priam.identity.token.IPreGeneratedTokenRetriever;
import com.netflix.priam.identity.token.TokenRetrieverUtils;
import com.netflix.priam.identity.token.TokenRetrieverUtils.GossipParseException;
import com.netflix.priam.utils.ITokenManager;
import com.netflix.priam.utils.RetryableCallable;
import com.netflix.priam.utils.Sleeper;
Expand All @@ -39,7 +38,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -166,18 +164,28 @@ public PriamInstance retriableCall() throws Exception {
// Priam might have crashed before bootstrapping Cassandra in replace mode.
// So, it is premature to use the assigned token without checking Cassandra
// gossip.
try {
String replaceIp =
TokenRetrieverUtils.inferTokenOwnerFromGossip(
aliveInstances, instance.getToken(), instance.getDC());
if (!StringUtils.isEmpty(replaceIp)
&& !replaceIp.equals(instance.getHostIP())) {
setReplacedIp(replaceIp);
logger.info(
"Priam found that the token is not alive according to Cassandra and we should start Cassandra in replace mode with replace ip: "
+ replaceIp);
}
} catch (GossipParseException e) {

// Infer current ownership information from other instances using gossip.
TokenRetrieverUtils.InferredTokenOwnership inferredTokenInformation =
TokenRetrieverUtils.inferTokenOwnerFromGossip(
aliveInstances, instance.getToken(), instance.getDC());
String inferredIp =
(inferredTokenInformation.getTokenInformation() == null)
? null
: inferredTokenInformation
.getTokenInformation()
.getIpAddress();
// if unreachable rely on token database.
// if mismatch rely on token database.
if (inferredTokenInformation.getTokenInformationStatus()
== TokenRetrieverUtils.InferredTokenOwnership
.TokenInformationStatus.GOOD
&& !inferredIp.equalsIgnoreCase(instance.getHostIP())
&& !inferredTokenInformation.getTokenInformation().isLive()) {
setReplacedIp(inferredIp);
logger.info(
"Priam found that the token is not alive according to Cassandra and we should start Cassandra in replace mode with replace ip: "
+ inferredIp);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,27 +112,50 @@ public PriamInstance get() throws Exception {
factory.delete(priamInstance);

// find the replaced IP
try {
replacedIp =
TokenRetrieverUtils.inferTokenOwnerFromGossip(
allInstancesWithinCluster,
priamInstance.getToken(),
priamInstance.getDC());

// Lets not replace the instance if gossip info is not merging!!
if (replacedIp == null) return null;
logger.info(
"Will try to replace token: {} with replacedIp (from gossip info): {} instead of ip from Token database: {}",
priamInstance.getToken(),
replacedIp,
priamInstance.getHostIP());
} catch (TokenRetrieverUtils.GossipParseException e) {
// In case of gossip exception, fallback to IP in token database.
this.replacedIp = priamInstance.getHostIP();
logger.info(
"Will try to replace token: {} with replacedIp from Token database: {}",
priamInstance.getToken(),
priamInstance.getHostIP());

// Infer current ownership information from other instances using gossip.
TokenRetrieverUtils.InferredTokenOwnership inferredTokenInformation =
TokenRetrieverUtils.inferTokenOwnerFromGossip(
allInstancesWithinCluster,
priamInstance.getToken(),
priamInstance.getDC());

switch (inferredTokenInformation.getTokenInformationStatus()) {
case GOOD:
if (inferredTokenInformation.getTokenInformation() == null) {
logger.error(
"If you see this message, it should not have happened. We expect token ownership information if all nodes agree. This is a code bounty issue.");
return null;
}
// Everyone agreed to a value. Check if it is live node.
if (inferredTokenInformation.getTokenInformation().isLive()) {
logger.info(
"This token is considered alive unanimously! We will not replace this instance.");
return null;
} else
this.replacedIp =
inferredTokenInformation.getTokenInformation().getIpAddress();
break;
case UNREACHABLE_NODES:
// In case of unable to reach sufficient nodes, fallback to IP in token
// database. This could be a genuine case of say missing security permissions.
this.replacedIp = priamInstance.getHostIP();
logger.warn(
"Unable to reach sufficient nodes. Please check security group permissions or there might be a network partition.");
logger.info(
"Will try to replace token: {} with replacedIp from Token database: {}",
priamInstance.getToken(),
priamInstance.getHostIP());
break;
case MISMATCH:
// Lets not replace the instance if gossip info is not merging!!
logger.info(
"Mismatch in gossip. We will not replace this instance, until gossip settles down.");
return null;
default:
throw new IllegalStateException(
"Unexpected value: "
+ inferredTokenInformation.getTokenInformationStatus());
}

PriamInstance result;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.netflix.priam.identity.token;

import com.netflix.priam.identity.PriamInstance;
import com.netflix.priam.utils.GsonJsonSerializer;
import com.netflix.priam.utils.SystemUtils;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
Expand Down Expand Up @@ -33,9 +33,8 @@ public class TokenRetrieverUtils {
* @throws GossipParseException when required number of instances are not available to fetch the
* gossip info.
*/
public static String inferTokenOwnerFromGossip(
List<? extends PriamInstance> allIds, String token, String dc)
throws GossipParseException, TokenAliveException {
public static InferredTokenOwnership inferTokenOwnerFromGossip(
List<? extends PriamInstance> allIds, String token, String dc) {

// Avoid using dead instance who we are trying to replace (duh!!)
// Avoid other regions instances to avoid communication over public ip address.
Expand All @@ -59,56 +58,64 @@ public static String inferTokenOwnerFromGossip(
// the IP to be replaced, in large clusters it may affect the startup
// performance. So we pick three random hosts from the ring and see if they all
// agree on the IP to be replaced. If not, we don't replace.
String replaceIp = null;
InferredTokenOwnership inferredTokenOwnership = new InferredTokenOwnership();
int matchedGossipInstances = 0, reachableInstances = 0;
for (PriamInstance instance : eligibleInstances) {
logger.info(
"Calling getIp on hostname[{}] and token[{}]", instance.getHostName(), token);

try {
String ip = getIp(instance.getHostName(), token);
TokenInformation tokenInformation =
getTokenInformation(instance.getHostName(), token);
reachableInstances++;

if (replaceIp == null) {
replaceIp = ip;
if (inferredTokenOwnership.getTokenInformation() == null) {
inferredTokenOwnership.setTokenInformation(tokenInformation);
}

if (StringUtils.isEmpty(ip) || !replaceIp.equals(ip)) {
// If the IP address produced by getIp call is empty it means it was able to
// parse the status information and that token was still alive!!
// We do not want to do anything if token is considered as alive by Cassandra.
if (inferredTokenOwnership.getTokenInformation().equals(tokenInformation)) {
matchedGossipInstances++;
if (matchedGossipInstances == noOfInstancesGossipShouldMatch) {
inferredTokenOwnership.setTokenInformationStatus(
InferredTokenOwnership.TokenInformationStatus.GOOD);
return inferredTokenOwnership;
}
} else {
// Mismatch in the gossip information from Cassandra.
inferredTokenOwnership.setTokenInformationStatus(
InferredTokenOwnership.TokenInformationStatus.MISMATCH);
logger.info(
"Not producing anything in replaceIp as according to C* that token is still alive or "
+ "there is a mismatch in status information per Cassandra. ip: [{}], replaceIp: [{}]",
ip,
replaceIp);
return null;
"There is a mismatch in the status information reported by Cassandra. TokenInformation1: {}, TokenInformation2: {}",
inferredTokenOwnership.getTokenInformation(),
tokenInformation);
inferredTokenOwnership.setTokenInformation(
inferredTokenOwnership.getTokenInformation().isLive
? inferredTokenOwnership.getTokenInformation()
: tokenInformation);
return inferredTokenOwnership;
}

matchedGossipInstances++;
if (matchedGossipInstances == noOfInstancesGossipShouldMatch) {
return replaceIp;
}
} catch (GossipParseException e) {
logger.warn(e.getMessage());
}
}

// Throw exception if we are not able to reach at least minimum required
// instances.
// If we are not able to reach at least minimum required instances.
if (reachableInstances < noOfInstancesGossipShouldMatch) {
throw new GossipParseException(
inferredTokenOwnership.setTokenInformationStatus(
InferredTokenOwnership.TokenInformationStatus.UNREACHABLE_NODES);
logger.info(
String.format(
"Unable to find enough instances where gossip match. Required: [%d]",
noOfInstancesGossipShouldMatch));
}

return null;
return inferredTokenOwnership;
}

// helper method to get the token owner IP from a Cassandra node.
private static String getIp(String host, String token)
throws GossipParseException, TokenAliveException {
private static TokenInformation getTokenInformation(String host, String token)
throws GossipParseException {
String response = null;
try {
response = SystemUtils.getDataFromUrl(String.format(STATUS_URL_FORMAT, host));
Expand All @@ -119,12 +126,8 @@ private static String getIp(String host, String token)
// We intentionally do not use the "unreachable" nodes as it may or may not be the best
// place to start.
// We just verify that the endpoint we provide is not "live".
if (liveNodes.contains(endpointInfo)) {
throw new TokenAliveException(
String.format("The token %s is considered as alive by %s.", token, host));
}

return endpointInfo;
boolean isLive = liveNodes.contains(endpointInfo);
return new TokenInformation(endpointInfo, isLive);
} catch (RuntimeException e) {
throw new GossipParseException(
String.format("Error in reaching out to host: [%s]", host), e);
Expand All @@ -137,40 +140,81 @@ private static String getIp(String host, String token)
}
}

/**
* This exception is thrown either when instances are not available or when they return invalid
* response.
*/
public static class GossipParseException extends Exception {
private static final long serialVersionUID = 1462488371031437486L;
public static class TokenInformation {
private String ipAddress;
private boolean isLive;

public GossipParseException() {
super();
public TokenInformation(String ipAddress, boolean isLive) {
this.ipAddress = ipAddress;
this.isLive = isLive;
}

public GossipParseException(String message) {
super(message);
public boolean isLive() {
return isLive;
}

public GossipParseException(String message, Throwable t) {
super(message, t);
public String getIpAddress() {
return ipAddress;
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || this.getClass() != obj.getClass()) return false;
TokenInformation tokenInformation = (TokenInformation) obj;
return this.ipAddress.equalsIgnoreCase(tokenInformation.getIpAddress())
&& isLive == tokenInformation.isLive;
}

public String toString() {
return GsonJsonSerializer.getGson().toJson(this);
}
}

/** This exception is thrown either when a node is bootstrapping using a token that is alive. */
public static class TokenAliveException extends Exception {
public static class InferredTokenOwnership {
public enum TokenInformationStatus {
GOOD,
UNREACHABLE_NODES,
MISMATCH
}

private TokenInformationStatus tokenInformationStatus =
TokenInformationStatus.UNREACHABLE_NODES;
private TokenInformation tokenInformation;

public void setTokenInformationStatus(TokenInformationStatus tokenInformationStatus) {
this.tokenInformationStatus = tokenInformationStatus;
}

public void setTokenInformation(TokenInformation tokenInformation) {
this.tokenInformation = tokenInformation;
}

public TokenInformationStatus getTokenInformationStatus() {
return tokenInformationStatus;
}

public TokenInformation getTokenInformation() {
return tokenInformation;
}
}

private static final long serialVersionUID = 1038678311186020257L;
/**
* This exception is thrown either when instances are not available or when they return invalid
* response.
*/
public static class GossipParseException extends Exception {
private static final long serialVersionUID = 1462488371031437486L;

public TokenAliveException() {
public GossipParseException() {
super();
}

public TokenAliveException(String message) {
public GossipParseException(String message) {
super(message);
}

public TokenAliveException(String message, Throwable t) {
public GossipParseException(String message, Throwable t) {
super(message, t);
}
}
Expand Down
Loading

0 comments on commit 3827a40

Please sign in to comment.