Skip to content

Commit

Permalink
fixing bugs in extensions
Browse files Browse the repository at this point in the history
  • Loading branch information
ck-c8y committed Dec 26, 2024
1 parent 7a09b5f commit 738bf38
Show file tree
Hide file tree
Showing 20 changed files with 277 additions and 396 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,9 @@

package dynamic.mapping.processor.extension.external;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import jakarta.ws.rs.ProcessingException;

import dynamic.mapping.model.MappingSubstitution;
import dynamic.mapping.model.MappingSubstitution.SubstituteValue.TYPE;
import dynamic.mapping.processor.extension.ProcessorExtensionSource;
import dynamic.mapping.processor.model.ProcessingContext;
import dynamic.mapping.processor.model.RepairStrategy;
Expand Down Expand Up @@ -59,39 +54,22 @@ public void extractFromSource(ProcessingContext<byte[]> context)
} catch (InvalidProtocolBufferException e) {
throw new ProcessingException(e.getMessage());
}
Map<String, List<MappingSubstitution.SubstituteValue>> postProcessingCache = context
.getPostProcessingCache();

postProcessingCache.put("time",
new ArrayList<MappingSubstitution.SubstituteValue>(
Arrays.asList(new MappingSubstitution.SubstituteValue(
new DateTime(
payloadProtobuf.getTimestamp())
.toString(),
MappingSubstitution.SubstituteValue.TYPE.TEXTUAL,
RepairStrategy.DEFAULT))));
postProcessingCache.put("text",
new ArrayList<MappingSubstitution.SubstituteValue>(Arrays.asList(
new MappingSubstitution.SubstituteValue(
payloadProtobuf.getTxt(),
MappingSubstitution.SubstituteValue.TYPE.TEXTUAL,
RepairStrategy.DEFAULT))));
postProcessingCache.put("type",
new ArrayList<MappingSubstitution.SubstituteValue>(
Arrays.asList(
new MappingSubstitution.SubstituteValue(
payloadProtobuf
.getEventType(),
MappingSubstitution.SubstituteValue.TYPE.TEXTUAL,
RepairStrategy.DEFAULT))));

// as the mapping uses useExternalId we have to map the id to _IDENTITY_.externalId
postProcessingCache.put(context.getMapping().getGenericDeviceIdentifier(),
new ArrayList<MappingSubstitution.SubstituteValue>(Arrays.asList(
new MappingSubstitution.SubstituteValue(
payloadProtobuf.getExternalId(),
MappingSubstitution.SubstituteValue.TYPE.TEXTUAL,
RepairStrategy.DEFAULT))));
context.addToProcessingCache("time", new DateTime(
payloadProtobuf.getTimestamp())
.toString(), TYPE.TEXTUAL, RepairStrategy.DEFAULT);
context.addToProcessingCache("text",
payloadProtobuf.getTxt(), TYPE.TEXTUAL, RepairStrategy.DEFAULT);
context.addToProcessingCache("type",
payloadProtobuf.getEventType(), TYPE.TEXTUAL, RepairStrategy.DEFAULT);

// as the mapping uses useExternalId we have to map the id to
// _IDENTITY_.externalId
context.addToProcessingCache(context.getMapping().getGenericDeviceIdentifier(),
payloadProtobuf.getExternalId()
.toString(),
TYPE.TEXTUAL, RepairStrategy.DEFAULT);

