diff --git a/management/testing/integration-tests/pom.xml b/management/testing/integration-tests/pom.xml
index 9078e09b61..a38ecbf579 100644
--- a/management/testing/integration-tests/pom.xml
+++ b/management/testing/integration-tests/pom.xml
@@ -68,6 +68,23 @@
junit
test
+
+ org.hamcrest
+ hamcrest-core
+ 1.3
+
+
+ ch.qos.logback
+ logback-classic
+ ${logback.version}
+ test
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.6.3
+ test
+
diff --git a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/AbstractHATest.java b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/AbstractHATest.java
new file mode 100644
index 0000000000..b904d50712
--- /dev/null
+++ b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/AbstractHATest.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright Terracotta, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.terracotta.management.integration.tests;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.terracotta.testing.rules.BasicExternalCluster;
+
+import java.io.File;
+
+import static java.util.Collections.emptyList;
+
+/**
+ * @author Mathieu Carbou
+ */
+public abstract class AbstractHATest extends AbstractTest {
+
+ private final String offheapResource = "primary-server-resource";
+ private final String resourceConfig =
+ ""
+ + ""
+ + "64"
+ + "" +
+ "\n";
+
+ @Rule
+ public org.terracotta.testing.rules.Cluster voltron =
+ new BasicExternalCluster(new File("target/galvan"), 2, emptyList(), "", resourceConfig, "");
+
+ @Before
+ public void setUp() throws Exception {
+ voltron.getClusterControl().waitForActive();
+ voltron.getClusterControl().waitForRunningPassivesInStandby();
+ commonSetUp(voltron);
+ tmsAgentService.readMessages();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ commonTearDown();
+ }
+
+}
diff --git a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/AbstractTest.java b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/AbstractTest.java
new file mode 100644
index 0000000000..02f06c1aeb
--- /dev/null
+++ b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/AbstractTest.java
@@ -0,0 +1,201 @@
+/*
+ * Copyright Terracotta, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.terracotta.management.integration.tests;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.terracotta.connection.Connection;
+import org.terracotta.connection.ConnectionFactory;
+import org.terracotta.connection.ConnectionPropertyNames;
+import org.terracotta.management.entity.sample.Cache;
+import org.terracotta.management.entity.sample.client.CacheFactory;
+import org.terracotta.management.entity.tms.TmsAgentConfig;
+import org.terracotta.management.entity.tms.client.TmsAgentEntity;
+import org.terracotta.management.entity.tms.client.TmsAgentEntityFactory;
+import org.terracotta.management.entity.tms.client.TmsAgentService;
+import org.terracotta.management.model.capabilities.context.CapabilityContext;
+import org.terracotta.management.model.stats.ContextualStatistics;
+import org.terracotta.management.registry.collect.StatisticConfiguration;
+import org.terracotta.testing.rules.Cluster;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Mathieu Carbou
+ */
+public abstract class AbstractTest {
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ private Connection managementConnection;
+ private Cluster cluster;
+
+ protected final List webappNodes = new ArrayList<>();
+ protected final Map> caches = new HashMap<>();
+ protected TmsAgentService tmsAgentService;
+
+ @Rule
+ public Timeout timeout = Timeout.seconds(60);
+
+ protected final void commonSetUp(Cluster cluster) throws Exception {
+ this.cluster = cluster;
+
+ mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
+ mapper.addMixIn(CapabilityContext.class, CapabilityContextMixin.class);
+
+ connectManagementClients(cluster.getConnectionURI());
+
+ addWebappNode(cluster.getConnectionURI().resolve("pet-clinic"));
+ addWebappNode(cluster.getConnectionURI().resolve("pet-clinic"));
+
+ getCaches("pets");
+ getCaches("clients");
+ }
+
+ protected final void commonTearDown() throws Exception {
+ closeNodes();
+ if (managementConnection != null) {
+ managementConnection.close();
+ }
+ if (cluster != null) {
+ cluster.getClusterControl().terminateAllServers();
+ }
+ }
+
+ protected JsonNode readJson(String file) {
+ try {
+ return mapper.readTree(new File(AbstractTest.class.getResource("/" + file).toURI()));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected JsonNode toJson(Object o) {
+ try {
+ return mapper.readTree(mapper.writeValueAsString(o));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected int size(int nodeIdx, String cacheName) {
+ return caches.get(cacheName).get(nodeIdx).size();
+ }
+
+ protected String get(int nodeIdx, String cacheName, String key) {
+ return caches.get(cacheName).get(nodeIdx).get(key);
+ }
+
+ protected void put(int nodeIdx, String cacheName, String key, String value) {
+ caches.get(cacheName).get(nodeIdx).put(key, value);
+ }
+
+ protected void remove(int nodeIdx, String cacheName, String key) {
+ caches.get(cacheName).get(nodeIdx).remove(key);
+ }
+
+ protected void closeNodes() {
+ webappNodes.forEach(cacheFactory -> {
+ try {
+ cacheFactory.getConnection().close();
+ } catch (IOException ignored) {
+ }
+ });
+ }
+
+ protected void getCaches(String name) {
+ caches.put(name, webappNodes.stream().map(cacheFactory -> cacheFactory.getCache(name)).collect(Collectors.toList()));
+ }
+
+ protected void addWebappNode(URI uri) throws Exception {
+ StatisticConfiguration statisticConfiguration = new StatisticConfiguration()
+ .setAverageWindowDuration(1, TimeUnit.MINUTES)
+ .setHistorySize(100)
+ .setHistoryInterval(1, TimeUnit.SECONDS)
+ .setTimeToDisable(5, TimeUnit.SECONDS);
+ CacheFactory cacheFactory = new CacheFactory(uri, statisticConfiguration);
+ cacheFactory.init();
+ webappNodes.add(cacheFactory);
+ }
+
+ public static abstract class CapabilityContextMixin {
+ @JsonIgnore
+ public abstract Collection getRequiredAttributeNames();
+
+ @JsonIgnore
+ public abstract Collection getRequiredAttributes();
+ }
+
+ private void connectManagementClients(URI uri) throws Exception {
+ // connects to server
+ Properties properties = new Properties();
+ properties.setProperty(ConnectionPropertyNames.CONNECTION_NAME, getClass().getSimpleName());
+ properties.setProperty(ConnectionPropertyNames.CONNECTION_TIMEOUT, "5000");
+ this.managementConnection = ConnectionFactory.connect(uri, properties);
+
+ // create a tms entity
+ TmsAgentEntityFactory tmsAgentEntityFactory = new TmsAgentEntityFactory(managementConnection, getClass().getSimpleName());
+ TmsAgentEntity tmsAgentEntity = tmsAgentEntityFactory.retrieveOrCreate(new TmsAgentConfig()
+ .setMaximumUnreadMessages(1024 * 1024)
+ .setStatisticConfiguration(new StatisticConfiguration()
+ .setAverageWindowDuration(1, TimeUnit.MINUTES)
+ .setHistorySize(100)
+ .setHistoryInterval(1, TimeUnit.SECONDS)
+ .setTimeToDisable(5, TimeUnit.SECONDS)));
+ this.tmsAgentService = new TmsAgentService(tmsAgentEntity);
+ this.tmsAgentService.setOperationTimeout(10, TimeUnit.SECONDS);
+ }
+
+ protected void queryAllRemoteStatsUntil(Predicate> test) throws Exception {
+ List extends ContextualStatistics> statistics;
+ do {
+ statistics = tmsAgentService.readMessages()
+ .stream()
+ .filter(message -> message.getType().equals("STATISTICS"))
+ .flatMap(message -> message.unwrap(ContextualStatistics.class).stream())
+ .collect(Collectors.toList());
+ // PLEASE KEEP THIS ! Really useful when troubleshooting stats!
+ /*if (!statistics.isEmpty()) {
+ System.out.println("received at " + System.currentTimeMillis() + ":");
+ statistics.stream()
+ .flatMap(o -> o.getStatistics().entrySet().stream())
+ .forEach(System.out::println);
+ }*/
+ Thread.sleep(500);
+ } while (!Thread.currentThread().isInterrupted() && (statistics.isEmpty() || !test.test(statistics)));
+ assertFalse(Thread.currentThread().isInterrupted());
+ assertTrue(test.test(statistics));
+ }
+
+}
diff --git a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/FailoverTest.java b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/FailoverTest.java
new file mode 100644
index 0000000000..c9efa3fb77
--- /dev/null
+++ b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/FailoverTest.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright Terracotta, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.terracotta.management.integration.tests;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.terracotta.management.model.cluster.Cluster;
+import org.terracotta.management.model.cluster.Server;
+import org.terracotta.management.model.message.Message;
+import org.terracotta.management.model.notification.ContextualNotification;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author Mathieu Carbou
+ */
+@Ignore // TODO activate
+public class FailoverTest extends AbstractHATest {
+
+ @Test
+ public void failover_management() throws Exception {
+ Cluster cluster = tmsAgentService.readTopology();
+ Server active = cluster.serverStream().filter(Server::isActive).findFirst().get();
+ Server passive = cluster.serverStream().filter(server -> !server.isActive()).findFirst().get();
+ assertThat(active.getState(), equalTo(Server.State.ACTIVE));
+ assertThat(passive.getState(), equalTo(Server.State.PASSIVE));
+
+ // clear buffer
+ tmsAgentService.readMessages();
+
+ // kill active - passive should take the active role
+ voltron.getClusterControl().terminateActive();
+ voltron.getClusterControl().waitForActive();
+
+ cluster = tmsAgentService.readTopology();
+ Server newActive = cluster.serverStream().filter(Server::isActive).findFirst().get();
+ assertThat(newActive.getState(), equalTo(Server.State.ACTIVE));
+ assertThat(newActive.getServerName(), equalTo(passive.getServerName()));
+
+ // read messages
+ List messages = tmsAgentService.readMessages();
+ assertThat(messages.size(), equalTo(3));
+
+ List notifs = messages.stream()
+ .filter(message -> message.getType().equals("NOTIFICATION"))
+ .flatMap(message -> message.unwrap(ContextualNotification.class).stream())
+ .filter(notif -> notif.getType().equals("SERVER_STATE_CHANGED"))
+ .collect(Collectors.toList());
+
+ assertThat(
+ notifs.stream().map(notif -> notif.getContext().get(Server.NAME_KEY)).collect(Collectors.toList()),
+ equalTo(Arrays.asList(newActive.getServerName(), newActive.getServerName())));
+
+ assertThat(
+ notifs.stream().map(notif -> notif.getAttributes().get("state")).collect(Collectors.toList()),
+ equalTo(Arrays.asList("ACTIVE", "ACTIVE")));
+
+ //TODO: complete with Galvan
+ //- test topology (like topology_includes_passives), client should have re-exposed their management metadata
+ //- check notifications: server states
+ //- check notification that might be there: CLIENT_RECONNECTED and SERVER_ENTITY_FAILOVER_COMPLETE
+ }
+
+}
diff --git a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/HATest.java b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/HATest.java
new file mode 100644
index 0000000000..78c4c15424
--- /dev/null
+++ b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/HATest.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright Terracotta, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.terracotta.management.integration.tests;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.terracotta.management.model.capabilities.descriptors.Settings;
+import org.terracotta.management.model.cluster.Cluster;
+import org.terracotta.management.model.cluster.Server;
+import org.terracotta.management.model.cluster.ServerEntity;
+import org.terracotta.management.model.message.Message;
+import org.terracotta.management.model.notification.ContextualNotification;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author Mathieu Carbou
+ */
+@Ignore // TODO activate
+public class HATest extends AbstractHATest {
+
+ @Test
+ public void topology_includes_passives() throws Exception {
+ Cluster cluster = tmsAgentService.readTopology();
+
+ // removes all random values
+
+ cluster.serverStream().forEach(server -> {
+ server.setActivateTime(0);
+ server.setStartTime(0);
+ server.setBuildId("Build ID");
+ });
+
+ cluster.serverEntityStream()
+ .map(ServerEntity::getManagementRegistry)
+ .flatMap(managementRegistry -> Stream.of(
+ managementRegistry.flatMap(r -> r.getCapability("ServerCacheSettings")),
+ managementRegistry.flatMap(r -> r.getCapability("OffHeapResourceSettings"))))
+ .forEach(capability -> {
+ if (capability.isPresent()) {
+ capability.get()
+ .getDescriptors(Settings.class)
+ .stream()
+ .filter(settings -> settings.containsKey("time")).forEach(settings -> settings.set("time", 0));
+ }
+ });
+
+ Server passive = cluster.serverStream().filter(server -> !server.isActive()).findFirst().get();
+ final String[] currentPassive = {toJson(passive.toMap()).toString()};
+ cluster.clientStream().forEach(client -> currentPassive[0] = currentPassive[0]
+ .replace(passive.getServerName(), "stripe-PASSIVE"));
+
+ // and compare
+ assertEquals(readJson("passive.json").toString(), currentPassive[0]);
+ }
+
+ @Test
+ public void get_notifications_when_passive_leaves() throws Exception {
+ Server active = tmsAgentService.readTopology().serverStream().filter(Server::isActive).findFirst().get();
+ Server passive = tmsAgentService.readTopology().serverStream().filter(server -> !server.isActive()).findFirst().get();
+ assertThat(active.getState(), equalTo(Server.State.ACTIVE));
+ assertThat(passive.getState(), equalTo(Server.State.PASSIVE));
+
+ // clear notification buffer
+ tmsAgentService.readMessages();
+
+ // remove one passive
+ voltron.getClusterControl().terminateOnePassive();
+
+ // read messages
+ List messages = tmsAgentService.readMessages();
+ assertThat(messages.size(), equalTo(2));
+ Map> map = messages.stream().collect(Collectors.groupingBy(Message::getType));
+ assertThat(map.size(), equalTo(2));
+ assertThat(map.keySet(), hasItem("TOPOLOGY"));
+ assertThat(map.keySet(), hasItem("NOTIFICATION"));
+ assertThat(map.get("NOTIFICATION").size(), equalTo(1));
+
+ List notifs = map.get("NOTIFICATION").stream()
+ .flatMap(message -> message.unwrap(ContextualNotification.class).stream())
+ .collect(Collectors.toList());
+
+ assertThat(
+ notifs.stream().map(ContextualNotification::getType).collect(Collectors.toList()),
+ equalTo(Arrays.asList("SERVER_LEFT")));
+
+ assertThat(
+ notifs.get(0).getContext().get(Server.NAME_KEY),
+ equalTo(passive.getServerName()));
+ }
+
+}
diff --git a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/SimpleGalvanIT.java b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/SimpleGalvanIT.java
index bca3f7cbfd..08749d977d 100644
--- a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/SimpleGalvanIT.java
+++ b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/SimpleGalvanIT.java
@@ -15,11 +15,13 @@
*/
package org.terracotta.management.integration.tests;
+import org.junit.After;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.terracotta.management.entity.sample.client.CacheFactory;
import org.terracotta.management.registry.collect.StatisticConfiguration;
import org.terracotta.testing.rules.BasicExternalCluster;
@@ -52,25 +54,29 @@ public static void waitForActive() throws Exception {
}
@Rule
- public final TestName testName = new TestName();
+ public Timeout timeout = Timeout.seconds(30);
- @Test
- public void simpleTest_one_active() throws Exception {
- URI uri = CLUSTER.getConnectionURI();
-
- System.out.println(uri);
+ CacheFactory cacheFactory;
+ @Before
+ public void setUp() throws Exception {
StatisticConfiguration statisticConfiguration = new StatisticConfiguration()
.setAverageWindowDuration(1, TimeUnit.MINUTES)
.setHistorySize(100)
.setHistoryInterval(1, TimeUnit.SECONDS)
.setTimeToDisable(5, TimeUnit.SECONDS);
- CacheFactory cacheFactory = new CacheFactory(uri.toString() + "/pif", statisticConfiguration);
- cacheFactory.init();
- try {
- cacheFactory.getCache("paf");
- } finally {
- cacheFactory.close();
- }
+ URI uri = CLUSTER.getConnectionURI().resolve("/pif");
+ cacheFactory = new CacheFactory(uri, statisticConfiguration);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ cacheFactory.getConnection().close();
+ }
+
+ @Test
+ public void simpleTest_one_active() throws Exception {
+ cacheFactory.init(); // create and fetches management entity
+ cacheFactory.getCache("paf"); // create and fetch sample entity
}
}
diff --git a/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/StartupTest.java b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/StartupTest.java
new file mode 100644
index 0000000000..51115d1bba
--- /dev/null
+++ b/management/testing/integration-tests/src/test/java/org/terracotta/management/integration/tests/StartupTest.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright Terracotta, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.terracotta.management.integration.tests;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.terracotta.management.model.cluster.Cluster;
+import org.terracotta.management.model.cluster.Server;
+import org.terracotta.management.model.message.Message;
+import org.terracotta.management.model.notification.ContextualNotification;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author Mathieu Carbou
+ */
+@Ignore // TODO activate
+public class StartupTest extends AbstractHATest {
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ // this sequence is so tha twe can have a stripe of 2 servers bu starting with only 1 active
+ // galvan does not have an easier way to do that
+ voltron.getClusterControl().waitForActive();
+ voltron.getClusterControl().waitForRunningPassivesInStandby();
+ voltron.getClusterControl().terminateOnePassive();
+
+ commonSetUp(voltron);
+ tmsAgentService.readMessages();
+
+ // start a passive after management is connected to active
+ voltron.getClusterControl().startOneServer();
+ voltron.getClusterControl().waitForRunningPassivesInStandby();
+ }
+
+ @Test
+ public void get_notifications_when_passive_joins() throws Exception {
+ Server active = tmsAgentService.readTopology().serverStream().filter(Server::isActive).findFirst().get();
+ Server passive = tmsAgentService.readTopology().serverStream().filter(server -> !server.isActive()).findFirst().get();
+ assertThat(active.getState(), equalTo(Server.State.ACTIVE));
+ assertThat(passive.getState(), equalTo(Server.State.PASSIVE));
+
+ // read messages
+ List messages = tmsAgentService.readMessages();
+ assertThat(messages.size(), equalTo(20));
+ Map> map = messages.stream().collect(Collectors.groupingBy(Message::getType));
+ assertThat(map.size(), equalTo(2));
+ assertThat(map.keySet(), hasItem("TOPOLOGY"));
+ assertThat(map.keySet(), hasItem("NOTIFICATION"));
+ assertThat(map.get("NOTIFICATION").size(), equalTo(19));
+
+ List notifs = map.get("NOTIFICATION").stream()
+ .flatMap(message -> message.unwrap(ContextualNotification.class).stream())
+ .collect(Collectors.toList());
+
+ assertThat(
+ notifs.stream().map(ContextualNotification::getType).collect(Collectors.toList()),
+ equalTo(Arrays.asList(
+ "SERVER_JOINED",
+ "SERVER_STATE_CHANGED", "SERVER_STATE_CHANGED",
+ "SERVER_ENTITY_CREATED",
+ "ENTITY_REGISTRY_AVAILABLE", "ENTITY_REGISTRY_UPDATED", "SERVER_CACHE_CREATED",
+ "SYNC_START", "SYNC_END",
+ "SERVER_ENTITY_CREATED", "ENTITY_REGISTRY_AVAILABLE", "SERVER_ENTITY_CREATED", "ENTITY_REGISTRY_AVAILABLE", "ENTITY_REGISTRY_UPDATED", "SERVER_CACHE_CREATED",
+ "SYNC_START", "SYNC_END",
+ "SERVER_ENTITY_CREATED", "SERVER_STATE_CHANGED")));
+
+ List serverStateNotifs = messages.stream()
+ .filter(message -> message.getType().equals("NOTIFICATION"))
+ .flatMap(message -> message.unwrap(ContextualNotification.class).stream())
+ .filter(notif -> notif.getType().equals("SERVER_STATE_CHANGED"))
+ .collect(Collectors.toList());
+
+ assertThat(
+ serverStateNotifs.stream().map(notif -> notif.getContext().get(Server.NAME_KEY)).collect(Collectors.toList()),
+ equalTo(Arrays.asList(passive.getServerName(), passive.getServerName(), passive.getServerName())));
+
+ assertThat(
+ serverStateNotifs.stream().map(notif -> notif.getAttributes().get("state")).collect(Collectors.toList()),
+ equalTo(Arrays.asList("UNINITIALIZED", "SYNCHRONIZING", "PASSIVE")));
+ }
+
+ @Test
+ public void get_server_states_when_passive_joins() throws Exception {
+ Server active = tmsAgentService.readTopology().serverStream().filter(Server::isActive).findFirst().get();
+ Server passive = tmsAgentService.readTopology().serverStream().filter(server -> !server.isActive()).findFirst().get();
+ assertThat(active.getState(), equalTo(Server.State.ACTIVE));
+ assertThat(passive.getState(), equalTo(Server.State.PASSIVE));
+
+ // read messages
+ List messages = tmsAgentService.readMessages();
+ List notifs = messages.stream()
+ .filter(message -> message.getType().equals("NOTIFICATION"))
+ .flatMap(message -> message.unwrap(ContextualNotification.class).stream())
+ .filter(notif -> notif.getType().equals("SERVER_STATE_CHANGED"))
+ .collect(Collectors.toList());
+
+
+ }
+
+ @Test
+ public void failover_management() throws Exception {
+ Cluster cluster = tmsAgentService.readTopology();
+ Server active = cluster.serverStream().filter(Server::isActive).findFirst().get();
+ Server passive = cluster.serverStream().filter(server -> !server.isActive()).findFirst().get();
+ assertThat(active.getState(), equalTo(Server.State.ACTIVE));
+ assertThat(passive.getState(), equalTo(Server.State.PASSIVE));
+
+ // clear buffer
+ tmsAgentService.readMessages();
+
+ // kill active - passive should take the active role
+ voltron.getClusterControl().terminateActive();
+ voltron.getClusterControl().waitForActive();
+
+ cluster = tmsAgentService.readTopology();
+ Server newActive = cluster.serverStream().filter(Server::isActive).findFirst().get();
+ assertThat(newActive.getState(), equalTo(Server.State.ACTIVE));
+ assertThat(newActive.getServerName(), equalTo(passive.getServerName()));
+
+ // read messages
+ List messages = tmsAgentService.readMessages();
+ assertThat(messages.size(), equalTo(3));
+
+ List notifs = messages.stream()
+ .filter(message -> message.getType().equals("NOTIFICATION"))
+ .flatMap(message -> message.unwrap(ContextualNotification.class).stream())
+ .filter(notif -> notif.getType().equals("SERVER_STATE_CHANGED"))
+ .collect(Collectors.toList());
+
+ assertThat(
+ notifs.stream().map(notif -> notif.getContext().get(Server.NAME_KEY)).collect(Collectors.toList()),
+ equalTo(Arrays.asList(newActive.getServerName(), newActive.getServerName())));
+
+ assertThat(
+ notifs.stream().map(notif -> notif.getAttributes().get("state")).collect(Collectors.toList()),
+ equalTo(Arrays.asList("ACTIVE", "ACTIVE")));
+
+ //TODO: complete with Galvan
+ //- test topology (like topology_includes_passives), client should have re-exposed their management metadata
+ //- check notifications: server states
+ //- check notification that might be there: CLIENT_RECONNECTED and SERVER_ENTITY_FAILOVER_COMPLETE
+ }
+
+}
diff --git a/management/testing/integration-tests/src/test/resources/logback.xml b/management/testing/integration-tests/src/test/resources/logback.xml
new file mode 100644
index 0000000000..db2a973f7d
--- /dev/null
+++ b/management/testing/integration-tests/src/test/resources/logback.xml
@@ -0,0 +1,42 @@
+
+
+
+
+
+ true
+
+
+
+
+
+
+ ${CONSOLE_LOG_PATTERN}
+ utf8
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/management/testing/sample-entity/src/main/java/org/terracotta/management/entity/sample/client/CacheFactory.java b/management/testing/sample-entity/src/main/java/org/terracotta/management/entity/sample/client/CacheFactory.java
index 1c5c0ef3b5..7f05bb4ad2 100644
--- a/management/testing/sample-entity/src/main/java/org/terracotta/management/entity/sample/client/CacheFactory.java
+++ b/management/testing/sample-entity/src/main/java/org/terracotta/management/entity/sample/client/CacheFactory.java
@@ -48,7 +48,7 @@ public class CacheFactory implements Closeable {
private Connection connection;
private CacheEntityFactory cacheEntityFactory;
- public CacheFactory(String uri) {
+ public CacheFactory(URI uri) {
this(uri, new StatisticConfiguration()
.setAverageWindowDuration(1, TimeUnit.MINUTES)
.setHistorySize(100)
@@ -56,10 +56,9 @@ public CacheFactory(String uri) {
.setTimeToDisable(30, TimeUnit.SECONDS));
}
- public CacheFactory(String uri, StatisticConfiguration statisticConfiguration) {
- URI u = URI.create(uri);
+ public CacheFactory(URI u, StatisticConfiguration statisticConfiguration) {
if (u.getPath() == null || u.getPath().isEmpty()) {
- throw new IllegalArgumentException(uri);
+ throw new IllegalArgumentException(u.toString());
}
this.uri = u;
this.management = new Management(new ContextContainer("appName", u.getPath().substring(1)), statisticConfiguration);
@@ -91,6 +90,10 @@ public Cache getCache(String name) {
});
}
+ public Connection getConnection() {
+ return connection;
+ }
+
@Override
public void close() {
management.close();
diff --git a/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/AbstractTest.java b/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/AbstractTest.java
index a1fdab4ff2..e04fc3773c 100644
--- a/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/AbstractTest.java
+++ b/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/AbstractTest.java
@@ -21,6 +21,8 @@
import com.fasterxml.jackson.databind.SerializationFeature;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
import org.terracotta.connection.Connection;
import org.terracotta.connection.ConnectionFactory;
import org.terracotta.connection.ConnectionPropertyNames;
@@ -47,7 +49,6 @@
import java.io.File;
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.math.BigInteger;
import java.net.URI;
import java.util.ArrayList;
@@ -56,11 +57,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -81,6 +78,9 @@ public abstract class AbstractTest {
protected final Map> caches = new HashMap<>();
protected TmsAgentService tmsAgentService;
+ @Rule
+ public Timeout timeout = Timeout.seconds(60);
+
protected AbstractTest() {
this(0);
}
@@ -135,19 +135,9 @@ public void setUp() throws Exception {
public void tearDown() throws Exception {
closeNodes();
if (managementConnection != null) {
- interruptVoltron(() -> {
- try {
- managementConnection.close();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- });
- }
- try {
- stripeControl.tearDown();
- } catch (Error ignored) {
- // because passthrough fails shutting down in case the passive failed to take over the active
+ managementConnection.close();
}
+ stripeControl.tearDown();
}
protected JsonNode readJson(String file) {
@@ -183,7 +173,12 @@ protected void remove(int nodeIdx, String cacheName, String key) {
}
protected void closeNodes() {
- webappNodes.forEach(cacheFactory -> interruptVoltron(cacheFactory::close));
+ webappNodes.forEach(cacheFactory -> {
+ try {
+ cacheFactory.getConnection().close();
+ } catch (IOException ignored) {
+ }
+ });
}
protected void getCaches(String name) {
@@ -196,7 +191,7 @@ protected void addWebappNode() throws Exception {
.setHistorySize(100)
.setHistoryInterval(1, TimeUnit.SECONDS)
.setTimeToDisable(5, TimeUnit.SECONDS);
- CacheFactory cacheFactory = new CacheFactory("passthrough://stripe-1:9510/pet-clinic", statisticConfiguration);
+ CacheFactory cacheFactory = new CacheFactory(URI.create("passthrough://stripe-1:9510/pet-clinic"), statisticConfiguration);
cacheFactory.init();
webappNodes.add(cacheFactory);
}
@@ -250,38 +245,4 @@ protected void queryAllRemoteStatsUntil(Predicate task = new FutureTask<>(() -> {
- try {
- runnable.run();
- } catch (RuntimeException e) {
- Throwable t = e.getCause();
- while (t != null && !(t instanceof InterruptedException)) {
- t = t.getCause();
- }
- if (t == null) {
- throw e;
- } else {
- Thread.currentThread().interrupt();
- }
- }
- }, null);
- Thread thread = new Thread(task, getClass().getSimpleName() + "-tearDown");
- thread.start();
- try {
- task.get(2, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e.getCause());
- } catch (TimeoutException e) {
- thread.interrupt();
- throw new RuntimeException(e);
- }
- }
}
diff --git a/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/CacheEntityFeaturesTest.java b/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/CacheEntityFeaturesTest.java
index d78654ee8b..8e3b18c3c6 100644
--- a/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/CacheEntityFeaturesTest.java
+++ b/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/CacheEntityFeaturesTest.java
@@ -18,6 +18,8 @@
import org.junit.Test;
import org.terracotta.management.entity.sample.client.CacheFactory;
+import java.net.URI;
+
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
@@ -30,13 +32,13 @@ public class CacheEntityFeaturesTest extends AbstractTest {
@Test
public void cache_remains_active_on_server_on_client_close() throws Exception {
- CacheFactory cacheFactory = new CacheFactory("passthrough://stripe-1:9510/cat-clinic");
+ CacheFactory cacheFactory = new CacheFactory(URI.create("passthrough://stripe-1:9510/cat-clinic"));
cacheFactory.init();
Cache cache = cacheFactory.getCache("cache");
cache.put("client1", "Mat");
cacheFactory.close();
- cacheFactory = new CacheFactory("passthrough://stripe-1:9510/cat-clinic");
+ cacheFactory = new CacheFactory(URI.create("passthrough://stripe-1:9510/cat-clinic"));
cacheFactory.init();
cache = cacheFactory.getCache("cache");
try {
diff --git a/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ClientCacheLocalManagementTest.java b/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ClientCacheLocalManagementTest.java
index 00ee534e79..b6b4463460 100644
--- a/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ClientCacheLocalManagementTest.java
+++ b/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ClientCacheLocalManagementTest.java
@@ -120,7 +120,7 @@ public void can_do_local_management_calls() throws Exception {
assertThat(size(0, "pets"), equalTo(0));
}
- @Test(timeout = 20_000)
+ @Test
public void can_query_local_stats() throws Exception {
System.out.println("Please be patient... Test can take about 5s...");
diff --git a/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ClientCacheRemoteManagementTest.java b/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ClientCacheRemoteManagementTest.java
index 5b0315975b..7b6bdd0031 100644
--- a/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ClientCacheRemoteManagementTest.java
+++ b/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ClientCacheRemoteManagementTest.java
@@ -59,7 +59,7 @@ public void can_access_remote_management_registry_of_client() throws Exception {
assertEquals(readJson("client-descriptors.json"), toJson(registry.getCapabilities()));
}
- @Test(timeout = 30_000)
+ @Test
public void can_do_remote_management_calls_on_client() throws Exception {
Client client = tmsAgentService.readTopology()
.clientStream()
@@ -88,7 +88,7 @@ public void can_do_remote_management_calls_on_client() throws Exception {
assertThat(tmsAgentService.call(context, "CacheCalls", "size", int.class).waitForReturn(), is(0));
}
- @Test(timeout = 30_000)
+ @Test
public void can_receive_client_statistics() throws Exception {
System.out.println("Please be patient... Test can take about 15s...");
triggerClientStatComputation();
diff --git a/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ServerCacheManagementTest.java b/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ServerCacheManagementTest.java
index 3c5c28db9f..68b259493c 100644
--- a/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ServerCacheManagementTest.java
+++ b/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ServerCacheManagementTest.java
@@ -75,7 +75,7 @@ public void can_access_remote_management_registry_on_server() throws Exception {
assertThat(registry.getCapability("StatisticCollectorCapability"), is(notNullValue()));
}
- @Test(timeout = 60_000L)
+ @Test
public void can_do_remote_management_calls_on_server() throws Exception {
ServerEntity serverEntity = tmsAgentService.readTopology()
.activeServerEntityStream()
@@ -111,7 +111,7 @@ public void can_do_remote_management_calls_on_server() throws Exception {
assertThat(tmsAgentService.call(context, "ServerCacheCalls", "size", int.class).waitForReturn(), is(0));
}
- @Test(timeout = 60_000)
+ @Test
public void can_receive_server_statistics() throws Exception {
System.out.println("Please be patient... Test can take about 15s...");
triggerServerStatComputation();
diff --git a/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ha/HATest.java b/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ha/HATest.java
index 62f124dcd5..e28a5c8906 100644
--- a/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ha/HATest.java
+++ b/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ha/HATest.java
@@ -79,7 +79,6 @@ public void topology_includes_passives() throws Exception {
}
@Test
- //TODO: needs to confirm that with a galvan test: the sequence of notifications seems weird on passthrough. All notifs are there, but there are duplicates (https://github.com/Terracotta-OSS/terracotta-platform/issues/191)
public void get_notifications_when_passive_joins() throws Exception {
// clear
tmsAgentService.readMessages();
@@ -185,7 +184,6 @@ public void get_server_states_when_passive_joins() throws Exception {
}
@Test
- //TODO: needs to also test the topology with a galvan test after the failover: (https://github.com/Terracotta-OSS/terracotta-platform/issues/191)
public void failover_management() throws Exception {
// connect passive
stripeControl.startOneServer();
@@ -226,11 +224,6 @@ public void failover_management() throws Exception {
assertThat(
notifs.stream().map(notif -> notif.getAttributes().get("state")).collect(Collectors.toList()),
equalTo(Arrays.asList("ACTIVE", "ACTIVE")));
-
- //TODO: complete with Galvan: passthrough bug prevents m&m so see the new topology and further notifications (https://github.com/Terracotta-OSS/tc-passthrough-testing/issues/74)
- //- test topology (like topology_includes_passives), client should have re-exposed their management metadata
- //- check notifications: server states
- //- check notification that might be there: CLIENT_RECONNECTED and SERVER_ENTITY_FAILOVER_COMPLETE
}
@Test