diff --git a/opensearch-kafka-connector/kafka-connect-transforms/src/main/java/org/openg2p/reporting/kafka/connect/transforms/DynamicNewField.java b/opensearch-kafka-connector/kafka-connect-transforms/src/main/java/org/openg2p/reporting/kafka/connect/transforms/DynamicNewField.java index 84cd0d4..b58e986 100644 --- a/opensearch-kafka-connector/kafka-connect-transforms/src/main/java/org/openg2p/reporting/kafka/connect/transforms/DynamicNewField.java +++ b/opensearch-kafka-connector/kafka-connect-transforms/src/main/java/org/openg2p/reporting/kafka/connect/transforms/DynamicNewField.java @@ -54,13 +54,15 @@ public abstract class Config{ String[] inputFields; SingleFieldPath[] inputFieldPaths; String[] inputDefaultValues; + String outputDefaultValue; String[] outputFields; Schema outputSchema; - Config(String type, String[] inputFields, String[] inputDefaultValues, String[] outputFields, Schema outputSchema){ + Config(String type, String[] inputFields, String[] inputDefaultValues, String[] outputFields, String outputDefaultValue, Schema outputSchema){ this.type = type; this.inputFields = inputFields; this.inputDefaultValues = inputDefaultValues; this.outputFields = outputFields; + this.outputDefaultValue = outputDefaultValue; this.outputSchema = outputSchema; this.inputFieldPaths = new SingleFieldPath[inputFields.length]; for(int i=0;i < inputFields.length; i++) { @@ -94,6 +96,7 @@ public class ESQueryConfig extends Config{ String[] inputFields, String[] outputFields, String[] inputDefaultValues, + String outputDefaultValue, String esUrl, String esIndex, String[] esInputFields, @@ -104,7 +107,7 @@ public class ESQueryConfig extends Config{ String esUsername, String esPassword ) { - super(type,inputFields,inputDefaultValues,outputFields,Schema.OPTIONAL_STRING_SCHEMA); + super(type,inputFields,inputDefaultValues,outputFields,outputDefaultValue,Schema.OPTIONAL_STRING_SCHEMA); this.esUrl=esUrl; this.esIndex=esIndex; @@ -191,12 +194,11 @@ else if(inputValues.size()==0){ // do nothing } } - if(responseSource == null) return Collections.nCopies(esOutputFields.length, "Error: No hits found"); for(String esOutputField: esOutputFields){ - if(responseSource.has(esOutputField) && !responseSource.isNull(esOutputField)){ + if(responseSource != null && responseSource.has(esOutputField)){ outputValues.add(responseSource.get(esOutputField)); } else { - outputValues.add(null); + outputValues.add(outputDefaultValue); } } return outputValues; @@ -265,7 +267,8 @@ void close(){ // Base Config public static final String INPUT_FIELDS_CONFIG = "input.fields"; public static final String OUTPUT_FIELDS_CONFIG = "output.fields"; - public static final String DEFAULT_VALUE_CONFIG = "input.default.values"; + public static final String INPUT_DEFAULT_VALUE_CONFIG = "input.default.values"; + public static final String OUTPUT_DEFAULT_VALUE_CONFIG = "output.default.value"; // Elasticsearch Specific Config public static final String ES_URL_CONFIG = "es.url"; @@ -285,7 +288,8 @@ void close(){ .define(TYPE_CONFIG, ConfigDef.Type.STRING, "es", ConfigDef.Importance.HIGH, "This is the type of query made. For now this field is ignored and defaulted to es") .define(INPUT_FIELDS_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Name of the field in the current index") .define(OUTPUT_FIELDS_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Names to give to the new fields") - .define(DEFAULT_VALUE_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Default vlaues for input fields") + .define(INPUT_DEFAULT_VALUE_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Default values for input fields") + .define(OUTPUT_DEFAULT_VALUE_CONFIG, ConfigDef.Type.STRING, "Error: No hits found", ConfigDef.Importance.HIGH, "Default value for output") .define(ES_URL_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Installed Elasticsearch URL") .define(ES_INDEX_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Name of the index in ES to search") @@ -309,7 +313,8 @@ public void configure(Map configs) { String inputFieldBulk = absconf.getString(INPUT_FIELDS_CONFIG); String outputFieldBulk = absconf.getString(OUTPUT_FIELDS_CONFIG); - String inputDefaultValuesBulk = absconf.getString(DEFAULT_VALUE_CONFIG); + String inputDefaultValuesBulk = absconf.getString(INPUT_DEFAULT_VALUE_CONFIG); + String outputDefaultValue = absconf.getString(OUTPUT_DEFAULT_VALUE_CONFIG); if (type.isEmpty() || inputFieldBulk.isEmpty() || outputFieldBulk.isEmpty()) { throw new ConfigException("One of required transform base config fields not set. Required base fields in tranform: " + TYPE_CONFIG + " ," + INPUT_FIELDS_CONFIG + " ," + OUTPUT_FIELDS_CONFIG); @@ -349,6 +354,7 @@ public void configure(Map configs) { inputFields, outputFields, inputDefaultValues, + outputDefaultValue, esUrl, esIndex, esInputFields,