Skip to content

Commit

Permalink
source-mysql: fix converter bug (#50965)
Browse files Browse the repository at this point in the history
  • Loading branch information
postamar authored Jan 7, 2025
1 parent 14aee8b commit ac054a6
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 158 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ application {
airbyteBulkConnector {
core = 'extract'
toolkits = ['extract-jdbc', 'extract-cdc']
cdk = '0.249'
cdk = '0.255'
}

dependencies {
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.10.0-rc.7
dockerImageTag: 3.10.0-rc.8
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,27 @@ import io.airbyte.cdk.read.cdc.NoConversion
import io.airbyte.cdk.read.cdc.NullFallThrough
import io.airbyte.cdk.read.cdc.PartialConverter
import io.airbyte.cdk.read.cdc.RelationalColumnCustomConverter
import io.debezium.spi.converter.RelationalColumn
import org.apache.kafka.connect.data.SchemaBuilder

class MySqlSourceCdcBooleanConverter : RelationalColumnCustomConverter {

override val debeziumPropertiesKey: String = "boolean"
override val handlers: List<RelationalColumnCustomConverter.Handler> = listOf(tinyint1Handler)

companion object {
val tinyint1Handler =
RelationalColumnCustomConverter.Handler(
predicate = {
it.typeName().equals("TINYINT", ignoreCase = true) &&
it.length().isPresent &&
it.length().asInt == 1
},
outputSchema = SchemaBuilder.bool(),
partialConverters =
listOf(
NullFallThrough,
PartialConverter { if (it is Number) Converted(it != 0) else NoConversion }
)
override val handlers: List<RelationalColumnCustomConverter.Handler> = listOf(TinyInt1Handler)

data object TinyInt1Handler : RelationalColumnCustomConverter.Handler {

override fun matches(column: RelationalColumn): Boolean =
column.typeName().equals("TINYINT", ignoreCase = true) &&
column.length().isPresent &&
column.length().asInt == 1

override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.bool()

override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter { if (it is Number) Converted(it != 0) else NoConversion }
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.airbyte.cdk.read.cdc.NoConversion
import io.airbyte.cdk.read.cdc.NullFallThrough
import io.airbyte.cdk.read.cdc.PartialConverter
import io.airbyte.cdk.read.cdc.RelationalColumnCustomConverter
import io.debezium.spi.converter.RelationalColumn
import java.time.Duration
import java.time.Instant
import java.time.LocalDate
Expand All @@ -30,159 +31,166 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {

override val handlers: List<RelationalColumnCustomConverter.Handler> =
listOf(
datetimeMillisHandler,
datetimeMicrosHandler,
dateHandler,
timeHandler,
timestampHandler
DatetimeMillisHandler,
DatetimeMicrosHandler,
DateHandler,
TimeHandler,
TimestampHandler
)

companion object {
data object DatetimeMillisHandler : RelationalColumnCustomConverter.Handler {

val datetimeMillisHandler =
RelationalColumnCustomConverter.Handler(
predicate = {
it.typeName().equals("DATETIME", ignoreCase = true) &&
it.length().orElse(0) <= 3
override fun matches(column: RelationalColumn): Boolean =
column.typeName().equals("DATETIME", ignoreCase = true) &&
column.length().orElse(0) <= 3

override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string()

override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it is LocalDateTime) {
Converted(it.format(LocalDateTimeCodec.formatter))
} else {
NoConversion
}
},
outputSchema = SchemaBuilder.string(),
partialConverters =
listOf(
NullFallThrough,
PartialConverter {
if (it is LocalDateTime) {
Converted(it.format(LocalDateTimeCodec.formatter))
} else {
NoConversion
}
},
PartialConverter {
// Required for default values.
if (it is Number) {
val delta: Duration = Duration.ofMillis(it.toLong())
val instant: Instant = Instant.EPOCH.plus(delta)
val localDateTime: LocalDateTime =
LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
Converted(localDateTime.format(LocalDateTimeCodec.formatter))
} else {
NoConversion
}
}
)
PartialConverter {
// Required for default values.
if (it is Number) {
val delta: Duration = Duration.ofMillis(it.toLong())
val instant: Instant = Instant.EPOCH.plus(delta)
val localDateTime: LocalDateTime =
LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
Converted(localDateTime.format(LocalDateTimeCodec.formatter))
} else {
NoConversion
}
}
)
}

data object DatetimeMicrosHandler : RelationalColumnCustomConverter.Handler {

override fun matches(column: RelationalColumn): Boolean =
column.typeName().equals("DATETIME", ignoreCase = true) && column.length().orElse(0) > 3

val datetimeMicrosHandler =
RelationalColumnCustomConverter.Handler(
predicate = {
it.typeName().equals("DATETIME", ignoreCase = true) && it.length().orElse(0) > 3
override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string()

override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it is LocalDateTime) {
Converted(it.format(LocalDateTimeCodec.formatter))
} else {
NoConversion
}
},
outputSchema = SchemaBuilder.string(),
partialConverters =
listOf(
NullFallThrough,
PartialConverter {
if (it is LocalDateTime) {
Converted(it.format(LocalDateTimeCodec.formatter))
} else {
NoConversion
}
},
PartialConverter {
// Required for default values.
if (it is Number) {
val delta: Duration = Duration.of(it.toLong(), ChronoUnit.MICROS)
val instant: Instant = Instant.EPOCH.plus(delta)
val localDateTime: LocalDateTime =
LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
Converted(localDateTime.format(LocalDateTimeCodec.formatter))
} else {
NoConversion
}
}
)
PartialConverter {
// Required for default values.
if (it is Number) {
val delta: Duration = Duration.of(it.toLong(), ChronoUnit.MICROS)
val instant: Instant = Instant.EPOCH.plus(delta)
val localDateTime: LocalDateTime =
LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
Converted(localDateTime.format(LocalDateTimeCodec.formatter))
} else {
NoConversion
}
}
)
}

data object DateHandler : RelationalColumnCustomConverter.Handler {

override fun matches(column: RelationalColumn): Boolean =
column.typeName().equals("DATE", ignoreCase = true)

override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string()

val dateHandler =
RelationalColumnCustomConverter.Handler(
predicate = { it.typeName().equals("DATE", ignoreCase = true) },
outputSchema = SchemaBuilder.string(),
partialConverters =
listOf(
NullFallThrough,
PartialConverter {
if (it is LocalDate) {
Converted(it.format(LocalDateCodec.formatter))
} else {
NoConversion
}
},
PartialConverter {
// Required for default values.
if (it is Number) {
val localDate: LocalDate = LocalDate.ofEpochDay(it.toLong())
Converted(localDate.format(LocalDateCodec.formatter))
} else {
NoConversion
}
}
),
override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it is LocalDate) {
Converted(it.format(LocalDateCodec.formatter))
} else {
NoConversion
}
},
PartialConverter {
// Required for default values.
if (it is Number) {
val localDate: LocalDate = LocalDate.ofEpochDay(it.toLong())
Converted(localDate.format(LocalDateCodec.formatter))
} else {
NoConversion
}
}
)
}

data object TimeHandler : RelationalColumnCustomConverter.Handler {

override fun matches(column: RelationalColumn): Boolean =
column.typeName().equals("TIME", ignoreCase = true)

override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string()

val timeHandler =
RelationalColumnCustomConverter.Handler(
predicate = { it.typeName().equals("TIME", ignoreCase = true) },
outputSchema = SchemaBuilder.string(),
partialConverters =
listOf(
NullFallThrough,
PartialConverter {
if (it is Duration) {
val localTime: LocalTime = LocalTime.MIDNIGHT.plus(it)
Converted(localTime.format(LocalTimeCodec.formatter))
} else {
NoConversion
}
},
PartialConverter {
// Required for default values.
if (it is Number) {
val delta: Duration = Duration.of(it.toLong(), ChronoUnit.MICROS)
val localTime: LocalTime = LocalTime.ofNanoOfDay(delta.toNanos())
Converted(localTime.format(LocalTimeCodec.formatter))
} else {
NoConversion
}
}
),
override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it is Duration) {
val localTime: LocalTime = LocalTime.MIDNIGHT.plus(it)
Converted(localTime.format(LocalTimeCodec.formatter))
} else {
NoConversion
}
},
PartialConverter {
// Required for default values.
if (it is Number) {
val delta: Duration = Duration.of(it.toLong(), ChronoUnit.MICROS)
val localTime: LocalTime = LocalTime.ofNanoOfDay(delta.toNanos())
Converted(localTime.format(LocalTimeCodec.formatter))
} else {
NoConversion
}
}
)
}

data object TimestampHandler : RelationalColumnCustomConverter.Handler {
override fun matches(column: RelationalColumn): Boolean =
column.typeName().equals("TIMESTAMP", ignoreCase = true)

val timestampHandler =
RelationalColumnCustomConverter.Handler(
predicate = { it.typeName().equals("TIMESTAMP", ignoreCase = true) },
outputSchema = SchemaBuilder.string(),
partialConverters =
listOf(
NullFallThrough,
PartialConverter {
if (it is ZonedDateTime) {
val offsetDateTime: OffsetDateTime = it.toOffsetDateTime()
Converted(offsetDateTime.format(OffsetDateTimeCodec.formatter))
} else {
NoConversion
}
},
PartialConverter {
// Required for default values.
if (it is String) {
val instant: Instant = Instant.parse(it)
val offsetDateTime: OffsetDateTime =
OffsetDateTime.ofInstant(instant, ZoneOffset.UTC)
Converted(offsetDateTime.format(OffsetDateTimeCodec.formatter))
} else {
NoConversion
}
}
),
override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string()

override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it is ZonedDateTime) {
val offsetDateTime: OffsetDateTime = it.toOffsetDateTime()
Converted(offsetDateTime.format(OffsetDateTimeCodec.formatter))
} else {
NoConversion
}
},
PartialConverter {
// Required for default values.
if (it is String) {
val instant: Instant = Instant.parse(it)
val offsetDateTime: OffsetDateTime =
OffsetDateTime.ofInstant(instant, ZoneOffset.UTC)
Converted(offsetDateTime.format(OffsetDateTimeCodec.formatter))
} else {
NoConversion
}
}
)
}
}
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:------------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.10.0-rc.8 | 2025-01-07 | [50965](https://github.com/airbytehq/airbyte/pull/50965) | Fix bug introduced in 3.10.0-rc.3. |
| 3.10.0-rc.7 | 2024-12-27 | [50437](https://github.com/airbytehq/airbyte/pull/50437) | Compatibility with MySQL Views. |
| 3.10.0-rc.6 | 2024-12-18 | [49892](https://github.com/airbytehq/airbyte/pull/49892) | Use a base image: airbyte/java-connector-base:1.0.0 |
| 3.10.0-rc.5 | 2025-01-03 | [50868](https://github.com/airbytehq/airbyte/pull/50868) | Fix exception handling rules declaration. |
Expand Down

0 comments on commit ac054a6

Please sign in to comment.