Skip to content

Commit

Permalink
renamed classes
Browse files Browse the repository at this point in the history
  • Loading branch information
ck-c8y committed Jan 3, 2025
1 parent 865f22e commit 9d6b618
Show file tree
Hide file tree
Showing 29 changed files with 152 additions and 152 deletions.
10 changes: 5 additions & 5 deletions USERGUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ Creation of the new mapping starts by pressing `Add Mapping`. On the next modal

1. `JSON`: if your payload is in JSON format
1. `FLAT_FILE`: if your payload is in a CSV format
1. `GENERIC_BINARY`: if your payload is in HEX format
1. `PROTOBUF_STATIC`: if your payload is a serialized protobuf message
1. `BINARY`: if your payload is in HEX format
1. `PROTOBUF_INTERNAL`: if your payload is a serialized protobuf message
1. `PROCESSOR_EXTENSION`: if you want to process the message yourself, by registering a processor extension

<p align="center">
Expand All @@ -134,14 +134,14 @@ The wizard to define a mapping consists of the steps:

- `JSON`
- `FLAT_FILE`
- `GENERIC_BINARY`
- `PROTOBUF_STATIC`
- `BINARY`
- `PROTOBUF_INTERNAL`
- `PROCESSOR_EXTENSION`

---

**NOTE:**
Payload for `FLAT_FILE` and `GENERIC_BINARY` are wrapped.
Payload for `FLAT_FILE` and `BINARY` are wrapped.
For example for a flat file messages:

