Skip to content

Commit

Permalink
find deciveIdentifier in protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
ck-c8y committed Dec 25, 2024
1 parent 63fc200 commit e51c6a1
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 55 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@ JSONata4Java/dependency-reduced-pom.xml
dynamic-mapping-ui/node_modules_tmp/
.vscode/launch.json
.env
setEnv.sh
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ public void unsubscribe(String topic) throws Exception {
sendSubscriptionEvents(topic, "Unsubscribing");
Mqtt3AsyncClient asyncMqttClient = mqttClient.toAsync();
asyncMqttClient.unsubscribe(Mqtt3Unsubscribe.builder().topicFilter(topic).build()).thenRun(() -> {
log.info("Tenant {} - Successfully unsubscribed on topic: {} for connector {}", tenant, topic,
log.info("Tenant {} - Successfully unsubscribed from topic: {} for connector {}", tenant, topic,
connectorName);
}).exceptionally(throwable -> {
log.error("Tenant {} - Failed to subscribe on topic {} with error: ", tenant, topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@
import dynamic.mapping.model.MappingRepresentation;
import dynamic.mapping.processor.ProcessingException;
import dynamic.mapping.processor.model.C8YRequest;
import dynamic.mapping.processor.model.MappingType;
import dynamic.mapping.processor.model.ProcessingContext;
import dynamic.mapping.processor.model.RepairStrategy;

import org.checkerframework.checker.units.qual.m;
import org.springframework.web.bind.annotation.RequestMethod;

import java.io.IOException;
Expand Down Expand Up @@ -93,7 +96,12 @@ public void substituteInTargetAndSend(ProcessingContext<T> context) {
.get().getKey();
int countMaxEntries = postProcessingCache.get(entryWithMaxSubstitutes).size();

List<String> pathsTargetForDeviceIdentifiers = getPathTargetForDeviceIdentifiers(mapping);
List<String> pathsTargetForDeviceIdentifiers;
if (mapping.extension == null || MappingType.PROTOBUF_STATIC.equals(mapping.getMappingType())) {
pathsTargetForDeviceIdentifiers = new ArrayList<>(Arrays.asList(mapping.getGenericDeviceIdentifier()));
} else {
pathsTargetForDeviceIdentifiers = getPathTargetForDeviceIdentifiers(mapping);
}
String firstPathTargetForDeviceIdentifiers = pathsTargetForDeviceIdentifiers.size() > 0
? pathsTargetForDeviceIdentifiers.get(0)
: null;
Expand Down Expand Up @@ -174,7 +182,8 @@ private ProcessingContext<T> getBuildProcessingContext(ProcessingContext<T> cont
}

if (!mapping.targetAPI.equals(API.INVENTORY)) {
// this block resolves the externalId (if used) to the Cumulocity sourceId in substitute.value
// this block resolves the externalId (if used) to the Cumulocity sourceId in
// substitute.value
if (pathsTargetForDeviceIdentifiers.contains(pathTarget) && mapping.useExternalId) {
ExternalIDRepresentation sourceId = c8yAgent.resolveExternalId2GlobalId(tenant,
new ID(mapping.externalIdType, substitute.value.toString()), context);
Expand Down Expand Up @@ -213,7 +222,7 @@ private ProcessingContext<T> getBuildProcessingContext(ProcessingContext<T> cont
substitute.value = null;
} else {
substitute.value = sourceId.getManagedObject().getId().getValue();
}
}
}
substituteValueInPayload(mapping.mappingType, substitute, payloadTarget,
mapping.transformGenericPath2C8YPath(pathTarget));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@

package dynamic.mapping.processor.inbound;

import static java.util.Map.entry;

import dynamic.mapping.connector.core.callback.ConnectorMessage;
import dynamic.mapping.core.ConfigurationRegistry;
import dynamic.mapping.model.Mapping;
import dynamic.mapping.processor.model.PayloadWrapper;
import dynamic.mapping.processor.model.ProcessingContext;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;

//@Service
public class FlatFileProcessorInbound extends JSONProcessorInbound {
Expand All @@ -43,7 +46,9 @@ public ProcessingContext<Object> deserializePayload(Mapping mapping, ConnectorMe
String payloadMessage = (message.getPayload() != null
? new String(message.getPayload(), Charset.defaultCharset())
: "");
Object payloadObjectNode = objectMapper.valueToTree(new PayloadWrapper(payloadMessage));
// Object payloadObjectNode = objectMapper.valueToTree(new PayloadWrapper(payloadMessage));
Object payloadObjectNode = new HashMap<>(Map.ofEntries(
entry("message", payloadMessage)));
ProcessingContext<Object> context = new ProcessingContext<Object>();
context.setPayload(payloadObjectNode);
return context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@

package dynamic.mapping.processor.inbound;

import static java.util.Map.entry;

import dynamic.mapping.connector.core.callback.ConnectorMessage;
import dynamic.mapping.core.ConfigurationRegistry;
import dynamic.mapping.model.Mapping;
import dynamic.mapping.processor.model.PayloadWrapper;
import dynamic.mapping.processor.model.ProcessingContext;
import org.apache.commons.codec.binary.Hex;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

//@Service
public class GenericBinaryProcessorInbound extends JSONProcessorInbound {
Expand All @@ -40,8 +43,10 @@ public GenericBinaryProcessorInbound(ConfigurationRegistry configurationRegistry
@Override
public ProcessingContext<Object> deserializePayload(Mapping mapping, ConnectorMessage message)
throws IOException {
Object payloadObjectNode = objectMapper
.valueToTree(new PayloadWrapper("0x" + Hex.encodeHexString(message.getPayload())));
// Object payloadObjectNode = objectMapper
// .valueToTree(new PayloadWrapper("0x" +
// Hex.encodeHexString(message.getPayload())));
Object payloadObjectNode = new HashMap<>(Map.ofEntries(entry("message", "0x" + Hex.encodeHexString(message.getPayload()))));
ProcessingContext<Object> context = new ProcessingContext<Object>();
context.setPayload(payloadObjectNode);
return context;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class ProcessingContext<O> {

private String sourceId;

private String targetPathIdentifier;

public static final String SOURCE_ID = "source.id";

public boolean hasError() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void extractFromSource(ProcessingContext<byte[]> context)
new MappingSubstitution.SubstituteValue(payloadProtobuf.getUnit(),
MappingSubstitution.SubstituteValue.TYPE.TEXTUAL,
RepairStrategy.DEFAULT))));
postProcessingCache.put(context.getMapping().targetAPI.identifier,
postProcessingCache.put(context.getMapping().getGenericDeviceIdentifier(),
new ArrayList<MappingSubstitution.SubstituteValue>(Arrays.asList(
new MappingSubstitution.SubstituteValue(
payloadProtobuf.getExternalId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,31 @@ public ProtobufMqttClient(Mqtt3BlockingClient sampleClient) {

public static void main(String[] args) {

Mqtt3SimpleAuth simpleAuth = Mqtt3SimpleAuth.builder().username(broker_username)
.password(broker_password.getBytes()).build();
Mqtt3BlockingClient sampleClient = Mqtt3Client.builder()
.serverHost(broker_host)
.serverPort(broker_port)
.identifier(client_id)
.simpleAuth(simpleAuth)
.sslWithDefaultConfig()
.buildBlocking();
ProtobufMqttClient client = new ProtobufMqttClient(sampleClient);
client.testSendMeasurement();
client.testSendAlarm();
if (broker_username == null || broker_username.isEmpty() ||
broker_password == null || broker_password.isEmpty()) {
Mqtt3BlockingClient sampleClient = Mqtt3Client.builder()
.serverHost(broker_host)
.serverPort(broker_port)
.identifier(client_id)
.sslWithDefaultConfig()
.buildBlocking();
ProtobufMqttClient client = new ProtobufMqttClient(sampleClient);
client.testSendMeasurement();
client.testSendAlarm();
} else {
Mqtt3SimpleAuth simpleAuth = Mqtt3SimpleAuth.builder().username(broker_username)
.password(broker_password.getBytes()).build();
Mqtt3BlockingClient sampleClient = Mqtt3Client.builder()
.serverHost(broker_host)
.serverPort(broker_port)
.identifier(client_id)
.simpleAuth(simpleAuth)
.sslWithDefaultConfig()
.buildBlocking();
ProtobufMqttClient client = new ProtobufMqttClient(sampleClient);
client.testSendMeasurement();
client.testSendAlarm();
}

}

Expand Down

0 comments on commit e51c6a1

Please sign in to comment.