Skip to content

Commit

Permalink
Merge pull request #297 from Cumulocity-IoT/virtualThreads
Browse files Browse the repository at this point in the history
Enabling virtual Threads
  • Loading branch information
switschel authored Jan 6, 2025
2 parents edbc29d + 4212f75 commit 9d10b90
Show file tree
Hide file tree
Showing 13 changed files with 121 additions and 114 deletions.
15 changes: 5 additions & 10 deletions dynamic-mapping-service/src/main/java/dynamic/mapping/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import dynamic.mapping.model.MappingTreeNode;
import dynamic.mapping.model.MappingTreeNodeSerializer;
Expand Down Expand Up @@ -98,17 +99,11 @@ public TaskExecutor taskExecutor() {
executor.setQueueCapacity(25);
return executor;
}
//Assuming we can process 25 messages in parallel per CPU-Core
@Bean("processingCachePool")
public ExecutorService processingThreadPool() {
return Executors.newCachedThreadPool();
//return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*25);
}

//Assuming we can process 10 messages in parallel per CPU-Core
@Bean("cachedThreadPool")
public ExecutorService cachedThreadPool() {
return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*10);
@Bean("virtThreadPool")
public ExecutorService virtThreadPool() {
final ThreadFactory factory = Thread.ofVirtual().name("virtThread-",0).factory();
return Executors.newThreadPerTaskExecutor(factory);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ public static class Certificate {

protected ConfigurationRegistry configurationRegistry;

@Getter
protected ExecutorService cachedThreadPool;
@Getter
protected ExecutorService virtThreadPool;

private Future<?> connectTask;
private ScheduledExecutorService housekeepingExecutor = Executors
Expand Down Expand Up @@ -254,23 +254,23 @@ public static class Certificate {
public void submitInitialize() {
if (initializeTask == null || initializeTask.isDone()) {
log.debug("Tenant {} - Initializing...", tenant);
initializeTask = cachedThreadPool.submit(this::initialize);
initializeTask = virtThreadPool.submit(this::initialize);
}
}

public void submitConnect() {
loadConfiguration();
if (connectTask == null || connectTask.isDone()) {
log.debug("Tenant {} - Connecting...", tenant);
connectTask = cachedThreadPool.submit(this::connect);
connectTask = virtThreadPool.submit(this::connect);
}
}

public void submitDisconnect() {
loadConfiguration();
if (connectTask == null || connectTask.isDone()) {
log.debug("Tenant {} - Disconnecting...", tenant);
connectTask = cachedThreadPool.submit(this::disconnect);
connectTask = virtThreadPool.submit(this::disconnect);
}
}

Expand Down Expand Up @@ -769,7 +769,7 @@ private Optional<Mapping> findActiveMappingInbound(Mapping mapping) {
if (cacheMappings == null) {
return Optional.empty();
}

return cacheMappings.values().stream()
.filter(m -> m.id.equals(mapping.id))
.findFirst();
Expand Down Expand Up @@ -910,7 +910,7 @@ private String formatPathLevel(String level, boolean isCurrent) {
*/
protected <T> Optional<T> executeWithTimeout(Callable<T> operation, long timeout, TimeUnit unit) {
try {
Future<T> future = cachedThreadPool.submit(operation);
Future<T> future = virtThreadPool.submit(operation);
return Optional.ofNullable(future.get(timeout, unit));
} catch (Exception e) {
log.warn("Operation timed out or failed: {}", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public KafkaClient(ConfigurationRegistry configurationRegistry,
this.connectorIdentifier = connectorConfiguration.identifier;
this.connectorStatus = ConnectorStatusEvent.unknown(connectorConfiguration.name, connectorConfiguration.identifier);
this.c8yAgent = configurationRegistry.getC8yAgent();
this.cachedThreadPool = configurationRegistry.getCachedThreadPool();
this.virtThreadPool = configurationRegistry.getVirtThreadPool();
this.objectMapper = configurationRegistry.getObjectMapper();
this.additionalSubscriptionIdTest = additionalSubscriptionIdTest;
this.mappingServiceRepresentation = configurationRegistry.getMappingServiceRepresentations().get(tenant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public MQTTClient(ConfigurationRegistry configurationRegistry,
this.connectorStatus = ConnectorStatusEvent.unknown(connectorConfiguration.name, connectorConfiguration.identifier);
// this.connectorType = connectorConfiguration.connectorType;
this.c8yAgent = configurationRegistry.getC8yAgent();
this.cachedThreadPool = configurationRegistry.getCachedThreadPool();
this.virtThreadPool = configurationRegistry.getVirtThreadPool();
this.objectMapper = configurationRegistry.getObjectMapper();
this.additionalSubscriptionIdTest = additionalSubscriptionIdTest;
this.mappingServiceRepresentation = configurationRegistry.getMappingServiceRepresentations().get(tenant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public MQTTServiceClient(ConfigurationRegistry configurationRegistry,
this.connectorName = connectorConfiguration.name;
this.connectorStatus = ConnectorStatusEvent.unknown(connectorConfiguration.name, connectorConfiguration.identifier);
this.c8yAgent = configurationRegistry.getC8yAgent();
this.cachedThreadPool = configurationRegistry.getCachedThreadPool();
this.virtThreadPool = configurationRegistry.getVirtThreadPool();
this.objectMapper = configurationRegistry.getObjectMapper();
this.additionalSubscriptionIdTest = additionalSubscriptionIdTest;
this.mappingServiceRepresentation = configurationRegistry.getMappingServiceRepresentations().get(tenant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public class BootstrapService {
@Autowired
ConnectorConfigurationComponent connectorConfigurationComponent;

@Qualifier("cachedThreadPool")
private ExecutorService cachedThreadPool;
@Qualifier("virtThreadPool")
private ExecutorService virtThreadPool;

@Autowired
public void setCachedThreadPool(ExecutorService cachedThreadPool) {
this.cachedThreadPool = cachedThreadPool;
public void setVirtThreadPool(ExecutorService virtThreadPool) {
this.virtThreadPool = virtThreadPool;
}

@Value("${APP.additionalSubscriptionIdTest}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,7 @@ public void setServiceConfigurationComponent(@Lazy ServiceConfigurationComponent
@Getter
@Setter
@Autowired
private ExecutorService cachedThreadPool;

@Getter
@Setter
@Autowired
private ExecutorService processingCachePool;
private ExecutorService virtThreadPool;

public Map<MappingType, BaseProcessorInbound<?>> createPayloadProcessorsInbound(String tenant) {
ExtensibleProcessor extensibleProcessor = getExtensibleProcessors().get(tenant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public void setConfigurationRegistry(@Lazy ConfigurationRegistry configurationRe
}

@Autowired
@Qualifier("cachedThreadPool")
private ExecutorService cachedThreadPool;
@Qualifier("virtThreadPool")
private ExecutorService virtThreadPool;

// structure: <tenant, <connectorIdentifier, asynchronousDispatcherOutbound>>
@Getter
Expand Down Expand Up @@ -285,20 +285,21 @@ public NotificationSubscriptionRepresentation createDeviceSubscription(ManagedOb
return notificationSubscriptionRepresentation;
}

public CompletableFuture<NotificationSubscriptionRepresentation> subscribeDeviceAndConnect(
ManagedObjectRepresentation mor,
API api) throws ExecutionException, InterruptedException {
/* Connect to all devices */
String tenant = subscriptionsService.getTenant();
String deviceName = mor.getName();
CompletableFuture<NotificationSubscriptionRepresentation> notificationFut = new CompletableFuture<NotificationSubscriptionRepresentation>();
subscriptionsService.runForTenant(tenant, () -> {
Map<String, String> deviceTokens = deviceTokenPerConnector.get(tenant);
NotificationSubscriptionRepresentation notification = createDeviceSubscription(mor, api);
notificationFut.complete(notification);
if (deviceWSStatusCode.get(tenant) == null
|| (deviceWSStatusCode.get(tenant) != null && deviceWSStatusCode.get(tenant) != 200)) {
log.info("Tenant {} - Device Subscription not connected yet. Will connect...", tenant);
public Future<NotificationSubscriptionRepresentation> subscribeDeviceAndConnect(
ManagedObjectRepresentation mor,
API api) throws ExecutionException, InterruptedException {
/* Connect to all devices */
String tenant = subscriptionsService.getTenant();
String deviceName = mor.getName();
log.info("Tenant {} - Creating new Subscription for Device {} with ID {}", tenant, deviceName,
mor.getId().getValue());

Callable<NotificationSubscriptionRepresentation> callableTask = () -> subscriptionsService.callForTenant(tenant, () -> {
Map<String, String> deviceTokens = deviceTokenPerConnector.get(tenant);
NotificationSubscriptionRepresentation notification = createDeviceSubscription(mor, api);
if (deviceWSStatusCode.get(tenant) == null
|| (deviceWSStatusCode.get(tenant) != null && deviceWSStatusCode.get(tenant) != 200)) {
log.info("Tenant {} - Device Subscription not connected yet. Will connect...", tenant);

try {
// Add Dispatcher for each Connector
Expand Down Expand Up @@ -331,12 +332,13 @@ public CompletableFuture<NotificationSubscriptionRepresentation> subscribeDevice
ExternalIDRepresentation extId = configurationRegistry.getC8yAgent().resolveGlobalId2ExternalId(tenant,
mor.getId(), null, null);
if (extId != null)
activatePushConnectivityStatus(tenant, extId.getExternalId());
});
return notificationFut;
activatePushConnectivityStatus(tenant, extId.getExternalId());
return notification;
});
return virtThreadPool.submit(callableTask);
}

public CompletableFuture<List<NotificationSubscriptionRepresentation>> getNotificationSubscriptionForDevices(
public Future<List<NotificationSubscriptionRepresentation>> getNotificationSubscriptionForDevices(
String deviceId,
String deviceSubscription) {
NotificationSubscriptionFilter filter = new NotificationSubscriptionFilter();
Expand All @@ -352,9 +354,8 @@ public CompletableFuture<List<NotificationSubscriptionRepresentation>> getNotifi
}
filter = filter.byContext("mo");
NotificationSubscriptionFilter finalFilter = filter;
CompletableFuture<List<NotificationSubscriptionRepresentation>> deviceSubListFut = new CompletableFuture<>();
subscriptionsService.runForTenant(subscriptionsService.getTenant(), () -> {
String tenant = subscriptionsService.getTenant();
String tenant = subscriptionsService.getTenant();
Callable<List<NotificationSubscriptionRepresentation>> callableTask = () -> subscriptionsService.callForTenant(tenant, () -> {
List<NotificationSubscriptionRepresentation> deviceSubList = new ArrayList<>();
Iterator<NotificationSubscriptionRepresentation> subIt = subscriptionAPI
.getSubscriptionsByFilter(finalFilter).get().allPages().iterator();
Expand All @@ -367,10 +368,9 @@ public CompletableFuture<List<NotificationSubscriptionRepresentation>> getNotifi
deviceSubList.add(notificationSubscriptionRepresentation);
}
}
deviceSubListFut.complete(deviceSubList);
});

return deviceSubListFut;
return deviceSubList;
});
return virtThreadPool.submit(callableTask);
}

public C8YNotificationSubscription getDeviceSubscriptions(String tenant, String deviceId,
Expand Down Expand Up @@ -622,9 +622,7 @@ public void reconnect() {
if (deviceWSStatusCode.get(tenant) != null && deviceWSStatusCode.get(tenant) == 401
|| deviceClient.getReadyState().equals(ReadyState.NOT_YET_CONNECTED)) {
log.info("Tenant {} - Trying to reconnect ws device client... ", tenant);
subscriptionsService.runForTenant(tenant, () -> {
initDeviceClient();
});
initDeviceClient();
} else if (deviceClient.getReadyState().equals(ReadyState.CLOSING)
|| deviceClient.getReadyState().equals(ReadyState.CLOSED)) {
log.info("Tenant {} - Trying to reconnect ws device client... ", tenant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ public abstract class BaseProcessorInbound<T> {
public BaseProcessorInbound(ConfigurationRegistry configurationRegistry) {
this.objectMapper = configurationRegistry.getObjectMapper();
this.c8yAgent = configurationRegistry.getC8yAgent();
this.processingCachePool = configurationRegistry.getProcessingCachePool();
this.virtThreadPool = configurationRegistry.getVirtThreadPool();
}

protected C8YAgent c8yAgent;

protected ObjectMapper objectMapper;

protected ExecutorService processingCachePool;
protected ExecutorService virtThreadPool;

public abstract T deserializePayload(Mapping mapping, ConnectorMessage message)
throws IOException;
Expand Down Expand Up @@ -149,7 +149,7 @@ public void substituteInTargetAndSend(ProcessingContext<T> context) {
for (int i = 0; i < deviceEntries.size(); i++) {
// for (MappingSubstitution.SubstituteValue device : deviceEntries) {
int finalI = i;
contextFutureList.add(processingCachePool.submit(() -> {
contextFutureList.add(virtThreadPool.submit(() -> {
return getBuildProcessingContext(context, deviceEntries.get(finalI),
finalI, deviceEntries.size());
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,18 @@ public class DispatcherInbound implements GenericMessageCallback {

private AConnectorClient connectorClient;

private ExecutorService cachedThreadPool;
private ExecutorService virtThreadPool;

private MappingComponent mappingComponent;

private ConfigurationRegistry configurationRegistry;

private Counter inboundMessageCounter;

public DispatcherInbound(ConfigurationRegistry configurationRegistry,
AConnectorClient connectorClient) {
this.connectorClient = connectorClient;
this.cachedThreadPool = configurationRegistry.getCachedThreadPool();
this.virtThreadPool = configurationRegistry.getVirtThreadPool();
this.mappingComponent = configurationRegistry.getMappingComponent();
this.configurationRegistry = configurationRegistry;
}
Expand All @@ -99,6 +101,7 @@ public static class MappingInboundTask<T> implements Callable<List<ProcessingCon
Timer inboundProcessingTimer;
Counter inboundProcessingCounter;
AConnectorClient connectorClient;
ExecutorService virtThreadPool;

public MappingInboundTask(ConfigurationRegistry configurationRegistry, List<Mapping> resolvedMappings,
ConnectorMessage message, AConnectorClient connectorClient) {
Expand All @@ -118,6 +121,7 @@ public MappingInboundTask(ConfigurationRegistry configurationRegistry, List<Mapp
this.inboundProcessingCounter = Counter.builder("dynmapper_inbound_message_total")
.tag("tenant", connectorMessage.getTenant()).description("Total number of inbound messages")
.tag("connector", connectorMessage.getConnectorIdentifier()).register(Metrics.globalRegistry);
this.virtThreadPool = configurationRegistry.getVirtThreadPool();

}

Expand Down Expand Up @@ -238,7 +242,7 @@ public Future<List<ProcessingContext<?>>> processMessage(ConnectorMessage messag
return futureProcessingResult;
}

futureProcessingResult = cachedThreadPool.submit(
futureProcessingResult = virtThreadPool.submit(
new MappingInboundTask(configurationRegistry, resolvedMappings,
message, connectorClient));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@
import static com.dashjoin.jsonata.Jsonata.jsonata;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;

import org.apache.commons.lang3.mutable.MutableInt;
import org.springframework.web.bind.annotation.RequestMethod;
Expand Down Expand Up @@ -156,7 +153,8 @@ public ProcessingContext<T> substituteInTargetAndSend(ProcessingContext<T> conte
/*
* step 4 prepare target payload for sending to mqttBroker
*/
if (!mapping.targetAPI.equals(API.INVENTORY)) {
if(Arrays.stream(API.values()).anyMatch(v -> mapping.targetAPI.equals(v))) {
//if (!mapping.targetAPI.equals(API.INVENTORY)) {
List<String> topicLevels = payloadTarget.read(Mapping.TOKEN_TOPIC_LEVEL);
if (topicLevels != null && topicLevels.size() > 0) {
// now merge the replaced topic levels
Expand Down Expand Up @@ -207,6 +205,7 @@ public ProcessingContext<T> substituteInTargetAndSend(ProcessingContext<T> conte
}
predecessor = newPredecessor;
} else {
//FIXME Why are INVENTORY API messages ignored?! Needs to be implemented
log.warn("Tenant {} - Ignoring payload: {}, {}, {}", tenant, payloadTarget, mapping.targetAPI,
processingCache.size());
}
Expand Down
Loading

0 comments on commit 9d10b90

Please sign in to comment.