```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
import dynamic.mapping.model.Mapping;
import dynamic.mapping.model.MappingServiceRepresentation;
import dynamic.mapping.model.QOS;
import dynamic.mapping.processor.inbound.AsynchronousDispatcherInbound;
import dynamic.mapping.processor.inbound.DispatcherInbound;
import dynamic.mapping.processor.model.ProcessingContext;
import lombok.AllArgsConstructor;
import lombok.Data;
Expand Down Expand Up @@ -135,7 +135,7 @@ public static class Certificate {

@Getter
@Setter
protected AsynchronousDispatcherInbound dispatcher;
protected DispatcherInbound dispatcher;

protected ObjectMapper objectMapper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import dynamic.mapping.core.ConnectorStatusEvent;
import dynamic.mapping.model.Mapping;
import dynamic.mapping.model.QOS;
import dynamic.mapping.processor.inbound.AsynchronousDispatcherInbound;
import dynamic.mapping.processor.inbound.DispatcherInbound;
import dynamic.mapping.processor.model.C8YRequest;
import dynamic.mapping.processor.model.ProcessingContext;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -130,7 +130,7 @@ private static String removeDateCommentLine(String pt) {

public KafkaClient(ConfigurationRegistry configurationRegistry,
ConnectorConfiguration connectorConfiguration,
AsynchronousDispatcherInbound dispatcher, String additionalSubscriptionIdTest, String tenant)
DispatcherInbound dispatcher, String additionalSubscriptionIdTest, String tenant)
throws FileNotFoundException, IOException {
this();
this.configurationRegistry = configurationRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import dynamic.mapping.connector.core.client.ConnectorType;
import dynamic.mapping.model.Mapping;
import dynamic.mapping.model.QOS;
import dynamic.mapping.processor.inbound.AsynchronousDispatcherInbound;
import dynamic.mapping.processor.inbound.DispatcherInbound;
import dynamic.mapping.processor.model.C8YRequest;
import dynamic.mapping.processor.model.ProcessingContext;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -113,7 +113,7 @@ public MQTTClient() {

public MQTTClient(ConfigurationRegistry configurationRegistry,
ConnectorConfiguration connectorConfiguration,
AsynchronousDispatcherInbound dispatcher, String additionalSubscriptionIdTest, String tenant) {
DispatcherInbound dispatcher, String additionalSubscriptionIdTest, String tenant) {
this();
this.configurationRegistry = configurationRegistry;
this.mappingComponent = configurationRegistry.getMappingComponent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import dynamic.mapping.connector.core.ConnectorPropertyType;
import dynamic.mapping.connector.core.ConnectorSpecification;
import dynamic.mapping.connector.core.client.ConnectorType;
import dynamic.mapping.processor.inbound.AsynchronousDispatcherInbound;
import dynamic.mapping.processor.inbound.DispatcherInbound;
import dynamic.mapping.configuration.ConnectorConfiguration;
import dynamic.mapping.connector.core.ConnectorProperty;
import dynamic.mapping.core.ConfigurationRegistry;
Expand Down Expand Up @@ -81,7 +81,7 @@ private static String getClientId(String identifier, String suffix) {

public MQTTServiceClient(ConfigurationRegistry configurationRegistry,
ConnectorConfiguration connectorConfiguration,
AsynchronousDispatcherInbound dispatcher, String additionalSubscriptionIdTest, String tenant) {
DispatcherInbound dispatcher, String additionalSubscriptionIdTest, String tenant) {
this();
this.configurationRegistry = configurationRegistry;
this.mappingComponent = configurationRegistry.getMappingComponent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import dynamic.mapping.connector.core.registry.ConnectorRegistryException;
import dynamic.mapping.connector.kafka.KafkaClient;
import dynamic.mapping.model.MappingServiceRepresentation;
import dynamic.mapping.processor.inbound.AsynchronousDispatcherInbound;
import dynamic.mapping.processor.outbound.AsynchronousDispatcherOutbound;
import dynamic.mapping.processor.inbound.DispatcherInbound;
import dynamic.mapping.processor.outbound.DispatcherOutbound;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
Expand Down Expand Up @@ -220,7 +220,7 @@ public AConnectorClient initializeConnectorByConfiguration(ConnectorConfiguratio
}
connectorRegistry.registerClient(tenant, connectorClient);
// initialize AsynchronousDispatcherInbound
AsynchronousDispatcherInbound dispatcherInbound = new AsynchronousDispatcherInbound(configurationRegistry,
DispatcherInbound dispatcherInbound = new DispatcherInbound(configurationRegistry,
connectorClient);
configurationRegistry.initializePayloadProcessorsInbound(tenant);
connectorClient.setDispatcher(dispatcherInbound);
Expand All @@ -236,7 +236,7 @@ public void initializeOutboundMapping(String tenant, ServiceConfiguration servic
if (serviceConfiguration.isOutboundMappingEnabled()) {
// initialize AsynchronousDispatcherOutbound
configurationRegistry.initializePayloadProcessorsOutbound(connectorClient);
AsynchronousDispatcherOutbound dispatcherOutbound = new AsynchronousDispatcherOutbound(
DispatcherOutbound dispatcherOutbound = new DispatcherOutbound(
configurationRegistry, connectorClient);
// Only initialize Connectors which are enabled
if (connectorClient.getConnectorConfiguration().isEnabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
import dynamic.mapping.model.MappingServiceRepresentation;
import dynamic.mapping.notification.C8YNotificationSubscriber;
import dynamic.mapping.processor.extension.ExtensibleProcessor;
import dynamic.mapping.processor.inbound.BasePayloadProcessorInbound;
import dynamic.mapping.processor.inbound.BaseProcessorInbound;
import dynamic.mapping.processor.inbound.FlatFileProcessorInbound;
import dynamic.mapping.processor.inbound.GenericBinaryProcessorInbound;
import dynamic.mapping.processor.inbound.BinaryProcessorInbound;
import dynamic.mapping.processor.inbound.JSONProcessorInbound;
import dynamic.mapping.processor.model.MappingType;
import dynamic.mapping.processor.outbound.BasePayloadProcessorOutbound;
import dynamic.mapping.processor.outbound.BaseProcessorOutbound;
import dynamic.mapping.processor.outbound.JSONProcessorOutbound;
import dynamic.mapping.processor.processor.fixed.StaticProtobufProcessor;
import dynamic.mapping.processor.processor.fixed.InternalProtobufProcessor;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -50,12 +50,12 @@ public class ConfigurationRegistry {

// structure: <tenant, <mappingType, extensibleProcessorInbound>>
@Getter
private Map<String, Map<MappingType, BasePayloadProcessorInbound<?>>> payloadProcessorsInbound = new HashMap<>();
private Map<String, Map<MappingType, BaseProcessorInbound<?>>> payloadProcessorsInbound = new HashMap<>();

// structure: <tenant, <connectorIdentifier, <mappingType,
// extensibleProcessorOutbound>>>
@Getter
private Map<String, Map<String, Map<MappingType, BasePayloadProcessorOutbound<?>>>> payloadProcessorsOutbound = new HashMap<>();
private Map<String, Map<String, Map<MappingType, BaseProcessorOutbound<?>>>> payloadProcessorsOutbound = new HashMap<>();

@Getter
private Map<String, ServiceConfiguration> serviceConfigurations = new HashMap<>();
Expand Down Expand Up @@ -123,13 +123,13 @@ public void setServiceConfigurationComponent(@Lazy ServiceConfigurationComponent
@Autowired
private ExecutorService processingCachePool;

public Map<MappingType, BasePayloadProcessorInbound<?>> createPayloadProcessorsInbound(String tenant) {
public Map<MappingType, BaseProcessorInbound<?>> createPayloadProcessorsInbound(String tenant) {
ExtensibleProcessor extensibleProcessor = getExtensibleProcessors().get(tenant);
return Map.of(
MappingType.JSON, new JSONProcessorInbound(this),
MappingType.FLAT_FILE, new FlatFileProcessorInbound(this),
MappingType.GENERIC_BINARY, new GenericBinaryProcessorInbound(this),
MappingType.PROTOBUF_STATIC, new StaticProtobufProcessor(this),
MappingType.BINARY, new BinaryProcessorInbound(this),
MappingType.PROTOBUF_INTERNAL, new InternalProtobufProcessor(this),
MappingType.EXTENSION_SOURCE, extensibleProcessor,
MappingType.EXTENSION_SOURCE_TARGET, extensibleProcessor);
}
Expand Down Expand Up @@ -159,7 +159,7 @@ public AConnectorClient createConnectorClient(ConnectorConfiguration connectorCo
return connectorClient;
}

public Map<MappingType, BasePayloadProcessorOutbound<?>> createPayloadProcessorsOutbound(
public Map<MappingType, BaseProcessorOutbound<?>> createPayloadProcessorsOutbound(
AConnectorClient connectorClient) {
return Map.of(
MappingType.JSON, new JSONProcessorOutbound(this, connectorClient));
Expand All @@ -172,7 +172,7 @@ public void initializePayloadProcessorsInbound(String tenant) {
}

public void initializePayloadProcessorsOutbound(AConnectorClient connectorClient) {
Map<String, Map<MappingType, BasePayloadProcessorOutbound<?>>> processorPerTenant = payloadProcessorsOutbound
Map<String, Map<MappingType, BaseProcessorOutbound<?>>> processorPerTenant = payloadProcessorsOutbound
.get(connectorClient.getTenant());
if (processorPerTenant == null) {
// log.info("Tenant {} - HIER III {} {}", connectorClient.getTenant(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ static public ArrayList<ValidationError> isSubstitutionValid(Mapping mapping) {
if (mapping.snoopStatus != SnoopStatus.ENABLED && mapping.snoopStatus != SnoopStatus.STARTED
&& !mapping.mappingType.equals(MappingType.EXTENSION_SOURCE)
&& !mapping.mappingType.equals(MappingType.EXTENSION_SOURCE_TARGET)
&& !mapping.mappingType.equals(MappingType.PROTOBUF_STATIC)
&& !mapping.mappingType.equals(MappingType.PROTOBUF_INTERNAL)
&& !mapping.direction.equals(Direction.OUTBOUND)) {
if (count > 1) {
result.add(ValidationError.Only_One_Substitution_Defining_Device_Identifier_Can_Be_Used);
Expand Down Expand Up @@ -417,7 +417,7 @@ static Collection<ValidationError> areJSONTemplatesValid(Mapping mapping) {
}

if (!mapping.mappingType.equals(MappingType.EXTENSION_SOURCE)
&& !mapping.mappingType.equals(MappingType.PROTOBUF_STATIC)) {
&& !mapping.mappingType.equals(MappingType.PROTOBUF_INTERNAL)) {
try {
new JSONObject(mapping.targetTemplate);
} catch (JSONException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import dynamic.mapping.model.Device;
import dynamic.mapping.notification.websocket.CustomWebSocketClient;
import dynamic.mapping.notification.websocket.NotificationCallback;
import dynamic.mapping.processor.outbound.AsynchronousDispatcherOutbound;
import dynamic.mapping.processor.outbound.DispatcherOutbound;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.ArrayStack;
Expand Down Expand Up @@ -93,7 +93,7 @@ public void setConfigurationRegistry(@Lazy ConfigurationRegistry configurationRe

// structure: <tenant, <connectorIdentifier, asynchronousDispatcherOutbound>>
@Getter
private Map<String, Map<String, AsynchronousDispatcherOutbound>> dispatcherOutboundMaps = new HashMap<>();
private Map<String, Map<String, DispatcherOutbound>> dispatcherOutboundMaps = new HashMap<>();

@Value("${C8Y.baseURL}")
private String baseUrl;
Expand All @@ -112,8 +112,8 @@ public void setConfigurationRegistry(@Lazy ConfigurationRegistry configurationRe
// structure: <tenant, <connectorIdentifier, tokenSeed>>
private Map<String, Map<String, String>> deviceTokenPerConnector = new HashMap<>();

public void addSubscriber(String tenant, String identifier, AsynchronousDispatcherOutbound dispatcherOutbound) {
Map<String, AsynchronousDispatcherOutbound> dispatcherOutboundMap = getDispatcherOutboundMaps().get(tenant);
public void addSubscriber(String tenant, String identifier, DispatcherOutbound dispatcherOutbound) {
Map<String, DispatcherOutbound> dispatcherOutboundMap = getDispatcherOutboundMaps().get(tenant);
if (dispatcherOutboundMap == null) {
dispatcherOutboundMap = new HashMap<>();
dispatcherOutboundMap.put(identifier, dispatcherOutbound);
Expand Down Expand Up @@ -156,7 +156,7 @@ public void initDeviceClient() {
try {
// For each dispatcher/connector create a new connection
if (dispatcherOutboundMaps.get(tenant) != null) {
for (AsynchronousDispatcherOutbound dispatcherOutbound : dispatcherOutboundMaps.get(tenant)
for (DispatcherOutbound dispatcherOutbound : dispatcherOutboundMaps.get(tenant)
.values()) {
//Only connect if connector is enabled
if(dispatcherOutbound.getConnectorClient().getConnectorConfiguration().isEnabled()){
Expand Down Expand Up @@ -307,7 +307,7 @@ public CompletableFuture<NotificationSubscriptionRepresentation> subscribeDevice
"Tenant {} - No Outbound dispatcher for any connector is registered, add a connector first!",
tenant);

for (AsynchronousDispatcherOutbound dispatcherOutbound : dispatcherOutboundMaps.get(tenant)
for (DispatcherOutbound dispatcherOutbound : dispatcherOutboundMaps.get(tenant)
.values()) {
String tokenSeed = DEVICE_SUBSCRIBER
+ dispatcherOutbound.getConnectorClient().getConnectorIdent()
Expand Down Expand Up @@ -515,8 +515,8 @@ public void removeConnector(String tenant, String connectorIdentifier) {
}
}

public void addConnector(String tenant, String connectorIdentifier, AsynchronousDispatcherOutbound dispatcherOutbound) {
Map<String, AsynchronousDispatcherOutbound> dispatcherOutboundMap = getDispatcherOutboundMaps().get(tenant);
public void addConnector(String tenant, String connectorIdentifier, DispatcherOutbound dispatcherOutbound) {
Map<String, DispatcherOutbound> dispatcherOutboundMap = getDispatcherOutboundMaps().get(tenant);
if (dispatcherOutboundMap == null) {
dispatcherOutboundMap = new HashMap<>();
getDispatcherOutboundMaps().put(tenant, dispatcherOutboundMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import dynamic.mapping.model.ExtensionStatus;
import dynamic.mapping.model.Mapping;
import dynamic.mapping.processor.ProcessingException;
import dynamic.mapping.processor.inbound.BasePayloadProcessorInbound;
import dynamic.mapping.processor.inbound.BaseProcessorInbound;
import dynamic.mapping.processor.model.ProcessingContext;
import lombok.extern.slf4j.Slf4j;
import dynamic.mapping.core.ConfigurationRegistry;
Expand All @@ -37,7 +37,7 @@
import java.util.Map;

@Slf4j
public class ExtensibleProcessor extends BasePayloadProcessorInbound<byte[]> {
public class ExtensibleProcessor extends BaseProcessorInbound<byte[]> {

private Map<String, Extension> extensions = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@
import java.util.concurrent.TimeUnit;

@Slf4j
public abstract class BasePayloadProcessorInbound<T> {
public abstract class BaseProcessorInbound<T> {

public BasePayloadProcessorInbound(ConfigurationRegistry configurationRegistry) {
public BaseProcessorInbound(ConfigurationRegistry configurationRegistry) {
this.objectMapper = configurationRegistry.getObjectMapper();
this.c8yAgent = configurationRegistry.getC8yAgent();
this.processingCachePool = configurationRegistry.getProcessingCachePool();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import java.util.Map;

//@Service
public class GenericBinaryProcessorInbound extends JSONProcessorInbound {
public class BinaryProcessorInbound extends JSONProcessorInbound {

public GenericBinaryProcessorInbound(ConfigurationRegistry configurationRegistry) {
public BinaryProcessorInbound(ConfigurationRegistry configurationRegistry) {
super(configurationRegistry);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
*/

@Slf4j
public class AsynchronousDispatcherInbound implements GenericMessageCallback {
public class DispatcherInbound implements GenericMessageCallback {

private AConnectorClient connectorClient;

Expand All @@ -80,7 +80,7 @@ public class AsynchronousDispatcherInbound implements GenericMessageCallback {

private ConfigurationRegistry configurationRegistry;

public AsynchronousDispatcherInbound(ConfigurationRegistry configurationRegistry,
public DispatcherInbound(ConfigurationRegistry configurationRegistry,
AConnectorClient connectorClient) {
this.connectorClient = connectorClient;
this.cachedThreadPool = configurationRegistry.getCachedThreadPool();
Expand All @@ -90,7 +90,7 @@ public AsynchronousDispatcherInbound(ConfigurationRegistry configurationRegistry

public static class MappingInboundTask<T> implements Callable<List<ProcessingContext<?>>> {
List<Mapping> resolvedMappings;
Map<MappingType, BasePayloadProcessorInbound<?>> payloadProcessorsInbound;
Map<MappingType, BaseProcessorInbound<?>> payloadProcessorsInbound;
ConnectorMessage connectorMessage;
MappingComponent mappingComponent;
C8YAgent c8yAgent;
Expand Down Expand Up @@ -138,7 +138,7 @@ public List<ProcessingContext<?>> call() throws Exception {
&& connectorClient.getMappingsDeployedInbound().containsKey(mapping.identifier)) {
MappingStatus mappingStatus = mappingComponent.getMappingStatus(tenant, mapping);
// identify the correct processor based on the mapping type
BasePayloadProcessorInbound processor = payloadProcessorsInbound.get(mapping.mappingType);
BaseProcessorInbound processor = payloadProcessorsInbound.get(mapping.mappingType);
try {
if (processor != null) {
inboundProcessingCounter.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class JSONProcessorInbound extends BasePayloadProcessorInbound<Object> {
public class JSONProcessorInbound extends BaseProcessorInbound<Object> {

public JSONProcessorInbound(ConfigurationRegistry configurationRegistry) {
super(configurationRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
public enum MappingType {
JSON("JSON"),
FLAT_FILE("FLAT_FILE"),
GENERIC_BINARY("GENERIC_BINARY"),
PROTOBUF_STATIC("PROTOBUF_STATIC"),
BINARY("BINARY"),
PROTOBUF_INTERNAL("PROTOBUF_INTERNAL"),
EXTENSION_SOURCE("EXTENSION_SOURCE"),
EXTENSION_SOURCE_TARGET("EXTENSION_SOURCE_TARGET");

Expand Down
Loading

0 comments on commit 9d6b618

Please sign in to comment.