Skip to content

Commit

Permalink
Change aggregate cluster handling to correctly handle cluster names a…
Browse files Browse the repository at this point in the history
…ppearing in multiple trees or overlapping with the RDS.virtualHosts.
  • Loading branch information
larry-safran committed Jan 9, 2025
1 parent 445bd42 commit 2f4de7f
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 27 deletions.
97 changes: 70 additions & 27 deletions xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import io.grpc.xds.client.XdsResourceType;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/**
Expand All @@ -50,7 +52,9 @@
*/
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
public static final String TOP_CDS_CONTEXT = toContextStr(CLUSTER_RESOURCE.typeName(), "");
public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
public static final String CLUSTER_TYPE = XdsClusterResource.getInstance().typeName();
private final XdsClient xdsClient;
private final XdsConfigWatcher xdsConfigWatcher;
private final SynchronizationContext syncContext;
Expand All @@ -76,6 +80,10 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
}

public static String toContextStr(String typeName, String resourceName) {
return typeName + " resource: " + resourceName;
}

@Override
public Closeable subscribeToCluster(String clusterName) {

Expand All @@ -86,7 +94,7 @@ public Closeable subscribeToCluster(String clusterName) {
Set<ClusterSubscription> localSubscriptions =
clusterSubscriptions.computeIfAbsent(clusterName, k -> new HashSet<>());
localSubscriptions.add(subscription);
addWatcher(new CdsWatcher(clusterName));
addWatcher(new CdsWatcher(clusterName, TOP_CDS_CONTEXT));
});

return subscription;
Expand All @@ -113,13 +121,29 @@ private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
}

private void cancelWatcher(CdsWatcher watcher, String parentContext) {
if (watcher == null) {
return;
}
watcher.parentContexts.remove(parentContext);
if (watcher.parentContexts.isEmpty()) {
cancelWatcher(watcher);
}
}

private <T extends ResourceUpdate> void cancelWatcher(XdsWatcherBase<T> watcher) {
syncContext.throwIfNotInThisSynchronizationContext();

if (watcher == null) {
return;
}

if (watcher instanceof CdsWatcher) {
CdsWatcher cdsWatcher = (CdsWatcher) watcher;
if (!cdsWatcher.parentContexts.isEmpty()) {
return;
}
}
XdsResourceType<T> type = watcher.type;
String resourceName = watcher.resourceName;

Expand Down Expand Up @@ -165,17 +189,17 @@ private void releaseSubscription(ClusterSubscription subscription) {
clusterSubscriptions.remove(clusterName);
XdsWatcherBase<?> cdsWatcher =
resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
cancelClusterWatcherTree((CdsWatcher) cdsWatcher);
cancelClusterWatcherTree((CdsWatcher) cdsWatcher, TOP_CDS_CONTEXT);
maybePublishConfig();
}
});
}

