Skip to content

Commit

Permalink
Merge pull request #22 from cultureamp/DPT-411/no-replace-strings
Browse files Browse the repository at this point in the history
Redshift Transformer bug fix for serializing arrays
  • Loading branch information
xphir authored Nov 13, 2023
2 parents a426b8b + cba9e78 commit 3014757
Show file tree
Hide file tree
Showing 6 changed files with 444 additions and 8 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ plugins {
}

// Package version
version = "0.7.3"
version = "0.7.4"

repositories {
// Use Maven Central for resolving dependencies.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ class RedShiftComplexDataTypeTransformer<R : ConnectRecord<R>> : Transformation<
realSchema = SchemaBuilder.string().build()
}
val converted = jsonConverter.fromConnectData("", realSchema, realValue)
var fieldString = objectMapper.readTree(converted).toString()
return fieldString.replace("\"[", "[").replace("]\"", "]").replace("\"{", "{").replace("}\"", "}")
return objectMapper.writeValueAsString(objectMapper.readTree(converted))
}

private fun buildWithSchema(sourceRecord: Any?, fieldNamePrefix: String, newRecord: Struct) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import com.mongodb.kafka.connect.util.ClassHelper
import com.mongodb.kafka.connect.util.ConfigHelper
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaAndValue
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.source.SourceRecord
import org.junit.Before
import java.io.File
import java.nio.file.Files
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue

/**
Expand Down Expand Up @@ -64,6 +66,15 @@ class RedShiftComplexDataTypeTransformerTest {
val transformedRecord = transformer.apply(sourceRecord)
hasNoComplexTypes(sourceRecord)
assertTrue(hasNoComplexTypes(transformedRecord))

val expectedValue = struct(
id, account_id, employee_id, event_created_at, body_source, body_employee_id, body_email, body_name, body_preferred_name, body_locale, body_observer, body_gdpr_erasure_request_id, body_test_map, body_test_map_1, body_test_array_of_structs, body_manager_assignment_manager_id, body_manager_assignment_demographic_id, body_erased, body_created_at, body_updated_at, body_deleted_at, metadata_correlation_id, metadata_causation_id, metadata_executor_id, metadata_service, test_array_of_structs, test_string_array, test_array_of_arrays, test_map,
topic_key = "",
tombstone = false
)

assertEquals(expectedValue, transformedRecord.value())
assertEquals(expectedSchema, transformedRecord.valueSchema())
}

@Test
Expand All @@ -81,6 +92,15 @@ class RedShiftComplexDataTypeTransformerTest {
val transformedRecord = transformer.apply(sourceRecord)
hasNoComplexTypes(sourceRecord)
assertTrue(hasNoComplexTypes(transformedRecord))

val expectedValue = nullBodyStruct(
id, account_id, employee_id, event_created_at, metadata_correlation_id, metadata_causation_id, metadata_executor_id, metadata_service, test_array_of_structs, test_string_array, test_array_of_arrays, test_map,
topic_key = "",
tombstone = true
)

assertEquals(expectedValue, transformedRecord.value())
assertEquals(expectedSchema, transformedRecord.valueSchema())
}

@Test
Expand All @@ -101,6 +121,15 @@ class RedShiftComplexDataTypeTransformerTest {
val transformedRecord = transformer.apply(sourceRecord)
hasNoComplexTypes(sourceRecord)
assertTrue(hasNoComplexTypes(transformedRecord))

val expectedValue = struct(
id, account_id, employee_id, event_created_at, body_source, body_employee_id, body_email, body_name, body_preferred_name, body_locale, body_observer, body_gdpr_erasure_request_id, body_test_map, body_test_map_1, body_test_array_of_structs, body_manager_assignment_manager_id, body_manager_assignment_demographic_id, body_erased, body_created_at, body_updated_at, body_deleted_at, metadata_correlation_id, metadata_causation_id, metadata_executor_id, metadata_service, test_array_of_structs, test_string_array, test_array_of_arrays, test_map,
topic_key = "hellp",
tombstone = false
)

assertEquals(expectedValue, transformedRecord.value())
assertEquals(expectedSchema, transformedRecord.valueSchema())
}

@Test
Expand Down Expand Up @@ -141,6 +170,11 @@ class RedShiftComplexDataTypeTransformerTest {
val transformedRecord = transformer.apply(sourceRecord)
hasNoComplexTypes(sourceRecord)
assertTrue(hasNoComplexTypes(transformedRecord))

val expectedValue = Struct(expectedSchema).put("tombstone", true)

assertEquals(expectedValue, transformedRecord.value())
assertEquals(expectedSchema, transformedRecord.valueSchema())
}

@Test
Expand Down Expand Up @@ -169,13 +203,149 @@ class RedShiftComplexDataTypeTransformerTest {
.toSchemaAndValue(sourceSchema, document.toBsonDocument())
}

private val id = "c63526f8-dec7-4ef8-96d8-18756076f064"
private val account_id = "0a05e2a3-7258-4cf5-a7f4-e21b08c030c5"
private val employee_id = "c63526f8-dec7-4ef8-96d8-18756076f064"
private val event_created_at = 1536899741117
private val body_source = "{\"string\": \"\"}"
private val body_employee_id = null
private val body_email = "{\"string\": \"[email protected]\"}"
private val body_name = "{\"string\": \"Test User 800702\"}"
private val body_preferred_name = null
private val body_locale = null
private val body_observer = false
private val body_gdpr_erasure_request_id = null
private val body_test_map = "{\"added_users_count\":\"0\",\"ignored_new_demographics_count\":\"0\",\"ignored_users_count\":\"0\",\"inactive_updated_users_count\":\"0\",\"reactivated_users_count\":\"0\",\"removed_users_count\":\"0\",\"updated_users_count\":\"0\"}"
private val body_test_map_1 = "\"{}\""
private val body_test_array_of_structs = "[{\"demographic_id\":\"{\\\"string\\\": \\\"[5c579970-684e-4911-a077-6bf407fb478d\\\"}\",\"demographic_value_id\":\"{\\\"string\\\": \\\"427b936f-e932-4673-95a2-acd3e3b900b1\\\"}\"},{\"demographic_id\":\"{\\\"string\\\": \\\"460f6b2d-03c5-46cf-ba55-aa14477a12dc]\\\"}\",\"demographic_value_id\":\"{\\\"string\\\": \\\"ecc0db2e-486e-4f4a-a54a-db21673e1a2b\\\"}\"}]"
private val body_manager_assignment_manager_id = "{\"string\": \"5c579970-684e-4911-a077-6bf407fb478d\"}"
private val body_manager_assignment_demographic_id = "{\"string\": \"427b936f-e932-4673-95a2-acd3e3b900b1\"}"
private val body_erased = false
private val body_created_at = 1536899741113
private val body_updated_at = 1536899741117
private val body_deleted_at = null
private val metadata_correlation_id = "{\"string\": \"b9098254-a1db-4114-9a39-baa17ab18fbf\"}"
private val metadata_causation_id = null
private val metadata_executor_id = "{\"string\": \"379907ca-632c-4e83-89c4-9dbe0e759ad3\"}"
private val metadata_service = "Influx"
private val test_array_of_structs = "[{\"demographic_id\":\"{\\\"string\\\": \\\"5c579970-684e-4911-a077-6bf407fb478d\\\"}\",\"demographic_value_id\":\"{\\\"string\\\": \\\"427b936f-e932-4673-95a2-acd3e3b900b1\\\"}\"},{\"demographic_id\":\"{\\\"string\\\": \\\"460f6b2d-03c5-46cf-ba55-aa14477a12dc\\\"}\",\"demographic_value_id\":\"{\\\"string\\\": \\\"ecc0db2e-486e-4f4a-a54a-db21673e1a2b\\\"}\"}]"
private val test_string_array = "[\"a\",\"b\",\"c\"]"
private val test_array_of_arrays = "[[\"a\",\"b\",\"c\"],[\"e\"],[\"f\",\"g\"]]"
private val test_map = "{\"added_users_count\":0,\"ignored_new_demographics_count\":0,\"ignored_users_count\":0,\"inactive_updated_users_count\":0,\"reactivated_users_count\":0,\"removed_users_count\":0,\"updated_users_count\":0}"

private fun struct(
id: String,
account_id: String,
employee_id: String,
event_created_at: Long,
body_source: String,
body_employee_id: String?,
body_email: String,
body_name: String,
body_preferred_name: String?,
body_locale: String?,
body_observer: Boolean,
body_gdpr_erasure_request_id: String?,
body_test_map: String,
body_test_map_1: String,
body_test_array_of_structs: String,
body_manager_assignment_manager_id: String,
body_manager_assignment_demographic_id: String,
body_erased: Boolean,
body_created_at: Long,
body_updated_at: Long,
body_deleted_at: Long?,
metadata_correlation_id: String,
metadata_causation_id: String?,
metadata_executor_id: String,
metadata_service: String,
test_array_of_structs: String,
test_string_array: String,
test_array_of_arrays: String,
test_map: String,
topic_key: String?,
tombstone: Boolean
): Struct {
val returnStruct = Struct(expectedSchema)
returnStruct.put("id", id)
.put("account_id", account_id)
.put("employee_id", employee_id)
.put("event_created_at", event_created_at)
.put("body_source", body_source)
.put("body_employee_id", body_employee_id)
.put("body_email", body_email)
.put("body_name", body_name)
.put("body_preferred_name", body_preferred_name)
.put("body_locale", body_locale)
.put("body_observer", body_observer)
.put("body_gdpr_erasure_request_id", body_gdpr_erasure_request_id)
.put("body_test_map", body_test_map)
.put("body_test_map_1", body_test_map_1)
.put("body_test_array_of_structs", body_test_array_of_structs)
.put("body_manager_assignment_manager_id", body_manager_assignment_manager_id)
.put("body_manager_assignment_demographic_id", body_manager_assignment_demographic_id)
.put("body_erased", body_erased)
.put("body_created_at", body_created_at)
.put("body_updated_at", body_updated_at)
.put("body_deleted_at", body_deleted_at)
.put("metadata_correlation_id", metadata_correlation_id)
.put("metadata_causation_id", metadata_causation_id)
.put("metadata_executor_id", metadata_executor_id)
.put("metadata_service", metadata_service)
.put("test_array_of_structs", test_array_of_structs)
.put("test_string_array", test_string_array)
.put("test_array_of_arrays", test_array_of_arrays)
.put("test_map", test_map)
.put("tombstone", tombstone)
if (topic_key != "")
returnStruct.put("topic_key", topic_key)
return returnStruct
}

private fun nullBodyStruct(
id: String,
account_id: String,
employee_id: String,
event_created_at: Long,
metadata_correlation_id: String,
metadata_causation_id: String?,
metadata_executor_id: String,
metadata_service: String,
test_array_of_structs: String,
test_string_array: String,
test_array_of_arrays: String,
test_map: String,
topic_key: String?,
tombstone: Boolean
): Struct {
val returnStruct = Struct(expectedSchema)
returnStruct.put("id", id)
.put("account_id", account_id)
.put("employee_id", employee_id)
.put("event_created_at", event_created_at)
.put("metadata_correlation_id", metadata_correlation_id)
.put("metadata_causation_id", metadata_causation_id)
.put("metadata_executor_id", metadata_executor_id)
.put("metadata_service", metadata_service)
.put("test_array_of_structs", test_array_of_structs)
.put("test_string_array", test_string_array)
.put("test_array_of_arrays", test_array_of_arrays)
.put("test_map", test_map)
.put("tombstone", tombstone)
if (topic_key != "")
returnStruct.put("topic_key", topic_key)
return returnStruct
}

private fun fileContent(fileName: String): String {
val url = this.javaClass.classLoader
.getResource(fileName) ?: throw IllegalArgumentException("$fileName is not found 1")

return String(Files.readAllBytes(File(url.file).toPath()))
}

private val expectedSchema = AvroSchema.fromJson(fileContent("com/cultureamp/employee-data.employees-v1-target-schema.avsc"))

private val jsonWriterSettings =
ClassHelper.createInstance(
MongoSourceConfig.OUTPUT_JSON_FORMATTER_CONFIG,
Expand Down
Loading

0 comments on commit 3014757

Please sign in to comment.