From 23686d7f429595c55afe470a83719b16a8f409f6 Mon Sep 17 00:00:00 2001 From: Mathieu Carbou Date: Tue, 20 Dec 2016 10:16:39 -0500 Subject: [PATCH] :construction: #191: Support of new upcoming voltron changes for failover --- .../DefaultClientMonitoringService.java | 13 +- .../DefaultConsumerManagementRegistry.java | 15 +- .../monitoring/DefaultManagementService.java | 18 ++- .../monitoring/MonitoringServiceProvider.java | 135 +++++++++--------- .../service/monitoring/TopologyService.java | 20 ++- .../integration/tests/FailoverIT.java | 52 +++++-- .../integration/tests/TopologyIT.java | 6 - 7 files changed, 157 insertions(+), 102 deletions(-) diff --git a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultClientMonitoringService.java b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultClientMonitoringService.java index 3aaac7e0cd..cd57baf572 100644 --- a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultClientMonitoringService.java +++ b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultClientMonitoringService.java @@ -101,6 +101,8 @@ public void answerManagementCall(ClientDescriptor caller, String managementCallI @Override public void onBecomeActive() { + LOGGER.trace("[{}] onBecomeActive()", this.consumerId); + clear(); } @Override @@ -119,13 +121,16 @@ public void onUnfetch(long consumerId, ClientDescriptor clientDescriptor) { public void onEntityDestroyed(long consumerId) { if (consumerId == this.consumerId) { LOGGER.trace("[{}] onEntityDestroyed()", this.consumerId); - manageableClients.clear(); + clear(); } } @Override public void onEntityFailover(long consumerId) { - onEntityDestroyed(consumerId); + if (consumerId == this.consumerId) { + LOGGER.trace("[{}] onEntityFailover()", this.consumerId); + clear(); + } } void fireMessage(Message message) { @@ -155,4 +160,8 @@ private void send(ClientDescriptor client, Message message) { } } + private void clear() { + manageableClients.clear(); + } + } diff --git a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultConsumerManagementRegistry.java b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultConsumerManagementRegistry.java index 590e7dca51..32eca9dee3 100644 --- a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultConsumerManagementRegistry.java +++ b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultConsumerManagementRegistry.java @@ -92,6 +92,8 @@ public boolean pushServerEntityNotification(Object managedObjectSource, String t @Override public void onBecomeActive() { + LOGGER.trace("[{}] onBecomeActive()", consumerId); + clear(); } @Override @@ -106,14 +108,21 @@ public void onUnfetch(long consumerId, ClientDescriptor clientDescriptor) { public void onEntityDestroyed(long consumerId) { if (consumerId == this.consumerId) { LOGGER.trace("[{}] onEntityDestroyed()", consumerId); - managementProviders.forEach(ManagementProvider::close); - managementProviders.clear(); + clear(); } } @Override public void onEntityFailover(long consumerId) { - onEntityDestroyed(consumerId); + if (consumerId == this.consumerId) { + LOGGER.trace("[{}] onEntityFailover()", consumerId); + clear(); + } } + private void clear() { + managementProviders.forEach(ManagementProvider::close); + managementProviders.clear(); + previouslyExposed.clear(); + } } diff --git a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultManagementService.java b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultManagementService.java index e5ba39e395..1909085a96 100644 --- a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultManagementService.java +++ b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/DefaultManagementService.java @@ -122,6 +122,8 @@ public String sendManagementCallRequest(ClientDescriptor caller, final Context c @Override public void onBecomeActive() { + LOGGER.trace("[{}] onBecomeActive()", this.consumerId); + clear(); } @Override @@ -140,14 +142,16 @@ public void onUnfetch(long consumerId, ClientDescriptor clientDescriptor) { public void onEntityDestroyed(long consumerId) { if (consumerId == this.consumerId) { LOGGER.trace("[{}] onEntityDestroyed()", this.consumerId); - managementCallRequests.clear(); - buffer = null; + clear(); } } @Override public void onEntityFailover(long consumerId) { - onEntityDestroyed(consumerId); + if (consumerId == this.consumerId) { + LOGGER.trace("[{}] onEntityFailover()", this.consumerId); + clear(); + } } void fireMessage(Message message) { @@ -203,4 +207,12 @@ private void track(ClientDescriptor caller, String managementCallIdentifier) { .add(managementCallIdentifier); } + private void clear() { + managementCallRequests.clear(); + full = null; + if (buffer != null) { + buffer.clear(); + } + } + } diff --git a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/MonitoringServiceProvider.java b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/MonitoringServiceProvider.java index ebc7cbe01f..16af8facf8 100644 --- a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/MonitoringServiceProvider.java +++ b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/MonitoringServiceProvider.java @@ -38,12 +38,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Map; -import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - -import static java.util.stream.Stream.concat; /** * @author Mathieu Carbou @@ -66,8 +61,8 @@ public class MonitoringServiceProvider implements ServiceProvider, Closeable { private final Map managementServices = new ConcurrentHashMap<>(); private final Map clientMonitoringServices = new ConcurrentHashMap<>(); private final Map consumerManagementRegistries = new ConcurrentHashMap<>(); - private final Map activeEntityMonitoringService = new ConcurrentHashMap<>(); - private final Map passiveEntityMonitoringService = new ConcurrentHashMap<>(); + private final Map activeEntityMonitoringServices = new ConcurrentHashMap<>(); + private final Map passiveEntityMonitoringServices = new ConcurrentHashMap<>(); private final TimeSource timeSource = TimeSource.BEST; private final DefaultSharedManagementRegistry sharedManagementRegistry = new DefaultSharedManagementRegistry(consumerManagementRegistries); @@ -98,32 +93,12 @@ public boolean initialize(ServiceProviderConfiguration configuration, PlatformCo this.topologyService.addTopologyEventListener(new TopologyEventListenerAdapter() { @Override public void onEntityDestroyed(long consumerId) { - LOGGER.trace("[0] onEntityDestroyed({})", consumerId); + LOGGER.trace("[{}] onEntityDestroyed()", consumerId); topologyService.removeTopologyEventListener(managementServices.remove(consumerId)); topologyService.removeTopologyEventListener(clientMonitoringServices.remove(consumerId)); topologyService.removeTopologyEventListener(consumerManagementRegistries.remove(consumerId)); - passiveEntityMonitoringService.remove(consumerId); - activeEntityMonitoringService.remove(consumerId); - } - - @Override - public void onEntityFailover(long consumerId) { - onEntityDestroyed(consumerId); - } - - @Override - public void onBecomeActive() { - // clear some states that can have been created by placeholders entities with restartability on before they become active - // platform does not send us any event about that so we do not know when these placeholder entities get destroyed - Set consumerIds = concat(concat(concat(concat( - managementServices.keySet().stream(), - clientMonitoringServices.keySet().stream()), - consumerManagementRegistries.keySet().stream()), - activeEntityMonitoringService.keySet().stream()), - passiveEntityMonitoringService.keySet().stream() - ).collect(Collectors.toCollection(TreeSet::new)); - LOGGER.trace("[0] onBecomeActive({})", consumerIds); - consumerIds.forEach(this::onEntityDestroyed); + passiveEntityMonitoringServices.remove(consumerId); + activeEntityMonitoringServices.remove(consumerId); } }); return true; @@ -152,25 +127,35 @@ public T getService(long consumerID, ServiceConfiguration configuration) // get or create a shared registry used to do aggregated operations on all consumer registries (i.e. management calls) if (SharedManagementRegistry.class == serviceType) { + LOGGER.trace("[{}] getService({})", consumerID, SharedManagementRegistry.class.getSimpleName()); return serviceType.cast(sharedManagementRegistry); } // get or creates a registry specific to this entity to handle stats and management calls if (ConsumerManagementRegistry.class == serviceType) { if (configuration instanceof ConsumerManagementRegistryConfiguration) { - return serviceType.cast(consumerManagementRegistries.computeIfAbsent(consumerID, cid -> { - ConsumerManagementRegistryConfiguration consumerManagementRegistryConfiguration = (ConsumerManagementRegistryConfiguration) configuration; - StatisticsService statisticsService = statisticsServiceFactory.createStatisticsService(consumerManagementRegistryConfiguration.getStatisticConfiguration()); - DefaultConsumerManagementRegistry consumerManagementRegistry = new DefaultConsumerManagementRegistry( - consumerID, - consumerManagementRegistryConfiguration.getEntityMonitoringService(), - statisticsService); - if (consumerManagementRegistryConfiguration.wantsServerManagementProviders()) { - addServerManagementProviders(consumerID, consumerManagementRegistry); - } - topologyService.addTopologyEventListener(consumerManagementRegistry); - return consumerManagementRegistry; - })); + ConsumerManagementRegistryConfiguration consumerManagementRegistryConfiguration = (ConsumerManagementRegistryConfiguration) configuration; + // in a failover, we are not aware of passive entity destruction so if we find a previous service with the same consumer id, we clean it + // this is true for this service specifically + DefaultConsumerManagementRegistry managementRegistry = consumerManagementRegistries.remove(consumerID); + if (managementRegistry != null) { + LOGGER.trace("[{}] getService({}): clearing previous instance", consumerID, ConsumerManagementRegistry.class.getSimpleName()); + topologyService.removeTopologyEventListener(managementRegistry); + managementRegistry.onEntityDestroyed(consumerID); + } + // create a new registry + LOGGER.trace("[{}] getService({})", consumerID, ConsumerManagementRegistry.class.getSimpleName()); + StatisticsService statisticsService = statisticsServiceFactory.createStatisticsService(consumerManagementRegistryConfiguration.getStatisticConfiguration()); + managementRegistry = new DefaultConsumerManagementRegistry( + consumerID, + consumerManagementRegistryConfiguration.getEntityMonitoringService(), + statisticsService); + if (consumerManagementRegistryConfiguration.wantsServerManagementProviders()) { + addServerManagementProviders(consumerID, managementRegistry); + } + topologyService.addTopologyEventListener(managementRegistry); + consumerManagementRegistries.put(consumerID, managementRegistry); + return serviceType.cast(managementRegistry); } else { throw new IllegalArgumentException("Missing configuration " + ConsumerManagementRegistryConfiguration.class.getSimpleName() + " when requesting service " + serviceType.getName()); } @@ -179,19 +164,21 @@ public T getService(long consumerID, ServiceConfiguration configuration) // get or creates a client-side monitoring service if (ClientMonitoringService.class == serviceType) { if (configuration instanceof ClientMonitoringServiceConfiguration) { - if (!topologyService.isCurrentServerActive()) { - throw new IllegalStateException("Server " + platformConfiguration.getServerName() + " is not active!"); - } - return serviceType.cast(clientMonitoringServices.computeIfAbsent(consumerID, cid -> { + DefaultClientMonitoringService clientMonitoringService = clientMonitoringServices.get(consumerID); + if (clientMonitoringService == null) { + LOGGER.trace("[{}] getService({})", consumerID, ClientMonitoringService.class.getSimpleName()); ClientMonitoringServiceConfiguration clientMonitoringServiceConfiguration = (ClientMonitoringServiceConfiguration) configuration; - DefaultClientMonitoringService clientMonitoringService = new DefaultClientMonitoringService( + clientMonitoringService = new DefaultClientMonitoringService( consumerID, topologyService, firingService, clientMonitoringServiceConfiguration.getClientCommunicator()); topologyService.addTopologyEventListener(clientMonitoringService); - return clientMonitoringService; - })); + clientMonitoringServices.put(consumerID, clientMonitoringService); + } else { + LOGGER.trace("[{}] getService({}): re-using.", consumerID, ClientMonitoringService.class.getSimpleName()); + } + return serviceType.cast(clientMonitoringService); } else { throw new IllegalArgumentException("Missing configuration " + ClientMonitoringServiceConfiguration.class.getSimpleName() + " when requesting service " + serviceType.getName()); } @@ -200,20 +187,22 @@ public T getService(long consumerID, ServiceConfiguration configuration) // get or creates a monitoring accessor service (for tms) if (ManagementService.class == serviceType) { if (configuration instanceof ManagementServiceConfiguration) { - if (!topologyService.isCurrentServerActive()) { - throw new IllegalStateException("Server " + platformConfiguration.getServerName() + " is not active!"); - } - return serviceType.cast(managementServices.computeIfAbsent(consumerID, cid -> { + DefaultManagementService managementService = managementServices.get(consumerID); + if (managementService == null) { + LOGGER.trace("[{}] getService({})", consumerID, ManagementService.class.getSimpleName()); ManagementServiceConfiguration managementServiceConfiguration = (ManagementServiceConfiguration) configuration; - DefaultManagementService managementService = new DefaultManagementService( + managementService = new DefaultManagementService( consumerID, topologyService, firingService, managementServiceConfiguration.getClientCommunicator(), sequenceGenerator); topologyService.addTopologyEventListener(managementService); - return managementService; - })); + managementServices.put(consumerID, managementService); + } else { + LOGGER.trace("[{}] getService({}): re-using.", consumerID, ManagementService.class.getSimpleName()); + } + return serviceType.cast(managementService); } else { throw new IllegalArgumentException("Missing configuration " + ManagementServiceConfiguration.class.getSimpleName() + " when requesting service " + serviceType.getName()); } @@ -222,17 +211,19 @@ public T getService(long consumerID, ServiceConfiguration configuration) // get or creates a monitoring service for an active entity if (ActiveEntityMonitoringService.class == serviceType) { if (configuration instanceof ActiveEntityMonitoringServiceConfiguration) { - if (!topologyService.isCurrentServerActive()) { - throw new IllegalStateException("Server " + platformConfiguration.getServerName() + " is not active!"); - } - return serviceType.cast(activeEntityMonitoringService.computeIfAbsent(consumerID, cid -> { + DefaultActiveEntityMonitoringService activeEntityMonitoringService = this.activeEntityMonitoringServices.get(consumerID); + if (activeEntityMonitoringService == null) { + LOGGER.trace("[{}] getService({})", consumerID, ActiveEntityMonitoringService.class.getSimpleName()); ActiveEntityMonitoringServiceConfiguration activeEntityMonitoringServiceConfiguration = (ActiveEntityMonitoringServiceConfiguration) configuration; - DefaultActiveEntityMonitoringService activeEntityMonitoringService = new DefaultActiveEntityMonitoringService( + activeEntityMonitoringService = new DefaultActiveEntityMonitoringService( consumerID, topologyService, firingService); - return activeEntityMonitoringService; - })); + activeEntityMonitoringServices.put(consumerID, activeEntityMonitoringService); + } else { + LOGGER.trace("[{}] getService({}): re-using.", consumerID, ActiveEntityMonitoringService.class.getSimpleName()); + } + return serviceType.cast(activeEntityMonitoringService); } else { throw new IllegalArgumentException("Missing configuration " + ActiveEntityMonitoringServiceConfiguration.class.getSimpleName() + " when requesting service " + serviceType.getName()); } @@ -241,19 +232,21 @@ public T getService(long consumerID, ServiceConfiguration configuration) // get or creates a monitoring service for a passive entity, bridging calls to IMonitoringProducer if (PassiveEntityMonitoringService.class == serviceType) { if (configuration instanceof PassiveEntityMonitoringServiceConfiguration) { - if (topologyService.isCurrentServerActive()) { - throw new IllegalStateException("Server " + platformConfiguration.getServerName() + " is not passive!"); - } - return serviceType.cast(passiveEntityMonitoringService.computeIfAbsent(consumerID, cid -> { + DefaultPassiveEntityMonitoringService passiveEntityMonitoringService = passiveEntityMonitoringServices.get(consumerID); + if (passiveEntityMonitoringService == null) { + LOGGER.trace("[{}] getService({})", consumerID, PassiveEntityMonitoringService.class.getSimpleName()); PassiveEntityMonitoringServiceConfiguration passiveEntityMonitoringServiceConfiguration = (PassiveEntityMonitoringServiceConfiguration) configuration; IMonitoringProducer monitoringProducer = passiveEntityMonitoringServiceConfiguration.getMonitoringProducer(); if (monitoringProducer == null) { LOGGER.warn("Platform service " + IMonitoringProducer.class.getSimpleName() + " is not accessible."); return null; } - DefaultPassiveEntityMonitoringService passiveEntityMonitoringService = new DefaultPassiveEntityMonitoringService(consumerID, monitoringProducer); - return passiveEntityMonitoringService; - })); + passiveEntityMonitoringService = new DefaultPassiveEntityMonitoringService(consumerID, monitoringProducer); + passiveEntityMonitoringServices.put(consumerID, passiveEntityMonitoringService); + } else { + LOGGER.trace("[{}] getService({}): re-using.", consumerID, PassiveEntityMonitoringService.class.getSimpleName()); + } + return serviceType.cast(passiveEntityMonitoringService); } else { throw new IllegalArgumentException("Missing configuration " + PassiveEntityMonitoringServiceConfiguration.class.getSimpleName() + " when requesting service " + serviceType.getName()); } diff --git a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/TopologyService.java b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/TopologyService.java index 007dc46853..b3eba1b4ae 100644 --- a/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/TopologyService.java +++ b/management/monitoring-service/src/main/java/org/terracotta/management/service/monitoring/TopologyService.java @@ -97,13 +97,27 @@ class TopologyService implements PlatformListener { public synchronized void serverDidBecomeActive(PlatformServer self) { LOGGER.trace("[0] serverDidBecomeActive({})", self.getServerName()); - serverDidJoinStripe(self); + Server server = Server.create(self.getServerName()) + .setBindAddress(self.getBindAddress()) + .setBindPort(self.getBindPort()) + .setBuildId(self.getBuild()) + .setGroupPort(self.getGroupPort()) + .setHostName(self.getHostName()) + .setStartTime(self.getStartTime()) + .setHostAddress(self.getHostAddress()) + .setVersion(self.getVersion()) + .computeUpTime(); + + stripe.addServer(server); currentActive = stripe.getServerByName(self.getServerName()).get(); - currentActive.setState(Server.State.ACTIVE); - currentActive.setActivateTime(timeSource.getTimestamp()); topologyEventListeners.forEach(TopologyEventListener::onBecomeActive); + + firingService.fireNotification(new ContextualNotification(server.getContext(), SERVER_JOINED.name())); + + long time = timeSource.getTimestamp(); + serverStateChanged(self, new ServerState("ACTIVE", time, time)); } @Override diff --git a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/FailoverIT.java b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/FailoverIT.java index cfe869a9f0..bed058a4f8 100644 --- a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/FailoverIT.java +++ b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/FailoverIT.java @@ -15,50 +15,73 @@ */ package org.terracotta.management.integration.tests; +import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.terracotta.management.model.capabilities.descriptors.Settings; import org.terracotta.management.model.cluster.Cluster; import org.terracotta.management.model.cluster.Server; +import org.terracotta.management.model.cluster.ServerEntity; import org.terracotta.management.model.message.Message; import org.terracotta.management.model.notification.ContextualNotification; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** * @author Mathieu Carbou */ -@Ignore("Impacted by https://github.com/Terracotta-OSS/terracotta-core/issues/405") -//TODO: VOLTRON ISSUE ? https://github.com/Terracotta-OSS/terracotta-core/issues/405 public class FailoverIT extends AbstractHATest { - @Test - public void failover_management() throws Exception { + Server oldActive; + Server oldPassive; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + Cluster cluster = tmsAgentService.readTopology(); - Server active = cluster.serverStream().filter(Server::isActive).findFirst().get(); - Server passive = cluster.serverStream().filter(server -> !server.isActive()).findFirst().get(); - assertThat(active.getState(), equalTo(Server.State.ACTIVE)); - assertThat(passive.getState(), equalTo(Server.State.PASSIVE)); + oldActive = cluster.serverStream().filter(Server::isActive).findFirst().get(); + oldPassive = cluster.serverStream().filter(server -> !server.isActive()).findFirst().get(); + assertThat(oldActive.getState(), equalTo(Server.State.ACTIVE)); + assertThat(oldPassive.getState(), equalTo(Server.State.PASSIVE)); // clear buffer tmsAgentService.readMessages(); // kill active - passive should take the active role - System.out.printf("==> terminateActive()"); voltron.getClusterControl().terminateActive(); - System.out.printf("==> waitForActive()"); voltron.getClusterControl().waitForActive(); + } - System.out.printf("==> readTopology()"); - cluster = tmsAgentService.readTopology(); + @Test + @Ignore + public void topology_recovery_after_failover() throws Exception { + Cluster cluster = tmsAgentService.readTopology(); + + // verify new server Server newActive = cluster.serverStream().filter(Server::isActive).findFirst().get(); assertThat(newActive.getState(), equalTo(Server.State.ACTIVE)); - assertThat(newActive.getServerName(), equalTo(passive.getServerName())); + assertThat(newActive.getServerName(), equalTo(oldPassive.getServerName())); + + // removes all random values + String currentTopo = toJson(cluster.toMap()).toString(); + String actual = removeRandomValues(currentTopo); + String expected = readJson("topology.json").toString(); + assertEquals(expected, actual); + } + + @Test + @Ignore + public void notifications_after_failover() throws Exception { // read messages List messages = tmsAgentService.readMessages(); @@ -74,7 +97,7 @@ public void failover_management() throws Exception { assertThat( notifs.stream().map(notif -> notif.getContext().get(Server.NAME_KEY)).collect(Collectors.toList()), - equalTo(Arrays.asList(newActive.getServerName(), newActive.getServerName()))); + equalTo(Arrays.asList(oldPassive.getServerName(), oldPassive.getServerName()))); assertThat( notifs.stream().map(notif -> notif.getAttributes().get("state")).collect(Collectors.toList()), @@ -87,6 +110,7 @@ public void failover_management() throws Exception { } @Test + @Ignore public void puts_can_be_seen_on_other_clients_after_failover() throws Exception { put(0, "clients", "client1", "Mathieu"); diff --git a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/TopologyIT.java b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/TopologyIT.java index 877a3addea..39bcdfe04d 100644 --- a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/TopologyIT.java +++ b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/TopologyIT.java @@ -42,13 +42,7 @@ public void can_read_topology() throws Exception { Cluster cluster = tmsAgentService.readTopology(); String currentTopo = toJson(cluster.toMap()).toString(); String actual = removeRandomValues(currentTopo); - - String expected = readJson("topology.json").toString(); - - System.out.println("This is the actual topology : " + actual); - System.out.println("This is the expected topology : " + expected); - // and compare assertEquals(expected, actual); }