private void cancelClusterWatcherTree(CdsWatcher root) {
private void cancelClusterWatcherTree(CdsWatcher root, String parentContext) {
checkNotNull(root, "root");
cancelWatcher(root);
cancelWatcher(root, parentContext);

if (root.getData() == null || !root.getData().hasValue()) {
if (root.getData() == null || !root.getData().hasValue() || !root.parentContexts.isEmpty()) {
return;
}

Expand All @@ -192,7 +216,7 @@ private void cancelClusterWatcherTree(CdsWatcher root) {
CdsWatcher clusterWatcher =
(CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(cluster);
if (clusterWatcher != null) {
cancelClusterWatcherTree(clusterWatcher);
cancelClusterWatcherTree(clusterWatcher, root.toContextString());
}
}
break;
Expand Down Expand Up @@ -269,6 +293,7 @@ public String toString() {
}

private static class TypeWatchers<T extends ResourceUpdate> {
// Key is resource name
final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
final XdsResourceType<T> resourceType;

Expand Down Expand Up @@ -337,7 +362,7 @@ protected void handleDoesNotExist(String resourceName) {
checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
data = StatusOr.fromStatus(
Status.UNAVAILABLE
.withDescription("No " + type.typeName() + " resource: " + resourceName));
.withDescription("No " + toContextString()));
transientError = false;
}

Expand Down Expand Up @@ -365,7 +390,7 @@ boolean isTransientError() {
}

String toContextString() {
return type.typeName() + " resource: " + resourceName;
return toContextStr(type.typeName(), resourceName);
}
}

Expand All @@ -388,7 +413,7 @@ public void onChanged(XdsListenerResource.LdsUpdate update) {
}

if (virtualHosts != null) {
updateRoutes(virtualHosts);
updateRoutes(virtualHosts, rdsName);
} else if (changedRdsName) {
this.rdsName = rdsName;
addWatcher(new RdsWatcher(rdsName));
Expand Down Expand Up @@ -432,7 +457,7 @@ public RdsWatcher(String resourceName) {
@Override
public void onChanged(RdsUpdate update) {
setData(update);
updateRoutes(update.virtualHosts);
updateRoutes(update.virtualHosts, resourceName());
maybePublishConfig();
}

Expand All @@ -447,12 +472,24 @@ public void onResourceDoesNotExist(String resourceName) {
handleDoesNotExist(resourceName);
xdsConfigWatcher.onResourceDoesNotExist(toContextString());
}

List<String> getCdsNames() {
if (data == null || !data.hasValue() || data.getValue().virtualHosts == null) {
return Collections.emptyList();
}

return data.getValue().virtualHosts.stream()
.map(VirtualHost::name)
.collect(Collectors.toList());
}
}

private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
List<String> parentContexts = new ArrayList<>();

CdsWatcher(String resourceName) {
CdsWatcher(String resourceName, String parentContext) {
super(CLUSTER_RESOURCE, resourceName);
this.parentContexts.add(parentContext);
}

@Override
Expand All @@ -472,6 +509,7 @@ public void onChanged(XdsClusterResource.CdsUpdate update) {
// no eds needed
break;
case AGGREGATE:
String parentContext = this.toContextString();
if (data != null && data.hasValue()) {
Set<String> oldNames = new HashSet<>(data.getValue().prioritizedClusterNames());
Set<String> newNames = new HashSet<>(update.prioritizedClusterNames());
Expand All @@ -480,16 +518,17 @@ public void onChanged(XdsClusterResource.CdsUpdate update) {

Set<String> addedClusters = Sets.difference(newNames, oldNames);
Set<String> deletedClusters = Sets.difference(oldNames, newNames);
addedClusters.forEach((cluster) -> addWatcher(new CdsWatcher(cluster)));
deletedClusters.forEach((cluster) -> cancelClusterWatcherTree(getCluster(cluster)));
addedClusters.forEach((cluster) -> addWatcher(new CdsWatcher(cluster, parentContext)));
deletedClusters.forEach((cluster)
-> cancelClusterWatcherTree(getCluster(cluster), parentContext));

if (!addedClusters.isEmpty()) {
maybePublishConfig();
}
} else {
setData(update);
for (String name : update.prioritizedClusterNames()) {
addWatcher(new CdsWatcher(name));
addWatcher(new CdsWatcher(name, parentContext));
}
}
break;
Expand Down Expand Up @@ -523,7 +562,7 @@ public void onResourceDoesNotExist(String resourceName) {
}
}

private void updateRoutes(List<VirtualHost> virtualHosts) {
private void updateRoutes(List<VirtualHost> virtualHosts, String rdsName) {
String authority = dataPlaneAuthority;

VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, authority);
Expand Down Expand Up @@ -563,25 +602,29 @@ private void updateRoutes(List<VirtualHost> virtualHosts) {
Set<String> deletedClusters =
oldClusters == null ? Collections.emptySet() : Sets.difference(oldClusters, clusters);

addedClusters.forEach((cluster) -> addWatcher(new CdsWatcher(cluster)));
deletedClusters.forEach(watcher -> cancelClusterWatcherTree(getCluster(watcher)));
String rdsContext =
toContextStr(XdsRouteConfigureResource.getInstance().typeName(), rdsName);
addedClusters.forEach((cluster) -> addWatcher(new CdsWatcher(cluster, rdsContext)));
deletedClusters.forEach(watcher -> cancelClusterWatcherTree(getCluster(watcher), rdsContext));
}

// Must be in SyncContext
private void cleanUpRoutes() {
// Remove RdsWatcher & CDS Watchers
TypeWatchers<?> rdsWatcher = resourceWatchers.get(XdsRouteConfigureResource.getInstance());
if (rdsWatcher != null) {
for (XdsWatcherBase<?> watcher : rdsWatcher.watchers.values()) {
cancelWatcher(watcher);
}
TypeWatchers<?> rdsResourceWatcher = resourceWatchers.get(XdsRouteConfigureResource.getInstance());
if (rdsResourceWatcher == null) {
return;
}
for (XdsWatcherBase<?> watcher : rdsResourceWatcher.watchers.values()) {
cancelWatcher(watcher);

// Remove all CdsWatchers
TypeWatchers<?> cdsWatcher = resourceWatchers.get(CLUSTER_RESOURCE);
if (cdsWatcher != null) {
for (XdsWatcherBase<?> watcher : cdsWatcher.watchers.values()) {
cancelWatcher(watcher);
// Remove CdsWatchers pointed to by the RdsWatcher
RdsWatcher rdsWatcher = (RdsWatcher) watcher;
for (String cName : rdsWatcher.getCdsNames()) {
CdsWatcher cdsWatcher = getCluster(cName);
if (cdsWatcher != null) {
cancelClusterWatcherTree(cdsWatcher, rdsWatcher.toContextString());
}
}
}
}
Expand Down
72 changes: 72 additions & 0 deletions xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import static io.grpc.xds.XdsTestUtils.CLUSTER_NAME;
import static io.grpc.xds.XdsTestUtils.ENDPOINT_HOSTNAME;
import static io.grpc.xds.XdsTestUtils.ENDPOINT_PORT;
import static io.grpc.xds.XdsTestUtils.RDS_NAME;
import static io.grpc.xds.XdsTestUtils.getEdsNameForCluster;
import static io.grpc.xds.client.CommonBootstrapperTestUtils.RDS_RESOURCE;
import static io.grpc.xds.client.CommonBootstrapperTestUtils.SERVER_URI;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -37,7 +39,10 @@
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.protobuf.Any;
import com.google.protobuf.Message;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
Expand All @@ -60,14 +65,18 @@
import io.grpc.xds.client.XdsClientImpl;
import io.grpc.xds.client.XdsClientMetricReporter;
import io.grpc.xds.client.XdsTransportFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
Expand Down Expand Up @@ -230,6 +239,69 @@ public void verify_simple_aggregate() {
}
}

@Test
public void testComplexRegisteredAggregate() throws IOException {
InOrder inOrder = org.mockito.Mockito.inOrder(xdsConfigWatcher);

// Do initialization
String rootName1 = "root_c";
List<String> childNames = Arrays.asList("clusterC", "clusterB", "clusterA");
XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName1, childNames);

String rootName2 = "root_2";
List<String> childNames2 = Arrays.asList("clusterA", "clusterX");
XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName2, childNames2);

xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());

Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());

Closeable subscription2 = xdsDependencyManager.subscribeToCluster(rootName2);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
testWatcher.verifyStats(3, 0, 0);
Set<String> expectedClusters = new HashSet<>();
expectedClusters.addAll(ImmutableList.of(rootName1, rootName2, CLUSTER_NAME));
expectedClusters.addAll(childNames);
expectedClusters.addAll(childNames2);
assertThat(xdsConfigCaptor.getValue().getClusters().keySet()).isEqualTo(expectedClusters);

// Close 1 subscription shouldn't affect the other or RDS subscriptions
subscription1.close();
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
Set<String> expectedClusters2 = new HashSet<>();
expectedClusters.addAll(ImmutableList.of(rootName2, CLUSTER_NAME));
expectedClusters.addAll(childNames2);
assertThat(xdsConfigCaptor.getValue().getClusters().keySet()).isEqualTo(expectedClusters2);

subscription2.close();
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
}

@Test
public void testDelayedSubscription() {
InOrder inOrder = org.mockito.Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);

String rootName1 = "root_c";
List<String> childNames = Arrays.asList("clusterC", "clusterB", "clusterA");

Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1);
fakeClock.forwardTime(16, TimeUnit.SECONDS);
inOrder.verify(xdsConfigWatcher).onUpdate(xdsConfigCaptor.capture());
assertThat(xdsConfigCaptor.getValue().getClusters().get(rootName1).toString()).isEqualTo(
StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
"No " + toContextStr(CLUSTER_TYPE_NAME, rootName1))).toString());

XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName1, childNames);
inOrder.verify(xdsConfigWatcher).onUpdate(xdsConfigCaptor.capture());
assertThat(xdsConfigCaptor.getValue().getClusters().get(rootName1).hasValue()).isTrue();
}

@Test
public void testMissingCdsAndEds() {
// update config so that agg cluster references 2 existing & 1 non-existing cluster
Expand Down
5 changes: 5 additions & 0 deletions xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public void run() {
});
}

ImmutableMap<String, Message> getCurrentConfig(String type) {
HashMap<String, Message> hashMap = xdsResources.get(type);
return (hashMap != null) ? ImmutableMap.copyOf(hashMap) : ImmutableMap.of();
}

@Override
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
final StreamObserver<DiscoveryResponse> responseObserver) {
Expand Down
Loading

0 comments on commit 2f4de7f

Please sign in to comment.