diff --git a/management/management-entity/management-entity-client/src/main/java/org/terracotta/management/entity/management/client/ManagementAgentEntity.java b/management/management-entity/management-entity-client/src/main/java/org/terracotta/management/entity/management/client/ManagementAgentEntity.java index 6fdc07e776..c8f6c66cc5 100644 --- a/management/management-entity/management-entity-client/src/main/java/org/terracotta/management/entity/management/client/ManagementAgentEntity.java +++ b/management/management-entity/management-entity-client/src/main/java/org/terracotta/management/entity/management/client/ManagementAgentEntity.java @@ -17,11 +17,12 @@ import org.terracotta.connection.entity.Entity; import org.terracotta.management.entity.management.ManagementAgent; +import org.terracotta.voltron.proxy.client.EndpointListenerAware; import org.terracotta.voltron.proxy.client.ServerMessageAware; /** * @author Mathieu Carbou */ -public interface ManagementAgentEntity extends ManagementAgent, Entity, ServerMessageAware { +public interface ManagementAgentEntity extends ManagementAgent, Entity, ServerMessageAware, EndpointListenerAware { } diff --git a/management/management-entity/management-entity-client/src/main/java/org/terracotta/management/entity/management/client/ManagementAgentService.java b/management/management-entity/management-entity-client/src/main/java/org/terracotta/management/entity/management/client/ManagementAgentService.java index 6d8ed8bb22..66142c92c4 100644 --- a/management/management-entity/management-entity-client/src/main/java/org/terracotta/management/entity/management/client/ManagementAgentService.java +++ b/management/management-entity/management-entity-client/src/main/java/org/terracotta/management/entity/management/client/ManagementAgentService.java @@ -17,10 +17,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.terracotta.management.entity.management.ReconnectData; import org.terracotta.management.model.Objects; import org.terracotta.management.model.call.ContextualCall; import org.terracotta.management.model.call.ContextualReturn; import org.terracotta.management.model.capabilities.Capability; +import org.terracotta.management.model.context.Context; import org.terracotta.management.model.context.ContextContainer; import org.terracotta.management.model.message.ManagementCallMessage; import org.terracotta.management.model.message.Message; @@ -31,10 +33,12 @@ import org.terracotta.management.registry.ManagementRegistry; import org.terracotta.management.registry.action.ExposedObject; import org.terracotta.voltron.proxy.MessageListener; +import org.terracotta.voltron.proxy.client.EndpointListener; import java.io.Closeable; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; @@ -52,7 +56,8 @@ public class ManagementAgentService implements Closeable { private volatile ManagementRegistry registry; private volatile boolean bridging = false; - private Capability[] previouslyExposed = new Capability[0]; + private Capability[] previouslyExposedCapabilities; + private String[] previouslyExposedTags; private long timeoutMs = 5000; private Executor managementCallExecutor = new Executor() { @@ -92,7 +97,8 @@ private void refresh(Object managedObject) { public ManagementAgentService(final ManagementAgentEntity entity) { this.entity = Objects.requireNonNull(entity); - this.entity.registerListener(Message.class, new MessageListener() { + + this.entity.registerMessageListener(Message.class, new MessageListener() { @Override public void onMessage(final Message message) { LOGGER.trace("onMessage({})", message); @@ -130,6 +136,30 @@ public void run() { } } }); + + this.entity.setEndpointListener(new EndpointListener() { + @Override + public Object onReconnect() { + if (bridging) { + LOGGER.trace("onReconnect()"); + Collection capabilities = registry == null ? Collections.emptyList() : registry.getCapabilities(); + Context context = registry == null ? Context.empty() : Context.create(registry.getContextContainer().getName(), registry.getContextContainer().getValue()); + return new ReconnectData( + previouslyExposedTags, + registry == null ? null : registry.getContextContainer(), + registry == null ? null : capabilities.toArray(new Capability[capabilities.size()]), + new ContextualNotification(context, "CLIENT_RECONNECTED")); + } else { + return null; + } + } + + @Override + public void onDisconnectUnexpectedly() { + LOGGER.trace("onDisconnectUnexpectedly()"); + close(); + } + }); } public void init() throws ExecutionException, InterruptedException, TimeoutException { @@ -147,6 +177,7 @@ public void init() throws ExecutionException, InterruptedException, TimeoutExcep */ public synchronized void setManagementRegistry(ManagementRegistry registry) { if (!bridging) { + LOGGER.trace("setManagementRegistry({})", registry.getContextContainer().getValue()); this.registry = registry; registry.addManagementProvider(managementProvider); bridging = true; @@ -156,6 +187,7 @@ public synchronized void setManagementRegistry(ManagementRegistry registry) { @Override public synchronized void close() { if (bridging) { + LOGGER.trace("close()"); registry.removeManagementProvider(managementProvider); bridging = false; } @@ -180,10 +212,10 @@ public void setCapabilities(ContextContainer contextContainer, Collection tags) throws ExecutionException, Interrup public void setTags(String... tags) throws ExecutionException, InterruptedException, TimeoutException { LOGGER.trace("setTags({})", Arrays.asList(tags)); get(entity.exposeTags(null, tags)); + previouslyExposedTags = tags; } public void pushNotification(ContextualNotification notification) throws ExecutionException, InterruptedException, TimeoutException { diff --git a/management/management-entity/management-entity-common/src/main/java/org/terracotta/management/entity/management/ReconnectData.java b/management/management-entity/management-entity-common/src/main/java/org/terracotta/management/entity/management/ReconnectData.java new file mode 100644 index 0000000000..07d0a32c7b --- /dev/null +++ b/management/management-entity/management-entity-common/src/main/java/org/terracotta/management/entity/management/ReconnectData.java @@ -0,0 +1,42 @@ +/* + * Copyright Terracotta, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.terracotta.management.entity.management; + +import org.terracotta.management.model.capabilities.Capability; +import org.terracotta.management.model.context.ContextContainer; +import org.terracotta.management.model.notification.ContextualNotification; + +import java.io.Serializable; + +/** + * @author Mathieu Carbou + */ +public class ReconnectData implements Serializable { + + private static final long serialVersionUID = 1L; + + public final String[] tags; + public final ContextContainer contextContainer; + public final Capability[] capabilities; + public final ContextualNotification contextualNotification; + + public ReconnectData(String[] tags, ContextContainer contextContainer, Capability[] capabilities, ContextualNotification contextualNotification) { + this.tags = tags; + this.contextContainer = contextContainer; + this.capabilities = capabilities; + this.contextualNotification = contextualNotification; + } +} diff --git a/management/management-entity/management-entity-server/src/main/java/org/terracotta/management/entity/management/server/ActiveManagementAgent.java b/management/management-entity/management-entity-server/src/main/java/org/terracotta/management/entity/management/server/ActiveManagementAgent.java index 02a29ec1fe..ad8ce08590 100644 --- a/management/management-entity/management-entity-server/src/main/java/org/terracotta/management/entity/management/server/ActiveManagementAgent.java +++ b/management/management-entity/management-entity-server/src/main/java/org/terracotta/management/entity/management/server/ActiveManagementAgent.java @@ -42,13 +42,15 @@ class ActiveManagementAgent implements ManagementAgent { @Override public Future pushNotification(@ClientId Object caller, ContextualNotification notification) { - clientMonitoringService.pushNotification((ClientDescriptor) caller, notification); + if (notification != null) { + clientMonitoringService.pushNotification((ClientDescriptor) caller, notification); + } return CompletableFuture.completedFuture(null); } @Override public Future pushStatistics(@ClientId Object caller, ContextualStatistics... statistics) { - if (statistics.length > 0) { + if (statistics != null && statistics.length > 0) { clientMonitoringService.pushStatistics((ClientDescriptor) caller, statistics); } return CompletableFuture.completedFuture(null); @@ -56,13 +58,17 @@ public Future pushStatistics(@ClientId Object caller, ContextualStatistics @Override public Future exposeManagementMetadata(@ClientId Object caller, ContextContainer contextContainer, Capability... capabilities) { - clientMonitoringService.exposeManagementRegistry((ClientDescriptor) caller, contextContainer, capabilities); + if (contextContainer != null && capabilities != null) { + clientMonitoringService.exposeManagementRegistry((ClientDescriptor) caller, contextContainer, capabilities); + } return CompletableFuture.completedFuture(null); } @Override public Future exposeTags(@ClientId Object caller, String... tags) { - clientMonitoringService.exposeTags((ClientDescriptor) caller, tags); + if (tags != null) { + clientMonitoringService.exposeTags((ClientDescriptor) caller, tags); + } return CompletableFuture.completedFuture(null); } diff --git a/management/management-entity/management-entity-server/src/main/java/org/terracotta/management/entity/management/server/ActiveManagementAgentServerEntity.java b/management/management-entity/management-entity-server/src/main/java/org/terracotta/management/entity/management/server/ActiveManagementAgentServerEntity.java index 4c6f000b3e..c4adc7b762 100644 --- a/management/management-entity/management-entity-server/src/main/java/org/terracotta/management/entity/management/server/ActiveManagementAgentServerEntity.java +++ b/management/management-entity/management-entity-server/src/main/java/org/terracotta/management/entity/management/server/ActiveManagementAgentServerEntity.java @@ -15,14 +15,34 @@ */ package org.terracotta.management.entity.management.server; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.terracotta.entity.ClientDescriptor; import org.terracotta.management.entity.management.ManagementAgent; +import org.terracotta.management.entity.management.ReconnectData; import org.terracotta.voltron.proxy.server.ActiveProxiedServerEntity; /** * @author Mathieu Carbou */ -class ActiveManagementAgentServerEntity extends ActiveProxiedServerEntity { +class ActiveManagementAgentServerEntity extends ActiveProxiedServerEntity { + + private static final Logger LOGGER = LoggerFactory.getLogger(ActiveManagementAgentServerEntity.class); + + private final ActiveManagementAgent managementAgent; + ActiveManagementAgentServerEntity(ActiveManagementAgent managementAgent) { super(managementAgent); + this.managementAgent = managementAgent; + } + + @Override + protected void onReconnect(ClientDescriptor clientDescriptor, ReconnectData reconnectData) { + if (reconnectData != null) { + LOGGER.trace("onReconnect({})", clientDescriptor); + managementAgent.exposeTags(clientDescriptor, reconnectData.tags); + managementAgent.exposeManagementMetadata(clientDescriptor, reconnectData.contextContainer, reconnectData.capabilities); + managementAgent.pushNotification(clientDescriptor, reconnectData.contextualNotification); + } } } diff --git a/management/management-entity/management-entity-server/src/main/java/org/terracotta/management/entity/management/server/ManagementAgentEntityServerService.java b/management/management-entity/management-entity-server/src/main/java/org/terracotta/management/entity/management/server/ManagementAgentEntityServerService.java index 7f4e680241..e93eeedebe 100644 --- a/management/management-entity/management-entity-server/src/main/java/org/terracotta/management/entity/management/server/ManagementAgentEntityServerService.java +++ b/management/management-entity/management-entity-server/src/main/java/org/terracotta/management/entity/management/server/ManagementAgentEntityServerService.java @@ -23,6 +23,7 @@ import org.terracotta.management.entity.management.ManagementAgent; import org.terracotta.management.entity.management.ManagementAgentConfig; import org.terracotta.management.entity.management.ManagementAgentVersion; +import org.terracotta.management.entity.management.ReconnectData; import org.terracotta.management.model.message.Message; import org.terracotta.management.service.monitoring.ClientMonitoringService; import org.terracotta.management.service.monitoring.ClientMonitoringServiceConfiguration; @@ -34,13 +35,13 @@ /** * @author Mathieu Carbou */ -public class ManagementAgentEntityServerService extends ProxyServerEntityService { +public class ManagementAgentEntityServerService extends ProxyServerEntityService { private static final Logger LOGGER = LoggerFactory.getLogger(ManagementAgentEntityServerService.class); public ManagementAgentEntityServerService() { //TODO: MATHIEU - PERF: https://github.com/Terracotta-OSS/terracotta-platform/issues/92 - super(ManagementAgent.class, ManagementAgentConfig.class, new Class[]{Message.class}, null); + super(ManagementAgent.class, ManagementAgentConfig.class, new Class[]{Message.class}, null, ReconnectData.class); setCodec(new SerializationCodec()); } diff --git a/management/testing/sample-entity/src/main/java/org/terracotta/management/entity/sample/client/ClientCache.java b/management/testing/sample-entity/src/main/java/org/terracotta/management/entity/sample/client/ClientCache.java index 69994bb5c2..462ab43211 100644 --- a/management/testing/sample-entity/src/main/java/org/terracotta/management/entity/sample/client/ClientCache.java +++ b/management/testing/sample-entity/src/main/java/org/terracotta/management/entity/sample/client/ClientCache.java @@ -53,7 +53,7 @@ public class ClientCache implements Cache { this.name = name; this.delegate = delegate; - this.delegate.registerListener(Serializable[].class, message -> { + this.delegate.registerMessageListener(Serializable[].class, message -> { String cmd = (String) message[0]; if ("remove".equals(cmd)) { remove((String) message[1], (String) message[2]); diff --git a/management/testing/sample-entity/src/main/java/org/terracotta/management/entity/sample/server/ActiveCacheServerEntity.java b/management/testing/sample-entity/src/main/java/org/terracotta/management/entity/sample/server/ActiveCacheServerEntity.java index 30d3cc0f3f..0393f46358 100644 --- a/management/testing/sample-entity/src/main/java/org/terracotta/management/entity/sample/server/ActiveCacheServerEntity.java +++ b/management/testing/sample-entity/src/main/java/org/terracotta/management/entity/sample/server/ActiveCacheServerEntity.java @@ -27,7 +27,7 @@ /** * @author Mathieu Carbou */ -class ActiveCacheServerEntity extends ActiveProxiedServerEntity { +class ActiveCacheServerEntity extends ActiveProxiedServerEntity { private static final Logger LOGGER = LoggerFactory.getLogger(ActiveCacheServerEntity.class); diff --git a/management/testing/sample-entity/src/main/java/org/terracotta/management/entity/sample/server/CacheEntityServerService.java b/management/testing/sample-entity/src/main/java/org/terracotta/management/entity/sample/server/CacheEntityServerService.java index 3be35eb7c9..2e5330a3dd 100644 --- a/management/testing/sample-entity/src/main/java/org/terracotta/management/entity/sample/server/CacheEntityServerService.java +++ b/management/testing/sample-entity/src/main/java/org/terracotta/management/entity/sample/server/CacheEntityServerService.java @@ -29,12 +29,12 @@ /** * @author Mathieu Carbou */ -public class CacheEntityServerService extends ProxyServerEntityService { +public class CacheEntityServerService extends ProxyServerEntityService { private static final Logger LOGGER = LoggerFactory.getLogger(CacheEntityServerService.class); public CacheEntityServerService() { - super(Cache.class, String.class, new Class[]{Serializable[].class}, CacheSync.class); + super(Cache.class, String.class, new Class[]{Serializable[].class}, CacheSync.class, null); setCodec(new SerializationCodec()); } diff --git a/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ha/HATest.java b/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ha/HATest.java index d101d882d4..62f124dcd5 100644 --- a/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ha/HATest.java +++ b/management/testing/sample-entity/src/test/java/org/terracotta/management/entity/sample/ha/HATest.java @@ -186,7 +186,7 @@ public void get_server_states_when_passive_joins() throws Exception { @Test //TODO: needs to also test the topology with a galvan test after the failover: (https://github.com/Terracotta-OSS/terracotta-platform/issues/191) - public void failover() throws Exception { + public void failover_management() throws Exception { // connect passive stripeControl.startOneServer(); stripeControl.waitForRunningPassivesInStandby(); @@ -226,6 +226,26 @@ public void failover() throws Exception { assertThat( notifs.stream().map(notif -> notif.getAttributes().get("state")).collect(Collectors.toList()), equalTo(Arrays.asList("ACTIVE", "ACTIVE"))); + + //TODO: complete with Galvan: passthrough bug prevents m&m so see the new topology and further notifications (https://github.com/Terracotta-OSS/tc-passthrough-testing/issues/74) + //- test topology (like topology_includes_passives), client should have re-exposed their management metadata + //- check notifications: server states + //- check notification that might be there: CLIENT_RECONNECTED and SERVER_ENTITY_FAILOVER_COMPLETE + } + + @Test + public void puts_can_be_seen_on_other_clients_after_failover() throws Exception { + // connect passive + stripeControl.startOneServer(); + stripeControl.waitForRunningPassivesInStandby(); + + put(0, "clients", "client1", "Mathieu"); + + // kill active - passive should take the active role + stripeControl.terminateActive(); + stripeControl.waitForActive(); + + assertThat(get(1, "clients", "client1"), equalTo("Mathieu")); } } diff --git a/management/tms-entity/tms-entity-client/src/main/java/org/terracotta/management/entity/tms/client/TmsAgentService.java b/management/tms-entity/tms-entity-client/src/main/java/org/terracotta/management/entity/tms/client/TmsAgentService.java index 79b30f2948..2fe1f67946 100644 --- a/management/tms-entity/tms-entity-client/src/main/java/org/terracotta/management/entity/tms/client/TmsAgentService.java +++ b/management/tms-entity/tms-entity-client/src/main/java/org/terracotta/management/entity/tms/client/TmsAgentService.java @@ -58,7 +58,7 @@ public class TmsAgentService { public TmsAgentService(final TmsAgentEntity entity) { this.entity = Objects.requireNonNull(entity); - this.entity.registerListener(Message.class, message -> { + this.entity.registerMessageListener(Message.class, message -> { LOGGER.trace("onMessage({})", message); switch (message.getType()) { diff --git a/management/tms-entity/tms-entity-server/src/main/java/org/terracotta/management/entity/tms/server/ActiveTmsAgentServerEntity.java b/management/tms-entity/tms-entity-server/src/main/java/org/terracotta/management/entity/tms/server/ActiveTmsAgentServerEntity.java index 00679d6600..12a42e08b6 100644 --- a/management/tms-entity/tms-entity-server/src/main/java/org/terracotta/management/entity/tms/server/ActiveTmsAgentServerEntity.java +++ b/management/tms-entity/tms-entity-server/src/main/java/org/terracotta/management/entity/tms/server/ActiveTmsAgentServerEntity.java @@ -25,7 +25,7 @@ /** * @author Mathieu Carbou */ -class ActiveTmsAgentServerEntity extends ActiveProxiedServerEntity { +class ActiveTmsAgentServerEntity extends ActiveProxiedServerEntity { private final AtomicBoolean connected = new AtomicBoolean(); private final ActiveTmsAgent tmsAgent; diff --git a/management/tms-entity/tms-entity-server/src/main/java/org/terracotta/management/entity/tms/server/TmsAgentEntityServerService.java b/management/tms-entity/tms-entity-server/src/main/java/org/terracotta/management/entity/tms/server/TmsAgentEntityServerService.java index dd440f4a59..e2080607c8 100644 --- a/management/tms-entity/tms-entity-server/src/main/java/org/terracotta/management/entity/tms/server/TmsAgentEntityServerService.java +++ b/management/tms-entity/tms-entity-server/src/main/java/org/terracotta/management/entity/tms/server/TmsAgentEntityServerService.java @@ -41,12 +41,12 @@ /** * @author Mathieu Carbou */ -public class TmsAgentEntityServerService extends ProxyServerEntityService { +public class TmsAgentEntityServerService extends ProxyServerEntityService { private static final Logger LOGGER = LoggerFactory.getLogger(TmsAgentEntityServerService.class); public TmsAgentEntityServerService() { - super(TmsAgent.class, TmsAgentConfig.class, new Class[]{Message.class}, null); + super(TmsAgent.class, TmsAgentConfig.class, new Class[]{Message.class}, null, null); setCodec(new SerializationCodec()); } diff --git a/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/ClientProxyFactory.java b/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/ClientProxyFactory.java index 2c3e21ba44..4e21d28f5d 100644 --- a/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/ClientProxyFactory.java +++ b/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/ClientProxyFactory.java @@ -17,6 +17,7 @@ import org.terracotta.connection.entity.Entity; import org.terracotta.entity.EntityClientEndpoint; +import org.terracotta.voltron.proxy.Codec; import org.terracotta.voltron.proxy.CommonProxyFactory; import org.terracotta.voltron.proxy.ProxyEntityMessage; import org.terracotta.voltron.proxy.ProxyEntityResponse; @@ -31,14 +32,16 @@ public class ClientProxyFactory { public static T createEntityProxy(Class clientType, Class type, EntityClientEndpoint entityClientEndpoint, - Class[] messageTypes) { - return createProxy(clientType, type, entityClientEndpoint, messageTypes); + Class[] messageTypes, + Codec codec) { + return createProxy(clientType, type, entityClientEndpoint, messageTypes, codec); } public static T createProxy(Class clientType, Class type, EntityClientEndpoint entityClientEndpoint, - Class[] messageTypes) { + Class[] messageTypes, + Codec codec) { if (entityClientEndpoint == null) { throw new NullPointerException("EntityClientEndpoint has to be provided!"); @@ -59,7 +62,8 @@ public static T createProxy(Class clientType, interfaces, new VoltronProxyInvocationHandler( entityClientEndpoint, - CommonProxyFactory.invert(CommonProxyFactory.createResponseTypeMappings(type, messageTypes)).values()) + CommonProxyFactory.invert(CommonProxyFactory.createResponseTypeMappings(type, messageTypes)).values(), + codec) )); } diff --git a/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/EndpointListener.java b/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/EndpointListener.java new file mode 100644 index 0000000000..d9c45032bd --- /dev/null +++ b/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/EndpointListener.java @@ -0,0 +1,27 @@ +/* + * Copyright Terracotta, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.terracotta.voltron.proxy.client; + +/** + * @author Mathieu Carbou + */ +public interface EndpointListener { + + Object onReconnect(); + + void onDisconnectUnexpectedly(); + +} diff --git a/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/EndpointListenerAware.java b/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/EndpointListenerAware.java new file mode 100644 index 0000000000..499624a81f --- /dev/null +++ b/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/EndpointListenerAware.java @@ -0,0 +1,23 @@ +/* + * Copyright Terracotta, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.terracotta.voltron.proxy.client; + +/** + * @author Alex Snaps + */ +public interface EndpointListenerAware { + void setEndpointListener(EndpointListener endpointListener); +} diff --git a/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/ProxyEndpointDelegate.java b/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/ProxyEndpointDelegate.java deleted file mode 100644 index 6b2901e18a..0000000000 --- a/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/ProxyEndpointDelegate.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright Terracotta, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.terracotta.voltron.proxy.client; - -import org.terracotta.entity.EndpointDelegate; -import org.terracotta.entity.EntityResponse; -import org.terracotta.voltron.proxy.MessageListener; -import org.terracotta.voltron.proxy.ProxyEntityResponse; - -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * @author Alex Snaps - */ -class ProxyEndpointDelegate implements EndpointDelegate { - - private final ConcurrentMap, CopyOnWriteArrayList> listeners; - - public ProxyEndpointDelegate(final ConcurrentMap, CopyOnWriteArrayList> listeners) { - this.listeners = listeners; - } - - @Override - public void handleMessage(EntityResponse messageFromServer) { - ProxyEntityResponse response = (ProxyEntityResponse)messageFromServer; - final Class aClass = response.getResponseType(); - for (MessageListener messageListener : listeners.get(aClass)) { - messageListener.onMessage(response.getResponse()); - } - } - - @Override - public byte[] createExtendedReconnectData() { - // no idea?! - return new byte[0]; - } - - @Override - public void didDisconnectUnexpectedly() { - // no idea?! - } -} diff --git a/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/ProxyEntityClientService.java b/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/ProxyEntityClientService.java index e8368be082..951b58cbb3 100644 --- a/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/ProxyEntityClientService.java +++ b/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/ProxyEntityClientService.java @@ -47,7 +47,7 @@ public boolean handlesEntityType(final Class aClass) { @Override public T create(EntityClientEndpoint endpoint) { - return ClientProxyFactory.createEntityProxy(clientType, type, endpoint, messageTypes); + return ClientProxyFactory.createEntityProxy(clientType, type, endpoint, messageTypes, messageCodec.getCodec()); } @Override diff --git a/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/ServerMessageAware.java b/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/ServerMessageAware.java index c79357af9b..c77016cd40 100644 --- a/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/ServerMessageAware.java +++ b/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/ServerMessageAware.java @@ -22,5 +22,5 @@ */ public interface ServerMessageAware { - void registerListener(Class type, MessageListener listener); + void registerMessageListener(Class type, MessageListener listener); } diff --git a/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/VoltronProxyInvocationHandler.java b/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/VoltronProxyInvocationHandler.java index 4d1c58d3d2..b2460de29f 100644 --- a/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/VoltronProxyInvocationHandler.java +++ b/voltron-proxy/voltron-proxy-client/src/main/java/org/terracotta/voltron/proxy/client/VoltronProxyInvocationHandler.java @@ -16,10 +16,13 @@ package org.terracotta.voltron.proxy.client; import org.terracotta.connection.entity.Entity; +import org.terracotta.entity.EndpointDelegate; import org.terracotta.entity.EntityClientEndpoint; +import org.terracotta.entity.EntityResponse; import org.terracotta.entity.InvocationBuilder; import org.terracotta.entity.InvokeFuture; import org.terracotta.exception.EntityException; +import org.terracotta.voltron.proxy.Codec; import org.terracotta.voltron.proxy.MessageListener; import org.terracotta.voltron.proxy.MethodDescriptor; import org.terracotta.voltron.proxy.ProxyEntityMessage; @@ -43,12 +46,14 @@ class VoltronProxyInvocationHandler implements InvocationHandler { private static final Method close; - private static final Method registerListener; + private static final Method registerMessageListener; + private static final Method setEndpointListener; static { try { close = Entity.class.getDeclaredMethod("close"); - registerListener = ServerMessageAware.class.getDeclaredMethod("registerListener", Class.class, MessageListener.class); + registerMessageListener = ServerMessageAware.class.getDeclaredMethod("registerMessageListener", Class.class, MessageListener.class); + setEndpointListener = EndpointListenerAware.class.getDeclaredMethod("setEndpointListener", EndpointListener.class); } catch (NoSuchMethodException e) { throw new AssertionError("Someone changed some method signature here!!!"); } @@ -56,41 +61,82 @@ class VoltronProxyInvocationHandler implements InvocationHandler { private final EntityClientEndpoint entityClientEndpoint; private final ConcurrentMap, CopyOnWriteArrayList> listeners; + private final Codec codec; - public VoltronProxyInvocationHandler(final EntityClientEndpoint entityClientEndpoint, Collection> events) { + private EndpointListener endpointListener; + + VoltronProxyInvocationHandler(final EntityClientEndpoint entityClientEndpoint, Collection> events, final Codec codec) { this.entityClientEndpoint = entityClientEndpoint; + this.codec = codec; this.listeners = new ConcurrentHashMap, CopyOnWriteArrayList>(); if (events.size() > 0) { for (Class aClass : events) { listeners.put(aClass, new CopyOnWriteArrayList()); } - entityClientEndpoint.setDelegate(new ProxyEndpointDelegate(listeners)); + + entityClientEndpoint.setDelegate(new EndpointDelegate() { + + @SuppressWarnings("unchecked") + @Override + public void handleMessage(EntityResponse messageFromServer) { + ProxyEntityResponse response = (ProxyEntityResponse) messageFromServer; + final Class aClass = response.getResponseType(); + for (MessageListener messageListener : listeners.get(aClass)) { + messageListener.onMessage(response.getResponse()); + } + } + + @Override + public byte[] createExtendedReconnectData() { + if (endpointListener == null) { + return null; + } else { + Object state = endpointListener.onReconnect(); + if (state == null) { + return null; + } + return codec.encode(state.getClass(), state); + } + } + + @Override + public void didDisconnectUnexpectedly() { + if (endpointListener != null) { + endpointListener.onDisconnectUnexpectedly(); + } + } + }); } } @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { - if(close.equals(method)) { + if (close.equals(method)) { entityClientEndpoint.close(); return null; - } else if(registerListener.equals(method)) { + + } else if (registerMessageListener.equals(method)) { final Class eventType = (Class) args[0]; final MessageListener arg = (MessageListener) args[1]; final CopyOnWriteArrayList messageListeners = listeners.get(eventType); - if(messageListeners == null) { + if (messageListeners == null) { throw new IllegalArgumentException("Event type '" + eventType + "' isn't supported"); } messageListeners.add(arg); return null; + + } else if (setEndpointListener.equals(method)) { + this.endpointListener = (EndpointListener) args[0]; + return null; } final MethodDescriptor methodDescriptor = MethodDescriptor.of(method); final InvocationBuilder builder = entityClientEndpoint.beginInvoke() - .message(new ProxyEntityMessage(methodDescriptor, args, false)); + .message(new ProxyEntityMessage(methodDescriptor, args, false)); - if(methodDescriptor.isAsync()) { + if (methodDescriptor.isAsync()) { switch (methodDescriptor.getAck()) { case NONE: break; diff --git a/voltron-proxy/voltron-proxy-client/src/test/java/org/terracotta/voltron/proxy/client/ClientProxyFactoryTest.java b/voltron-proxy/voltron-proxy-client/src/test/java/org/terracotta/voltron/proxy/client/ClientProxyFactoryTest.java index ebb4544771..07d6038c0e 100644 --- a/voltron-proxy/voltron-proxy-client/src/test/java/org/terracotta/voltron/proxy/client/ClientProxyFactoryTest.java +++ b/voltron-proxy/voltron-proxy-client/src/test/java/org/terracotta/voltron/proxy/client/ClientProxyFactoryTest.java @@ -49,8 +49,9 @@ public class ClientProxyFactoryTest { @Test public void addEntityInterfaceToType() { + final SerializationCodec codec = new SerializationCodec(); final EntityClientEndpoint clientEndpoint = mock(EntityClientEndpoint.class); - final Comparable proxy = ClientProxyFactory.createProxy(Comparable.class, Comparable.class, clientEndpoint, null); + final Comparable proxy = ClientProxyFactory.createProxy(Comparable.class, Comparable.class, clientEndpoint, null, codec); assertThat(proxy, instanceOf(Entity.class)); ((Entity) proxy).close(); verify(clientEndpoint).close(); @@ -67,7 +68,7 @@ public void testFakeOutboundCall() throws ExecutionException, InterruptedExcepti when(builder.invoke()).thenReturn(future); when(future.get()).thenReturn(response(Integer.class, 42)); - final PassThrough proxy = ClientProxyFactory.createProxy(PassThrough.class, PassThrough.class, endpoint, null); + final PassThrough proxy = ClientProxyFactory.createProxy(PassThrough.class, PassThrough.class, endpoint, null, codec); assertThat(proxy.sync(), is(42)); } @@ -90,7 +91,7 @@ public T decode(final Class type, final byte[] buffer) { when(future.getWithTimeout(1, TimeUnit.SECONDS)).thenReturn(response(Integer.class, 43)) .thenThrow(new TimeoutException("Blah!")); - final PassThrough proxy = ClientProxyFactory.createProxy(PassThrough.class, PassThrough.class, endpoint, null); + final PassThrough proxy = ClientProxyFactory.createProxy(PassThrough.class, PassThrough.class, endpoint, null, codec); assertThat(proxy.aSync().get(), is(42)); assertThat(proxy.aSync().get(1, TimeUnit.SECONDS), is(43)); try { @@ -112,19 +113,19 @@ public void testRegistersListeners() throws ExecutionException, InterruptedExcep when(builder.invoke()).thenReturn(future); when(future.get()).thenReturn(response(Integer.class, 42)); - final ListenerAware proxy = ClientProxyFactory.createEntityProxy(ListenerAware.class, PassThrough.class, endpoint, new Class[]{String.class, Integer.class, Long.class, Double.class}); + final ListenerAware proxy = ClientProxyFactory.createEntityProxy(ListenerAware.class, PassThrough.class, endpoint, new Class[]{String.class, Integer.class, Long.class, Double.class}, codec); assertThat(proxy.sync(), is(42)); - proxy.registerListener(String.class, new StringMessageListener()); - proxy.registerListener(Integer.class, new MessageListener() { + proxy.registerMessageListener(String.class, new StringMessageListener()); + proxy.registerMessageListener(Integer.class, new MessageListener() { @Override public void onMessage(final Integer message) { throw new UnsupportedOperationException("Implement me!"); } }); - proxy.registerListener(Long.class, new ComplexMessageListener()); - proxy.registerListener(Double.class, new MoreComplexMessageListener()); + proxy.registerMessageListener(Long.class, new ComplexMessageListener()); + proxy.registerMessageListener(Double.class, new MoreComplexMessageListener()); try { - proxy.registerListener(Object.class, new MessageListener() { + proxy.registerMessageListener(Object.class, new MessageListener() { @Override public void onMessage(final Object message) { throw new UnsupportedOperationException("Implement me!"); diff --git a/voltron-proxy/voltron-proxy-client/src/test/java/org/terracotta/voltron/proxy/client/VoltronProxyInvocationHandlerTest.java b/voltron-proxy/voltron-proxy-client/src/test/java/org/terracotta/voltron/proxy/client/VoltronProxyInvocationHandlerTest.java index 5ac454d2da..0132c03af4 100644 --- a/voltron-proxy/voltron-proxy-client/src/test/java/org/terracotta/voltron/proxy/client/VoltronProxyInvocationHandlerTest.java +++ b/voltron-proxy/voltron-proxy-client/src/test/java/org/terracotta/voltron/proxy/client/VoltronProxyInvocationHandlerTest.java @@ -64,7 +64,7 @@ public byte[] encode(final Class type, final Object value) { when(future.get()).thenReturn(ProxyEntityResponse.response(Void.TYPE, null)); Map methodMappings = invert(createMethodMappings(TestInterface.class)); - VoltronProxyInvocationHandler handler = new VoltronProxyInvocationHandler(endpoint, Collections.>emptyList()); + VoltronProxyInvocationHandler handler = new VoltronProxyInvocationHandler(endpoint, Collections.>emptyList(), codec); for (MethodDescriptor method : methodMappings.keySet()) { handler.invoke(null, method.getMethod(), new Object[] { "String", new Object() }); } diff --git a/voltron-proxy/voltron-proxy-server/src/main/java/org/terracotta/voltron/proxy/server/ActiveProxiedServerEntity.java b/voltron-proxy/voltron-proxy-server/src/main/java/org/terracotta/voltron/proxy/server/ActiveProxiedServerEntity.java index 7943bc3d04..b8cc3de801 100644 --- a/voltron-proxy/voltron-proxy-server/src/main/java/org/terracotta/voltron/proxy/server/ActiveProxiedServerEntity.java +++ b/voltron-proxy/voltron-proxy-server/src/main/java/org/terracotta/voltron/proxy/server/ActiveProxiedServerEntity.java @@ -18,6 +18,7 @@ import org.terracotta.entity.ActiveServerEntity; import org.terracotta.entity.ClientDescriptor; import org.terracotta.entity.PassiveSynchronizationChannel; +import org.terracotta.voltron.proxy.Codec; import org.terracotta.voltron.proxy.ProxyEntityMessage; import org.terracotta.voltron.proxy.ProxyEntityResponse; @@ -26,11 +27,14 @@ /** * @author Alex Snaps */ -public abstract class ActiveProxiedServerEntity implements ActiveServerEntity { +public abstract class ActiveProxiedServerEntity implements ActiveServerEntity { private final T entity; private final ProxyInvoker invoker; + private S synchronizer; + private Codec codec; + private Class reconnectDataType; public ActiveProxiedServerEntity(T entity) { this.entity = Objects.requireNonNull(entity); @@ -38,7 +42,7 @@ public ActiveProxiedServerEntity(T entity) { } @Override - public ProxyEntityResponse invoke(final ClientDescriptor clientDescriptor, final ProxyEntityMessage msg) { + public final ProxyEntityResponse invoke(final ClientDescriptor clientDescriptor, final ProxyEntityMessage msg) { return invoker.invoke(msg, clientDescriptor); } @@ -53,12 +57,19 @@ public void disconnected(ClientDescriptor clientDescriptor) { } @Override - public void handleReconnect(final ClientDescriptor clientDescriptor, final byte[] extendedReconnectData) { - // Don't care I think + public final void handleReconnect(final ClientDescriptor clientDescriptor, final byte[] extendedReconnectData) { + if (reconnectDataType != null && codec != null) { + R state = null; + if (extendedReconnectData != null && extendedReconnectData.length > 0) { + state = codec.decode(reconnectDataType, extendedReconnectData); + + } + onReconnect(clientDescriptor, state); + } } @Override - public void synchronizeKeyToPassive(final PassiveSynchronizationChannel channel, final int concurrencyKey) { + public final void synchronizeKeyToPassive(final PassiveSynchronizationChannel channel, final int concurrencyKey) { if (synchronizer != null) { SyncProxyFactory.setCurrentChannel(channel); try { @@ -87,24 +98,31 @@ public void destroy() { protected void synchronizeKeyToPassive(int concurrencyKey) { } - protected void fireMessage(Class type, M message, boolean echo) {invoker.fireMessage(type, message, echo);} + protected void onReconnect(ClientDescriptor clientDescriptor, R state) { + } + + protected final void fireMessage(Class type, M message, boolean echo) {invoker.fireMessage(type, message, echo);} - protected void fireMessage(Class type, M message, ClientDescriptor[] clients) {invoker.fireMessage(type, message, clients);} + protected final void fireMessage(Class type, M message, ClientDescriptor[] clients) {invoker.fireMessage(type, message, clients);} - protected T getEntity() { + protected final T getEntity() { return entity; } - protected S getSynchronizer() { + protected final S getSynchronizer() { return synchronizer; } - ProxyInvoker getInvoker() { + final ProxyInvoker getInvoker() { return invoker; } - void setSynchronizer(S synchronizer) { + final void setSynchronizer(S synchronizer) { this.synchronizer = synchronizer; } + final void setReconnect(Class reconnectDataType, Codec codec) { + this.reconnectDataType = reconnectDataType; + this.codec = codec; + } } diff --git a/voltron-proxy/voltron-proxy-server/src/main/java/org/terracotta/voltron/proxy/server/ProxyServerEntityService.java b/voltron-proxy/voltron-proxy-server/src/main/java/org/terracotta/voltron/proxy/server/ProxyServerEntityService.java index 7fdf0c95a8..0235345afc 100644 --- a/voltron-proxy/voltron-proxy-server/src/main/java/org/terracotta/voltron/proxy/server/ProxyServerEntityService.java +++ b/voltron-proxy/voltron-proxy-server/src/main/java/org/terracotta/voltron/proxy/server/ProxyServerEntityService.java @@ -15,12 +15,16 @@ */ package org.terracotta.voltron.proxy.server; +import org.terracotta.entity.ActiveServerEntity; import org.terracotta.entity.BasicServiceConfiguration; import org.terracotta.entity.ClientCommunicator; +import org.terracotta.entity.CommonServerEntity; import org.terracotta.entity.ConcurrencyStrategy; +import org.terracotta.entity.ConfigurationException; import org.terracotta.entity.EntityServerService; import org.terracotta.entity.ExecutionStrategy; import org.terracotta.entity.MessageCodec; +import org.terracotta.entity.PassiveServerEntity; import org.terracotta.entity.ServiceRegistry; import org.terracotta.entity.SyncMessageCodec; import org.terracotta.voltron.proxy.Codec; @@ -33,12 +37,17 @@ import java.util.Set; /** + * @param The interface of the entity proxy implementation + * @param Entity config type + * @param Void, or the interface type to sync to passives + * @param Void, or the reconnect data type * @author Mathieu Carbou */ -public abstract class ProxyServerEntityService implements EntityServerService { +public abstract class ProxyServerEntityService implements EntityServerService { private final Class configType; private final Class[] eventTypes; + private final Class reconnectDataType; private final Class synchronizerType; private final ProxyMessageCodec messageCodec; private final DelegatingSyncMessageCodec syncMessageCodec; @@ -55,7 +64,7 @@ public Set getKeysForSynchronization() { } }; - public ProxyServerEntityService(Class proxyType, Class configType, Class[] eventTypes, Class synchronizerType) { + public ProxyServerEntityService(Class proxyType, Class configType, Class[] eventTypes, Class synchronizerType, Class reconnectDataType) { this.configType = Objects.requireNonNull(configType); this.eventTypes = eventTypes; // can be null @@ -63,20 +72,22 @@ public ProxyServerEntityService(Class proxyType, Class configType, Class getMessageCodec() { + public final MessageCodec getMessageCodec() { return messageCodec; } @Override - public SyncMessageCodec getSyncMessageCodec() { + public final SyncMessageCodec getSyncMessageCodec() { return syncMessageCodec; } @Override - public ActiveProxiedServerEntity createActiveEntity(ServiceRegistry registry, byte[] configuration) { + public final ActiveProxiedServerEntity createActiveEntity(ServiceRegistry registry, byte[] configuration) { C config = null; if (configType == Void.TYPE) { if (configuration != null && configuration.length > 0) { @@ -85,23 +96,27 @@ public ActiveProxiedServerEntity createActiveEntity(ServiceRegistry regist } else { config = configType.cast(messageCodec.getCodec().decode(configType, configuration)); } - ActiveProxiedServerEntity activeEntity = createActiveEntity(registry, config); + ActiveProxiedServerEntity activeEntity = createActiveEntity(registry, config); if (eventTypes != null && eventTypes.length > 0) { ClientCommunicator clientCommunicator = registry.getService(new BasicServiceConfiguration<>(ClientCommunicator.class)); activeEntity.getInvoker().activateEvents(clientCommunicator, eventTypes); } - if(synchronizerType != null) { + if (synchronizerType != null) { S synchronizer = SyncProxyFactory.createProxy(synchronizerType); activeEntity.setSynchronizer(synchronizer); } + if (reconnectDataType != null) { + activeEntity.setReconnect(reconnectDataType, messageCodec.getCodec()); + } + return activeEntity; } @Override - public PassiveProxiedServerEntity createPassiveEntity(ServiceRegistry registry, byte[] configuration) { + public final PassiveProxiedServerEntity createPassiveEntity(ServiceRegistry registry, byte[] configuration) { C config = null; if (configType == Void.TYPE) { if (configuration != null && configuration.length > 0) { @@ -114,27 +129,40 @@ public PassiveProxiedServerEntity createPassiveEntity(ServiceRegistry regi } @Override - public ConcurrencyStrategy getConcurrencyStrategy(byte[] configuration) { + public final ConcurrencyStrategy getConcurrencyStrategy(byte[] configuration) { return concurrencyStrategy; } @Override - public ExecutionStrategy getExecutionStrategy(byte[] configuration) { + public final ExecutionStrategy getExecutionStrategy(byte[] configuration) { return executionStrategy; } - protected Set getKeysForSynchronization() { - return Collections.emptySet(); - } - - protected void setCodec(Codec codec) { + protected final void setCodec(Codec codec) { messageCodec.setCodec(codec); - if(syncMessageCodec != null) { + if (syncMessageCodec != null) { syncMessageCodec.setCodec(codec); } } - protected abstract ActiveProxiedServerEntity createActiveEntity(ServiceRegistry registry, C configuration); + @Override + public final > AP reconfigureEntity(ServiceRegistry registry, AP oldEntity, byte[] configuration) throws ConfigurationException { + if (oldEntity instanceof PassiveServerEntity) { + return (AP) createPassiveEntity(registry, configuration); + } else if (oldEntity instanceof ActiveServerEntity) { + return (AP) createActiveEntity(registry, configuration); + } else { + throw new AssertionError("unknown entity type"); + } + } + + // can be overriden / implemented + + protected Set getKeysForSynchronization() { + return Collections.emptySet(); + } + + protected abstract ActiveProxiedServerEntity createActiveEntity(ServiceRegistry registry, C configuration); protected abstract PassiveProxiedServerEntity createPassiveEntity(ServiceRegistry registry, C configuration); diff --git a/voltron-proxy/voltron-proxy-server/src/test/java/org/terracotta/voltron/proxy/server/EndToEndTest.java b/voltron-proxy/voltron-proxy-server/src/test/java/org/terracotta/voltron/proxy/server/EndToEndTest.java index 105a8a266e..9a3c6553f0 100644 --- a/voltron-proxy/voltron-proxy-server/src/test/java/org/terracotta/voltron/proxy/server/EndToEndTest.java +++ b/voltron-proxy/voltron-proxy-server/src/test/java/org/terracotta/voltron/proxy/server/EndToEndTest.java @@ -31,6 +31,7 @@ import org.terracotta.voltron.proxy.ProxyEntityMessage; import org.terracotta.voltron.proxy.ProxyEntityResponse; import org.terracotta.voltron.proxy.ProxyMessageCodec; +import org.terracotta.voltron.proxy.SerializationCodec; import org.terracotta.voltron.proxy.client.ClientProxyFactory; import org.terracotta.voltron.proxy.client.ServerMessageAware; @@ -59,6 +60,7 @@ public class EndToEndTest { @Test public void testBothEnds() throws ExecutionException, InterruptedException { + final SerializationCodec codec = new SerializationCodec(); final ProxyMessageCodec messageCodec = new ProxyMessageCodec(Comparable.class, null); final ProxyInvoker proxyInvoker = new ProxyInvoker(new Comparable() { public int compareTo(final Object o) { @@ -70,12 +72,13 @@ public int compareTo(final Object o) { when(endpoint.beginInvoke()).thenReturn(builder); - final Comparable proxy = ClientProxyFactory.createProxy(Comparable.class, Comparable.class, endpoint, null); + final Comparable proxy = ClientProxyFactory.createProxy(Comparable.class, Comparable.class, endpoint, null, codec); assertThat(proxy.compareTo("blah!"), is(42)); } @Test public void testServerInitiatedMessageFiring() throws ExecutionException, InterruptedException { + final SerializationCodec codec = new SerializationCodec(); final AtomicReference delegate = new AtomicReference(); final ProxyMessageCodec messageCodec = new ProxyMessageCodec(Comparable.class, new Class[] {String.class}); final ProxyInvoker proxyInvoker = new ProxyInvoker(new Comparable() { @@ -118,9 +121,9 @@ public void didCloseUnexpectedly() { } }; - final ComparableEntity proxy = ClientProxyFactory.createEntityProxy(ComparableEntity.class, Comparable.class, endpoint, new Class[]{String.class}); + final ComparableEntity proxy = ClientProxyFactory.createEntityProxy(ComparableEntity.class, Comparable.class, endpoint, new Class[]{String.class}, codec); final AtomicReference messageReceived = new AtomicReference(); - proxy.registerListener(String.class, new MessageListener() { + proxy.registerMessageListener(String.class, new MessageListener() { @Override public void onMessage(final String message) { messageReceived.set(message); @@ -137,6 +140,7 @@ public void onMessage(final String message) { @Test public void testClientInvokeInitiatedMessageFiring() throws ExecutionException, InterruptedException { + final SerializationCodec codec = new SerializationCodec(); final MessageListener listener = new MessageListener() { @Override public void onMessage(final Integer message) { @@ -161,8 +165,8 @@ public Future send(final ClientDescriptor clientDescriptor, final EntityRe proxyInvoker.addClient(new MyClientDescriptor()); proxyInvoker.addClient(myClient); - final ClientIdAware proxy = ClientProxyFactory.createProxy(ClientIdAware.class, ClientIdAware.class, endpoint, new Class[] {Integer.class}); - proxy.registerListener(Integer.class, listener); + final ClientIdAware proxy = ClientProxyFactory.createProxy(ClientIdAware.class, ClientIdAware.class, endpoint, new Class[] {Integer.class}, codec); + proxy.registerMessageListener(Integer.class, listener); proxy.nothing(); proxy.notMuch(null); assertThat(firingClientIdAware.counter.get(), is(1)); @@ -170,9 +174,10 @@ public Future send(final ClientDescriptor clientDescriptor, final EntityRe @Test public void testClientIdSubstitution() throws ExecutionException, InterruptedException { + final SerializationCodec codec = new SerializationCodec(); final MessageCodec messageCodec = new ProxyMessageCodec(ClientIdAware.class, null); final ProxyInvoker proxyInvoker = new ProxyInvoker(new ClientIdAware() { - public void registerListener(Class type, final MessageListener listener) { + public void registerMessageListener(Class type, final MessageListener listener) { throw new UnsupportedOperationException("Implement me!"); } @@ -195,7 +200,7 @@ public Serializable much(final Serializable foo, final Object id) { when(endpoint.beginInvoke()).thenReturn(builder); - final ClientIdAware proxy = ClientProxyFactory.createProxy(ClientIdAware.class, ClientIdAware.class, endpoint, null); + final ClientIdAware proxy = ClientProxyFactory.createProxy(ClientIdAware.class, ClientIdAware.class, endpoint, null, codec); proxy.nothing(); proxy.notMuch(null); assertThat(proxy.much(12, 12), notNullValue()); @@ -331,7 +336,7 @@ public Serializable much(final Serializable foo, final Object id) { } @Override - public void registerListener(Class type, final MessageListener listener) { + public void registerMessageListener(Class type, final MessageListener listener) { // noop } }