Skip to content

Commit e05fbc5

Browse files
committed
Implement get message command
1 parent 781cac7 commit e05fbc5

9 files changed

Lines changed: 492 additions & 9 deletions

File tree

cmd/cmd_get.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ kafkactl get brokers --output=json`,
2020

2121
getCmd.AddCommand(cmd.GetBrokersCmd())
2222
getCmd.AddCommand(cmd.GetTopicsCmd())
23+
getCmd.AddCommand(cmd.GetMessageCmd())
2324
getCmd.AddCommand(cmd.GetConsumerGroupsCmd())
2425
getCmd.AddCommand(cmd.GetConfigCmd())
2526

cmd/cmd_get_message.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package cmd
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"os"
8+
"strconv"
9+
10+
"github.com/Shopify/sarama"
11+
"github.com/fgrosse/cli"
12+
"github.com/spf13/cobra"
13+
"github.com/spf13/viper"
14+
)
15+
16+
func (cmd *Kafkactl) GetMessageCmd() *cobra.Command {
17+
getMessageCmd := &cobra.Command{
18+
Use: "message --topic=foo --offset=123",
19+
Args: cobra.NoArgs,
20+
Short: "Consume messages from a Kafka cluster",
21+
Example: `
22+
# Print message with offset 81041238 from topic my-fancy-topic
23+
kafkactl get message --topic=my-fancy-topic --offset=81041238
24+
25+
# Read offsets from std in and print all corresponding messages
26+
kubectl logs -l app=my-app | jq 'select(…) | .offset' | kafkactl get message --offset=- --topic=my-fancy-topic
27+
`,
28+
RunE: func(_ *cobra.Command, args []string) error {
29+
ctx := cli.Context()
30+
offset := viper.GetString("offset")
31+
topic := viper.GetString("topic")
32+
partition := viper.GetInt32("partition")
33+
encoding := viper.GetString("output")
34+
return cmd.getMessage(ctx, offset, topic, partition, encoding)
35+
},
36+
}
37+
38+
flags := getMessageCmd.Flags()
39+
flags.String("offset", "", `The Kafka offset that should be fetched. Can either be a number or the string "-" to read numbers from stdin (newline delimited)`)
40+
flags.String("topic", "", "Kafka topic")
41+
flags.Int32("partition", 0, "Kafka topic partition")
42+
43+
// change default for --output flag
44+
flags.StringP("output", "o", "json", "Output format. One of json|raw")
45+
46+
_ = getMessageCmd.MarkFlagRequired("offset")
47+
_ = getMessageCmd.MarkFlagRequired("topic")
48+
49+
return getMessageCmd
50+
}
51+
52+
func (cmd *Kafkactl) getMessage(ctx context.Context, offset, topic string, partition int32, encoding string) error {
53+
if encoding != "json" && encoding != "raw" {
54+
return errors.New("only JSON and raw output are supported by this sub command")
55+
}
56+
57+
if topic == "" {
58+
return errors.New("empty topic flag")
59+
}
60+
61+
dec, err := cmd.topicDecoder(topic)
62+
if err != nil {
63+
return err
64+
}
65+
66+
printMessage := func(offsetStr string) error {
67+
offset, err := strconv.Atoi(offsetStr)
68+
if err != nil {
69+
return fmt.Errorf("failed to parse offset from stdin: %w", err)
70+
}
71+
72+
msg, err := cmd.fetchMessageForOffset(topic, partition, int64(offset))
73+
if err != nil {
74+
return err
75+
}
76+
77+
if encoding == "raw" {
78+
_, err := os.Stdout.Write(msg.Value)
79+
return err
80+
}
81+
82+
decoded, err := dec.Decode(msg)
83+
if err != nil {
84+
return err
85+
}
86+
87+
return cli.Print(encoding, decoded)
88+
}
89+
90+
if offset == "-" {
91+
for line := range cli.ReadLines(ctx) {
92+
err := printMessage(line)
93+
if err != nil {
94+
return err
95+
}
96+
}
97+
return nil
98+
}
99+
100+
return printMessage(offset)
101+
}
102+
103+
func (cmd *Kafkactl) fetchMessageForOffset(topic string, partition int32, offset int64) (*sarama.ConsumerMessage, error) {
104+
conf := cmd.saramaConfig()
105+
conf.Metadata.Full = false // we are only interested in very specific topics
106+
conf.Producer.Return.Successes = true
107+
108+
client, err := cmd.connectClientWithConfig(conf)
109+
if err != nil {
110+
return nil, err
111+
}
112+
113+
defer client.Close()
114+
115+
broker, err := client.Leader(topic, partition)
116+
if err != nil {
117+
return nil, fmt.Errorf("failed to determine leader for partition %d of topic %q: %w", partition, topic, err)
118+
}
119+
120+
return cmd.FetchMessage(broker, topic, partition, offset)
121+
}

cmd/config.go

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,46 @@ import (
66
"net/url"
77
"os"
88
"path/filepath"
9+
"regexp"
910

11+
"github.com/pkg/errors"
1012
"github.com/spf13/viper"
1113
"gopkg.in/yaml.v3"
1214
)
1315

1416
type Configuration struct {
15-
APIVersion string `yaml:"api_version"`
16-
Contexts []ContextConfiguration `yaml:"contexts"`
17-
17+
APIVersion string `yaml:"api_version"`
1818
CurrentContext string `yaml:"current_context"`
1919
PreviousContext string `yaml:"previous_context"`
20+
21+
Contexts []ContextConfiguration `yaml:"contexts"`
22+
Topics []*TopicConfig `yaml:"topics"`
23+
Proto GlobalProtoDecoderConfig `yaml:"proto"`
2024
}
2125

2226
type ContextConfiguration struct {
2327
Name string
2428
Brokers []string
2529
}
2630

31+
type TopicConfig struct {
32+
Name string
33+
Decode TopicDecoderConfig
34+
}
35+
36+
type TopicDecoderConfig struct {
37+
Proto TopicProtoDecoderConfig
38+
}
39+
40+
type TopicProtoDecoderConfig struct {
41+
Type string
42+
File string
43+
}
44+
45+
type GlobalProtoDecoderConfig struct {
46+
Includes []string
47+
}
48+
2749
func NewConfiguration() Configuration {
2850
return Configuration{APIVersion: "v1"}
2951
}
@@ -254,6 +276,20 @@ func (conf *Configuration) Brokers() []string {
254276
return brokers
255277
}
256278

279+
func (conf *Configuration) TopicConfig(topic string) (*TopicConfig, error) {
280+
for i, topicConf := range conf.Topics {
281+
re, err := regexp.Compile("^" + topicConf.Name + "$")
282+
if err != nil {
283+
return nil, errors.Errorf(`topic configuration "name" field of topic %d is no valid regular expression: %v`, i, err)
284+
}
285+
if re.MatchString(topic) {
286+
return topicConf, nil
287+
}
288+
}
289+
290+
return nil, nil
291+
}
292+
257293
func ensurePort(addr string) string {
258294
u, err := url.Parse("kafka://" + addr)
259295
if err == nil && u.Port() == "" {

cmd/decoder.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package cmd
2+
3+
import (
4+
"encoding/json"
5+
"os"
6+
7+
"github.com/Shopify/sarama"
8+
"github.com/pkg/errors"
9+
)
10+
11+
type Message struct {
12+
Topic string `json:"topic"`
13+
Partition int32 `json:"partition"`
14+
Offset int64 `json:"offset"`
15+
Value json.RawMessage `json:"value"`
16+
}
17+
18+
type Decoder interface {
19+
Decode(*sarama.ConsumerMessage) (*Message, error)
20+
}
21+
22+
type RawDecoder struct{}
23+
24+
func (cmd *Kafkactl) topicDecoder(topic string) (Decoder, error) {
25+
topicConf, err := cmd.conf.TopicConfig(topic)
26+
if err != nil {
27+
return nil, errors.Wrap(err, "failed to load topic configuration")
28+
}
29+
30+
switch {
31+
case topicConf == nil:
32+
return new(RawDecoder), nil
33+
case topicConf.Decode.Proto.Type != "":
34+
for i, s := range cmd.conf.Proto.Includes {
35+
cmd.conf.Proto.Includes[i] = os.ExpandEnv(s)
36+
}
37+
38+
return NewProtoDecoder(ProtoDecoderConfig{
39+
Includes: cmd.conf.Proto.Includes,
40+
File: topicConf.Decode.Proto.File,
41+
Type: topicConf.Decode.Proto.Type,
42+
})
43+
default:
44+
return new(RawDecoder), nil
45+
}
46+
}
47+
48+
func (d *RawDecoder) Decode(msg *sarama.ConsumerMessage) (*Message, error) {
49+
return &Message{
50+
Value: msg.Value,
51+
Topic: msg.Topic,
52+
Partition: msg.Partition,
53+
Offset: msg.Offset,
54+
}, nil
55+
}

0 commit comments

Comments
 (0)