diff --git a/xds/src/main/java/io/grpc/xds/XdsClusterSubscriptionRegistry.java b/xds/src/main/java/io/grpc/xds/XdsClusterSubscriptionRegistry.java new file mode 100644 index 00000000000..e227e785645 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/XdsClusterSubscriptionRegistry.java @@ -0,0 +1,25 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import java.io.Closeable; + +public interface XdsClusterSubscriptionRegistry { + Closeable subscribeToCluster(String clusterName); + + void refreshDynamicSubscriptions(); +} diff --git a/xds/src/main/java/io/grpc/xds/XdsConfig.java b/xds/src/main/java/io/grpc/xds/XdsConfig.java new file mode 100644 index 00000000000..f2581729b23 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/XdsConfig.java @@ -0,0 +1,46 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import io.grpc.StatusOr; +import java.util.Map; + +public class XdsConfig { + public final XdsListenerResource listener; + public final XdsRouteConfigureResource route; + public final Map> clusters; + + public XdsConfig(XdsListenerResource listener, XdsRouteConfigureResource route, + Map> clusters) { + this.listener = listener; + this.route = route; + this.clusters = clusters; + } + + public static class XdsClusterConfig { + public final String clusterName; + public final XdsClusterResource clusterResource; + public final XdsEndpointResource endpoint; + + public XdsClusterConfig(String clusterName, XdsClusterResource clusterResource, + XdsEndpointResource endpoint) { + this.clusterName = clusterName; + this.clusterResource = clusterResource; + this.endpoint = endpoint; + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java new file mode 100644 index 00000000000..7d7cdedc1c4 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -0,0 +1,271 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.xds.client.XdsClient.ResourceUpdate; +import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG; + +import io.grpc.InternalLogId; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; +import io.grpc.xds.client.XdsClient; +import io.grpc.xds.client.XdsClient.ResourceWatcher; +import io.grpc.xds.client.XdsLogger; +import io.grpc.xds.client.XdsResourceType; +import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * This class acts as a layer of indirection between the XdsClient and the NameResolver. It + * maintains the watchers for the xds resources and when an update is received, it either requests + * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher. + */ +@SuppressWarnings("unused") // TODO remove when changes for A74 are fully implemented +final class XdsDependencyManager implements XdsClusterSubscriptionRegistry { + private final XdsClient xdsClient; + private final XdsConfigWatcher xdsConfigWatcher; + private final SynchronizationContext syncContext; + private final String dataPlaneAuthority; + private final Map> clusterSubscriptions = new HashMap<>(); + + private final InternalLogId logId; + private final XdsLogger logger; + private XdsConfig lastXdsConfig = null; + private Map, TypeWatchers> resourceWatchers = new HashMap<>(); + + XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher, + SynchronizationContext syncContext, String dataPlaneAuthority, + String listenerName) { + logId = InternalLogId.allocate("xds-dependency-manager", listenerName); + logger = XdsLogger.withLogId(logId); + this.xdsClient = xdsClient; + this.xdsConfigWatcher = xdsConfigWatcher; + this.syncContext = syncContext; + this.dataPlaneAuthority = dataPlaneAuthority; + + // start the ball rolling + addWatcher(new LdsWatcher(listenerName)); + } + + @Override + public ClusterSubscription subscribeToCluster(String clusterName) { + checkNotNull(clusterName, "clusterName"); + ClusterSubscription subscription = new ClusterSubscription(clusterName); + + Set localSubscriptions = + clusterSubscriptions.computeIfAbsent(clusterName, k -> new HashSet<>()); + localSubscriptions.add(subscription); + addWatcher(new CdsWatcher(clusterName)); + + return subscription; + } + + @Override + public void refreshDynamicSubscriptions() { + // TODO: implement + } + + @SuppressWarnings("unchecked") + private void addWatcher(XdsWatcherBase watcher) { + XdsResourceType type = watcher.resourceContext.getResourceType(); + String resourceName = watcher.resourceContext.resourceName; + + this.syncContext.executeLater(() -> { + TypeWatchers typeWatchers = (TypeWatchers)resourceWatchers.get(type); + if (typeWatchers == null) { + typeWatchers = new TypeWatchers<>(type); + resourceWatchers.put(type, typeWatchers); + } + + typeWatchers.add(resourceName, watcher); + xdsClient.watchXdsResource(type, resourceName, watcher); + }); + } + + + public void shutdown() { + for (TypeWatchers watchers : resourceWatchers.values()) { + shutdownWatchersForType(watchers); + } + } + + private void shutdownWatchersForType(TypeWatchers watchers) { + for (Map.Entry> watcherEntry : watchers.watchers.entrySet()) { + xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(), + watcherEntry.getValue()); + } + } + + private void releaseSubscription(ClusterSubscription subscription) { + checkNotNull(subscription, "subscription"); + String clusterName = subscription.getClusterName(); + Set subscriptions = clusterSubscriptions.get(clusterName); + if (subscriptions == null) { + logger.log(DEBUG, "Subscription already released for {0}", clusterName); + return; + } + + subscriptions.remove(subscription); + if (subscriptions.isEmpty()) { + clusterSubscriptions.remove(clusterName); + // TODO release XdsClient watches for this cluster and its endpoint + maybePublishConfig(); + } + } + + private void maybePublishConfig() { + // Check if all resources have results and if so generate a new XdsConfig and send it to all + // the watchers + } + + @Override + public String toString() { + return logId.toString(); + } + + private static class TypeWatchers { + Map> watchers; + XdsResourceType resourceType; + + TypeWatchers(XdsResourceType resourceType) { + watchers = new HashMap<>(); + this.resourceType = resourceType; + } + + public void add(String resourceName, ResourceWatcher watcher) { + watchers.put(resourceName, watcher); + } + } + + + public static class ResourceContext { + String resourceName; + XdsResourceType resourceType; + + public ResourceContext(XdsResourceType resourceType, String resourceName) { + this.resourceName = resourceName; + this.resourceType = resourceType; + } + + public XdsResourceType getResourceType() { + return resourceType; + } + } + + public interface XdsConfigWatcher { + + void onUpdate(XdsConfig config); + + void onError(ResourceContext resourceContext, Status status); + + void onResourceDoesNotExist(ResourceContext resourceContext); + } + + private class ClusterSubscription implements Closeable { + String clusterName; + + public ClusterSubscription(String clusterName) { + this.clusterName = clusterName; + } + + public String getClusterName() { + return clusterName; + } + + @Override + public void close() throws IOException { + releaseSubscription(this); + } + } + + private abstract class XdsWatcherBase implements ResourceWatcher { + final ResourceContext resourceContext; + + private XdsWatcherBase(XdsResourceType type, String resourceName) { + this.resourceContext = new ResourceContext<>(type, resourceName); + } + + @Override + public void onError(Status error) { + xdsConfigWatcher.onError(resourceContext, error); + } + + @Override + public void onResourceDoesNotExist(String resourceName) { + xdsConfigWatcher.onResourceDoesNotExist(resourceContext); + } + } + + private class LdsWatcher extends XdsWatcherBase { + + private LdsWatcher(String resourceName) { + super(XdsListenerResource.getInstance(), resourceName); + } + + @Override + public void onChanged(XdsListenerResource.LdsUpdate update) { + // TODO: process the update and add an RdsWatcher if needed + // If none needed call maybePublishConfig() + } + } + + private class RdsWatcher extends XdsWatcherBase { + + public RdsWatcher(String resourceName) { + super(XdsRouteConfigureResource.getInstance(), resourceName); + } + + @Override + public void onChanged(RdsUpdate update) { + // TODO: process the update and add CdsWatchers for all virtual hosts as needed + // If none needed call maybePublishConfig() + } + } + + + private class CdsWatcher extends XdsWatcherBase { + + private CdsWatcher(String resourceName) { + super(XdsClusterResource.getInstance(), resourceName); + } + + @Override + public void onChanged(XdsClusterResource.CdsUpdate update) { + // TODO: process the update and add an EdsWatcher if needed + // else call maybePublishConfig() + } + } + + private class EdsWatcher extends XdsWatcherBase { + private EdsWatcher(String resourceName) { + super(XdsEndpointResource.getInstance(), resourceName); + } + + @Override + public void onChanged(XdsEndpointResource.EdsUpdate update) { + // TODO: process the update + maybePublishConfig(); + + } + } +}