Skip to content

Commit

Permalink
Merge pull request Terracotta-OSS#237 from mathieucarbou/issue-154
Browse files Browse the repository at this point in the history
Close Terracotta-OSS#154: Failover support on client side (re-expose management metadata of clients on failover)
  • Loading branch information
anthonydahanne authored Dec 15, 2016
2 parents 3cfa6e5 + 1737b58 commit 694781f
Show file tree
Hide file tree
Showing 25 changed files with 358 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

import org.terracotta.connection.entity.Entity;
import org.terracotta.management.entity.management.ManagementAgent;
import org.terracotta.voltron.proxy.client.EndpointListenerAware;
import org.terracotta.voltron.proxy.client.ServerMessageAware;

/**
* @author Mathieu Carbou
*/
public interface ManagementAgentEntity extends ManagementAgent, Entity, ServerMessageAware {
public interface ManagementAgentEntity extends ManagementAgent, Entity, ServerMessageAware, EndpointListenerAware {

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.management.entity.management.ReconnectData;
import org.terracotta.management.model.Objects;
import org.terracotta.management.model.call.ContextualCall;
import org.terracotta.management.model.call.ContextualReturn;
import org.terracotta.management.model.capabilities.Capability;
import org.terracotta.management.model.context.Context;
import org.terracotta.management.model.context.ContextContainer;
import org.terracotta.management.model.message.ManagementCallMessage;
import org.terracotta.management.model.message.Message;
Expand All @@ -31,10 +33,12 @@
import org.terracotta.management.registry.ManagementRegistry;
import org.terracotta.management.registry.action.ExposedObject;
import org.terracotta.voltron.proxy.MessageListener;
import org.terracotta.voltron.proxy.client.EndpointListener;

import java.io.Closeable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
Expand All @@ -52,7 +56,8 @@ public class ManagementAgentService implements Closeable {

private volatile ManagementRegistry registry;
private volatile boolean bridging = false;
private Capability[] previouslyExposed = new Capability[0];
private Capability[] previouslyExposedCapabilities;
private String[] previouslyExposedTags;

private long timeoutMs = 5000;
private Executor managementCallExecutor = new Executor() {
Expand Down Expand Up @@ -92,7 +97,8 @@ private void refresh(Object managedObject) {

public ManagementAgentService(final ManagementAgentEntity entity) {
this.entity = Objects.requireNonNull(entity);
this.entity.registerListener(Message.class, new MessageListener<Message>() {

this.entity.registerMessageListener(Message.class, new MessageListener<Message>() {
@Override
public void onMessage(final Message message) {
LOGGER.trace("onMessage({})", message);
Expand Down Expand Up @@ -130,6 +136,30 @@ public void run() {
}
}
});

this.entity.setEndpointListener(new EndpointListener() {
@Override
public Object onReconnect() {
if (bridging) {
LOGGER.trace("onReconnect()");
Collection<? extends Capability> capabilities = registry == null ? Collections.<Capability>emptyList() : registry.getCapabilities();
Context context = registry == null ? Context.empty() : Context.create(registry.getContextContainer().getName(), registry.getContextContainer().getValue());
return new ReconnectData(
previouslyExposedTags,
registry == null ? null : registry.getContextContainer(),
registry == null ? null : capabilities.toArray(new Capability[capabilities.size()]),
new ContextualNotification(context, "CLIENT_RECONNECTED"));
} else {
return null;
}
}

@Override
public void onDisconnectUnexpectedly() {
LOGGER.trace("onDisconnectUnexpectedly()");
close();
}
});
}

public void init() throws ExecutionException, InterruptedException, TimeoutException {
Expand All @@ -147,6 +177,7 @@ public void init() throws ExecutionException, InterruptedException, TimeoutExcep
*/
public synchronized void setManagementRegistry(ManagementRegistry registry) {
if (!bridging) {
LOGGER.trace("setManagementRegistry({})", registry.getContextContainer().getValue());
this.registry = registry;
registry.addManagementProvider(managementProvider);
bridging = true;
Expand All @@ -156,6 +187,7 @@ public synchronized void setManagementRegistry(ManagementRegistry registry) {
@Override
public synchronized void close() {
if (bridging) {
LOGGER.trace("close()");
registry.removeManagementProvider(managementProvider);
bridging = false;
}
Expand All @@ -180,10 +212,10 @@ public void setCapabilities(ContextContainer contextContainer, Collection<? exte
}

public void setCapabilities(ContextContainer contextContainer, Capability... capabilities) throws ExecutionException, InterruptedException, TimeoutException {
if (!Arrays.deepEquals(previouslyExposed, capabilities)) {
LOGGER.trace("exposeManagementMetadata({})", contextContainer);
if (!Arrays.deepEquals(previouslyExposedCapabilities, capabilities)) {
LOGGER.trace("exposeManagementMetadata({})", contextContainer.getValue());
get(entity.exposeManagementMetadata(null, contextContainer, capabilities));
previouslyExposed = capabilities;
previouslyExposedCapabilities = capabilities;
}
}

Expand All @@ -194,6 +226,7 @@ public void setTags(Collection<String> tags) throws ExecutionException, Interrup
public void setTags(String... tags) throws ExecutionException, InterruptedException, TimeoutException {
LOGGER.trace("setTags({})", Arrays.asList(tags));
get(entity.exposeTags(null, tags));
previouslyExposedTags = tags;
}

public void pushNotification(ContextualNotification notification) throws ExecutionException, InterruptedException, TimeoutException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Terracotta, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.terracotta.management.entity.management;

import org.terracotta.management.model.capabilities.Capability;
import org.terracotta.management.model.context.ContextContainer;
import org.terracotta.management.model.notification.ContextualNotification;

import java.io.Serializable;

/**
* @author Mathieu Carbou
*/
public class ReconnectData implements Serializable {

private static final long serialVersionUID = 1L;

public final String[] tags;
public final ContextContainer contextContainer;
public final Capability[] capabilities;
public final ContextualNotification contextualNotification;

public ReconnectData(String[] tags, ContextContainer contextContainer, Capability[] capabilities, ContextualNotification contextualNotification) {
this.tags = tags;
this.contextContainer = contextContainer;
this.capabilities = capabilities;
this.contextualNotification = contextualNotification;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,33 @@ class ActiveManagementAgent implements ManagementAgent {

@Override
public Future<Void> pushNotification(@ClientId Object caller, ContextualNotification notification) {
clientMonitoringService.pushNotification((ClientDescriptor) caller, notification);
if (notification != null) {
clientMonitoringService.pushNotification((ClientDescriptor) caller, notification);
}
return CompletableFuture.completedFuture(null);
}

@Override
public Future<Void> pushStatistics(@ClientId Object caller, ContextualStatistics... statistics) {
if (statistics.length > 0) {
if (statistics != null && statistics.length > 0) {
clientMonitoringService.pushStatistics((ClientDescriptor) caller, statistics);
}
return CompletableFuture.completedFuture(null);
}

@Override
public Future<Void> exposeManagementMetadata(@ClientId Object caller, ContextContainer contextContainer, Capability... capabilities) {
clientMonitoringService.exposeManagementRegistry((ClientDescriptor) caller, contextContainer, capabilities);
if (contextContainer != null && capabilities != null) {
clientMonitoringService.exposeManagementRegistry((ClientDescriptor) caller, contextContainer, capabilities);
}
return CompletableFuture.completedFuture(null);
}

@Override
public Future<Void> exposeTags(@ClientId Object caller, String... tags) {
clientMonitoringService.exposeTags((ClientDescriptor) caller, tags);
if (tags != null) {
clientMonitoringService.exposeTags((ClientDescriptor) caller, tags);
}
return CompletableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,34 @@
*/
package org.terracotta.management.entity.management.server;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.entity.ClientDescriptor;
import org.terracotta.management.entity.management.ManagementAgent;
import org.terracotta.management.entity.management.ReconnectData;
import org.terracotta.voltron.proxy.server.ActiveProxiedServerEntity;

/**
* @author Mathieu Carbou
*/
class ActiveManagementAgentServerEntity extends ActiveProxiedServerEntity<ManagementAgent, Void> {
class ActiveManagementAgentServerEntity extends ActiveProxiedServerEntity<ManagementAgent, Void, ReconnectData> {

private static final Logger LOGGER = LoggerFactory.getLogger(ActiveManagementAgentServerEntity.class);

private final ActiveManagementAgent managementAgent;

ActiveManagementAgentServerEntity(ActiveManagementAgent managementAgent) {
super(managementAgent);
this.managementAgent = managementAgent;
}

@Override
protected void onReconnect(ClientDescriptor clientDescriptor, ReconnectData reconnectData) {
if (reconnectData != null) {
LOGGER.trace("onReconnect({})", clientDescriptor);
managementAgent.exposeTags(clientDescriptor, reconnectData.tags);
managementAgent.exposeManagementMetadata(clientDescriptor, reconnectData.contextContainer, reconnectData.capabilities);
managementAgent.pushNotification(clientDescriptor, reconnectData.contextualNotification);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.terracotta.management.entity.management.ManagementAgent;
import org.terracotta.management.entity.management.ManagementAgentConfig;
import org.terracotta.management.entity.management.ManagementAgentVersion;
import org.terracotta.management.entity.management.ReconnectData;
import org.terracotta.management.model.message.Message;
import org.terracotta.management.service.monitoring.ClientMonitoringService;
import org.terracotta.management.service.monitoring.ClientMonitoringServiceConfiguration;
Expand All @@ -34,13 +35,13 @@
/**
* @author Mathieu Carbou
*/
public class ManagementAgentEntityServerService extends ProxyServerEntityService<ManagementAgent, ManagementAgentConfig, Void> {
public class ManagementAgentEntityServerService extends ProxyServerEntityService<ManagementAgent, ManagementAgentConfig, Void, ReconnectData> {

private static final Logger LOGGER = LoggerFactory.getLogger(ManagementAgentEntityServerService.class);

public ManagementAgentEntityServerService() {
//TODO: MATHIEU - PERF: https://github.com/Terracotta-OSS/terracotta-platform/issues/92
super(ManagementAgent.class, ManagementAgentConfig.class, new Class<?>[]{Message.class}, null);
super(ManagementAgent.class, ManagementAgentConfig.class, new Class<?>[]{Message.class}, null, ReconnectData.class);
setCodec(new SerializationCodec());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class ClientCache implements Cache {
this.name = name;
this.delegate = delegate;

this.delegate.registerListener(Serializable[].class, message -> {
this.delegate.registerMessageListener(Serializable[].class, message -> {
String cmd = (String) message[0];
if ("remove".equals(cmd)) {
remove((String) message[1], (String) message[2]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
/**
* @author Mathieu Carbou
*/
class ActiveCacheServerEntity extends ActiveProxiedServerEntity<Cache, CacheSync> {
class ActiveCacheServerEntity extends ActiveProxiedServerEntity<Cache, CacheSync, Void> {

private static final Logger LOGGER = LoggerFactory.getLogger(ActiveCacheServerEntity.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
/**
* @author Mathieu Carbou
*/
public class CacheEntityServerService extends ProxyServerEntityService<Cache, String, CacheSync> {
public class CacheEntityServerService extends ProxyServerEntityService<Cache, String, CacheSync, Void> {

private static final Logger LOGGER = LoggerFactory.getLogger(CacheEntityServerService.class);

public CacheEntityServerService() {
super(Cache.class, String.class, new Class<?>[]{Serializable[].class}, CacheSync.class);
super(Cache.class, String.class, new Class<?>[]{Serializable[].class}, CacheSync.class, null);
setCodec(new SerializationCodec());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void get_server_states_when_passive_joins() throws Exception {

@Test
//TODO: needs to also test the topology with a galvan test after the failover: (https://github.com/Terracotta-OSS/terracotta-platform/issues/191)
public void failover() throws Exception {
public void failover_management() throws Exception {
// connect passive
stripeControl.startOneServer();
stripeControl.waitForRunningPassivesInStandby();
Expand Down Expand Up @@ -226,6 +226,26 @@ public void failover() throws Exception {
assertThat(
notifs.stream().map(notif -> notif.getAttributes().get("state")).collect(Collectors.toList()),
equalTo(Arrays.asList("ACTIVE", "ACTIVE")));

//TODO: complete with Galvan: passthrough bug prevents m&m so see the new topology and further notifications (https://github.com/Terracotta-OSS/tc-passthrough-testing/issues/74)
//- test topology (like topology_includes_passives), client should have re-exposed their management metadata
//- check notifications: server states
//- check notification that might be there: CLIENT_RECONNECTED and SERVER_ENTITY_FAILOVER_COMPLETE
}

@Test
public void puts_can_be_seen_on_other_clients_after_failover() throws Exception {
// connect passive
stripeControl.startOneServer();
stripeControl.waitForRunningPassivesInStandby();

put(0, "clients", "client1", "Mathieu");

// kill active - passive should take the active role
stripeControl.terminateActive();
stripeControl.waitForActive();

assertThat(get(1, "clients", "client1"), equalTo("Mathieu"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class TmsAgentService {

public TmsAgentService(final TmsAgentEntity entity) {
this.entity = Objects.requireNonNull(entity);
this.entity.registerListener(Message.class, message -> {
this.entity.registerMessageListener(Message.class, message -> {
LOGGER.trace("onMessage({})", message);

switch (message.getType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* @author Mathieu Carbou
*/
class ActiveTmsAgentServerEntity extends ActiveProxiedServerEntity<TmsAgent, Void> {
class ActiveTmsAgentServerEntity extends ActiveProxiedServerEntity<TmsAgent, Void, Void> {

private final AtomicBoolean connected = new AtomicBoolean();
private final ActiveTmsAgent tmsAgent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@
/**
* @author Mathieu Carbou
*/
public class TmsAgentEntityServerService extends ProxyServerEntityService<TmsAgent, TmsAgentConfig, Void> {
public class TmsAgentEntityServerService extends ProxyServerEntityService<TmsAgent, TmsAgentConfig, Void, Void> {

private static final Logger LOGGER = LoggerFactory.getLogger(TmsAgentEntityServerService.class);

public TmsAgentEntityServerService() {
super(TmsAgent.class, TmsAgentConfig.class, new Class<?>[]{Message.class}, null);
super(TmsAgent.class, TmsAgentConfig.class, new Class<?>[]{Message.class}, null, null);
setCodec(new SerializationCodec());
}

Expand Down
Loading

0 comments on commit 694781f

Please sign in to comment.