-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add joda-time and ByteBuffer support in avro #740
Changes from all commits
bb41853
9ed9065
71abf92
5f23eb7
ae2d53c
4ba749c
fdc9adc
2ff7e54
f0114e0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,21 +16,22 @@ | |
|
||
package magnolify.avro | ||
|
||
import java.nio.ByteBuffer | ||
import java.time._ | ||
import java.{util => ju} | ||
import magnolia1._ | ||
import magnolify.shared._ | ||
import magnolify.shims.FactoryCompat | ||
import org.apache.avro.generic.GenericData.EnumSymbol | ||
import org.apache.avro.generic._ | ||
import org.apache.avro.{JsonProperties, LogicalType, LogicalTypes, Schema} | ||
import org.joda.{time => joda} | ||
|
||
import java.nio.{ByteBuffer, ByteOrder} | ||
import java.time._ | ||
import java.{util => ju} | ||
import scala.annotation.{implicitNotFound, nowarn} | ||
import scala.collection.concurrent | ||
import scala.reflect.ClassTag | ||
import scala.jdk.CollectionConverters._ | ||
import scala.collection.compat._ | ||
import scala.jdk.CollectionConverters._ | ||
import scala.reflect.ClassTag | ||
|
||
sealed trait AvroType[T] extends Converter[T, GenericRecord, GenericRecord] { | ||
val schema: Schema | ||
|
@@ -196,14 +197,14 @@ object AvroField { | |
implicit val afLong: AvroField[Long] = id[Long](Schema.Type.LONG) | ||
implicit val afFloat: AvroField[Float] = id[Float](Schema.Type.FLOAT) | ||
implicit val afDouble: AvroField[Double] = id[Double](Schema.Type.DOUBLE) | ||
implicit val afBytes: AvroField[Array[Byte]] = new Aux[Array[Byte], ByteBuffer, ByteBuffer] { | ||
implicit val afByteBuffer: AvroField[ByteBuffer] = new Aux[ByteBuffer, ByteBuffer, ByteBuffer] { | ||
override protected def buildSchema(cm: CaseMapper): Schema = Schema.create(Schema.Type.BYTES) | ||
// `JacksonUtils.toJson` expects `Array[Byte]` for `BYTES` defaults | ||
override def makeDefault(d: Array[Byte])(cm: CaseMapper): Array[Byte] = d | ||
override def from(v: ByteBuffer)(cm: CaseMapper): Array[Byte] = | ||
ju.Arrays.copyOfRange(v.array(), v.position(), v.limit()) | ||
override def to(v: Array[Byte])(cm: CaseMapper): ByteBuffer = ByteBuffer.wrap(v) | ||
Comment on lines
-203
to
-205
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we have a non symmetric definition:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. interesting--yeah, I can't see why we would need to do an array-copy here, your change makes sense to me. |
||
override def makeDefault(d: ByteBuffer)(cm: CaseMapper): Array[Byte] = d.array() | ||
override def from(v: ByteBuffer)(cm: CaseMapper): ByteBuffer = v | ||
override def to(v: ByteBuffer)(cm: CaseMapper): ByteBuffer = v | ||
} | ||
implicit val afBytes: AvroField[Array[Byte]] = from[ByteBuffer](_.array())(ByteBuffer.wrap) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed the behavior to pass the same mutable array from avro to scala |
||
implicit val afCharSequence: AvroField[CharSequence] = id[CharSequence](Schema.Type.STRING) | ||
implicit val afString: AvroField[String] = new Aux[String, String, String] { | ||
override protected def buildSchema(cm: CaseMapper): Schema = { | ||
|
@@ -316,8 +317,37 @@ object AvroField { | |
logicalType[CharSequence](LogicalTypes.uuid())(cs => ju.UUID.fromString(cs.toString))( | ||
_.toString | ||
) | ||
|
||
// date | ||
implicit val afDate: AvroField[LocalDate] = | ||
logicalType[Int](LogicalTypes.date())(x => LocalDate.ofEpochDay(x.toLong))(_.toEpochDay.toInt) | ||
private lazy val EpochJodaDate = new joda.LocalDate(1970, 1, 1) | ||
implicit val afJodaDate: AvroField[joda.LocalDate] = | ||
logicalType[Int](LogicalTypes.date()) { daysFromEpoch => | ||
EpochJodaDate.plusDays(daysFromEpoch) | ||
} { date => | ||
joda.Days.daysBetween(EpochJodaDate, date).getDays | ||
} | ||
|
||
// duration, as in the avro spec. do not make implicit as there is not a specific type for it | ||
// A duration logical type annotates Avro fixed type of size 12, which stores three little-endian unsigned integers | ||
// that represent durations at different granularities of time. | ||
// The first stores a number in months, the second stores a number in days, and the third stores a number in milliseconds. | ||
val afDuration: AvroField[(Long, Long, Long)] = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, got it. This is probably the least painful solution, then -- can you add documentation (both on how to import it, and on what it represents) to https://github.com/spotify/magnolify/blob/main/docs/avro.md + https://github.com/spotify/magnolify/blob/main/docs/mapping.md ? |
||
logicalType[ByteBuffer](new LogicalType("duration")) { bs => | ||
bs.order(ByteOrder.LITTLE_ENDIAN) | ||
val months = java.lang.Integer.toUnsignedLong(bs.getInt) | ||
val days = java.lang.Integer.toUnsignedLong(bs.getInt) | ||
val millis = java.lang.Integer.toUnsignedLong(bs.getInt) | ||
(months, days, millis) | ||
} { case (months, days, millis) => | ||
ByteBuffer | ||
.allocate(12) | ||
.order(ByteOrder.LITTLE_ENDIAN) | ||
.putInt(months.toInt) | ||
.putInt(days.toInt) | ||
.putInt(millis.toInt) | ||
}(AvroField.fixed(12)(ByteBuffer.wrap)(_.array())) | ||
|
||
def fixed[T: ClassTag]( | ||
size: Int | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like it could be declared as
id[ByteBuffer](Schema.Type. BYTES)
, butid
needs to have an override to accommodated.array()