From 659de92006efd7279f51a00f0b8b7b8030a3425a Mon Sep 17 00:00:00 2001 From: Mathieu Carbou Date: Fri, 16 Dec 2016 17:56:41 -0500 Subject: [PATCH] :heavy_plus_sign: Support for restartability: clear any entity placeholder states that can have been set when server becomes active --- .../DefaultActiveEntityMonitoringService.java | 12 ++-- .../DefaultClientMonitoringService.java | 18 +++--- .../DefaultConsumerManagementRegistry.java | 6 +- .../monitoring/DefaultDataListener.java | 12 ++-- ...Service.java => DefaultFiringService.java} | 6 +- .../monitoring/DefaultManagementService.java | 14 +++-- .../{EventService.java => FiringService.java} | 2 +- .../monitoring/MonitoringServiceProvider.java | 60 +++++++++++++------ ...stener.java => TopologyEventListener.java} | 5 +- ...java => TopologyEventListenerAdapter.java} | 7 ++- .../service/monitoring/TopologyService.java | 54 +++++++++-------- management/testing/integration-tests/pom.xml | 10 ---- .../integration/tests/AbstractTest.java | 6 +- .../{FailoverTest.java => FailoverIT.java} | 2 +- .../tests/{HATest.java => HAIT.java} | 2 +- .../{StartupTest.java => StartupIT.java} | 2 +- 16 files changed, 126 insertions(+), 92 deletions(-) rename management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/{DefaultEventService.java => DefaultFiringService.java} (93%) rename management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/{EventService.java => FiringService.java} (98%) rename management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/{EntityListener.java => TopologyEventListener.java} (94%) rename management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/{EntityListenerAdapter.java => TopologyEventListenerAdapter.java} (89%) rename management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/{FailoverTest.java => FailoverIT.java} (98%) rename management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/{HATest.java => HAIT.java} (98%) rename management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/{StartupTest.java => StartupIT.java} (99%) diff --git a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultActiveEntityMonitoringService.java b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultActiveEntityMonitoringService.java index 4f7a00e0fe..f64fb5156a 100644 --- a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultActiveEntityMonitoringService.java +++ b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultActiveEntityMonitoringService.java @@ -34,13 +34,13 @@ public class DefaultActiveEntityMonitoringService extends AbstractEntityMonitoringService implements ActiveEntityMonitoringService { private final TopologyService topologyService; - private final EventService eventService; + private final FiringService firingService; private final String serverName; - DefaultActiveEntityMonitoringService(long consumerId, TopologyService topologyService, EventService eventService) { + DefaultActiveEntityMonitoringService(long consumerId, TopologyService topologyService, FiringService firingService) { super(consumerId); this.topologyService = Objects.requireNonNull(topologyService); - this.eventService = Objects.requireNonNull(eventService); + this.firingService = Objects.requireNonNull(firingService); this.serverName = topologyService.getCurrentServerName(); } @@ -57,7 +57,7 @@ public void pushNotification(ContextualNotification notification) { logger.trace("[{}] pushNotification({})", getConsumerId(), notification); topologyService.getEntityContext(serverName, getConsumerId()).ifPresent(context -> { notification.setContext(notification.getContext().with(context)); - eventService.fireNotification(notification); + firingService.fireNotification(notification); }); } @@ -70,14 +70,14 @@ public void pushStatistics(ContextualStatistics... statistics) { topologyService.getEntityContext(serverName, getConsumerId()) .ifPresent(context -> statistic.setContext(statistic.getContext().with(context))); } - eventService.fireStatistics(statistics); + firingService.fireStatistics(statistics); } } @Override public void answerManagementCall(String managementCallIdentifier, ContextualReturn contextualReturn) { logger.trace("[{}] answerManagementCall({}, executed={}, error={})", getConsumerId(), managementCallIdentifier, contextualReturn.hasExecuted(), contextualReturn.errorThrown()); - eventService.fireManagementCallAnswer(managementCallIdentifier, contextualReturn); + firingService.fireManagementCallAnswer(managementCallIdentifier, contextualReturn); } @Override diff --git a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultClientMonitoringService.java b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultClientMonitoringService.java index 5a0fc42a38..3aaac7e0cd 100644 --- a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultClientMonitoringService.java +++ b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultClientMonitoringService.java @@ -38,20 +38,20 @@ /** * @author Mathieu Carbou */ -class DefaultClientMonitoringService implements ClientMonitoringService, EntityListener { +class DefaultClientMonitoringService implements ClientMonitoringService, TopologyEventListener { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultClientMonitoringService.class); private final long consumerId; - private final EventService eventService; + private final FiringService firingService; private final ClientCommunicator clientCommunicator; private final TopologyService topologyService; private final Map manageableClients = new ConcurrentHashMap<>(); - DefaultClientMonitoringService(long consumerId, TopologyService topologyService, EventService eventService, ClientCommunicator clientCommunicator) { + DefaultClientMonitoringService(long consumerId, TopologyService topologyService, FiringService firingService, ClientCommunicator clientCommunicator) { this.consumerId = consumerId; this.topologyService = Objects.requireNonNull(topologyService); - this.eventService = Objects.requireNonNull(eventService); + this.firingService = Objects.requireNonNull(firingService); this.clientCommunicator = Objects.requireNonNull(clientCommunicator); } @@ -60,7 +60,7 @@ public void pushNotification(ClientDescriptor from, ContextualNotification notif LOGGER.trace("[{}] pushNotification({}, {})", consumerId, from, notification); topologyService.getClientContext(consumerId, from).ifPresent(context -> { notification.setContext(notification.getContext().with(context)); - eventService.fireNotification(notification); + firingService.fireNotification(notification); }); } @@ -72,7 +72,7 @@ public void pushStatistics(ClientDescriptor from, ContextualStatistics... statis for (ContextualStatistics statistic : statistics) { statistic.setContext(statistic.getContext().with(context)); } - eventService.fireStatistics(statistics); + firingService.fireStatistics(statistics); }); } } @@ -96,7 +96,11 @@ public void exposeManagementRegistry(ClientDescriptor from, ContextContainer con @Override public void answerManagementCall(ClientDescriptor caller, String managementCallIdentifier, ContextualReturn contextualReturn) { LOGGER.trace("[{}] answerManagementCall({})", consumerId, managementCallIdentifier); - eventService.fireManagementCallAnswer(managementCallIdentifier, contextualReturn); + firingService.fireManagementCallAnswer(managementCallIdentifier, contextualReturn); + } + + @Override + public void onBecomeActive() { } @Override diff --git a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultConsumerManagementRegistry.java b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultConsumerManagementRegistry.java index db12f9d8f9..590e7dca51 100644 --- a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultConsumerManagementRegistry.java +++ b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultConsumerManagementRegistry.java @@ -35,7 +35,7 @@ /** * @author Mathieu Carbou */ -class DefaultConsumerManagementRegistry extends DefaultManagementRegistry implements ConsumerManagementRegistry, EntityListener { +class DefaultConsumerManagementRegistry extends DefaultManagementRegistry implements ConsumerManagementRegistry, TopologyEventListener { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultConsumerManagementRegistry.class); @@ -90,6 +90,10 @@ public boolean pushServerEntityNotification(Object managedObjectSource, String t return false; } + @Override + public void onBecomeActive() { + } + @Override public void onFetch(long consumerId, ClientDescriptor clientDescriptor) { } diff --git a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultDataListener.java b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultDataListener.java index 87059c6f31..7682a88d43 100644 --- a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultDataListener.java +++ b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultDataListener.java @@ -41,12 +41,12 @@ class DefaultDataListener implements DataListener { private final long consumerId; private final TopologyService topologyService; - private final EventService eventService; + private final FiringService firingService; - DefaultDataListener(long consumerId, TopologyService topologyService, EventService eventService) { + DefaultDataListener(long consumerId, TopologyService topologyService, FiringService firingService) { this.consumerId = consumerId; this.topologyService = Objects.requireNonNull(topologyService); - this.eventService = Objects.requireNonNull(eventService); + this.firingService = Objects.requireNonNull(firingService); } // =========================================================================== @@ -68,7 +68,7 @@ public void pushBestEffortsData(long consumerId, PlatformServer sender, String n } topologyService.getEntityContext(sender.getServerName(), consumerId).ifPresent(context -> { notification.setContext(notification.getContext().with(context)); - eventService.fireNotification(notification); + firingService.fireNotification(notification); }); break; } @@ -82,7 +82,7 @@ public void pushBestEffortsData(long consumerId, PlatformServer sender, String n topologyService.getEntityContext(sender.getServerName(), cid) .ifPresent(context -> statistic.setContext(statistic.getContext().with(context))); } - eventService.fireStatistics(statistics); + firingService.fireStatistics(statistics); break; } @@ -106,7 +106,7 @@ public synchronized void setState(long consumerId, PlatformServer sender, String // handles data coming from DefaultMonitoringService.answerManagementCall() String managementCallIdentifier = path[2]; ContextualReturn answer = (ContextualReturn) data; - eventService.fireManagementCallAnswer(managementCallIdentifier, answer); + firingService.fireManagementCallAnswer(managementCallIdentifier, answer); } } diff --git a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultEventService.java b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultFiringService.java similarity index 93% rename from management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultEventService.java rename to management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultFiringService.java index bcb719b620..093008c656 100644 --- a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultEventService.java +++ b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultFiringService.java @@ -37,9 +37,9 @@ /** * @author Mathieu Carbou */ -class DefaultEventService implements EventService, Closeable { +class DefaultFiringService implements FiringService, Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventService.class); + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultFiringService.class); private final SequenceGenerator sequenceGenerator; private final PlatformConfiguration platformConfiguration; @@ -51,7 +51,7 @@ class DefaultEventService implements EventService, Closeable { // temporary there to simulate an entity callback with IEntityMessenger private final ExecutorService executorService = Executors.newSingleThreadExecutor(); - DefaultEventService(SequenceGenerator sequenceGenerator, PlatformConfiguration platformConfiguration, SharedManagementRegistry sharedManagementRegistry, Map managementServices, Map clientMonitoringServices) { + DefaultFiringService(SequenceGenerator sequenceGenerator, PlatformConfiguration platformConfiguration, SharedManagementRegistry sharedManagementRegistry, Map managementServices, Map clientMonitoringServices) { this.sequenceGenerator = Objects.requireNonNull(sequenceGenerator); this.platformConfiguration = Objects.requireNonNull(platformConfiguration); this.sharedManagementRegistry = Objects.requireNonNull(sharedManagementRegistry); diff --git a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultManagementService.java b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultManagementService.java index c038093659..e5ba39e395 100644 --- a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultManagementService.java +++ b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultManagementService.java @@ -45,12 +45,12 @@ /** * @author Mathieu Carbou */ -class DefaultManagementService implements ManagementService, EntityListener { +class DefaultManagementService implements ManagementService, TopologyEventListener { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultManagementService.class); private final long consumerId; - private final EventService eventService; + private final FiringService firingService; private final ClientCommunicator clientCommunicator; private final SequenceGenerator sequenceGenerator; private final TopologyService topologyService; @@ -59,10 +59,10 @@ class DefaultManagementService implements ManagementService, EntityListener { private volatile ReadWriteBuffer buffer; private volatile ContextualNotification full; - DefaultManagementService(long consumerId, TopologyService topologyService, EventService eventService, ClientCommunicator clientCommunicator, SequenceGenerator sequenceGenerator) { + DefaultManagementService(long consumerId, TopologyService topologyService, FiringService firingService, ClientCommunicator clientCommunicator, SequenceGenerator sequenceGenerator) { this.consumerId = consumerId; this.topologyService = Objects.requireNonNull(topologyService); - this.eventService = Objects.requireNonNull(eventService); + this.firingService = Objects.requireNonNull(firingService); this.clientCommunicator = Objects.requireNonNull(clientCommunicator); this.sequenceGenerator = Objects.requireNonNull(sequenceGenerator); } @@ -116,10 +116,14 @@ public String sendManagementCallRequest(ClientDescriptor caller, final Context c } track(caller, managementCallIdentifier); - eventService.fireManagementCallRequest(managementCallIdentifier, new ContextualCall<>(fullContext, capabilityName, methodName, returnType, parameters)); + firingService.fireManagementCallRequest(managementCallIdentifier, new ContextualCall<>(fullContext, capabilityName, methodName, returnType, parameters)); return managementCallIdentifier; } + @Override + public void onBecomeActive() { + } + @Override public void onFetch(long consumerId, ClientDescriptor clientDescriptor) { } diff --git a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/EventService.java b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/FiringService.java similarity index 98% rename from management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/EventService.java rename to management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/FiringService.java index 6a00f8ad8a..c6f32bfe7b 100644 --- a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/EventService.java +++ b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/FiringService.java @@ -25,7 +25,7 @@ * * @author Mathieu Carbou */ -interface EventService { +interface FiringService { void fireNotification(ContextualNotification notification); void fireStatistics(ContextualStatistics[] statistics); diff --git a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/MonitoringServiceProvider.java b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/MonitoringServiceProvider.java index 4e3c9feaf8..a2695b5119 100644 --- a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/MonitoringServiceProvider.java +++ b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/MonitoringServiceProvider.java @@ -38,7 +38,12 @@ import java.util.Arrays; import java.util.Collection; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static java.util.stream.Stream.concat; /** * @author Mathieu Carbou @@ -61,7 +66,8 @@ public class MonitoringServiceProvider implements ServiceProvider, Closeable { private final Map managementServices = new ConcurrentHashMap<>(); private final Map clientMonitoringServices = new ConcurrentHashMap<>(); private final Map consumerManagementRegistries = new ConcurrentHashMap<>(); - private final Map entityMonitoringServices = new ConcurrentHashMap<>(); + private final Map activeEntityMonitoringService = new ConcurrentHashMap<>(); + private final Map passiveEntityMonitoringService = new ConcurrentHashMap<>(); private final TimeSource timeSource = TimeSource.BEST; private final DefaultSharedManagementRegistry sharedManagementRegistry = new DefaultSharedManagementRegistry(consumerManagementRegistries); @@ -69,7 +75,7 @@ public class MonitoringServiceProvider implements ServiceProvider, Closeable { private final StatisticsServiceFactory statisticsServiceFactory = new StatisticsServiceFactory(sharedManagementRegistry, timeSource); private PlatformConfiguration platformConfiguration; - private DefaultEventService eventService; + private DefaultFiringService firingService; private TopologyService topologyService; private IStripeMonitoring platformListenerAdapter; @@ -85,24 +91,40 @@ public void prepareForSynchronization() throws ServiceProviderCleanupException { @Override public boolean initialize(ServiceProviderConfiguration configuration, PlatformConfiguration platformConfiguration) { this.platformConfiguration = platformConfiguration; - this.eventService = new DefaultEventService(sequenceGenerator, platformConfiguration, sharedManagementRegistry, managementServices, clientMonitoringServices); - this.topologyService = new TopologyService(eventService, timeSource, platformConfiguration); + this.firingService = new DefaultFiringService(sequenceGenerator, platformConfiguration, sharedManagementRegistry, managementServices, clientMonitoringServices); + this.topologyService = new TopologyService(firingService, timeSource, platformConfiguration); this.platformListenerAdapter = new IStripeMonitoringPlatformListenerAdapter(topologyService); - this.topologyService.addEntityListener(new EntityListenerAdapter() { + this.topologyService.addTopologyEventListener(new TopologyEventListenerAdapter() { @Override public void onEntityDestroyed(long consumerId) { LOGGER.trace("[{}] onEntityDestroyed()", consumerId); - topologyService.removeEntityListener(managementServices.remove(consumerId)); - topologyService.removeEntityListener(clientMonitoringServices.remove(consumerId)); - topologyService.removeEntityListener(consumerManagementRegistries.remove(consumerId)); - entityMonitoringServices.remove(consumerId); + topologyService.removeTopologyEventListener(managementServices.remove(consumerId)); + topologyService.removeTopologyEventListener(clientMonitoringServices.remove(consumerId)); + topologyService.removeTopologyEventListener(consumerManagementRegistries.remove(consumerId)); + passiveEntityMonitoringService.remove(consumerId); + activeEntityMonitoringService.remove(consumerId); } @Override public void onEntityFailover(long consumerId) { onEntityDestroyed(consumerId); } + + @Override + public void onBecomeActive() { + // clear some states that can have been created by placeholders entities with restartability on before they become active + // platform does not send us any event about that so we do not know when these placeholder entities get destroyed + Set consumerIds = concat(concat(concat(concat( + managementServices.keySet().stream(), + clientMonitoringServices.keySet().stream()), + consumerManagementRegistries.keySet().stream()), + activeEntityMonitoringService.keySet().stream()), + passiveEntityMonitoringService.keySet().stream() + ).collect(Collectors.toCollection(TreeSet::new)); + LOGGER.trace("[0] onBecomeActive({})", consumerIds); + consumerIds.forEach(this::onEntityDestroyed); + } }); return true; } @@ -110,7 +132,7 @@ public void onEntityFailover(long consumerId) { @Override public void close() { this.statisticsServiceFactory.close(); - this.eventService.close(); + this.firingService.close(); } @SuppressWarnings("unchecked") @@ -123,7 +145,7 @@ public T getService(long consumerID, ServiceConfiguration configuration) if (consumerID == PLATFORM_CONSUMER_ID) { return serviceType.cast(platformListenerAdapter); } else { - DataListener dataListener = new DefaultDataListener(consumerID, topologyService, eventService); + DataListener dataListener = new DefaultDataListener(consumerID, topologyService, firingService); return serviceType.cast(new IStripeMonitoringDataListenerAdapter(consumerID, dataListener)); } } @@ -146,7 +168,7 @@ public T getService(long consumerID, ServiceConfiguration configuration) if (consumerManagementRegistryConfiguration.wantsServerManagementProviders()) { addServerManagementProviders(consumerID, consumerManagementRegistry); } - topologyService.addEntityListener(consumerManagementRegistry); + topologyService.addTopologyEventListener(consumerManagementRegistry); return consumerManagementRegistry; })); } else { @@ -165,9 +187,9 @@ public T getService(long consumerID, ServiceConfiguration configuration) DefaultClientMonitoringService clientMonitoringService = new DefaultClientMonitoringService( consumerID, topologyService, - eventService, + firingService, clientMonitoringServiceConfiguration.getClientCommunicator()); - topologyService.addEntityListener(clientMonitoringService); + topologyService.addTopologyEventListener(clientMonitoringService); return clientMonitoringService; })); } else { @@ -186,10 +208,10 @@ public T getService(long consumerID, ServiceConfiguration configuration) DefaultManagementService managementService = new DefaultManagementService( consumerID, topologyService, - eventService, + firingService, managementServiceConfiguration.getClientCommunicator(), sequenceGenerator); - topologyService.addEntityListener(managementService); + topologyService.addTopologyEventListener(managementService); return managementService; })); } else { @@ -203,12 +225,12 @@ public T getService(long consumerID, ServiceConfiguration configuration) if (!topologyService.isCurrentServerActive()) { throw new IllegalStateException("Server " + platformConfiguration.getServerName() + " is not active!"); } - return serviceType.cast(entityMonitoringServices.computeIfAbsent(consumerID, cid -> { + return serviceType.cast(activeEntityMonitoringService.computeIfAbsent(consumerID, cid -> { ActiveEntityMonitoringServiceConfiguration activeEntityMonitoringServiceConfiguration = (ActiveEntityMonitoringServiceConfiguration) configuration; DefaultActiveEntityMonitoringService activeEntityMonitoringService = new DefaultActiveEntityMonitoringService( consumerID, topologyService, - eventService); + firingService); return activeEntityMonitoringService; })); } else { @@ -222,7 +244,7 @@ public T getService(long consumerID, ServiceConfiguration configuration) if (topologyService.isCurrentServerActive()) { throw new IllegalStateException("Server " + platformConfiguration.getServerName() + " is not passive!"); } - return serviceType.cast(entityMonitoringServices.computeIfAbsent(consumerID, cid -> { + return serviceType.cast(passiveEntityMonitoringService.computeIfAbsent(consumerID, cid -> { PassiveEntityMonitoringServiceConfiguration passiveEntityMonitoringServiceConfiguration = (PassiveEntityMonitoringServiceConfiguration) configuration; IMonitoringProducer monitoringProducer = passiveEntityMonitoringServiceConfiguration.getMonitoringProducer(); if (monitoringProducer == null) { diff --git a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/EntityListener.java b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/TopologyEventListener.java similarity index 94% rename from management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/EntityListener.java rename to management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/TopologyEventListener.java index 3c0ab6ae2d..3b2254fbfb 100644 --- a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/EntityListener.java +++ b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/TopologyEventListener.java @@ -20,7 +20,9 @@ /** * @author Mathieu Carbou */ -interface EntityListener { +interface TopologyEventListener { + + void onBecomeActive(); void onFetch(long consumerId, ClientDescriptor clientDescriptor); @@ -29,4 +31,5 @@ interface EntityListener { void onEntityDestroyed(long consumerId); void onEntityFailover(long consumerId); + } diff --git a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/EntityListenerAdapter.java b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/TopologyEventListenerAdapter.java similarity index 89% rename from management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/EntityListenerAdapter.java rename to management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/TopologyEventListenerAdapter.java index f5a320eef5..2401c4f209 100644 --- a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/EntityListenerAdapter.java +++ b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/TopologyEventListenerAdapter.java @@ -20,7 +20,7 @@ /** * @author Mathieu Carbou */ -class EntityListenerAdapter implements EntityListener { +class TopologyEventListenerAdapter implements TopologyEventListener { @Override public void onFetch(long consumerId, ClientDescriptor clientDescriptor) { @@ -40,4 +40,9 @@ public void onEntityDestroyed(long consumerId) { public void onEntityFailover(long consumerId) { } + + @Override + public void onBecomeActive() { + + } } diff --git a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/TopologyService.java b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/TopologyService.java index 0a3d126286..007dc46853 100644 --- a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/TopologyService.java +++ b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/TopologyService.java @@ -73,16 +73,16 @@ class TopologyService implements PlatformListener { private final Cluster cluster; private final Stripe stripe; private final Map> fetches = new HashMap<>(); - private final EventService eventService; + private final FiringService firingService; private final TimeSource timeSource; private final PlatformConfiguration platformConfiguration; - private final List entityListeners = new CopyOnWriteArrayList<>(); + private final List topologyEventListeners = new CopyOnWriteArrayList<>(); private final Map failoverEntities = new HashMap<>(); private volatile Server currentActive; - TopologyService(EventService eventService, TimeSource timeSource, PlatformConfiguration platformConfiguration) { - this.eventService = Objects.requireNonNull(eventService); + TopologyService(FiringService firingService, TimeSource timeSource, PlatformConfiguration platformConfiguration) { + this.firingService = Objects.requireNonNull(firingService); this.timeSource = Objects.requireNonNull(timeSource); this.platformConfiguration = platformConfiguration; this.cluster = Cluster.create().addStripe(stripe = Stripe.create("SINGLE")); @@ -102,6 +102,8 @@ public synchronized void serverDidBecomeActive(PlatformServer self) { currentActive = stripe.getServerByName(self.getServerName()).get(); currentActive.setState(Server.State.ACTIVE); currentActive.setActivateTime(timeSource.getTimestamp()); + + topologyEventListeners.forEach(TopologyEventListener::onBecomeActive); } @Override @@ -121,7 +123,7 @@ public synchronized void serverDidJoinStripe(PlatformServer platformServer) { stripe.addServer(server); - eventService.fireNotification(new ContextualNotification(server.getContext(), SERVER_JOINED.name())); + firingService.fireNotification(new ContextualNotification(server.getContext(), SERVER_JOINED.name())); } @Override @@ -134,7 +136,7 @@ public synchronized void serverDidLeaveStripe(PlatformServer platformServer) { Context context = server.getContext(); server.remove(); - eventService.fireNotification(new ContextualNotification(context, SERVER_LEFT.name())); + firingService.fireNotification(new ContextualNotification(context, SERVER_LEFT.name())); } @Override @@ -162,10 +164,10 @@ public synchronized void serverEntityCreated(PlatformServer sender, PlatformEnti fetches.put(platformEntity.consumerID, new HashMap<>()); } - eventService.fireNotification(new ContextualNotification(entity.getContext(), SERVER_ENTITY_CREATED.name())); + firingService.fireNotification(new ContextualNotification(entity.getContext(), SERVER_ENTITY_CREATED.name())); if (failoverEntities.remove(identifier) != null) { - eventService.fireNotification(new ContextualNotification(entity.getContext(), SERVER_ENTITY_FAILOVER_COMPLETED.name())); + firingService.fireNotification(new ContextualNotification(entity.getContext(), SERVER_ENTITY_FAILOVER_COMPLETED.name())); } } @@ -193,10 +195,10 @@ public synchronized void serverEntityDestroyed(PlatformServer sender, PlatformEn if (isCurrentServerActive() && sender.getServerName().equals(currentActive.getServerName())) { fetches.remove(platformEntity.consumerID); - entityListeners.forEach(listener -> listener.onEntityDestroyed(platformEntity.consumerID)); + topologyEventListeners.forEach(listener -> listener.onEntityDestroyed(platformEntity.consumerID)); } - eventService.fireNotification(new ContextualNotification(context, SERVER_ENTITY_DESTROYED.name())); + firingService.fireNotification(new ContextualNotification(context, SERVER_ENTITY_DESTROYED.name())); } @Override @@ -212,7 +214,7 @@ public synchronized void serverEntityFailover(PlatformServer sender, PlatformEnt // so we can keep track of those and send an event after, when they become active failoverEntities.put(ServerEntityIdentifier.create(platformEntity.name, platformEntity.typeName), platformEntity.consumerID); - entityListeners.forEach(listener -> listener.onEntityFailover(platformEntity.consumerID)); + topologyEventListeners.forEach(listener -> listener.onEntityFailover(platformEntity.consumerID)); } @Override @@ -228,7 +230,7 @@ public synchronized void clientConnected(PlatformConnectedClient platformConnect client.addConnection(Connection.create(clientIdentifier.getConnectionUid(), getActiveServer(), endpoint)); - eventService.fireNotification(new ContextualNotification(client.getContext(), CLIENT_CONNECTED.name())); + firingService.fireNotification(new ContextualNotification(client.getContext(), CLIENT_CONNECTED.name())); } @Override @@ -242,7 +244,7 @@ public synchronized void clientDisconnected(PlatformConnectedClient platformConn client.remove(); - eventService.fireNotification(new ContextualNotification(context, CLIENT_DISCONNECTED.name())); + firingService.fireNotification(new ContextualNotification(context, CLIENT_DISCONNECTED.name())); } @Override @@ -267,9 +269,9 @@ public synchronized void clientFetch(PlatformConnectedClient platformConnectedCl fetches.get(platformEntity.consumerID).put(clientDescriptor, clientIdentifier); - entityListeners.forEach(listener -> listener.onFetch(platformEntity.consumerID, clientDescriptor)); + topologyEventListeners.forEach(listener -> listener.onFetch(platformEntity.consumerID, clientDescriptor)); - eventService.fireNotification(new ContextualNotification(entity.getContext(), SERVER_ENTITY_FETCHED.name(), client.getContext())); + firingService.fireNotification(new ContextualNotification(entity.getContext(), SERVER_ENTITY_FETCHED.name(), client.getContext())); } @Override @@ -290,10 +292,10 @@ public synchronized void clientUnfetch(PlatformConnectedClient platformConnected fetches.get(platformEntity.consumerID).remove(clientDescriptor); - entityListeners.forEach(listener -> listener.onUnfetch(platformEntity.consumerID, clientDescriptor)); + topologyEventListeners.forEach(listener -> listener.onUnfetch(platformEntity.consumerID, clientDescriptor)); if (connection.unfetchServerEntity(platformEntity.name, platformEntity.typeName)) { - eventService.fireNotification(new ContextualNotification(entity.getContext(), SERVER_ENTITY_UNFETCHED.name(), client.getContext())); + firingService.fireNotification(new ContextualNotification(entity.getContext(), SERVER_ENTITY_UNFETCHED.name(), client.getContext())); } } @@ -311,7 +313,7 @@ public synchronized void serverStateChanged(PlatformServer sender, ServerState s attrs.put("state", serverState.getState()); attrs.put("activateTime", serverState.getActivate() > 0 ? String.valueOf(serverState.getActivate()) : "0"); - eventService.fireNotification(new ContextualNotification(server.getContext(), SERVER_STATE_CHANGED.name(), attrs)); + firingService.fireNotification(new ContextualNotification(server.getContext(), SERVER_STATE_CHANGED.name(), attrs)); } synchronized void setEntityManagementRegistry(long consumerId, String serverName, ManagementRegistry newRegistry) { @@ -321,7 +323,7 @@ synchronized void setEntityManagementRegistry(long consumerId, String serverName String notif = serverEntity.getManagementRegistry().map(current -> current.equals(newRegistry) ? "" : "ENTITY_REGISTRY_UPDATED").orElse("ENTITY_REGISTRY_AVAILABLE"); if (!notif.isEmpty()) { serverEntity.setManagementRegistry(newRegistry); - eventService.fireNotification(new ContextualNotification(serverEntity.getContext(), notif)); + firingService.fireNotification(new ContextualNotification(serverEntity.getContext(), notif)); } }); } @@ -331,7 +333,7 @@ synchronized void setClientManagementRegistry(long consumerId, ClientDescriptor String notif = client.getManagementRegistry().map(current -> current.equals(newRegistry) ? "" : "CLIENT_REGISTRY_UPDATED").orElse("CLIENT_REGISTRY_AVAILABLE"); if (!notif.isEmpty()) { client.setManagementRegistry(newRegistry); - eventService.fireNotification(new ContextualNotification(client.getContext(), notif)); + firingService.fireNotification(new ContextualNotification(client.getContext(), notif)); } }); } @@ -342,7 +344,7 @@ synchronized void setClientTags(long consumerId, ClientDescriptor clientDescript Set newTags = new HashSet<>(Arrays.asList(tags)); if (!currtags.equals(newTags)) { client.setTags(tags); - eventService.fireNotification(new ContextualNotification(client.getContext(), "CLIENT_TAGS_UPDATED")); + firingService.fireNotification(new ContextualNotification(client.getContext(), "CLIENT_TAGS_UPDATED")); } }); } @@ -406,13 +408,13 @@ String getCurrentServerName() { return platformConfiguration.getServerName(); } - void addEntityListener(EntityListener entityListener) { - entityListeners.add(Objects.requireNonNull(entityListener)); + void addTopologyEventListener(TopologyEventListener topologyEventListener) { + topologyEventListeners.add(Objects.requireNonNull(topologyEventListener)); } - void removeEntityListener(EntityListener entityListener) { - if (entityListener != null) { - entityListeners.remove(entityListener); + void removeTopologyEventListener(TopologyEventListener topologyEventListener) { + if (topologyEventListener != null) { + topologyEventListeners.remove(topologyEventListener); } } diff --git a/management/testing/integration-tests/pom.xml b/management/testing/integration-tests/pom.xml index a38ecbf579..4072a7d096 100644 --- a/management/testing/integration-tests/pom.xml +++ b/management/testing/integration-tests/pom.xml @@ -63,16 +63,6 @@ ${galvan.version} test - - junit - junit - test - - - org.hamcrest - hamcrest-core - 1.3 - ch.qos.logback logback-classic diff --git a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/AbstractTest.java b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/AbstractTest.java index 02f06c1aeb..9a20dfd048 100644 --- a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/AbstractTest.java +++ b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/AbstractTest.java @@ -66,7 +66,7 @@ public abstract class AbstractTest { protected TmsAgentService tmsAgentService; @Rule - public Timeout timeout = Timeout.seconds(60); + public Timeout timeout = Timeout.seconds(90); protected final void commonSetUp(Cluster cluster) throws Exception { this.cluster = cluster; @@ -76,8 +76,8 @@ protected final void commonSetUp(Cluster cluster) throws Exception { connectManagementClients(cluster.getConnectionURI()); - addWebappNode(cluster.getConnectionURI().resolve("pet-clinic")); - addWebappNode(cluster.getConnectionURI().resolve("pet-clinic")); + addWebappNode(cluster.getConnectionURI().resolve("/pet-clinic")); + addWebappNode(cluster.getConnectionURI().resolve("/pet-clinic")); getCaches("pets"); getCaches("clients"); diff --git a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/FailoverTest.java b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/FailoverIT.java similarity index 98% rename from management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/FailoverTest.java rename to management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/FailoverIT.java index c9efa3fb77..76447651f0 100644 --- a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/FailoverTest.java +++ b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/FailoverIT.java @@ -33,7 +33,7 @@ * @author Mathieu Carbou */ @Ignore // TODO activate -public class FailoverTest extends AbstractHATest { +public class FailoverIT extends AbstractHATest { @Test public void failover_management() throws Exception { diff --git a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/HATest.java b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/HAIT.java similarity index 98% rename from management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/HATest.java rename to management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/HAIT.java index 78c4c15424..bc38dcd2a0 100644 --- a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/HATest.java +++ b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/HAIT.java @@ -39,7 +39,7 @@ * @author Mathieu Carbou */ @Ignore // TODO activate -public class HATest extends AbstractHATest { +public class HAIT extends AbstractHATest { @Test public void topology_includes_passives() throws Exception { diff --git a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/StartupTest.java b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/StartupIT.java similarity index 99% rename from management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/StartupTest.java rename to management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/StartupIT.java index 51115d1bba..047087df80 100644 --- a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/StartupTest.java +++ b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/StartupIT.java @@ -36,7 +36,7 @@ * @author Mathieu Carbou */ @Ignore // TODO activate -public class StartupTest extends AbstractHATest { +public class StartupIT extends AbstractHATest { @Before @Override