Skip to content

Commit

Permalink
➕ Support for restartability: clear any entity placeholder states tha…
Browse files Browse the repository at this point in the history
…t can have been set when server becomes active
  • Loading branch information
mathieucarbou committed Dec 16, 2016
1 parent 4ef7c3a commit 659de92
Show file tree
Hide file tree
Showing 16 changed files with 126 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
public class DefaultActiveEntityMonitoringService extends AbstractEntityMonitoringService implements ActiveEntityMonitoringService {

private final TopologyService topologyService;
private final EventService eventService;
private final FiringService firingService;
private final String serverName;

DefaultActiveEntityMonitoringService(long consumerId, TopologyService topologyService, EventService eventService) {
DefaultActiveEntityMonitoringService(long consumerId, TopologyService topologyService, FiringService firingService) {
super(consumerId);
this.topologyService = Objects.requireNonNull(topologyService);
this.eventService = Objects.requireNonNull(eventService);
this.firingService = Objects.requireNonNull(firingService);
this.serverName = topologyService.getCurrentServerName();
}

Expand All @@ -57,7 +57,7 @@ public void pushNotification(ContextualNotification notification) {
logger.trace("[{}] pushNotification({})", getConsumerId(), notification);
topologyService.getEntityContext(serverName, getConsumerId()).ifPresent(context -> {
notification.setContext(notification.getContext().with(context));
eventService.fireNotification(notification);
firingService.fireNotification(notification);
});
}

Expand All @@ -70,14 +70,14 @@ public void pushStatistics(ContextualStatistics... statistics) {
topologyService.getEntityContext(serverName, getConsumerId())
.ifPresent(context -> statistic.setContext(statistic.getContext().with(context)));
}
eventService.fireStatistics(statistics);
firingService.fireStatistics(statistics);
}
}

