diff --git a/build.gradle.kts b/build.gradle.kts index 67321a4..53c7fa8 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -16,7 +16,7 @@ plugins { } // Package version -version = "0.7.2" +version = "0.7.3" repositories { // Use Maven Central for resolving dependencies. diff --git a/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformer.kt b/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformer.kt index bb2b636..b68e7ca 100644 --- a/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformer.kt +++ b/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformer.kt @@ -189,7 +189,8 @@ class RedShiftComplexDataTypeTransformer> : Transformation< if (sourceValue != null) { updatedValue.put("tombstone", false) buildWithSchema(sourceValue, "", updatedValue) - } else { + } + if (sourceValue == null || sourceValue.get("body") == null) { updatedValue.put("tombstone", true) } return newRecord(record, updatedSchema, updatedValue) diff --git a/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformerTest.kt b/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformerTest.kt index 46af773..14acae9 100644 --- a/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformerTest.kt +++ b/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformerTest.kt @@ -66,6 +66,23 @@ class RedShiftComplexDataTypeTransformerTest { assertTrue(hasNoComplexTypes(transformedRecord)) } + @Test + fun `can transform ECST Employee data with null body`() { + + val avroRecord = payload("com/cultureamp/employee-data.employees-v2.json") + val sourceRecord = SourceRecord( + null, + null, + "employee data ecst test", + avroRecord.schema(), + avroRecord.value() + ) + + val transformedRecord = transformer.apply(sourceRecord) + hasNoComplexTypes(sourceRecord) + assertTrue(hasNoComplexTypes(transformedRecord)) + } + @Test fun `can transform ECST Employee data that has key as field`() { diff --git a/src/test/resources/com/cultureamp/employee-data.employees-v2.json b/src/test/resources/com/cultureamp/employee-data.employees-v2.json new file mode 100644 index 0000000..24bc15b --- /dev/null +++ b/src/test/resources/com/cultureamp/employee-data.employees-v2.json @@ -0,0 +1,52 @@ +{ + "id": "c63526f8-dec7-4ef8-96d8-18756076f064", + "account_id": "0a05e2a3-7258-4cf5-a7f4-e21b08c030c5", + "employee_id": "c63526f8-dec7-4ef8-96d8-18756076f064", + "event_created_at": 1536899741117, + "body": null, + "metadata": { + "correlation_id": { + "string": "b9098254-a1db-4114-9a39-baa17ab18fbf" + }, + "causation_id": null, + "executor_id": { + "string": "379907ca-632c-4e83-89c4-9dbe0e759ad3" + }, + "service": "Influx" + }, + "test_array_of_structs": [ + { + "demographic_id": { + "string": "5c579970-684e-4911-a077-6bf407fb478d" + }, + "demographic_value_id": { + "string": "427b936f-e932-4673-95a2-acd3e3b900b1" + } + }, + { + "demographic_id": { + "string": "460f6b2d-03c5-46cf-ba55-aa14477a12dc" + }, + "demographic_value_id": { + "string": "ecc0db2e-486e-4f4a-a54a-db21673e1a2b" + } + } + ], + "test_map": { + "added_users_count": 0, + "ignored_new_demographics_count": 0, + "ignored_users_count": 0, + "inactive_updated_users_count": 0, + "reactivated_users_count": 0, + "removed_users_count": 0, + "updated_users_count": 0 + }, + "test_string_array": [ + "a", "b", "c" + ], + "test_array_of_arrays": [ + ["a", "b", "c"], + ["e"], + ["f", "g"] + ] +} \ No newline at end of file diff --git a/src/test/resources/com/cultureamp/employee-data.employees-value-v1.avsc b/src/test/resources/com/cultureamp/employee-data.employees-value-v1.avsc index 2c7f801..0109fe6 100644 --- a/src/test/resources/com/cultureamp/employee-data.employees-value-v1.avsc +++ b/src/test/resources/com/cultureamp/employee-data.employees-value-v1.avsc @@ -33,7 +33,8 @@ }, { "name": "body", - "type": { + "type": [ "null", + { "type": "record", "name": "Employee", "namespace": "com.cultureamp.employee.v1", @@ -214,6 +215,7 @@ } ] } + ] }, { "name": "metadata",