Skip to content

Commit

Permalink
Merge pull request #23 from fgrosse/improvements
Browse files Browse the repository at this point in the history
Improve "get consumer-group" and "get message"
  • Loading branch information
fgrosse authored Dec 18, 2023
2 parents 10db297 + af8adaf commit 913d6de
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 52 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]
- Support tombstone messages (i.e., messages without payload)
- Support for Protocol buffers encoded Kafka message keys
- Support for Protocol buffers encoded Kafka message keys
- Support base 64 output encoding in `kafkactl get message`
- Add `--regex` flag to `kafkactl get group` to filter which consumer groups to show
- Add `--fetch-offsets` flag to `kafkactl get group` to skip fetching consumer group offsets
- Add `--topic` flag to let `kafkactl get group` only fetch offsets for a specific topic

## [v1.4.0] - 2023-08-21
- Omit empty fields from configuration file
Expand Down
126 changes: 81 additions & 45 deletions cmd/get/get_consumer_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package get
import (
"encoding/json"
"fmt"
"regexp"
"sort"
"strings"

Expand All @@ -22,8 +23,8 @@ type ConsumerGroup struct {
CoordinatorAddr string `table:"-"`
Members []GroupMember `table:"-"`
Offsets []GroupOffset `table:"-"`
Clients string `table:"CLIENTS" json:"-" yaml:"-" table:"CLIENTS"`
OffsetsSummary string `table:"OFFSETS" json:"-" yaml:"-"`
Clients string `table:"-" json:"-" yaml:"-"`
OffsetsSummary string `table:"-" json:"-" yaml:"-"`
}

// GroupMember contains information displayed by "kafkactl get consumer".
Expand All @@ -45,7 +46,7 @@ type GroupOffset struct {
}

func (cmd *command) GetConsumerGroupsCmd() *cobra.Command {
return &cobra.Command{
getConsumerGroupsCmd := &cobra.Command{
Use: "consumer-group [name]...",
Args: cobra.MaximumNArgs(1),
Aliases: []string{"consumer-groups", "consumer", "consumers", "group", "groups"},
Expand All @@ -64,27 +65,72 @@ func (cmd *command) GetConsumerGroupsCmd() *cobra.Command {
if len(args) > 0 {
name = args[0]
}

fetchOffsets := viper.GetBool("fetch-offsets")
topicFilter := viper.GetString("topic")

regex := viper.GetString("regex")
encoding := viper.GetString("output")
return cmd.getConsumerGroups(name, encoding)
return cmd.getConsumerGroups(name, regex, fetchOffsets, topicFilter, encoding)
},
}

flags := getConsumerGroupsCmd.Flags()
flags.StringP("regex", "e", "", "only show groups which match this regular expression")
flags.Bool("fetch-offsets", false, `show consumer group topic offsets (slow on large clusters)`)
flags.String("topic", "", `show topic offsets for a specific topic only (faster on large clusters)`)

return getConsumerGroupsCmd
}

func (cmd *command) getConsumerGroups(name, encoding string) error {
func (cmd *command) getConsumerGroups(name, regex string, fetchOffsets bool, topicFilter, encoding string) error {
var rexp *regexp.Regexp
if regex != "" {
var err error
rexp, err = regexp.Compile(regex)
if err != nil {
return fmt.Errorf("failed to compile regular expression: %w", err)
}
}

conf, err := cmd.SaramaConfig()
if err != nil {
return fmt.Errorf("config: %w", err)
}

admin, err := cmd.ConnectAdmin()
if err != nil {
return err
}

defer admin.Close()

client, err := cmd.ConnectClient(conf)
if err != nil {
return err
}

defer client.Close()

var groups []string
if name == "" {
var err error
groups, err = cmd.listGroups()
groups, err = cmd.listGroups(rexp)
if err != nil {
return fmt.Errorf("failed to list all consumer groups: %w", err)
}
} else {
groups = []string{name}
}

groupDescriptions, err := admin.DescribeConsumerGroups(groups)
if err != nil {
return fmt.Errorf("failed to describe consumer groups: %w", err)
}

var result []ConsumerGroup
for _, group := range groups {
g, err := cmd.getConsumerGroup(group)
g, err := cmd.getConsumerGroup(client, group, groupDescriptions, fetchOffsets, topicFilter)
if err != nil {
return err
}
Expand All @@ -94,7 +140,7 @@ func (cmd *command) getConsumerGroups(name, encoding string) error {
return cli.Print(encoding, result)
}

func (cmd *command) listGroups() ([]string, error) {
func (cmd *command) listGroups(regex *regexp.Regexp) ([]string, error) {
admin, err := cmd.ConnectAdmin()
if err != nil {
return nil, err
Expand All @@ -109,28 +155,21 @@ func (cmd *command) listGroups() ([]string, error) {

var groups []string
for name := range resp {
if regex != nil && !regex.MatchString(name) {
continue
}

groups = append(groups, name)
}

sort.Strings(groups)
return groups, nil
}

func (cmd *command) getConsumerGroup(groupID string) (ConsumerGroup, error) {
func (cmd *command) getConsumerGroup(client sarama.Client, groupID string, groupDescriptions []*sarama.GroupDescription, fetchOffsets bool, topicFilter string) (ConsumerGroup, error) {
var description ConsumerGroup

conf, err := cmd.SaramaConfig()
if err != nil {
return description, fmt.Errorf("config: %w", err)
}

client, err := cmd.ConnectClient(conf)
if err != nil {
return description, err
}

defer client.Close()

cmd.debug.Printf("Determining coordinator for group %q", groupID)
coordinator, err := client.Coordinator(groupID)
if err != nil {
return description, fmt.Errorf("failed to determine consumer group coordinator: %w", err)
Expand All @@ -139,28 +178,16 @@ func (cmd *command) getConsumerGroup(groupID string) (ConsumerGroup, error) {
description.CoordinatorID = coordinator.ID()
description.CoordinatorAddr = coordinator.Addr()

cmd.debug.Printf("Retrieving consumer meta data for group %q", groupID)
metadataReq := &sarama.ConsumerMetadataRequest{ConsumerGroup: groupID}
metadataResp, err := coordinator.GetConsumerMetadata(metadataReq)
if err != nil {
return description, fmt.Errorf("failed to get consumer meta data: %w", err)
}
if metadataResp.Err != sarama.ErrNoError {
return description, metadataResp.Err
var g *sarama.GroupDescription
for _, descr := range groupDescriptions {
if descr.GroupId == groupID {
g = descr
break
}
}

cmd.debug.Printf("Fetching group description for %q", groupID)
describeReq := &sarama.DescribeGroupsRequest{Groups: []string{groupID}}
describeResp, err := coordinator.DescribeGroups(describeReq)
if err != nil {
return description, fmt.Errorf("failed to get group description: %w", err)
}
if len(describeResp.Groups) != 1 {
return description, fmt.Errorf("unexpected number of groups in Kafka response: want 1 but got %d", len(describeResp.Groups))
}
g := describeResp.Groups[0]
if g.Err != sarama.ErrNoError {
return description, fmt.Errorf("group description error: %w", g.Err)
if g == nil {
return description, fmt.Errorf("missing group description for group %q", groupID)
}

description.GroupID = g.GroupId
Expand All @@ -180,25 +207,34 @@ func (cmd *command) getConsumerGroup(groupID string) (ConsumerGroup, error) {
cmd.logger.Printf("ERROR: invalid member meta data in client %q: %s", id, err)
continue
}
if meta == nil {
cmd.logger.Printf("ERROR: member metadata is nil for group %q", groupID)
continue
}

for _, t := range meta.Topics {
if isInternalTopic(t) {
continue
}

mem.Topics = append(mem.Topics, t)
mem.UserData = meta.UserData
// mem.UserData = meta.UserData
}

description.Members = append(description.Members, mem)
description.Clients += mem.ClientID + ","
}
description.Clients = strings.Trim(description.Clients, ",")

if !fetchOffsets {
return description, nil
}

var topics []string
if t := viper.GetString("topic"); t != "" {
topics = []string{t}
if topicFilter != "" {
topics = []string{topicFilter}
} else {
// TODO: only fetch topics consumed by this group
cmd.debug.Println("Fetching all topics and partitions")
topics, err = client.Topics()
if err != nil {
Expand All @@ -214,7 +250,7 @@ func (cmd *command) getConsumerGroup(groupID string) (ConsumerGroup, error) {
}
}

cmd.debug.Println("Fetching topic offsets")
cmd.debug.Println("Fetching topic offsets") // TODO: possible in a single request?
partitionOffsets := map[string]map[int32]int64{}
for topic, partitions := range topicPartitions {
topicOffsets := map[int32]int64{}
Expand Down
25 changes: 19 additions & 6 deletions cmd/get/get_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package get

import (
"context"
"encoding/base64"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -41,7 +42,7 @@ func (cmd *command) GetMessageCmd() *cobra.Command {
flags.Int32("partition", 0, "Kafka topic partition")

// change default for --output flag
flags.StringP("output", "o", "json", "Output format. One of json|raw")
flags.StringP("output", "o", "json", "Output format. One of json|raw|base64")

_ = getMessageCmd.MarkFlagRequired("offset")
_ = getMessageCmd.MarkFlagRequired("topic")
Expand All @@ -50,7 +51,7 @@ func (cmd *command) GetMessageCmd() *cobra.Command {
}

func (cmd *command) getMessage(ctx context.Context, offset, topic string, partition int32, encoding string) error {
if encoding != "json" && encoding != "raw" {
if encoding != "json" && encoding != "raw" && encoding != "base64" {
return errors.New("only JSON and raw output are supported by this sub command")
}

Expand All @@ -72,17 +73,29 @@ func (cmd *command) getMessage(ctx context.Context, offset, topic string, partit

msg, err := cmd.fetchMessageForOffset(topic, partition, int64(offset))
if err != nil {
return err
return fmt.Errorf("fetch message: %w", err)
}

if encoding == "raw" {
switch encoding {
case "raw":
_, err := os.Stdout.Write(msg.Value)
return err
fmt.Fprintln(os.Stdout)
if err != nil {
return fmt.Errorf("write: %w", err)
}
return nil
case "base64":
_, err := base64.NewEncoder(base64.StdEncoding, os.Stdout).Write(msg.Value)
fmt.Fprintln(os.Stdout)
if err != nil {
return fmt.Errorf("write: %w", err)
}
return nil
}

decoded, err := dec.Decode(msg)
if err != nil {
return err
return fmt.Errorf("decode: %w", err)
}

return cli.Print(encoding, decoded)
Expand Down

0 comments on commit 913d6de

Please sign in to comment.