From ac054a641e8f3f87f5cbb04f03d906836736d65d Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Tue, 7 Jan 2025 15:27:17 -0500 Subject: [PATCH] source-mysql: fix converter bug (#50965) --- .../connectors/source-mysql/build.gradle | 2 +- .../connectors/source-mysql/metadata.yaml | 2 +- .../mysql/MySqlSourceCdcBooleanConverter.kt | 32 +- .../mysql/MySqlSourceCdcTemporalConverter.kt | 288 +++++++++--------- docs/integrations/sources/mysql.md | 1 + 5 files changed, 167 insertions(+), 158 deletions(-) diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 008277f47aac..40175176fa1a 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -9,7 +9,7 @@ application { airbyteBulkConnector { core = 'extract' toolkits = ['extract-jdbc', 'extract-cdc'] - cdk = '0.249' + cdk = '0.255' } dependencies { diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 20ff8617c6c5..5d5187ef2f87 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.10.0-rc.7 + dockerImageTag: 3.10.0-rc.8 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcBooleanConverter.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcBooleanConverter.kt index 0f9f04cdccd1..b29fbcdea228 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcBooleanConverter.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcBooleanConverter.kt @@ -9,27 +9,27 @@ import io.airbyte.cdk.read.cdc.NoConversion import io.airbyte.cdk.read.cdc.NullFallThrough import io.airbyte.cdk.read.cdc.PartialConverter import io.airbyte.cdk.read.cdc.RelationalColumnCustomConverter +import io.debezium.spi.converter.RelationalColumn import org.apache.kafka.connect.data.SchemaBuilder class MySqlSourceCdcBooleanConverter : RelationalColumnCustomConverter { override val debeziumPropertiesKey: String = "boolean" - override val handlers: List = listOf(tinyint1Handler) - - companion object { - val tinyint1Handler = - RelationalColumnCustomConverter.Handler( - predicate = { - it.typeName().equals("TINYINT", ignoreCase = true) && - it.length().isPresent && - it.length().asInt == 1 - }, - outputSchema = SchemaBuilder.bool(), - partialConverters = - listOf( - NullFallThrough, - PartialConverter { if (it is Number) Converted(it != 0) else NoConversion } - ) + override val handlers: List = listOf(TinyInt1Handler) + + data object TinyInt1Handler : RelationalColumnCustomConverter.Handler { + + override fun matches(column: RelationalColumn): Boolean = + column.typeName().equals("TINYINT", ignoreCase = true) && + column.length().isPresent && + column.length().asInt == 1 + + override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.bool() + + override val partialConverters: List = + listOf( + NullFallThrough, + PartialConverter { if (it is Number) Converted(it != 0) else NoConversion } ) } } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcTemporalConverter.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcTemporalConverter.kt index cecdeed92ec6..ded4475e2c2e 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcTemporalConverter.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcTemporalConverter.kt @@ -13,6 +13,7 @@ import io.airbyte.cdk.read.cdc.NoConversion import io.airbyte.cdk.read.cdc.NullFallThrough import io.airbyte.cdk.read.cdc.PartialConverter import io.airbyte.cdk.read.cdc.RelationalColumnCustomConverter +import io.debezium.spi.converter.RelationalColumn import java.time.Duration import java.time.Instant import java.time.LocalDate @@ -30,159 +31,166 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter { override val handlers: List = listOf( - datetimeMillisHandler, - datetimeMicrosHandler, - dateHandler, - timeHandler, - timestampHandler + DatetimeMillisHandler, + DatetimeMicrosHandler, + DateHandler, + TimeHandler, + TimestampHandler ) - companion object { + data object DatetimeMillisHandler : RelationalColumnCustomConverter.Handler { - val datetimeMillisHandler = - RelationalColumnCustomConverter.Handler( - predicate = { - it.typeName().equals("DATETIME", ignoreCase = true) && - it.length().orElse(0) <= 3 + override fun matches(column: RelationalColumn): Boolean = + column.typeName().equals("DATETIME", ignoreCase = true) && + column.length().orElse(0) <= 3 + + override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string() + + override val partialConverters: List = + listOf( + NullFallThrough, + PartialConverter { + if (it is LocalDateTime) { + Converted(it.format(LocalDateTimeCodec.formatter)) + } else { + NoConversion + } }, - outputSchema = SchemaBuilder.string(), - partialConverters = - listOf( - NullFallThrough, - PartialConverter { - if (it is LocalDateTime) { - Converted(it.format(LocalDateTimeCodec.formatter)) - } else { - NoConversion - } - }, - PartialConverter { - // Required for default values. - if (it is Number) { - val delta: Duration = Duration.ofMillis(it.toLong()) - val instant: Instant = Instant.EPOCH.plus(delta) - val localDateTime: LocalDateTime = - LocalDateTime.ofInstant(instant, ZoneOffset.UTC) - Converted(localDateTime.format(LocalDateTimeCodec.formatter)) - } else { - NoConversion - } - } - ) + PartialConverter { + // Required for default values. + if (it is Number) { + val delta: Duration = Duration.ofMillis(it.toLong()) + val instant: Instant = Instant.EPOCH.plus(delta) + val localDateTime: LocalDateTime = + LocalDateTime.ofInstant(instant, ZoneOffset.UTC) + Converted(localDateTime.format(LocalDateTimeCodec.formatter)) + } else { + NoConversion + } + } ) + } + + data object DatetimeMicrosHandler : RelationalColumnCustomConverter.Handler { + + override fun matches(column: RelationalColumn): Boolean = + column.typeName().equals("DATETIME", ignoreCase = true) && column.length().orElse(0) > 3 - val datetimeMicrosHandler = - RelationalColumnCustomConverter.Handler( - predicate = { - it.typeName().equals("DATETIME", ignoreCase = true) && it.length().orElse(0) > 3 + override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string() + + override val partialConverters: List = + listOf( + NullFallThrough, + PartialConverter { + if (it is LocalDateTime) { + Converted(it.format(LocalDateTimeCodec.formatter)) + } else { + NoConversion + } }, - outputSchema = SchemaBuilder.string(), - partialConverters = - listOf( - NullFallThrough, - PartialConverter { - if (it is LocalDateTime) { - Converted(it.format(LocalDateTimeCodec.formatter)) - } else { - NoConversion - } - }, - PartialConverter { - // Required for default values. - if (it is Number) { - val delta: Duration = Duration.of(it.toLong(), ChronoUnit.MICROS) - val instant: Instant = Instant.EPOCH.plus(delta) - val localDateTime: LocalDateTime = - LocalDateTime.ofInstant(instant, ZoneOffset.UTC) - Converted(localDateTime.format(LocalDateTimeCodec.formatter)) - } else { - NoConversion - } - } - ) + PartialConverter { + // Required for default values. + if (it is Number) { + val delta: Duration = Duration.of(it.toLong(), ChronoUnit.MICROS) + val instant: Instant = Instant.EPOCH.plus(delta) + val localDateTime: LocalDateTime = + LocalDateTime.ofInstant(instant, ZoneOffset.UTC) + Converted(localDateTime.format(LocalDateTimeCodec.formatter)) + } else { + NoConversion + } + } ) + } + + data object DateHandler : RelationalColumnCustomConverter.Handler { + + override fun matches(column: RelationalColumn): Boolean = + column.typeName().equals("DATE", ignoreCase = true) + + override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string() - val dateHandler = - RelationalColumnCustomConverter.Handler( - predicate = { it.typeName().equals("DATE", ignoreCase = true) }, - outputSchema = SchemaBuilder.string(), - partialConverters = - listOf( - NullFallThrough, - PartialConverter { - if (it is LocalDate) { - Converted(it.format(LocalDateCodec.formatter)) - } else { - NoConversion - } - }, - PartialConverter { - // Required for default values. - if (it is Number) { - val localDate: LocalDate = LocalDate.ofEpochDay(it.toLong()) - Converted(localDate.format(LocalDateCodec.formatter)) - } else { - NoConversion - } - } - ), + override val partialConverters: List = + listOf( + NullFallThrough, + PartialConverter { + if (it is LocalDate) { + Converted(it.format(LocalDateCodec.formatter)) + } else { + NoConversion + } + }, + PartialConverter { + // Required for default values. + if (it is Number) { + val localDate: LocalDate = LocalDate.ofEpochDay(it.toLong()) + Converted(localDate.format(LocalDateCodec.formatter)) + } else { + NoConversion + } + } ) + } + + data object TimeHandler : RelationalColumnCustomConverter.Handler { + + override fun matches(column: RelationalColumn): Boolean = + column.typeName().equals("TIME", ignoreCase = true) + + override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string() - val timeHandler = - RelationalColumnCustomConverter.Handler( - predicate = { it.typeName().equals("TIME", ignoreCase = true) }, - outputSchema = SchemaBuilder.string(), - partialConverters = - listOf( - NullFallThrough, - PartialConverter { - if (it is Duration) { - val localTime: LocalTime = LocalTime.MIDNIGHT.plus(it) - Converted(localTime.format(LocalTimeCodec.formatter)) - } else { - NoConversion - } - }, - PartialConverter { - // Required for default values. - if (it is Number) { - val delta: Duration = Duration.of(it.toLong(), ChronoUnit.MICROS) - val localTime: LocalTime = LocalTime.ofNanoOfDay(delta.toNanos()) - Converted(localTime.format(LocalTimeCodec.formatter)) - } else { - NoConversion - } - } - ), + override val partialConverters: List = + listOf( + NullFallThrough, + PartialConverter { + if (it is Duration) { + val localTime: LocalTime = LocalTime.MIDNIGHT.plus(it) + Converted(localTime.format(LocalTimeCodec.formatter)) + } else { + NoConversion + } + }, + PartialConverter { + // Required for default values. + if (it is Number) { + val delta: Duration = Duration.of(it.toLong(), ChronoUnit.MICROS) + val localTime: LocalTime = LocalTime.ofNanoOfDay(delta.toNanos()) + Converted(localTime.format(LocalTimeCodec.formatter)) + } else { + NoConversion + } + } ) + } + + data object TimestampHandler : RelationalColumnCustomConverter.Handler { + override fun matches(column: RelationalColumn): Boolean = + column.typeName().equals("TIMESTAMP", ignoreCase = true) - val timestampHandler = - RelationalColumnCustomConverter.Handler( - predicate = { it.typeName().equals("TIMESTAMP", ignoreCase = true) }, - outputSchema = SchemaBuilder.string(), - partialConverters = - listOf( - NullFallThrough, - PartialConverter { - if (it is ZonedDateTime) { - val offsetDateTime: OffsetDateTime = it.toOffsetDateTime() - Converted(offsetDateTime.format(OffsetDateTimeCodec.formatter)) - } else { - NoConversion - } - }, - PartialConverter { - // Required for default values. - if (it is String) { - val instant: Instant = Instant.parse(it) - val offsetDateTime: OffsetDateTime = - OffsetDateTime.ofInstant(instant, ZoneOffset.UTC) - Converted(offsetDateTime.format(OffsetDateTimeCodec.formatter)) - } else { - NoConversion - } - } - ), + override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string() + + override val partialConverters: List = + listOf( + NullFallThrough, + PartialConverter { + if (it is ZonedDateTime) { + val offsetDateTime: OffsetDateTime = it.toOffsetDateTime() + Converted(offsetDateTime.format(OffsetDateTimeCodec.formatter)) + } else { + NoConversion + } + }, + PartialConverter { + // Required for default values. + if (it is String) { + val instant: Instant = Instant.parse(it) + val offsetDateTime: OffsetDateTime = + OffsetDateTime.ofInstant(instant, ZoneOffset.UTC) + Converted(offsetDateTime.format(OffsetDateTimeCodec.formatter)) + } else { + NoConversion + } + } ) } } diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index a460516aea43..2cf4586bcff3 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -226,6 +226,7 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:------------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.10.0-rc.8 | 2025-01-07 | [50965](https://github.com/airbytehq/airbyte/pull/50965) | Fix bug introduced in 3.10.0-rc.3. | | 3.10.0-rc.7 | 2024-12-27 | [50437](https://github.com/airbytehq/airbyte/pull/50437) | Compatibility with MySQL Views. | | 3.10.0-rc.6 | 2024-12-18 | [49892](https://github.com/airbytehq/airbyte/pull/49892) | Use a base image: airbyte/java-connector-base:1.0.0 | | 3.10.0-rc.5 | 2025-01-03 | [50868](https://github.com/airbytehq/airbyte/pull/50868) | Fix exception handling rules declaration. |