From 59ec8ef1f3c7490131f55a0e0d51152748c3b45a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= Date: Mon, 18 Dec 2023 10:34:42 +0100 Subject: [PATCH 1/2] Support protobuf encoded keys --- internal/config.go | 7 +++--- internal/decoder.go | 12 +++++++--- internal/encoder.go | 7 ++++-- internal/fetch.go | 13 ++++++++--- internal/proto.go | 55 ++++++++++++++++++++++++++++++--------------- 5 files changed, 65 insertions(+), 29 deletions(-) diff --git a/internal/config.go b/internal/config.go index 2d62623..792001e 100644 --- a/internal/config.go +++ b/internal/config.go @@ -56,11 +56,12 @@ type TopicConfig struct { } type TopicSchemaConfig struct { - Type string `yaml:"type"` // "avro" or "proto" - Proto TopicProtoConfig `yaml:"proto,omitempty"` + Type string `yaml:"type,omitempty"` // "avro" or "proto" + Proto SchemaConfig `yaml:"proto,omitempty"` + Key SchemaConfig `yaml:"key,omitempty"` } -type TopicProtoConfig struct { +type SchemaConfig struct { Type string `yaml:"type,omitempty"` File string `yaml:"file,omitempty"` } diff --git a/internal/decoder.go b/internal/decoder.go index e9eb536..3f05906 100644 --- a/internal/decoder.go +++ b/internal/decoder.go @@ -51,8 +51,11 @@ func NewTopicDecoder(topic string, conf Configuration) (Decoder, error) { return NewProtoDecoder(ProtoConfig{ Includes: conf.Proto.Includes, - File: topicConf.Schema.Proto.File, - Type: topicConf.Schema.Proto.Type, + Key: topicConf.Schema.Key, + Value: SchemaConfig{ + File: topicConf.Schema.Proto.File, + Type: topicConf.Schema.Proto.Type, + }, }) default: return new(StringDecoder), nil @@ -72,13 +75,16 @@ func (d *StringDecoder) Decode(kafkaMsg *sarama.ConsumerMessage) (*Message, erro // NewMessage creates a new Message from a given Kafka message. // The Key and Value are copied into the Message as is (i.e. without decoding). func NewMessage(m *sarama.ConsumerMessage) *Message { + ts := m.Timestamp.UTC() + ts = ts.Truncate(time.Second) + msg := &Message{ Key: m.Key, Value: m.Value, Topic: m.Topic, Partition: m.Partition, Offset: m.Offset, - Timestamp: m.Timestamp, + Timestamp: ts, } msg.Headers = map[string][]string{} diff --git a/internal/encoder.go b/internal/encoder.go index 95e3c2d..55432f0 100644 --- a/internal/encoder.go +++ b/internal/encoder.go @@ -27,8 +27,11 @@ func NewTopicEncoder(topic string, conf Configuration) (Encoder, error) { return NewProtoEncoder(ProtoConfig{ Includes: conf.Proto.Includes, - File: topicConf.Schema.Proto.File, - Type: topicConf.Schema.Proto.Type, + Key: topicConf.Schema.Key, + Value: SchemaConfig{ + File: topicConf.Schema.Proto.File, + Type: topicConf.Schema.Proto.Type, + }, }) default: return new(StringEncoder), nil diff --git a/internal/fetch.go b/internal/fetch.go index 87b9897..c9c5038 100644 --- a/internal/fetch.go +++ b/internal/fetch.go @@ -65,8 +65,7 @@ func FetchMessages(broker *sarama.Broker, topic string, partition int32, offset var messages []*sarama.ConsumerMessage for _, records := range block.RecordsSet { if records.MsgSet != nil { - // For our current Kafka version messages are returned in this - // field but maybe newer versions may use RecordBatch. + // Old Kafka versions return messages in this field and newer versions use RecordBatch instead. messages = append(messages, parseMessages(topic, partition, records.MsgSet)...) isPartial = isPartial || records.MsgSet.PartialTrailingMessage } @@ -78,7 +77,15 @@ func FetchMessages(broker *sarama.Broker, topic string, partition int32, offset } if len(messages) == 0 && isPartial { + if fetchSizeBytes == maxFetchSizeBytes { + return nil, errors.Errorf("received partial message but fetch size is already at maximum") + } + fetchSizeBytes *= 2 + if fetchSizeBytes > maxFetchSizeBytes { + fetchSizeBytes = maxFetchSizeBytes + } + debugLogger.Printf("Received partial response and trying again with bigger fetch size offset=%v new-fetch-size=%d", offset, fetchSizeBytes) return FetchMessages(broker, topic, partition, offset, fetchSizeBytes, debugLogger) } @@ -115,7 +122,7 @@ func fetchOffsetRequest(topic string, partition int32, offset int64, fetchSizeBy // Version relates to the Kafka version of the server. Version 3 // can be used when the Kafka version is >= v0.10.0. - Version: 3, + Version: 5, // Isolation is a feature of a newer Kafka version but I copied this // setting here from the sarama library just in case upgrade one day. diff --git a/internal/proto.go b/internal/proto.go index 6732537..0903868 100644 --- a/internal/proto.go +++ b/internal/proto.go @@ -13,7 +13,8 @@ import ( ) type ProtoDecoder struct { - typ *desc.MessageDescriptor + key *desc.MessageDescriptor + value *desc.MessageDescriptor } type ProtoEncoder struct { @@ -22,8 +23,8 @@ type ProtoEncoder struct { type ProtoConfig struct { Includes []string - File string - Type string + Key SchemaConfig + Value SchemaConfig } func NewProtoDecoder(conf ProtoConfig) (*ProtoDecoder, error) { @@ -31,43 +32,61 @@ func NewProtoDecoder(conf ProtoConfig) (*ProtoDecoder, error) { ImportPaths: conf.Includes, } - dd, err := p.ParseFiles(conf.File) + files := []string{conf.Value.File} + if conf.Key.File != "" { + files = append(files, conf.Key.File) + } + + dd, err := p.ParseFiles(files...) if err != nil { return nil, fmt.Errorf("failed to parse proto: %w", err) } dec := &ProtoDecoder{} for _, descr := range dd { - dec.typ = descr.FindMessage(conf.Type) - if dec.typ != nil { - break + if dec.value == nil { + dec.value = descr.FindMessage(conf.Value.Type) + } + if dec.key == nil { + dec.key = descr.FindMessage(conf.Key.Type) } } - if dec.typ == nil { - return nil, fmt.Errorf("could not find message %q", conf.Type) + if dec.value == nil { + return nil, fmt.Errorf("could not find message %q", conf.Value.Type) } return dec, nil } func (d *ProtoDecoder) Decode(kafkaMsg *sarama.ConsumerMessage) (*Message, error) { - val, err := d.decode(kafkaMsg.Value) + msg := NewMessage(kafkaMsg) + val, err := d.decode(d.value, kafkaMsg.Value) if err != nil { - return nil, err + return nil, fmt.Errorf("value: %w", err) } - msg := NewMessage(kafkaMsg) msg.Value = val + if d.key == nil { + return msg, nil + } + + key, err := d.decode(d.key, kafkaMsg.Key) + if err != nil { + return nil, fmt.Errorf("key: %w", err) + } + + msg.Key = key + return msg, nil } -func (d *ProtoDecoder) decode(value []byte) (json.RawMessage, error) { - protoMsg := dynamic.NewMessage(d.typ) +func (*ProtoDecoder) decode(descriptor *desc.MessageDescriptor, value []byte) (json.RawMessage, error) { + protoMsg := dynamic.NewMessage(descriptor) err := protoMsg.Unmarshal(value) if err != nil { - return nil, fmt.Errorf("failed to decode message as proto %s: %w", d.typ.GetFullyQualifiedName(), err) + return nil, fmt.Errorf("failed to decode message as proto %s: %w", descriptor.GetFullyQualifiedName(), err) } marshaler := &jsonpb.Marshaler{ @@ -90,21 +109,21 @@ func NewProtoEncoder(conf ProtoConfig) (*ProtoEncoder, error) { ImportPaths: conf.Includes, } - dd, err := p.ParseFiles(conf.File) + dd, err := p.ParseFiles(conf.Value.File) if err != nil { return nil, errors.Wrap(err, "failed to parse proto") } enc := &ProtoEncoder{} for _, descr := range dd { - enc.typ = descr.FindMessage(conf.Type) + enc.typ = descr.FindMessage(conf.Value.Type) if enc.typ != nil { break } } if enc.typ == nil { - return nil, errors.Errorf("could not find message %q", conf.Type) + return nil, errors.Errorf("could not find message %q", conf.Value.Type) } return enc, nil From 7721936ad7f07037aeb769e3e8a57db445dd0af2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= Date: Mon, 18 Dec 2023 10:37:46 +0100 Subject: [PATCH 2/2] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b935c09..36a466d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - Support tombstone messages (i.e., messages without payload) +- Support for Protocol buffers encoded Kafka message keys ## [v1.4.0] - 2023-08-21 - Omit empty fields from configuration file