Skip to content

Commit

Permalink
handling error when jextension jar could not be loaded
Browse files Browse the repository at this point in the history
  • Loading branch information
ck-c8y committed Nov 28, 2024
1 parent 816b4d3 commit e8a06fe
Showing 1 changed file with 97 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import static java.util.Map.entry;

Expand Down Expand Up @@ -178,17 +176,18 @@ public ExternalIDRepresentation resolveExternalId2GlobalId(String tenant, ID ide
try {
ExternalIDRepresentation resultInner = this.getInboundExternalIdCache(tenant)
.getIdByExternalId(identity);
Counter.builder("dynmapper_inbound_identity_requests_total").tag("tenant", tenant).register(Metrics.globalRegistry).increment();
Counter.builder("dynmapper_inbound_identity_requests_total").tag("tenant", tenant)
.register(Metrics.globalRegistry).increment();
if (resultInner == null) {
resultInner = identityApi.resolveExternalId2GlobalId(identity, context);
this.getInboundExternalIdCache(tenant).putIdForExternalId(identity,
resultInner);


} else {
log.debug("Tenant {} - Cache hit for external ID {} -> {}", tenant, identity.getValue(),
resultInner.getManagedObject().getId().getValue());
Counter.builder("dynmapper_inbound_identity_cache_hits_total").tag("tenant", tenant).register(Metrics.globalRegistry).increment();
Counter.builder("dynmapper_inbound_identity_cache_hits_total").tag("tenant", tenant)
.register(Metrics.globalRegistry).increment();
}
return resultInner;
} catch (SDKException e) {
Expand All @@ -201,7 +200,7 @@ public ExternalIDRepresentation resolveExternalId2GlobalId(String tenant, ID ide

public ExternalIDRepresentation resolveGlobalId2ExternalId(String tenant, GId gid, String idType,
ProcessingContext<?> context) {
//TODO Use Cache
// TODO Use Cache
if (idType == null) {
idType = "c8y_Serial";
}
Expand Down Expand Up @@ -331,60 +330,61 @@ public AConnectorClient.Certificate loadCertificateByName(String certificateName
return null;
}
}
//TODO Change this to use ExecutorService + Virtual Threads when available

// TODO Change this to use ExecutorService + Virtual Threads when available
public CompletableFuture<AbstractExtensibleRepresentation> createMEAOAsync(ProcessingContext<?> context)
throws ProcessingException {
return CompletableFuture.supplyAsync(() -> {
String tenant = context.getTenant();
StringBuffer error = new StringBuffer("");
C8YRequest currentRequest = context.getCurrentRequest();
String payload = currentRequest.getRequest();
API targetAPI = context.getMapping().getTargetAPI();
AbstractExtensibleRepresentation result = subscriptionsService.callForTenant(tenant, () -> {
MicroserviceCredentials contextCredentials = removeAppKeyHeaderFromContext(contextService.getContext());
return contextService.callWithinContext(contextCredentials, () -> {
AbstractExtensibleRepresentation rt = null;
try {
if (targetAPI.equals(API.EVENT)) {
EventRepresentation eventRepresentation = configurationRegistry.getObjectMapper().readValue(
payload,
EventRepresentation.class);
rt = eventApi.create(eventRepresentation);
log.info("Tenant {} - New event posted: {}", tenant, rt);
} else if (targetAPI.equals(API.ALARM)) {
AlarmRepresentation alarmRepresentation = configurationRegistry.getObjectMapper().readValue(
payload,
AlarmRepresentation.class);
rt = alarmApi.create(alarmRepresentation);
log.info("Tenant {} - New alarm posted: {}", tenant, rt);
} else if (targetAPI.equals(API.MEASUREMENT)) {
MeasurementRepresentation measurementRepresentation = jsonParser
.parse(MeasurementRepresentation.class, payload);
rt = measurementApi.create(measurementRepresentation);
log.info("Tenant {} - New measurement posted: {}", tenant, rt);
} else if (targetAPI.equals(API.OPERATION)) {
OperationRepresentation operationRepresentation = jsonParser
.parse(OperationRepresentation.class, payload);
rt = deviceControlApi.create(operationRepresentation);
log.info("Tenant {} - New operation posted: {}", tenant, rt);
} else {
log.error("Tenant {} - Not existing API!", tenant);
}
} catch (JsonProcessingException e) {
log.error("Tenant {} - Could not map payload: {} {}", tenant, targetAPI, payload);
error.append("Could not map payload: " + targetAPI + "/" + payload);
} catch (SDKException s) {
log.error("Tenant {} - Could not sent payload to c8y: {} {}: ", tenant, targetAPI, payload, s);
error.append("Could not sent payload to c8y: " + targetAPI + "/" + payload + "/" + s);
String tenant = context.getTenant();
StringBuffer error = new StringBuffer("");
C8YRequest currentRequest = context.getCurrentRequest();
String payload = currentRequest.getRequest();
API targetAPI = context.getMapping().getTargetAPI();
AbstractExtensibleRepresentation result = subscriptionsService.callForTenant(tenant, () -> {
MicroserviceCredentials contextCredentials = removeAppKeyHeaderFromContext(contextService.getContext());
return contextService.callWithinContext(contextCredentials, () -> {
AbstractExtensibleRepresentation rt = null;
try {
if (targetAPI.equals(API.EVENT)) {
EventRepresentation eventRepresentation = configurationRegistry.getObjectMapper().readValue(
payload,
EventRepresentation.class);
rt = eventApi.create(eventRepresentation);
log.info("Tenant {} - New event posted: {}", tenant, rt);
} else if (targetAPI.equals(API.ALARM)) {
AlarmRepresentation alarmRepresentation = configurationRegistry.getObjectMapper().readValue(
payload,
AlarmRepresentation.class);
rt = alarmApi.create(alarmRepresentation);
log.info("Tenant {} - New alarm posted: {}", tenant, rt);
} else if (targetAPI.equals(API.MEASUREMENT)) {
MeasurementRepresentation measurementRepresentation = jsonParser
.parse(MeasurementRepresentation.class, payload);
rt = measurementApi.create(measurementRepresentation);
log.info("Tenant {} - New measurement posted: {}", tenant, rt);
} else if (targetAPI.equals(API.OPERATION)) {
OperationRepresentation operationRepresentation = jsonParser
.parse(OperationRepresentation.class, payload);
rt = deviceControlApi.create(operationRepresentation);
log.info("Tenant {} - New operation posted: {}", tenant, rt);
} else {
log.error("Tenant {} - Not existing API!", tenant);
}
return rt;
});
} catch (JsonProcessingException e) {
log.error("Tenant {} - Could not map payload: {} {}", tenant, targetAPI, payload);
error.append("Could not map payload: " + targetAPI + "/" + payload);
} catch (SDKException s) {
log.error("Tenant {} - Could not sent payload to c8y: {} {}: ", tenant, targetAPI, payload, s);
error.append("Could not sent payload to c8y: " + targetAPI + "/" + payload + "/" + s);
}
return rt;
});
if (!error.toString().equals("")) {
throw new CompletionException(new ProcessingException(error.toString()));
}
return result;
});
if (!error.toString().equals("")) {
throw new CompletionException(new ProcessingException(error.toString()));
}
return result;
});
}

public AbstractExtensibleRepresentation createMEAO(ProcessingContext<?> context)
Expand All @@ -405,27 +405,30 @@ public AbstractExtensibleRepresentation createMEAO(ProcessingContext<?> context)
payload,
EventRepresentation.class);
rt = eventApi.create(eventRepresentation);
if(serviceConfiguration.logPayload )
if (serviceConfiguration.logPayload)
log.info("Tenant {} - New event posted: {}", tenant, rt);
else
log.info("Tenant {} - New event posted with Id {}", tenant, ((EventRepresentation)rt).getId().getValue());
log.info("Tenant {} - New event posted with Id {}", tenant,
((EventRepresentation) rt).getId().getValue());
} else if (targetAPI.equals(API.ALARM)) {
AlarmRepresentation alarmRepresentation = configurationRegistry.getObjectMapper().readValue(
payload,
AlarmRepresentation.class);
rt = alarmApi.create(alarmRepresentation);
if(serviceConfiguration.logPayload )
if (serviceConfiguration.logPayload)
log.info("Tenant {} - New alarm posted: {}", tenant, rt);
else
log.info("Tenant {} - New alarm posted with Id {}", tenant, ((AlarmRepresentation)rt).getId().getValue());
log.info("Tenant {} - New alarm posted with Id {}", tenant,
((AlarmRepresentation) rt).getId().getValue());
} else if (targetAPI.equals(API.MEASUREMENT)) {
MeasurementRepresentation measurementRepresentation = jsonParser
.parse(MeasurementRepresentation.class, payload);
rt = measurementApi.create(measurementRepresentation);
if(serviceConfiguration.logPayload )
if (serviceConfiguration.logPayload)
log.info("Tenant {} - New measurement posted: {}", tenant, rt);
else
log.info("Tenant {} - New measurement posted with Id {}", tenant, ((MeasurementRepresentation) rt).getId().getValue());
log.info("Tenant {} - New measurement posted with Id {}", tenant,
((MeasurementRepresentation) rt).getId().getValue());
} else if (targetAPI.equals(API.OPERATION)) {
OperationRepresentation operationRepresentation = jsonParser
.parse(OperationRepresentation.class, payload);
Expand All @@ -450,7 +453,8 @@ public AbstractExtensibleRepresentation createMEAO(ProcessingContext<?> context)
return result;
}

public ManagedObjectRepresentation upsertDevice(String tenant, ID identity, ProcessingContext<?> context, ExternalIDRepresentation extId)
public ManagedObjectRepresentation upsertDevice(String tenant, ID identity, ProcessingContext<?> context,
ExternalIDRepresentation extId)
throws ProcessingException {
StringBuffer error = new StringBuffer("");
C8YRequest currentRequest = context.getCurrentRequest();
Expand All @@ -462,7 +466,8 @@ public ManagedObjectRepresentation upsertDevice(String tenant, ID identity, Proc
currentRequest.getRequest(),
ManagedObjectRepresentation.class);
try {
//ExternalIDRepresentation extId = resolveExternalId2GlobalId(tenant, identity, context);
// ExternalIDRepresentation extId = resolveExternalId2GlobalId(tenant, identity,
// context);
if (extId == null) {
// Device does not exist
// append external id to name
Expand All @@ -482,8 +487,8 @@ public ManagedObjectRepresentation upsertDevice(String tenant, ID identity, Proc
mor.setId(null);

mor = inventoryApi.create(mor, context);
//TODO Add/Update new managed object to IdentityCache
if(serviceConfiguration.logPayload)
// TODO Add/Update new managed object to IdentityCache
if (serviceConfiguration.logPayload)
log.info("Tenant {} - New device created: {}", tenant, mor);
else
log.info("Tenant {} - New device created with Id {}", tenant, mor.getId().getValue());
Expand All @@ -492,7 +497,7 @@ public ManagedObjectRepresentation upsertDevice(String tenant, ID identity, Proc
// Device exists - update needed
mor.setId(extId.getManagedObject().getId());
mor = inventoryApi.update(mor, context);
if(serviceConfiguration.logPayload)
if (serviceConfiguration.logPayload)
log.info("Tenant {} - Device updated: {}", tenant, mor);
else
log.info("Tenant {} - Device {} updated.", tenant, mor.getId().getValue());
Expand Down Expand Up @@ -521,10 +526,12 @@ public void loadProcessorExtensions(String tenant) {
boolean external = (Boolean) props.get("external");
log.debug("Tenant {} - Trying to load extension id: {}, name: {}", tenant, extension.getId().getValue(),
extName);
InputStream downloadInputStream = null;
FileOutputStream outputStream = null;
try {
if (external) {
// step 1 download extension for binary repository
InputStream downloadInputStream = binaryApi.downloadFile(extension.getId());
downloadInputStream = binaryApi.downloadFile(extension.getId());

// step 2 create temporary file,because classloader needs a url resource
File tempFile = File.createTempFile(extName, "jar");
Expand All @@ -535,7 +542,7 @@ public void loadProcessorExtensions(String tenant) {
log.debug("Tenant {} - CanonicalPath: {}, Path: {}, PathWithProtocol: {}", tenant, canonicalPath,
path,
pathWithProtocol);
FileOutputStream outputStream = new FileOutputStream(tempFile);
outputStream = new FileOutputStream(tempFile);
IOUtils.copy(downloadInputStream, outputStream);

// step 3 parse list of extensions
Expand All @@ -548,9 +555,27 @@ public void loadProcessorExtensions(String tenant) {
external);
}
} catch (IOException e) {
log.error("Tenant {} - Exception occurred, When loading extension, starting without extensions: ",
tenant,
e);
log.error("Tenant {} - IO Exception occurred when loading extension: ", tenant, e);
} catch (SecurityException e) {
log.error("Tenant {} - Security Exception occurred when loading extension: ", tenant, e);
} catch (IllegalArgumentException e) {
log.error("Tenant {} - Invalid argument Exception occurred when loading extension: ", tenant, e);
} finally {
// Consider cleaning up resources here
if (downloadInputStream != null) {
try {
downloadInputStream.close();
} catch (IOException e) {
log.warn("Tenant {} - Failed to close download stream", tenant, e);
}
}
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
log.warn("Tenant {} - Failed to close output stream", tenant, e);
}
}
}
}
}
Expand Down Expand Up @@ -789,7 +814,8 @@ public InboundExternalIdCache getInboundExternalIdCache(String tenant) {
public void clearInboundExternalIdCache(String tenant, boolean recreate, int inboundExternalIdCacheSize) {
InboundExternalIdCache inboundExternalIdCache = inboundExternalIdCaches.get(tenant);
if (inboundExternalIdCache != null) {
//FIXME Recreating the cache creates a new instance of InboundExternalIdCache which causes issues with Metering
// FIXME Recreating the cache creates a new instance of InboundExternalIdCache
// which causes issues with Metering
if (recreate) {
inboundExternalIdCaches.put(tenant, new InboundExternalIdCache(inboundExternalIdCacheSize, tenant));
} else {
Expand Down

0 comments on commit e8a06fe

Please sign in to comment.