@Override
public void answerManagementCall(String managementCallIdentifier, ContextualReturn<?> contextualReturn) {
logger.trace("[{}] answerManagementCall({}, executed={}, error={})", getConsumerId(), managementCallIdentifier, contextualReturn.hasExecuted(), contextualReturn.errorThrown());
eventService.fireManagementCallAnswer(managementCallIdentifier, contextualReturn);
firingService.fireManagementCallAnswer(managementCallIdentifier, contextualReturn);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,20 @@
/**
* @author Mathieu Carbou
*/
class DefaultClientMonitoringService implements ClientMonitoringService, EntityListener {
class DefaultClientMonitoringService implements ClientMonitoringService, TopologyEventListener {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultClientMonitoringService.class);

private final long consumerId;
private final EventService eventService;
private final FiringService firingService;
private final ClientCommunicator clientCommunicator;
private final TopologyService topologyService;
private final Map<ClientDescriptor, Context> manageableClients = new ConcurrentHashMap<>();

DefaultClientMonitoringService(long consumerId, TopologyService topologyService, EventService eventService, ClientCommunicator clientCommunicator) {
DefaultClientMonitoringService(long consumerId, TopologyService topologyService, FiringService firingService, ClientCommunicator clientCommunicator) {
this.consumerId = consumerId;
this.topologyService = Objects.requireNonNull(topologyService);
this.eventService = Objects.requireNonNull(eventService);
this.firingService = Objects.requireNonNull(firingService);
this.clientCommunicator = Objects.requireNonNull(clientCommunicator);
}

Expand All @@ -60,7 +60,7 @@ public void pushNotification(ClientDescriptor from, ContextualNotification notif
LOGGER.trace("[{}] pushNotification({}, {})", consumerId, from, notification);
topologyService.getClientContext(consumerId, from).ifPresent(context -> {
notification.setContext(notification.getContext().with(context));
eventService.fireNotification(notification);
firingService.fireNotification(notification);
});
}

Expand All @@ -72,7 +72,7 @@ public void pushStatistics(ClientDescriptor from, ContextualStatistics... statis
for (ContextualStatistics statistic : statistics) {
statistic.setContext(statistic.getContext().with(context));
}
eventService.fireStatistics(statistics);
firingService.fireStatistics(statistics);
});
}
}
Expand All @@ -96,7 +96,11 @@ public void exposeManagementRegistry(ClientDescriptor from, ContextContainer con
@Override
public void answerManagementCall(ClientDescriptor caller, String managementCallIdentifier, ContextualReturn<?> contextualReturn) {
LOGGER.trace("[{}] answerManagementCall({})", consumerId, managementCallIdentifier);
eventService.fireManagementCallAnswer(managementCallIdentifier, contextualReturn);
firingService.fireManagementCallAnswer(managementCallIdentifier, contextualReturn);
}

@Override
public void onBecomeActive() {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
/**
* @author Mathieu Carbou
*/
class DefaultConsumerManagementRegistry extends DefaultManagementRegistry implements ConsumerManagementRegistry, EntityListener {
class DefaultConsumerManagementRegistry extends DefaultManagementRegistry implements ConsumerManagementRegistry, TopologyEventListener {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultConsumerManagementRegistry.class);

Expand Down Expand Up @@ -90,6 +90,10 @@ public boolean pushServerEntityNotification(Object managedObjectSource, String t
return false;
}

@Override
public void onBecomeActive() {
}

@Override
public void onFetch(long consumerId, ClientDescriptor clientDescriptor) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ class DefaultDataListener implements DataListener {

private final long consumerId;
private final TopologyService topologyService;
private final EventService eventService;
private final FiringService firingService;

DefaultDataListener(long consumerId, TopologyService topologyService, EventService eventService) {
DefaultDataListener(long consumerId, TopologyService topologyService, FiringService firingService) {
this.consumerId = consumerId;
this.topologyService = Objects.requireNonNull(topologyService);
this.eventService = Objects.requireNonNull(eventService);
this.firingService = Objects.requireNonNull(firingService);
}

// ===========================================================================
Expand All @@ -68,7 +68,7 @@ public void pushBestEffortsData(long consumerId, PlatformServer sender, String n
}
topologyService.getEntityContext(sender.getServerName(), consumerId).ifPresent(context -> {
notification.setContext(notification.getContext().with(context));
eventService.fireNotification(notification);
firingService.fireNotification(notification);
});
break;
}
Expand All @@ -82,7 +82,7 @@ public void pushBestEffortsData(long consumerId, PlatformServer sender, String n
topologyService.getEntityContext(sender.getServerName(), cid)
.ifPresent(context -> statistic.setContext(statistic.getContext().with(context)));
}
eventService.fireStatistics(statistics);
firingService.fireStatistics(statistics);
break;
}

Expand All @@ -106,7 +106,7 @@ public synchronized void setState(long consumerId, PlatformServer sender, String
// handles data coming from DefaultMonitoringService.answerManagementCall()
String managementCallIdentifier = path[2];
ContextualReturn<?> answer = (ContextualReturn<?>) data;
eventService.fireManagementCallAnswer(managementCallIdentifier, answer);
firingService.fireManagementCallAnswer(managementCallIdentifier, answer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
/**
* @author Mathieu Carbou
*/
class DefaultEventService implements EventService, Closeable {
class DefaultFiringService implements FiringService, Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventService.class);
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultFiringService.class);

private final SequenceGenerator sequenceGenerator;
private final PlatformConfiguration platformConfiguration;
Expand All @@ -51,7 +51,7 @@ class DefaultEventService implements EventService, Closeable {
// temporary there to simulate an entity callback with IEntityMessenger
private final ExecutorService executorService = Executors.newSingleThreadExecutor();

DefaultEventService(SequenceGenerator sequenceGenerator, PlatformConfiguration platformConfiguration, SharedManagementRegistry sharedManagementRegistry, Map<Long, DefaultManagementService> managementServices, Map<Long, DefaultClientMonitoringService> clientMonitoringServices) {
DefaultFiringService(SequenceGenerator sequenceGenerator, PlatformConfiguration platformConfiguration, SharedManagementRegistry sharedManagementRegistry, Map<Long, DefaultManagementService> managementServices, Map<Long, DefaultClientMonitoringService> clientMonitoringServices) {
this.sequenceGenerator = Objects.requireNonNull(sequenceGenerator);
this.platformConfiguration = Objects.requireNonNull(platformConfiguration);
this.sharedManagementRegistry = Objects.requireNonNull(sharedManagementRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@
/**
* @author Mathieu Carbou
*/
class DefaultManagementService implements ManagementService, EntityListener {
class DefaultManagementService implements ManagementService, TopologyEventListener {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultManagementService.class);

private final long consumerId;
private final EventService eventService;
private final FiringService firingService;
private final ClientCommunicator clientCommunicator;
private final SequenceGenerator sequenceGenerator;
private final TopologyService topologyService;
Expand All @@ -59,10 +59,10 @@ class DefaultManagementService implements ManagementService, EntityListener {
private volatile ReadWriteBuffer<Message> buffer;
private volatile ContextualNotification full;

DefaultManagementService(long consumerId, TopologyService topologyService, EventService eventService, ClientCommunicator clientCommunicator, SequenceGenerator sequenceGenerator) {
DefaultManagementService(long consumerId, TopologyService topologyService, FiringService firingService, ClientCommunicator clientCommunicator, SequenceGenerator sequenceGenerator) {
this.consumerId = consumerId;
this.topologyService = Objects.requireNonNull(topologyService);
this.eventService = Objects.requireNonNull(eventService);
this.firingService = Objects.requireNonNull(firingService);
this.clientCommunicator = Objects.requireNonNull(clientCommunicator);
this.sequenceGenerator = Objects.requireNonNull(sequenceGenerator);
}
Expand Down Expand Up @@ -116,10 +116,14 @@ public String sendManagementCallRequest(ClientDescriptor caller, final Context c
}

track(caller, managementCallIdentifier);
eventService.fireManagementCallRequest(managementCallIdentifier, new ContextualCall<>(fullContext, capabilityName, methodName, returnType, parameters));
firingService.fireManagementCallRequest(managementCallIdentifier, new ContextualCall<>(fullContext, capabilityName, methodName, returnType, parameters));
return managementCallIdentifier;
}

@Override
public void onBecomeActive() {
}

@Override
public void onFetch(long consumerId, ClientDescriptor clientDescriptor) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*
* @author Mathieu Carbou
*/
interface EventService {
interface FiringService {
void fireNotification(ContextualNotification notification);

void fireStatistics(ContextualStatistics[] statistics);
Expand Down
Loading

0 comments on commit 659de92

Please sign in to comment.