Skip to content

Commit

Permalink
Store private IPs in the token database. (#913)
Browse files Browse the repository at this point in the history
* CASS-1828 Consolidate PriamInstance fetching into a single interface.

* CASS-1828 Remove DeadTokenRetriever interface.

* CASS-1828 Remove IPreGeneratedTokenRetriever interface

* CASS-1828 Remove INewTokenRetriever interface

* CASS-1828 remove populateRacMap

* CASS-1828 Remove sameHostPredicate

* CASS-1828 Move calls to gossip when finding preassigned token to separate function.

* CASS-1828 Change PriamInstance toString to include IP and shrink a log statement.

* CASS-1828 tighten up grabPreassignedToken.

* CASS-1828 make TokenRetriever use the same logic to get rac instances both when generating a dead token and a pregenerated token.

* CASS-1828 move gossip check from grabDeadToken to function.

* CASS-1828 Move deletion to separate function.

* CASS-1828 move token claiming to separate function

* CASS-1828 Remove redundant comments and log statements, plus some minor rearranging.

* CASS-1828 combine grabDeadToken and grabPreGeneratedToken

* CASS-1828 add tests of new token generation

* CASS-1828 Remove redundant method from ITokenManager interface and tighten up new token generation logic

* CASS-1828 use nullity of replace ip to determine whether to replace.

* CASS-1828 Ensure that pregenerated token is claimed when available and the dead token fails gossip check. This corrects a bug introduced when combining the erstwhile grabDeadToken and grabPregeneratedToken methods.

* CASS-1828 stop marking tokens dead and deleting them, begin updating atomically and reading consistently with SimpleDB.

* CASS-1828 Compare against both IPs when checking Gossip in assigned token case

* CASS-1828 update database when getting preassigned tokens

* CASS-1828 make DoubleRing inject InstanceInfo instead of InstanceIdentity.

* CASS-1828 use private IP when dictated by configuration.

* CASS-1828 remove redundant attachVolumes method from PriamInstanceFactory

* CASS-1828 remove redundant sort method from PriamInstanceFactory

* CASS-1828 remove redundant generics in PriamInstanceFactory

* CASS-1828 Make PriamInstanceFactory return a Set of PriamInstances rather than a List. More generally, use a Set of PriamInstances where applicable.

* CASS-1828 make IMembership return an ImmutableSet of IPs not a list.

* CASS-1828 make UpdateSecuritySettings always add current instance's IP to account for possibility of stale data in instance database. Remove extra call to instance database as well.
  • Loading branch information
mattl-netflix authored Mar 17, 2021
1 parent ed9eb9f commit 21497b0
Show file tree
Hide file tree
Showing 42 changed files with 1,171 additions and 1,438 deletions.
20 changes: 10 additions & 10 deletions priam/src/main/java/com/netflix/priam/aws/AWSMembership.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.amazonaws.services.ec2.AmazonEC2ClientBuilder;
import com.amazonaws.services.ec2.model.*;
import com.amazonaws.services.ec2.model.Filter;
import com.google.common.collect.Lists;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.netflix.priam.config.IConfiguration;
Expand Down Expand Up @@ -60,7 +60,7 @@ public AWSMembership(
}

@Override
public List<String> getRacMembership() {
public ImmutableSet<String> getRacMembership() {
AmazonAutoScaling client = null;
try {
List<String> asgNames = new ArrayList<>();
Expand All @@ -73,7 +73,7 @@ public List<String> getRacMembership() {
asgNames.toArray(new String[asgNames.size()]));
DescribeAutoScalingGroupsResult res = client.describeAutoScalingGroups(asgReq);

List<String> instanceIds = Lists.newArrayList();
ImmutableSet.Builder<String> instanceIds = ImmutableSet.builder();
for (AutoScalingGroup asg : res.getAutoScalingGroups()) {
for (Instance ins : asg.getInstances())
if (!(ins.getLifecycleState().equalsIgnoreCase("Terminating")
Expand All @@ -89,7 +89,7 @@ public List<String> getRacMembership() {
StringUtils.join(asgNames, ","),
StringUtils.join(instanceIds, ",")));
}
return instanceIds;
return instanceIds.build();
} finally {
if (client != null) client.shutdown();
}
Expand Down Expand Up @@ -117,7 +117,7 @@ public int getRacMembershipSize() {
}

@Override
public List<String> getCrossAccountRacMembership() {
public ImmutableSet<String> getCrossAccountRacMembership() {
AmazonAutoScaling client = null;
try {
List<String> asgNames = new ArrayList<>();
Expand All @@ -130,7 +130,7 @@ public List<String> getCrossAccountRacMembership() {
asgNames.toArray(new String[asgNames.size()]));
DescribeAutoScalingGroupsResult res = client.describeAutoScalingGroups(asgReq);

List<String> instanceIds = Lists.newArrayList();
ImmutableSet.Builder<String> instanceIds = ImmutableSet.builder();
for (AutoScalingGroup asg : res.getAutoScalingGroups()) {
for (Instance ins : asg.getInstances())
if (!(ins.getLifecycleState().equalsIgnoreCase("Terminating")
Expand All @@ -144,7 +144,7 @@ public List<String> getCrossAccountRacMembership() {
"Querying Amazon returned following instance in the cross-account ASG: %s --> %s",
instanceInfo.getRac(), StringUtils.join(instanceIds, ",")));
}
return instanceIds;
return instanceIds.build();
} finally {
if (client != null) client.shutdown();
}
Expand Down Expand Up @@ -274,11 +274,11 @@ public void removeACL(Collection<String> listIPs, int from, int to) {
}

/** List SG ACL's */
public List<String> listACL(int from, int to) {
public ImmutableSet<String> listACL(int from, int to) {
AmazonEC2 client = null;
try {
client = getEc2Client();
List<String> ipPermissions = new ArrayList<>();
ImmutableSet.Builder<String> ipPermissions = ImmutableSet.builder();

if (isClassic()) {

Expand Down Expand Up @@ -316,7 +316,7 @@ public List<String> listACL(int from, int to) {
logger.debug("Fetch current permissions for vpc env of running instance");
}

return ipPermissions;
return ipPermissions.build();
} finally {
if (client != null) client.shutdown();
}
Expand Down
30 changes: 21 additions & 9 deletions priam/src/main/java/com/netflix/priam/aws/SDBInstanceData.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ public SDBInstanceData(ICredential provider, IConfiguration configuration) {
*/
public PriamInstance getInstance(String app, String dc, int id) {
AmazonSimpleDB simpleDBClient = getSimpleDBClient();
SelectRequest request = new SelectRequest(String.format(INSTANCE_QUERY, app, dc, id));
SelectRequest request =
new SelectRequest(String.format(INSTANCE_QUERY, app, dc, id))
.withConsistentRead(true);
SelectResult result = simpleDBClient.select(request);
if (result.getItems().size() == 0) return null;
return transform(result.getItems().get(0));
Expand All @@ -91,8 +93,10 @@ public Set<PriamInstance> getAllIds(String app) {
Set<PriamInstance> inslist = new HashSet<>();
String nextToken = null;
do {
SelectRequest request = new SelectRequest(String.format(ALL_QUERY, app));
request.setNextToken(nextToken);
SelectRequest request =
new SelectRequest(String.format(ALL_QUERY, app))
.withConsistentRead(true)
.withNextToken(nextToken);
SelectResult result = simpleDBClient.select(request);
nextToken = result.getNextToken();
for (Item item : result.getItems()) {
Expand All @@ -106,15 +110,23 @@ public Set<PriamInstance> getAllIds(String app) {
/**
* Create a new instance entry in SimpleDB
*
* @param instance Instance entry to be created.
* @param orig Original instance used for validation
* @param inst Instance entry to be created.
* @throws AmazonServiceException If unable to write to Simple DB because of any error.
*/
public void createInstance(PriamInstance instance) throws AmazonServiceException {
AmazonSimpleDB simpleDBClient = getSimpleDBClient();
public void updateInstance(PriamInstance orig, PriamInstance inst)
throws AmazonServiceException {
PutAttributesRequest putReq =
new PutAttributesRequest(
DOMAIN, getKey(instance), createAttributesToRegister(instance));
simpleDBClient.putAttributes(putReq);
new PutAttributesRequest(DOMAIN, getKey(inst), createAttributesToRegister(inst))
.withExpected(
new UpdateCondition()
.withName(Attributes.INSTANCE_ID)
.withValue(orig.getInstanceId()))
.withExpected(
new UpdateCondition()
.withName(Attributes.TOKEN)
.withValue(orig.getToken()));
getSimpleDBClient().putAttributes(putReq);
}

/**
Expand Down
42 changes: 12 additions & 30 deletions priam/src/main/java/com/netflix/priam/aws/SDBInstanceFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,40 @@
package com.netflix.priam.aws;

import com.amazonaws.AmazonServiceException;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.priam.config.IConfiguration;
import com.netflix.priam.identity.IPriamInstanceFactory;
import com.netflix.priam.identity.PriamInstance;
import com.netflix.priam.identity.config.InstanceInfo;
import java.util.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* SimpleDB based instance instanceIdentity. Requires 'InstanceIdentity' domain to be created ahead
*/
@Singleton
public class SDBInstanceFactory implements IPriamInstanceFactory<PriamInstance> {
public class SDBInstanceFactory implements IPriamInstanceFactory {
private static final Logger logger = LoggerFactory.getLogger(SDBInstanceFactory.class);

private final IConfiguration config;
private final SDBInstanceData dao;
private final InstanceInfo instanceInfo;

@Inject
public SDBInstanceFactory(
IConfiguration config, SDBInstanceData dao, InstanceInfo instanceInfo) {
this.config = config;
public SDBInstanceFactory(SDBInstanceData dao, InstanceInfo instanceInfo) {
this.dao = dao;
this.instanceInfo = instanceInfo;
}

@Override
public List<PriamInstance> getAllIds(String appName) {
List<PriamInstance> return_ = new ArrayList<>();
return_.addAll(dao.getAllIds(appName));
sort(return_);
return return_;
public ImmutableSet<PriamInstance> getAllIds(String appName) {
return ImmutableSet.copyOf(
dao.getAllIds(appName)
.stream()
.sorted((Comparator.comparingInt(PriamInstance::getId)))
.collect(Collectors.toList()));
}

@Override
Expand Down Expand Up @@ -104,31 +103,14 @@ public void delete(PriamInstance inst) {
}

@Override
public void update(PriamInstance inst) {
public void update(PriamInstance orig, PriamInstance inst) {
try {
dao.createInstance(inst);
dao.updateInstance(orig, inst);
} catch (AmazonServiceException e) {
throw new RuntimeException("Unable to update/create priam instance", e);
}
}

@Override
public void sort(List<PriamInstance> return_) {
Comparator<? super PriamInstance> comparator =
(Comparator<PriamInstance>)
(o1, o2) -> {
Integer c1 = o1.getId();
Integer c2 = o2.getId();
return c1.compareTo(c2);
};
return_.sort(comparator);
}

@Override
public void attachVolumes(PriamInstance instance, String mountPath, String device) {
// TODO Auto-generated method stub
}

private PriamInstance makePriamInstance(
String app,
int id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@
*/
package com.netflix.priam.aws;

import com.google.common.collect.Lists;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.priam.config.IConfiguration;
import com.netflix.priam.identity.IMembership;
import com.netflix.priam.identity.IPriamInstanceFactory;
import com.netflix.priam.identity.InstanceIdentity;
import com.netflix.priam.identity.PriamInstance;
import com.netflix.priam.identity.config.InstanceInfo;
import com.netflix.priam.scheduler.SimpleTimer;
import com.netflix.priam.scheduler.Task;
import com.netflix.priam.scheduler.TaskTimer;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -52,16 +52,21 @@ public class UpdateSecuritySettings extends Task {

private static final Random ran = new Random();
private final IMembership membership;
private final IPriamInstanceFactory<PriamInstance> factory;
private final IPriamInstanceFactory factory;
private final InstanceInfo instanceInfo;

@Inject
// Note: do not parameterized the generic type variable to an implementation as it confuses
// Guice in the binding.
public UpdateSecuritySettings(
IConfiguration config, IMembership membership, IPriamInstanceFactory factory) {
IConfiguration config,
IMembership membership,
IPriamInstanceFactory factory,
InstanceInfo instanceInfo) {
super(config);
this.membership = membership;
this.factory = factory;
this.instanceInfo = instanceInfo;
}

/**
Expand All @@ -70,37 +75,27 @@ public UpdateSecuritySettings(
*/
@Override
public void execute() {
// if seed dont execute.
int port = config.getSSLStoragePort();
List<String> acls = membership.listACL(port, port);
List<PriamInstance> instances = factory.getAllIds(config.getAppName());

// iterate to add...
Set<String> add = new HashSet<>();
List<PriamInstance> allInstances = factory.getAllIds(config.getAppName());
for (PriamInstance instance : allInstances) {
String range = instance.getHostIP() + "/32";
if (!acls.contains(range)) add.add(range);
}
if (add.size() > 0) {
membership.addACL(add, port, port);
ImmutableSet<String> currentAcl = membership.listACL(port, port);
Set<String> desiredAcl =
factory.getAllIds(config.getAppName())
.stream()
.map(i -> i.getHostIP() + "/32")
.collect(Collectors.toSet());
// Make sure a hole is opened for my instance.
// This accommodates the eventually consistent CassandraInstanceFactory.
// Remove once IPs are all private as there won't be any chance of a discrepancy anymore.
String myIp =
config.usePrivateIP() ? instanceInfo.getPrivateIP() : instanceInfo.getHostIP();
desiredAcl.add(myIp + "/32");
Set<String> aclToAdd = Sets.difference(desiredAcl, currentAcl);
if (!aclToAdd.isEmpty()) {
membership.addACL(aclToAdd, port, port);
firstTimeUpdated = true;
}

// just iterate to generate ranges.
List<String> currentRanges = Lists.newArrayList();
for (PriamInstance instance : instances) {
String range = instance.getHostIP() + "/32";
currentRanges.add(range);
}

// iterate to remove...
List<String> remove = Lists.newArrayList();
for (String acl : acls)
if (!currentRanges.contains(acl)) // if not found then remove....
remove.add(acl);
if (remove.size() > 0) {
membership.removeACL(remove, port, port);
Set<String> aclToRemove = Sets.difference(currentAcl, desiredAcl);
if (!aclToRemove.isEmpty()) {
membership.removeACL(aclToRemove, port, port);
firstTimeUpdated = true;
}
}
Expand Down
13 changes: 6 additions & 7 deletions priam/src/main/java/com/netflix/priam/cli/StaticMembership.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
*/
package com.netflix.priam.cli;

import com.google.common.collect.ImmutableSet;
import com.netflix.priam.identity.IMembership;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import org.apache.cassandra.io.util.FileUtils;
import org.slf4j.Logger;
Expand All @@ -36,7 +35,7 @@ public class StaticMembership implements IMembership {

private static final Logger logger = LoggerFactory.getLogger(StaticMembership.class);

private List<String> racMembership;
private ImmutableSet<String> racMembership;
private int racCount;

public StaticMembership() throws IOException {
Expand All @@ -57,18 +56,18 @@ public StaticMembership() throws IOException {
if (name.startsWith(INSTANCES_PRE)) {
racCount += 1;
if (name.equals(INSTANCES_PRE + racName))
racMembership = Arrays.asList(config.getProperty(name).split(","));
racMembership = ImmutableSet.copyOf(config.getProperty(name).split(","));
}
}
}

@Override
public List<String> getRacMembership() {
public ImmutableSet<String> getRacMembership() {
return racMembership;
}

@Override
public List<String> getCrossAccountRacMembership() {
public ImmutableSet<String> getCrossAccountRacMembership() {
return null;
}

Expand All @@ -90,7 +89,7 @@ public void addACL(Collection<String> listIPs, int from, int to) {}
public void removeACL(Collection<String> listIPs, int from, int to) {}

@Override
public List<String> listACL(int from, int to) {
public ImmutableSet<String> listACL(int from, int to) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,13 @@ default ImmutableSet<String> getTunablePropertyFiles() {
return ImmutableSet.of();
}

/**
* @return true to use private IPs for seeds and insertion into the Token DB false otherwise.
*/
default boolean usePrivateIP() {
return getSnitch().equals("org.apache.cassandra.locator.GossipingPropertyFileSnitch");
}

/**
* Escape hatch for getting any arbitrary property by key This is useful so we don't have to
* keep adding methods to this interface for every single configuration option ever. Also
Expand Down
Loading

0 comments on commit 21497b0

Please sign in to comment.