Skip to content

Commit

Permalink
Merge pull request #22 from fgrosse/proto-keys
Browse files Browse the repository at this point in the history
Support protobuf encoded keys
  • Loading branch information
fgrosse authored Dec 18, 2023
2 parents f5bf241 + 7721936 commit 10db297
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]
- Support tombstone messages (i.e., messages without payload)
- Support for Protocol buffers encoded Kafka message keys

## [v1.4.0] - 2023-08-21
- Omit empty fields from configuration file
Expand Down
7 changes: 4 additions & 3 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ type TopicConfig struct {
}

type TopicSchemaConfig struct {
Type string `yaml:"type"` // "avro" or "proto"
Proto TopicProtoConfig `yaml:"proto,omitempty"`
Type string `yaml:"type,omitempty"` // "avro" or "proto"
Proto SchemaConfig `yaml:"proto,omitempty"`
Key SchemaConfig `yaml:"key,omitempty"`
}

type TopicProtoConfig struct {
type SchemaConfig struct {
Type string `yaml:"type,omitempty"`
File string `yaml:"file,omitempty"`
}
Expand Down
12 changes: 9 additions & 3 deletions internal/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ func NewTopicDecoder(topic string, conf Configuration) (Decoder, error) {

return NewProtoDecoder(ProtoConfig{
Includes: conf.Proto.Includes,
File: topicConf.Schema.Proto.File,
Type: topicConf.Schema.Proto.Type,
Key: topicConf.Schema.Key,
Value: SchemaConfig{
File: topicConf.Schema.Proto.File,
Type: topicConf.Schema.Proto.Type,
},
})
default:
return new(StringDecoder), nil
Expand All @@ -72,13 +75,16 @@ func (d *StringDecoder) Decode(kafkaMsg *sarama.ConsumerMessage) (*Message, erro
// NewMessage creates a new Message from a given Kafka message.
// The Key and Value are copied into the Message as is (i.e. without decoding).
func NewMessage(m *sarama.ConsumerMessage) *Message {
ts := m.Timestamp.UTC()
ts = ts.Truncate(time.Second)

msg := &Message{
Key: m.Key,
Value: m.Value,
Topic: m.Topic,
Partition: m.Partition,
Offset: m.Offset,
Timestamp: m.Timestamp,
Timestamp: ts,
}

msg.Headers = map[string][]string{}
Expand Down
7 changes: 5 additions & 2 deletions internal/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ func NewTopicEncoder(topic string, conf Configuration) (Encoder, error) {

return NewProtoEncoder(ProtoConfig{
Includes: conf.Proto.Includes,
File: topicConf.Schema.Proto.File,
Type: topicConf.Schema.Proto.Type,
Key: topicConf.Schema.Key,
Value: SchemaConfig{
File: topicConf.Schema.Proto.File,
Type: topicConf.Schema.Proto.Type,
},
})
default:
return new(StringEncoder), nil
Expand Down
13 changes: 10 additions & 3 deletions internal/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ func FetchMessages(broker *sarama.Broker, topic string, partition int32, offset
var messages []*sarama.ConsumerMessage
for _, records := range block.RecordsSet {
if records.MsgSet != nil {
// For our current Kafka version messages are returned in this
// field but maybe newer versions may use RecordBatch.
// Old Kafka versions return messages in this field and newer versions use RecordBatch instead.
messages = append(messages, parseMessages(topic, partition, records.MsgSet)...)
isPartial = isPartial || records.MsgSet.PartialTrailingMessage
}
Expand All @@ -78,7 +77,15 @@ func FetchMessages(broker *sarama.Broker, topic string, partition int32, offset
}

if len(messages) == 0 && isPartial {
if fetchSizeBytes == maxFetchSizeBytes {
return nil, errors.Errorf("received partial message but fetch size is already at maximum")
}

fetchSizeBytes *= 2
if fetchSizeBytes > maxFetchSizeBytes {
fetchSizeBytes = maxFetchSizeBytes
}

debugLogger.Printf("Received partial response and trying again with bigger fetch size offset=%v new-fetch-size=%d", offset, fetchSizeBytes)
return FetchMessages(broker, topic, partition, offset, fetchSizeBytes, debugLogger)
}
Expand Down Expand Up @@ -115,7 +122,7 @@ func fetchOffsetRequest(topic string, partition int32, offset int64, fetchSizeBy

// Version relates to the Kafka version of the server. Version 3
// can be used when the Kafka version is >= v0.10.0.
Version: 3,
Version: 5,

// Isolation is a feature of a newer Kafka version but I copied this
// setting here from the sarama library just in case upgrade one day.
Expand Down
55 changes: 37 additions & 18 deletions internal/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
)

type ProtoDecoder struct {
typ *desc.MessageDescriptor
key *desc.MessageDescriptor
value *desc.MessageDescriptor
}

type ProtoEncoder struct {
Expand All @@ -22,52 +23,70 @@ type ProtoEncoder struct {

type ProtoConfig struct {
Includes []string
File string
Type string
Key SchemaConfig
Value SchemaConfig
}

func NewProtoDecoder(conf ProtoConfig) (*ProtoDecoder, error) {
p := protoparse.Parser{
ImportPaths: conf.Includes,
}

dd, err := p.ParseFiles(conf.File)
files := []string{conf.Value.File}
if conf.Key.File != "" {
files = append(files, conf.Key.File)
}

dd, err := p.ParseFiles(files...)
if err != nil {
return nil, fmt.Errorf("failed to parse proto: %w", err)
}

dec := &ProtoDecoder{}
for _, descr := range dd {
dec.typ = descr.FindMessage(conf.Type)
if dec.typ != nil {
break
if dec.value == nil {
dec.value = descr.FindMessage(conf.Value.Type)
}
if dec.key == nil {
dec.key = descr.FindMessage(conf.Key.Type)
}
}

if dec.typ == nil {
return nil, fmt.Errorf("could not find message %q", conf.Type)
if dec.value == nil {
return nil, fmt.Errorf("could not find message %q", conf.Value.Type)
}

return dec, nil
}

func (d *ProtoDecoder) Decode(kafkaMsg *sarama.ConsumerMessage) (*Message, error) {
val, err := d.decode(kafkaMsg.Value)
msg := NewMessage(kafkaMsg)
val, err := d.decode(d.value, kafkaMsg.Value)
if err != nil {
return nil, err
return nil, fmt.Errorf("value: %w", err)
}

msg := NewMessage(kafkaMsg)
msg.Value = val

if d.key == nil {
return msg, nil
}

key, err := d.decode(d.key, kafkaMsg.Key)
if err != nil {
return nil, fmt.Errorf("key: %w", err)
}

msg.Key = key

return msg, nil
}

func (d *ProtoDecoder) decode(value []byte) (json.RawMessage, error) {
protoMsg := dynamic.NewMessage(d.typ)
func (*ProtoDecoder) decode(descriptor *desc.MessageDescriptor, value []byte) (json.RawMessage, error) {
protoMsg := dynamic.NewMessage(descriptor)
err := protoMsg.Unmarshal(value)
if err != nil {
return nil, fmt.Errorf("failed to decode message as proto %s: %w", d.typ.GetFullyQualifiedName(), err)
return nil, fmt.Errorf("failed to decode message as proto %s: %w", descriptor.GetFullyQualifiedName(), err)
}

marshaler := &jsonpb.Marshaler{
Expand All @@ -90,21 +109,21 @@ func NewProtoEncoder(conf ProtoConfig) (*ProtoEncoder, error) {
ImportPaths: conf.Includes,
}

dd, err := p.ParseFiles(conf.File)
dd, err := p.ParseFiles(conf.Value.File)
if err != nil {
return nil, errors.Wrap(err, "failed to parse proto")
}

enc := &ProtoEncoder{}
for _, descr := range dd {
enc.typ = descr.FindMessage(conf.Type)
enc.typ = descr.FindMessage(conf.Value.Type)
if enc.typ != nil {
break
}
}

if enc.typ == nil {
return nil, errors.Errorf("could not find message %q", conf.Type)
return nil, errors.Errorf("could not find message %q", conf.Value.Type)
}

return enc, nil
Expand Down

0 comments on commit 10db297

Please sign in to comment.