From 65bfde08190dc589e973f51296ebbebe55f9f0ef Mon Sep 17 00:00:00 2001 From: Shraddha Patel Date: Mon, 13 Nov 2023 19:37:05 +1100 Subject: [PATCH 1/3] added unit tests and remove replacement string --- .../RedShiftComplexDataTypeTransformer.kt | 3 +- .../RedShiftComplexDataTypeTransformerTest.kt | 170 +++++++++++ ...loyee-data.employees-v1-target-schema.avsc | 264 ++++++++++++++++++ .../employee-data.employees-v1.json | 4 +- .../employee-data.employees-value-v1.avsc | 9 +- 5 files changed, 443 insertions(+), 7 deletions(-) create mode 100644 src/test/resources/com/cultureamp/employee-data.employees-v1-target-schema.avsc diff --git a/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformer.kt b/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformer.kt index b68e7ca..656f2e5 100644 --- a/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformer.kt +++ b/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformer.kt @@ -126,8 +126,7 @@ class RedShiftComplexDataTypeTransformer> : Transformation< realSchema = SchemaBuilder.string().build() } val converted = jsonConverter.fromConnectData("", realSchema, realValue) - var fieldString = objectMapper.readTree(converted).toString() - return fieldString.replace("\"[", "[").replace("]\"", "]").replace("\"{", "{").replace("}\"", "}") + return objectMapper.writeValueAsString(objectMapper.readTree(converted)) } private fun buildWithSchema(sourceRecord: Any?, fieldNamePrefix: String, newRecord: Struct) { diff --git a/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformerTest.kt b/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformerTest.kt index 14acae9..0d7f4b6 100644 --- a/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformerTest.kt +++ b/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformerTest.kt @@ -8,11 +8,13 @@ import com.mongodb.kafka.connect.util.ClassHelper import com.mongodb.kafka.connect.util.ConfigHelper import org.apache.kafka.connect.data.Schema import org.apache.kafka.connect.data.SchemaAndValue +import org.apache.kafka.connect.data.Struct import org.apache.kafka.connect.source.SourceRecord import org.junit.Before import java.io.File import java.nio.file.Files import kotlin.test.Test +import kotlin.test.assertEquals import kotlin.test.assertTrue /** @@ -64,6 +66,15 @@ class RedShiftComplexDataTypeTransformerTest { val transformedRecord = transformer.apply(sourceRecord) hasNoComplexTypes(sourceRecord) assertTrue(hasNoComplexTypes(transformedRecord)) + + val expectedValue = struct( + id, account_id, employee_id, event_created_at, body_source, body_employee_id, body_email, body_name, body_preferred_name, body_locale, body_observer, body_gdpr_erasure_request_id, body_test_map, body_test_map_1, body_test_array_of_structs, body_manager_assignment_manager_id, body_manager_assignment_demographic_id, body_erased, body_created_at, body_updated_at, body_deleted_at, metadata_correlation_id, metadata_causation_id, metadata_executor_id, metadata_service, test_array_of_structs, test_string_array, test_array_of_arrays, test_map, + topic_key = "", + tombstone = false + ) + + assertEquals(expectedValue, transformedRecord.value()) + assertEquals(expectedSchema, transformedRecord.valueSchema()) } @Test @@ -81,6 +92,15 @@ class RedShiftComplexDataTypeTransformerTest { val transformedRecord = transformer.apply(sourceRecord) hasNoComplexTypes(sourceRecord) assertTrue(hasNoComplexTypes(transformedRecord)) + + val expectedValue = nullBodyStruct( + id, account_id, employee_id, event_created_at, metadata_correlation_id, metadata_causation_id, metadata_executor_id, metadata_service, test_array_of_structs, test_string_array, test_array_of_arrays, test_map, + topic_key = "", + tombstone = true + ) + + assertEquals(expectedValue, transformedRecord.value()) + assertEquals(expectedSchema, transformedRecord.valueSchema()) } @Test @@ -101,6 +121,15 @@ class RedShiftComplexDataTypeTransformerTest { val transformedRecord = transformer.apply(sourceRecord) hasNoComplexTypes(sourceRecord) assertTrue(hasNoComplexTypes(transformedRecord)) + + val expectedValue = struct( + id, account_id, employee_id, event_created_at, body_source, body_employee_id, body_email, body_name, body_preferred_name, body_locale, body_observer, body_gdpr_erasure_request_id, body_test_map, body_test_map_1, body_test_array_of_structs, body_manager_assignment_manager_id, body_manager_assignment_demographic_id, body_erased, body_created_at, body_updated_at, body_deleted_at, metadata_correlation_id, metadata_causation_id, metadata_executor_id, metadata_service, test_array_of_structs, test_string_array, test_array_of_arrays, test_map, + topic_key = "hellp", + tombstone = false + ) + + assertEquals(expectedValue, transformedRecord.value()) + assertEquals(expectedSchema, transformedRecord.valueSchema()) } @Test @@ -141,6 +170,11 @@ class RedShiftComplexDataTypeTransformerTest { val transformedRecord = transformer.apply(sourceRecord) hasNoComplexTypes(sourceRecord) assertTrue(hasNoComplexTypes(transformedRecord)) + + val expectedValue = Struct(expectedSchema).put("tombstone", true) + + assertEquals(expectedValue, transformedRecord.value()) + assertEquals(expectedSchema, transformedRecord.valueSchema()) } @Test @@ -169,6 +203,140 @@ class RedShiftComplexDataTypeTransformerTest { .toSchemaAndValue(sourceSchema, document.toBsonDocument()) } + private val id = "c63526f8-dec7-4ef8-96d8-18756076f064" + private val account_id = "0a05e2a3-7258-4cf5-a7f4-e21b08c030c5" + private val employee_id = "c63526f8-dec7-4ef8-96d8-18756076f064" + private val event_created_at = 1536899741117 + private val body_source = "{\"string\": \"\"}" + private val body_employee_id = null + private val body_email = "{\"string\": \"testing800702@namelytest.com\"}" + private val body_name = "{\"string\": \"Test User 800702\"}" + private val body_preferred_name = null + private val body_locale = null + private val body_observer = false + private val body_gdpr_erasure_request_id = null + private val body_test_map = "{\"added_users_count\":\"0\",\"ignored_new_demographics_count\":\"0\",\"ignored_users_count\":\"0\",\"inactive_updated_users_count\":\"0\",\"reactivated_users_count\":\"0\",\"removed_users_count\":\"0\",\"updated_users_count\":\"0\"}" + private val body_test_map_1 = "\"{}\"" + private val body_test_array_of_structs = "[{\"demographic_id\":\"{\\\"string\\\": \\\"[5c579970-684e-4911-a077-6bf407fb478d\\\"}\",\"demographic_value_id\":\"{\\\"string\\\": \\\"427b936f-e932-4673-95a2-acd3e3b900b1\\\"}\"},{\"demographic_id\":\"{\\\"string\\\": \\\"460f6b2d-03c5-46cf-ba55-aa14477a12dc]\\\"}\",\"demographic_value_id\":\"{\\\"string\\\": \\\"ecc0db2e-486e-4f4a-a54a-db21673e1a2b\\\"}\"}]" + private val body_manager_assignment_manager_id = "{\"string\": \"5c579970-684e-4911-a077-6bf407fb478d\"}" + private val body_manager_assignment_demographic_id = "{\"string\": \"427b936f-e932-4673-95a2-acd3e3b900b1\"}" + private val body_erased = false + private val body_created_at = 1536899741113 + private val body_updated_at = 1536899741117 + private val body_deleted_at = null + private val metadata_correlation_id = "{\"string\": \"b9098254-a1db-4114-9a39-baa17ab18fbf\"}" + private val metadata_causation_id = null + private val metadata_executor_id = "{\"string\": \"379907ca-632c-4e83-89c4-9dbe0e759ad3\"}" + private val metadata_service = "Influx" + private val test_array_of_structs = "[{\"demographic_id\":\"{\\\"string\\\": \\\"5c579970-684e-4911-a077-6bf407fb478d\\\"}\",\"demographic_value_id\":\"{\\\"string\\\": \\\"427b936f-e932-4673-95a2-acd3e3b900b1\\\"}\"},{\"demographic_id\":\"{\\\"string\\\": \\\"460f6b2d-03c5-46cf-ba55-aa14477a12dc\\\"}\",\"demographic_value_id\":\"{\\\"string\\\": \\\"ecc0db2e-486e-4f4a-a54a-db21673e1a2b\\\"}\"}]" + private val test_string_array = "[\"a\",\"b\",\"c\"]" + private val test_array_of_arrays = "[[\"a\",\"b\",\"c\"],[\"e\"],[\"f\",\"g\"]]" + private val test_map = "{\"added_users_count\":0,\"ignored_new_demographics_count\":0,\"ignored_users_count\":0,\"inactive_updated_users_count\":0,\"reactivated_users_count\":0,\"removed_users_count\":0,\"updated_users_count\":0}" + + private fun struct( + id: String, + account_id: String, + employee_id: String, + event_created_at: Long, + body_source: String, + body_employee_id: String?, + body_email: String, + body_name: String, + body_preferred_name: String?, + body_locale: String?, + body_observer: Boolean, + body_gdpr_erasure_request_id: String?, + body_test_map: String, + body_test_map_1: String, + body_test_array_of_structs: String, + body_manager_assignment_manager_id: String, + body_manager_assignment_demographic_id: String, + body_erased: Boolean, + body_created_at: Long, + body_updated_at: Long, + body_deleted_at: Long?, + metadata_correlation_id: String, + metadata_causation_id: String?, + metadata_executor_id: String, + metadata_service: String, + test_array_of_structs: String, + test_string_array: String, + test_array_of_arrays: String, + test_map: String, + topic_key: String?, + tombstone: Boolean + ): Struct { + val returnStruct = Struct(expectedSchema) + returnStruct.put("id", id) + .put("account_id", account_id) + .put("employee_id", employee_id) + .put("event_created_at", event_created_at) + .put("body_source", body_source) + .put("body_employee_id", body_employee_id) + .put("body_email", body_email) + .put("body_name", body_name) + .put("body_preferred_name", body_preferred_name) + .put("body_locale", body_locale) + .put("body_observer", body_observer) + .put("body_gdpr_erasure_request_id", body_gdpr_erasure_request_id) + .put("body_test_map", body_test_map) + .put("body_test_map_1", body_test_map_1) + .put("body_test_array_of_structs", body_test_array_of_structs) + .put("body_manager_assignment_manager_id", body_manager_assignment_manager_id) + .put("body_manager_assignment_demographic_id", body_manager_assignment_demographic_id) + .put("body_erased", body_erased) + .put("body_created_at", body_created_at) + .put("body_updated_at", body_updated_at) + .put("body_deleted_at", body_deleted_at) + .put("metadata_correlation_id", metadata_correlation_id) + .put("metadata_causation_id", metadata_causation_id) + .put("metadata_executor_id", metadata_executor_id) + .put("metadata_service", metadata_service) + .put("test_array_of_structs", test_array_of_structs) + .put("test_string_array", test_string_array) + .put("test_array_of_arrays", test_array_of_arrays) + .put("test_map", test_map) + .put("tombstone", tombstone) + if (topic_key != "") + returnStruct.put("topic_key", topic_key) + return returnStruct + } + + private fun nullBodyStruct( + id: String, + account_id: String, + employee_id: String, + event_created_at: Long, + metadata_correlation_id: String, + metadata_causation_id: String?, + metadata_executor_id: String, + metadata_service: String, + test_array_of_structs: String, + test_string_array: String, + test_array_of_arrays: String, + test_map: String, + topic_key: String?, + tombstone: Boolean + ): Struct { + val returnStruct = Struct(expectedSchema) + returnStruct.put("id", id) + .put("account_id", account_id) + .put("employee_id", employee_id) + .put("event_created_at", event_created_at) + .put("metadata_correlation_id", metadata_correlation_id) + .put("metadata_causation_id", metadata_causation_id) + .put("metadata_executor_id", metadata_executor_id) + .put("metadata_service", metadata_service) + .put("test_array_of_structs", test_array_of_structs) + .put("test_string_array", test_string_array) + .put("test_array_of_arrays", test_array_of_arrays) + .put("test_map", test_map) + .put("tombstone", tombstone) + if (topic_key != "") + returnStruct.put("topic_key", topic_key) + return returnStruct + } + private fun fileContent(fileName: String): String { val url = this.javaClass.classLoader .getResource(fileName) ?: throw IllegalArgumentException("$fileName is not found 1") @@ -176,6 +344,8 @@ class RedShiftComplexDataTypeTransformerTest { return String(Files.readAllBytes(File(url.file).toPath())) } + private val expectedSchema = AvroSchema.fromJson(fileContent("com/cultureamp/employee-data.employees-v1-target-schema.avsc")) + private val jsonWriterSettings = ClassHelper.createInstance( MongoSourceConfig.OUTPUT_JSON_FORMATTER_CONFIG, diff --git a/src/test/resources/com/cultureamp/employee-data.employees-v1-target-schema.avsc b/src/test/resources/com/cultureamp/employee-data.employees-v1-target-schema.avsc new file mode 100644 index 0000000..7275fe0 --- /dev/null +++ b/src/test/resources/com/cultureamp/employee-data.employees-v1-target-schema.avsc @@ -0,0 +1,264 @@ +{ + "type": "record", + "name": "Event", + "namespace": "com.cultureamp.employee.v1", + "fields": [ + { + "name": "id", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "account_id", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "employee_id", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "event_created_at", + "type": [ + "null", + "long" + ], + "default": null + }, + { + "name": "body_source", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "body_employee_id", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "body_email", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "body_name", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "body_preferred_name", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "body_locale", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "body_observer", + "type": [ + "null", + "boolean" + ], + "default": null + }, + { + "name": "body_gdpr_erasure_request_id", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "body_test_map", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "body_test_map_1", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "body_test_array_of_structs", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "body_manager_assignment_manager_id", + "type": [ + "null", + { + "type": "string", + "logicalType": "uuid" + } + ], + "default": null + }, + { + "name": "body_manager_assignment_demographic_id", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "body_erased", + "type": [ + "null", + "boolean" + ], + "default": null + }, + { + "name": "body_created_at", + "type": [ + "null", + "long" + ], + "default": null + }, + { + "name": "body_updated_at", + "type": [ + "null", + "long" + ], + "default": null + }, + { + "name": "body_deleted_at", + "type": [ + "null", + { + "type": "long", + "logicalType": "timestamp-millis" + } + ], + "default": null + }, + { + "name": "metadata_correlation_id", + "type": [ + "null", + { + "type": "string", + "logicalType": "uuid" + } + ], + "default": null + }, + { + "name": "metadata_causation_id", + "type": [ + "null", + { + "type": "string", + "logicalType": "uuid" + } + ], + "default": null + }, + { + "name": "metadata_executor_id", + "type": [ + "null", + { + "type": "string", + "logicalType": "uuid" + } + ], + "default": null + }, + { + "name": "metadata_service", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "test_array_of_structs", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "test_string_array", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "test_array_of_arrays", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "test_map", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "topic_key", + "type": "string", + "default": "" + }, + { + "name": "tombstone", + "type": "boolean", + "default": false + } + ] +} \ No newline at end of file diff --git a/src/test/resources/com/cultureamp/employee-data.employees-v1.json b/src/test/resources/com/cultureamp/employee-data.employees-v1.json index 7157931..557d344 100644 --- a/src/test/resources/com/cultureamp/employee-data.employees-v1.json +++ b/src/test/resources/com/cultureamp/employee-data.employees-v1.json @@ -34,7 +34,7 @@ "test_array_of_structs": [ { "demographic_id": { - "string": "5c579970-684e-4911-a077-6bf407fb478d" + "string": "[5c579970-684e-4911-a077-6bf407fb478d" }, "demographic_value_id": { "string": "427b936f-e932-4673-95a2-acd3e3b900b1" @@ -42,7 +42,7 @@ }, { "demographic_id": { - "string": "460f6b2d-03c5-46cf-ba55-aa14477a12dc" + "string": "460f6b2d-03c5-46cf-ba55-aa14477a12dc]" }, "demographic_value_id": { "string": "ecc0db2e-486e-4f4a-a54a-db21673e1a2b" diff --git a/src/test/resources/com/cultureamp/employee-data.employees-value-v1.avsc b/src/test/resources/com/cultureamp/employee-data.employees-value-v1.avsc index 0109fe6..950986f 100644 --- a/src/test/resources/com/cultureamp/employee-data.employees-value-v1.avsc +++ b/src/test/resources/com/cultureamp/employee-data.employees-value-v1.avsc @@ -310,9 +310,12 @@ "name": "test_array_of_arrays", "type": [ "null", - { - "type" : "array", - "items" : "string" + { + "type": "array", + "items": { + "type": "array", + "items": "string" + } } ], "default": null From 409842800a46d7aef164a6c73ba13187bdb57337 Mon Sep 17 00:00:00 2001 From: Shraddha Patel Date: Tue, 14 Nov 2023 07:42:51 +1100 Subject: [PATCH 2/3] build version change --- build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle.kts b/build.gradle.kts index 53c7fa8..61ae9b0 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -16,7 +16,7 @@ plugins { } // Package version -version = "0.7.3" +version = "0.7.4-alpha" repositories { // Use Maven Central for resolving dependencies. From cba9e78c9ba6d5297bb73bb97f0b18fffafea53e Mon Sep 17 00:00:00 2001 From: Shraddha Patel Date: Tue, 14 Nov 2023 09:57:20 +1100 Subject: [PATCH 3/3] update version --- build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle.kts b/build.gradle.kts index 61ae9b0..a0ded74 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -16,7 +16,7 @@ plugins { } // Package version -version = "0.7.4-alpha" +version = "0.7.4" repositories { // Use Maven Central for resolving dependencies.