log.info("Tenant {} - New event over protobuf: {}, {}, {}, {}", context.getTenant(),
payloadProtobuf.getTimestamp(),
payloadProtobuf.getTxt(), payloadProtobuf.getEventType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,14 @@

import com.dashjoin.jsonata.json.Json;

import dynamic.mapping.model.MappingSubstitution;
import dynamic.mapping.model.MappingSubstitution.SubstituteValue.TYPE;
import dynamic.mapping.processor.extension.ProcessorExtensionSource;
import dynamic.mapping.processor.model.ProcessingContext;
import dynamic.mapping.processor.model.RepairStrategy;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;

import jakarta.ws.rs.ProcessingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

@Slf4j
Expand All @@ -45,65 +42,47 @@ public ProcessorExtensionCustomMeasurement() {
@Override
public void extractFromSource(ProcessingContext<byte[]> context)
throws ProcessingException {
Map jsonObject;
try {
jsonObject = (Map) Json.parseJson(new String(context.getPayload(), "UTF-8"));
} catch (Exception e) {
throw new ProcessingException(e.getMessage());
}
Map<String, List<MappingSubstitution.SubstituteValue>> postProcessingCache = context
.getPostProcessingCache();

postProcessingCache.put("time",
new ArrayList<MappingSubstitution.SubstituteValue>(
Arrays.asList(new MappingSubstitution.SubstituteValue(
new DateTime(
jsonObject.get("time"))
.toString(),
MappingSubstitution.SubstituteValue.TYPE.TEXTUAL,
RepairStrategy.DEFAULT))));
Map jsonObject = (Map) Json.parseJson(new String(context.getPayload(), "UTF-8"));

Map fragmentTemperatureSeries = Map.of("value", jsonObject.get("temperature"), "unit", jsonObject.get("unit"));
Map fragmentTemperature = Map.of("T", fragmentTemperatureSeries);
context.addToProcessingCache("time", new DateTime(
jsonObject.get("time"))
.toString(), TYPE.TEXTUAL, RepairStrategy.DEFAULT);

postProcessingCache.put("c8y_Fragment_to_remove",
new ArrayList<MappingSubstitution.SubstituteValue>(Arrays.asList(
new MappingSubstitution.SubstituteValue(null,
MappingSubstitution.SubstituteValue.TYPE.TEXTUAL,
RepairStrategy.REMOVE_IF_MISSING_OR_NULL))));
Map fragmentTemperatureSeries = Map.of("value", jsonObject.get("temperature"), "unit",
jsonObject.get("unit"));
Map fragmentTemperature = Map.of("T", fragmentTemperatureSeries);

postProcessingCache.put("c8y_Temperature",
new ArrayList<MappingSubstitution.SubstituteValue>(Arrays.asList(
new MappingSubstitution.SubstituteValue(fragmentTemperature,
MappingSubstitution.SubstituteValue.TYPE.OBJECT,
RepairStrategy.DEFAULT))));
// as the mapping uses useExternalId we have to map the id to _IDENTITY_.externalId
postProcessingCache.put(context.getMapping().getGenericDeviceIdentifier(),
new ArrayList<MappingSubstitution.SubstituteValue>(Arrays.asList(
new MappingSubstitution.SubstituteValue(
jsonObject.get("externalId"),
MappingSubstitution.SubstituteValue.TYPE.TEXTUAL,
RepairStrategy.DEFAULT))));
context.addToProcessingCache("c8y_Fragment_to_remove", null, TYPE.TEXTUAL,
RepairStrategy.REMOVE_IF_MISSING_OR_NULL);
context.addToProcessingCache("c8y_Temperature",
fragmentTemperature, TYPE.OBJECT, RepairStrategy.DEFAULT);
context.addToProcessingCache("c8y_Temperature",
fragmentTemperature, TYPE.OBJECT, RepairStrategy.DEFAULT);
// as the mapping uses useExternalId we have to map the id to
// _IDENTITY_.externalId
context.addToProcessingCache(context.getMapping().getGenericDeviceIdentifier(),
jsonObject.get("externalId")
.toString(),
TYPE.TEXTUAL, RepairStrategy.DEFAULT);

Number unexpected = Float.NaN;
if (jsonObject.get("unexpected") != null) {
// it is important to use RepairStrategy.CREATE_IF_MISSING as the node
// "unexpected" does not yet exists in the target payload
Map fragmentUnexpectedSeries = Map.of("value", jsonObject.get("unexpected"),"unit", "unknown_unit");
Map fragmentUnexpected = Map.of("U", fragmentUnexpectedSeries);
postProcessingCache.put("c8y_Unexpected",
new ArrayList<MappingSubstitution.SubstituteValue>(
Arrays.asList(new MappingSubstitution.SubstituteValue(
fragmentUnexpected,
MappingSubstitution.SubstituteValue.TYPE.OBJECT,
RepairStrategy.CREATE_IF_MISSING))));
unexpected = (Number)jsonObject.get("unexpected");
Number unexpected = Float.NaN;
if (jsonObject.get("unexpected") != null) {
// it is important to use RepairStrategy.CREATE_IF_MISSING as the node
// "unexpected" does not yet exists in the target payload
Map fragmentUnexpectedSeries = Map.of("value", jsonObject.get("unexpected"), "unit", "unknown_unit");
Map fragmentUnexpected = Map.of("U", fragmentUnexpectedSeries);
context.addToProcessingCache("c8y_Unexpected",
fragmentUnexpected, TYPE.OBJECT, RepairStrategy.CREATE_IF_MISSING);
unexpected = (Number) jsonObject.get("unexpected");
}

log.info("Tenant {} - New measurement over json processor: {}, {}, {}, {}", context.getTenant(),
jsonObject.get("time").toString(),
jsonObject.get("unit").toString(), jsonObject.get("temperature"),
unexpected);
} catch (Exception e) {
throw new ProcessingException(e.getMessage());
}

log.info("Tenant {} - New measurement over json processor: {}, {}, {}, {}", context.getTenant(),
jsonObject.get("time").toString(),
jsonObject.get("unit").toString(), jsonObject.get("temperature"),
unexpected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import dynamic.mapping.model.Mapping;
import dynamic.mapping.model.MappingSubstitution;
import dynamic.mapping.model.MappingSubstitution.SubstituteValue;
import dynamic.mapping.processor.model.ProcessingContext;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -58,7 +59,7 @@ void testDeserializeCustomEvent() {

extension.extractFromSource(context);

ArrayList<MappingSubstitution.SubstituteValue> extractedTypes = (ArrayList) context.getPostProcessingCache().get("type");
ArrayList<SubstituteValue> extractedTypes = (ArrayList) context.getProcessingCache().get("type");
assertEquals( extractedTypes.size(), 1);
MappingSubstitution.SubstituteValue extractedType = extractedTypes.get(0);
log.info("Extracted: {}, {} ", extractedType.value, extractedType.value.getClass().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import lombok.extern.slf4j.Slf4j;
import dynamic.mapping.processor.model.MappingType;
import dynamic.mapping.processor.model.RepairStrategy;
import dynamic.mapping.model.MappingSubstitution.SubstituteValue.TYPE;

import jakarta.validation.constraints.NotNull;
import java.io.Serializable;
Expand Down Expand Up @@ -182,28 +183,28 @@ public static void addNestedValue(DocumentContext jsonObject, String path, Objec
}

public static void processSubstitute(String tenant,
List<MappingSubstitution.SubstituteValue> postProcessingCacheEntry,
List<MappingSubstitution.SubstituteValue> processingCacheEntry,
Object extractedSourceContent, MappingSubstitution substitution, Mapping mapping) {
if (extractedSourceContent == null) {
log.warn("Tenant {} - Substitution {} not in message payload. Check your mapping {}", tenant,
substitution.pathSource, mapping.getMappingTopic());
postProcessingCacheEntry
processingCacheEntry
.add(new MappingSubstitution.SubstituteValue(extractedSourceContent,
MappingSubstitution.SubstituteValue.TYPE.IGNORE, substitution.repairStrategy));
} else if (isTextual(extractedSourceContent)) {
postProcessingCacheEntry.add(
processingCacheEntry.add(
new MappingSubstitution.SubstituteValue(extractedSourceContent,
MappingSubstitution.SubstituteValue.TYPE.TEXTUAL, substitution.repairStrategy));
TYPE.TEXTUAL, substitution.repairStrategy));
} else if (isNumber(extractedSourceContent)) {
postProcessingCacheEntry
processingCacheEntry
.add(new MappingSubstitution.SubstituteValue(extractedSourceContent,
MappingSubstitution.SubstituteValue.TYPE.NUMBER, substitution.repairStrategy));
} else if (isArray(extractedSourceContent)) {
postProcessingCacheEntry
processingCacheEntry
.add(new MappingSubstitution.SubstituteValue(extractedSourceContent,
MappingSubstitution.SubstituteValue.TYPE.ARRAY, substitution.repairStrategy));
} else {
postProcessingCacheEntry
processingCacheEntry
.add(new MappingSubstitution.SubstituteValue(extractedSourceContent,
MappingSubstitution.SubstituteValue.TYPE.OBJECT, substitution.repairStrategy));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,11 @@

package dynamic.mapping.processor.extension.internal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.joda.time.DateTime;

import com.google.protobuf.InvalidProtocolBufferException;

import dynamic.mapping.model.MappingSubstitution;
import dynamic.mapping.model.MappingSubstitution.SubstituteValue.TYPE;
import dynamic.mapping.processor.extension.ProcessorExtensionSource;
import dynamic.mapping.processor.model.ProcessingContext;
import dynamic.mapping.processor.model.RepairStrategy;
Expand All @@ -46,43 +41,26 @@ public void extractFromSource(ProcessingContext<byte[]> context)
try {
payloadProtobuf = InternalCustomAlarmOuter.InternalCustomAlarm
.parseFrom(context.getPayload());
context.addToProcessingCache("time", new DateTime(
payloadProtobuf.getTimestamp())
.toString(), TYPE.TEXTUAL, RepairStrategy.DEFAULT);
context.addToProcessingCache("text",
payloadProtobuf.getTxt(), TYPE.TEXTUAL, RepairStrategy.DEFAULT);
context.addToProcessingCache("type",
payloadProtobuf.getAlarmType(), TYPE.TEXTUAL, RepairStrategy.DEFAULT);
// as the mapping uses useExternalId we have to map the id to
// _IDENTITY_.externalId
context.addToProcessingCache(context.getMapping().getGenericDeviceIdentifier(),
payloadProtobuf.getExternalId()
.toString(),
TYPE.TEXTUAL, RepairStrategy.DEFAULT);
log.info("Tenant {} - New alarm over protobuf: {}, {}, {}, {}, {}", context.getTenant(),
payloadProtobuf.getTimestamp(),
payloadProtobuf.getTxt(), payloadProtobuf.getAlarmType(),
payloadProtobuf.getExternalId(), payloadProtobuf.getSeverity());
} catch (InvalidProtocolBufferException e) {
throw new ProcessingException(e.getMessage());
}
Map<String, List<MappingSubstitution.SubstituteValue>> postProcessingCache = context
.getPostProcessingCache();

postProcessingCache.put("time",
new ArrayList<MappingSubstitution.SubstituteValue>(
Arrays.asList(new MappingSubstitution.SubstituteValue(
new DateTime(
payloadProtobuf.getTimestamp())
.toString(),
MappingSubstitution.SubstituteValue.TYPE.TEXTUAL,
RepairStrategy.DEFAULT))));
postProcessingCache.put("text",
new ArrayList<MappingSubstitution.SubstituteValue>(Arrays.asList(
new MappingSubstitution.SubstituteValue(
payloadProtobuf.getTxt(),
MappingSubstitution.SubstituteValue.TYPE.TEXTUAL,
RepairStrategy.DEFAULT))));
postProcessingCache.put("type",
new ArrayList<MappingSubstitution.SubstituteValue>(
Arrays.asList(
new MappingSubstitution.SubstituteValue(
payloadProtobuf
.getAlarmType(),
MappingSubstitution.SubstituteValue.TYPE.TEXTUAL,
RepairStrategy.DEFAULT))));
postProcessingCache.put(context.getMapping().getGenericDeviceIdentifier(),
new ArrayList<MappingSubstitution.SubstituteValue>(Arrays.asList(
new MappingSubstitution.SubstituteValue(
payloadProtobuf.getExternalId(),
MappingSubstitution.SubstituteValue.TYPE.TEXTUAL,
RepairStrategy.DEFAULT))));
log.info("Tenant {} - New alarm over protobuf: {}, {}, {}, {}, {}", context.getTenant(),
payloadProtobuf.getTimestamp(),
payloadProtobuf.getTxt(), payloadProtobuf.getAlarmType(),
payloadProtobuf.getExternalId(), payloadProtobuf.getSeverity());
}
}
Loading

0 comments on commit 738bf38

Please sign in to comment.