diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/core/C8YAgent.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/core/C8YAgent.java index 8c8a5ed7..0742918d 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/core/C8YAgent.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/core/C8YAgent.java @@ -87,8 +87,6 @@ import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import static java.util.Map.entry; @@ -178,17 +176,18 @@ public ExternalIDRepresentation resolveExternalId2GlobalId(String tenant, ID ide try { ExternalIDRepresentation resultInner = this.getInboundExternalIdCache(tenant) .getIdByExternalId(identity); - Counter.builder("dynmapper_inbound_identity_requests_total").tag("tenant", tenant).register(Metrics.globalRegistry).increment(); + Counter.builder("dynmapper_inbound_identity_requests_total").tag("tenant", tenant) + .register(Metrics.globalRegistry).increment(); if (resultInner == null) { resultInner = identityApi.resolveExternalId2GlobalId(identity, context); this.getInboundExternalIdCache(tenant).putIdForExternalId(identity, resultInner); - } else { log.debug("Tenant {} - Cache hit for external ID {} -> {}", tenant, identity.getValue(), resultInner.getManagedObject().getId().getValue()); - Counter.builder("dynmapper_inbound_identity_cache_hits_total").tag("tenant", tenant).register(Metrics.globalRegistry).increment(); + Counter.builder("dynmapper_inbound_identity_cache_hits_total").tag("tenant", tenant) + .register(Metrics.globalRegistry).increment(); } return resultInner; } catch (SDKException e) { @@ -201,7 +200,7 @@ public ExternalIDRepresentation resolveExternalId2GlobalId(String tenant, ID ide public ExternalIDRepresentation resolveGlobalId2ExternalId(String tenant, GId gid, String idType, ProcessingContext context) { - //TODO Use Cache + // TODO Use Cache if (idType == null) { idType = "c8y_Serial"; } @@ -331,60 +330,61 @@ public AConnectorClient.Certificate loadCertificateByName(String certificateName return null; } } - //TODO Change this to use ExecutorService + Virtual Threads when available + + // TODO Change this to use ExecutorService + Virtual Threads when available public CompletableFuture createMEAOAsync(ProcessingContext context) throws ProcessingException { return CompletableFuture.supplyAsync(() -> { - String tenant = context.getTenant(); - StringBuffer error = new StringBuffer(""); - C8YRequest currentRequest = context.getCurrentRequest(); - String payload = currentRequest.getRequest(); - API targetAPI = context.getMapping().getTargetAPI(); - AbstractExtensibleRepresentation result = subscriptionsService.callForTenant(tenant, () -> { - MicroserviceCredentials contextCredentials = removeAppKeyHeaderFromContext(contextService.getContext()); - return contextService.callWithinContext(contextCredentials, () -> { - AbstractExtensibleRepresentation rt = null; - try { - if (targetAPI.equals(API.EVENT)) { - EventRepresentation eventRepresentation = configurationRegistry.getObjectMapper().readValue( - payload, - EventRepresentation.class); - rt = eventApi.create(eventRepresentation); - log.info("Tenant {} - New event posted: {}", tenant, rt); - } else if (targetAPI.equals(API.ALARM)) { - AlarmRepresentation alarmRepresentation = configurationRegistry.getObjectMapper().readValue( - payload, - AlarmRepresentation.class); - rt = alarmApi.create(alarmRepresentation); - log.info("Tenant {} - New alarm posted: {}", tenant, rt); - } else if (targetAPI.equals(API.MEASUREMENT)) { - MeasurementRepresentation measurementRepresentation = jsonParser - .parse(MeasurementRepresentation.class, payload); - rt = measurementApi.create(measurementRepresentation); - log.info("Tenant {} - New measurement posted: {}", tenant, rt); - } else if (targetAPI.equals(API.OPERATION)) { - OperationRepresentation operationRepresentation = jsonParser - .parse(OperationRepresentation.class, payload); - rt = deviceControlApi.create(operationRepresentation); - log.info("Tenant {} - New operation posted: {}", tenant, rt); - } else { - log.error("Tenant {} - Not existing API!", tenant); - } - } catch (JsonProcessingException e) { - log.error("Tenant {} - Could not map payload: {} {}", tenant, targetAPI, payload); - error.append("Could not map payload: " + targetAPI + "/" + payload); - } catch (SDKException s) { - log.error("Tenant {} - Could not sent payload to c8y: {} {}: ", tenant, targetAPI, payload, s); - error.append("Could not sent payload to c8y: " + targetAPI + "/" + payload + "/" + s); + String tenant = context.getTenant(); + StringBuffer error = new StringBuffer(""); + C8YRequest currentRequest = context.getCurrentRequest(); + String payload = currentRequest.getRequest(); + API targetAPI = context.getMapping().getTargetAPI(); + AbstractExtensibleRepresentation result = subscriptionsService.callForTenant(tenant, () -> { + MicroserviceCredentials contextCredentials = removeAppKeyHeaderFromContext(contextService.getContext()); + return contextService.callWithinContext(contextCredentials, () -> { + AbstractExtensibleRepresentation rt = null; + try { + if (targetAPI.equals(API.EVENT)) { + EventRepresentation eventRepresentation = configurationRegistry.getObjectMapper().readValue( + payload, + EventRepresentation.class); + rt = eventApi.create(eventRepresentation); + log.info("Tenant {} - New event posted: {}", tenant, rt); + } else if (targetAPI.equals(API.ALARM)) { + AlarmRepresentation alarmRepresentation = configurationRegistry.getObjectMapper().readValue( + payload, + AlarmRepresentation.class); + rt = alarmApi.create(alarmRepresentation); + log.info("Tenant {} - New alarm posted: {}", tenant, rt); + } else if (targetAPI.equals(API.MEASUREMENT)) { + MeasurementRepresentation measurementRepresentation = jsonParser + .parse(MeasurementRepresentation.class, payload); + rt = measurementApi.create(measurementRepresentation); + log.info("Tenant {} - New measurement posted: {}", tenant, rt); + } else if (targetAPI.equals(API.OPERATION)) { + OperationRepresentation operationRepresentation = jsonParser + .parse(OperationRepresentation.class, payload); + rt = deviceControlApi.create(operationRepresentation); + log.info("Tenant {} - New operation posted: {}", tenant, rt); + } else { + log.error("Tenant {} - Not existing API!", tenant); } - return rt; - }); + } catch (JsonProcessingException e) { + log.error("Tenant {} - Could not map payload: {} {}", tenant, targetAPI, payload); + error.append("Could not map payload: " + targetAPI + "/" + payload); + } catch (SDKException s) { + log.error("Tenant {} - Could not sent payload to c8y: {} {}: ", tenant, targetAPI, payload, s); + error.append("Could not sent payload to c8y: " + targetAPI + "/" + payload + "/" + s); + } + return rt; }); - if (!error.toString().equals("")) { - throw new CompletionException(new ProcessingException(error.toString())); - } - return result; }); + if (!error.toString().equals("")) { + throw new CompletionException(new ProcessingException(error.toString())); + } + return result; + }); } public AbstractExtensibleRepresentation createMEAO(ProcessingContext context) @@ -405,27 +405,30 @@ public AbstractExtensibleRepresentation createMEAO(ProcessingContext context) payload, EventRepresentation.class); rt = eventApi.create(eventRepresentation); - if(serviceConfiguration.logPayload ) + if (serviceConfiguration.logPayload) log.info("Tenant {} - New event posted: {}", tenant, rt); else - log.info("Tenant {} - New event posted with Id {}", tenant, ((EventRepresentation)rt).getId().getValue()); + log.info("Tenant {} - New event posted with Id {}", tenant, + ((EventRepresentation) rt).getId().getValue()); } else if (targetAPI.equals(API.ALARM)) { AlarmRepresentation alarmRepresentation = configurationRegistry.getObjectMapper().readValue( payload, AlarmRepresentation.class); rt = alarmApi.create(alarmRepresentation); - if(serviceConfiguration.logPayload ) + if (serviceConfiguration.logPayload) log.info("Tenant {} - New alarm posted: {}", tenant, rt); else - log.info("Tenant {} - New alarm posted with Id {}", tenant, ((AlarmRepresentation)rt).getId().getValue()); + log.info("Tenant {} - New alarm posted with Id {}", tenant, + ((AlarmRepresentation) rt).getId().getValue()); } else if (targetAPI.equals(API.MEASUREMENT)) { MeasurementRepresentation measurementRepresentation = jsonParser .parse(MeasurementRepresentation.class, payload); rt = measurementApi.create(measurementRepresentation); - if(serviceConfiguration.logPayload ) + if (serviceConfiguration.logPayload) log.info("Tenant {} - New measurement posted: {}", tenant, rt); else - log.info("Tenant {} - New measurement posted with Id {}", tenant, ((MeasurementRepresentation) rt).getId().getValue()); + log.info("Tenant {} - New measurement posted with Id {}", tenant, + ((MeasurementRepresentation) rt).getId().getValue()); } else if (targetAPI.equals(API.OPERATION)) { OperationRepresentation operationRepresentation = jsonParser .parse(OperationRepresentation.class, payload); @@ -450,7 +453,8 @@ public AbstractExtensibleRepresentation createMEAO(ProcessingContext context) return result; } - public ManagedObjectRepresentation upsertDevice(String tenant, ID identity, ProcessingContext context, ExternalIDRepresentation extId) + public ManagedObjectRepresentation upsertDevice(String tenant, ID identity, ProcessingContext context, + ExternalIDRepresentation extId) throws ProcessingException { StringBuffer error = new StringBuffer(""); C8YRequest currentRequest = context.getCurrentRequest(); @@ -462,7 +466,8 @@ public ManagedObjectRepresentation upsertDevice(String tenant, ID identity, Proc currentRequest.getRequest(), ManagedObjectRepresentation.class); try { - //ExternalIDRepresentation extId = resolveExternalId2GlobalId(tenant, identity, context); + // ExternalIDRepresentation extId = resolveExternalId2GlobalId(tenant, identity, + // context); if (extId == null) { // Device does not exist // append external id to name @@ -482,8 +487,8 @@ public ManagedObjectRepresentation upsertDevice(String tenant, ID identity, Proc mor.setId(null); mor = inventoryApi.create(mor, context); - //TODO Add/Update new managed object to IdentityCache - if(serviceConfiguration.logPayload) + // TODO Add/Update new managed object to IdentityCache + if (serviceConfiguration.logPayload) log.info("Tenant {} - New device created: {}", tenant, mor); else log.info("Tenant {} - New device created with Id {}", tenant, mor.getId().getValue()); @@ -492,7 +497,7 @@ public ManagedObjectRepresentation upsertDevice(String tenant, ID identity, Proc // Device exists - update needed mor.setId(extId.getManagedObject().getId()); mor = inventoryApi.update(mor, context); - if(serviceConfiguration.logPayload) + if (serviceConfiguration.logPayload) log.info("Tenant {} - Device updated: {}", tenant, mor); else log.info("Tenant {} - Device {} updated.", tenant, mor.getId().getValue()); @@ -521,10 +526,12 @@ public void loadProcessorExtensions(String tenant) { boolean external = (Boolean) props.get("external"); log.debug("Tenant {} - Trying to load extension id: {}, name: {}", tenant, extension.getId().getValue(), extName); + InputStream downloadInputStream = null; + FileOutputStream outputStream = null; try { if (external) { // step 1 download extension for binary repository - InputStream downloadInputStream = binaryApi.downloadFile(extension.getId()); + downloadInputStream = binaryApi.downloadFile(extension.getId()); // step 2 create temporary file,because classloader needs a url resource File tempFile = File.createTempFile(extName, "jar"); @@ -535,7 +542,7 @@ public void loadProcessorExtensions(String tenant) { log.debug("Tenant {} - CanonicalPath: {}, Path: {}, PathWithProtocol: {}", tenant, canonicalPath, path, pathWithProtocol); - FileOutputStream outputStream = new FileOutputStream(tempFile); + outputStream = new FileOutputStream(tempFile); IOUtils.copy(downloadInputStream, outputStream); // step 3 parse list of extensions @@ -548,9 +555,27 @@ public void loadProcessorExtensions(String tenant) { external); } } catch (IOException e) { - log.error("Tenant {} - Exception occurred, When loading extension, starting without extensions: ", - tenant, - e); + log.error("Tenant {} - IO Exception occurred when loading extension: ", tenant, e); + } catch (SecurityException e) { + log.error("Tenant {} - Security Exception occurred when loading extension: ", tenant, e); + } catch (IllegalArgumentException e) { + log.error("Tenant {} - Invalid argument Exception occurred when loading extension: ", tenant, e); + } finally { + // Consider cleaning up resources here + if (downloadInputStream != null) { + try { + downloadInputStream.close(); + } catch (IOException e) { + log.warn("Tenant {} - Failed to close download stream", tenant, e); + } + } + if (outputStream != null) { + try { + outputStream.close(); + } catch (IOException e) { + log.warn("Tenant {} - Failed to close output stream", tenant, e); + } + } } } } @@ -789,7 +814,8 @@ public InboundExternalIdCache getInboundExternalIdCache(String tenant) { public void clearInboundExternalIdCache(String tenant, boolean recreate, int inboundExternalIdCacheSize) { InboundExternalIdCache inboundExternalIdCache = inboundExternalIdCaches.get(tenant); if (inboundExternalIdCache != null) { - //FIXME Recreating the cache creates a new instance of InboundExternalIdCache which causes issues with Metering + // FIXME Recreating the cache creates a new instance of InboundExternalIdCache + // which causes issues with Metering if (recreate) { inboundExternalIdCaches.put(tenant, new InboundExternalIdCache(inboundExternalIdCacheSize, tenant)); } else {