Skip to content

Commit cd43190

Browse files
committed
Add consume command
1 parent d5bad67 commit cd43190

9 files changed

Lines changed: 252 additions & 18 deletions

File tree

cmd/consume/command.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package consume
2+
3+
import (
4+
"log"
5+
6+
"github.com/Shopify/sarama"
7+
"github.com/fgrosse/kafkactl/pkg"
8+
"github.com/spf13/cobra"
9+
)
10+
11+
type command struct {
12+
BaseCommand
13+
*cobra.Command
14+
logger *log.Logger
15+
debug *log.Logger
16+
}
17+
18+
type BaseCommand interface {
19+
Configuration() *pkg.Configuration
20+
SaramaConfig() *sarama.Config
21+
ConnectClient(*sarama.Config) (sarama.Client, error)
22+
}
23+
24+
func Command(base BaseCommand, logger, debug *log.Logger) *cobra.Command {
25+
cmd := &command{
26+
BaseCommand: base,
27+
logger: logger,
28+
debug: debug,
29+
}
30+
31+
cmd.Command = cmd.ConsumeCmd()
32+
return cmd.Command
33+
}

cmd/consume/consume.go

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package consume
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"os"
8+
"strconv"
9+
"sync"
10+
11+
"github.com/Shopify/sarama"
12+
"github.com/fgrosse/cli"
13+
"github.com/fgrosse/kafkactl/pkg"
14+
"github.com/pkg/errors"
15+
"github.com/spf13/cobra"
16+
"github.com/spf13/viper"
17+
)
18+
19+
func (cmd *command) ConsumeCmd() *cobra.Command {
20+
produceCmd := &cobra.Command{
21+
Use: "consume <topic>",
22+
Args: cobra.ExactArgs(1),
23+
Short: "Consume messages from a Kafka topic and print them to stdout",
24+
Long: `Consume messages from a Kafka topic and print them to stdout.
25+
26+
By default, the --output flag is set to "raw" which means that the command will
27+
only print the message values followed by a newline. You can set --output=json
28+
in order to print each consumed message as a JSON object which will contain the
29+
partition and offset information in addition to the message value.
30+
31+
Values will automatically be decoded using the topic schema configuration from the
32+
kafkactl configuration file (e.g. to decode proto messages and print them as JSON).
33+
If no configuration matches the topic name, message values will be assumed to be
34+
unicode strings.
35+
36+
This command will block as long as it is connected to Kafka. You can stop reading
37+
messages by sending SIGINT, SIGQUIT or SIGTERM to the process (e.g. by pressing ctrl+c).
38+
`,
39+
Example: `
40+
# Read and print all messages from "example-topic" without joining a consumer group
41+
kafkactl consume example-topic
42+
43+
# Read messages only from a specific partition
44+
kafkactl consume example-topic --partition=1
45+
46+
# Join the "test" consumer group and print all messages that are assigned to this member
47+
kafkactl consume example-topic --group=test
48+
`,
49+
RunE: func(_ *cobra.Command, args []string) error {
50+
ctx := cli.Context()
51+
topic := args[0]
52+
partition := viper.GetInt32("partition")
53+
offset := viper.GetString("offset")
54+
outputEncoding := viper.GetString("output")
55+
return cmd.consume(ctx, topic, partition, offset, outputEncoding)
56+
},
57+
}
58+
59+
flags := produceCmd.Flags()
60+
flags.Int32("partition", -1, "Kafka topic partition. -1 means all partitions")
61+
flags.String("offset", "newest", `either "oldest", "newest" or an integer`)
62+
flags.StringP("output", "o", "raw", "output format. One of raw|json. See --help output for more information")
63+
// TODO: support joining a consumer group
64+
65+
return produceCmd
66+
}
67+
68+
func (cmd *command) consume(ctx context.Context, topic string, partition int32, offsetStr, outputEncoding string) error {
69+
var offset int64
70+
switch offsetStr {
71+
case "oldest", "first":
72+
offset = sarama.OffsetOldest
73+
case "newest", "last":
74+
offset = sarama.OffsetNewest
75+
default:
76+
n, err := strconv.Atoi(offsetStr)
77+
if err != nil {
78+
return fmt.Errorf("failed to parse --offset as integer: %w", err)
79+
}
80+
offset = int64(n)
81+
}
82+
83+
conf := cmd.Configuration()
84+
dec, err := pkg.NewTopicDecoder(topic, *conf)
85+
if err != nil {
86+
return err
87+
}
88+
89+
messages, err := cmd.simpleConsumer(ctx, topic, partition, offset)
90+
if err != nil {
91+
return err
92+
}
93+
94+
for msg := range messages {
95+
decoded, err := dec.Decode(msg)
96+
if err != nil {
97+
return fmt.Errorf("failed to decode message from Kafka: %w", err)
98+
}
99+
100+
switch outputEncoding {
101+
case "raw":
102+
fmt.Fprintln(os.Stdout, decoded.Value)
103+
case "json":
104+
val, err := json.Marshal(decoded)
105+
if err != nil {
106+
return err
107+
}
108+
fmt.Fprintln(os.Stdout, string(val))
109+
}
110+
111+
}
112+
113+
return nil
114+
}
115+
116+
func (cmd *command) simpleConsumer(ctx context.Context, topic string, partition int32, offset int64) (<-chan *sarama.ConsumerMessage, error) {
117+
conf := cmd.SaramaConfig()
118+
conf.Consumer.Return.Errors = false // TODO
119+
120+
brokers := cmd.Configuration().Brokers()
121+
c, err := sarama.NewConsumer(brokers, conf)
122+
if err != nil {
123+
return nil, fmt.Errorf("failed to create consumer: %w", err)
124+
}
125+
126+
if partition >= 0 {
127+
return cmd.consumeSinglePartition(ctx, c, topic, partition, offset)
128+
}
129+
130+
return cmd.consumeAllPartitions(ctx, c, topic, offset)
131+
}
132+
133+
func (cmd *command) consumeSinglePartition(ctx context.Context, c sarama.Consumer, topic string, partition int32, offset int64) (<-chan *sarama.ConsumerMessage, error) {
134+
con, err := c.ConsumePartition(topic, partition, offset)
135+
if err != nil {
136+
return nil, errors.Wrap(err, "failed to consume topic partition")
137+
}
138+
139+
go func() {
140+
<-ctx.Done()
141+
con.AsyncClose()
142+
}()
143+
144+
cmd.debug.Printf("Consuming topic %q partition %d starting at offset %d", topic, partition, offset)
145+
return con.Messages(), nil
146+
}
147+
148+
func (cmd *command) consumeAllPartitions(ctx context.Context, c sarama.Consumer, topic string, offset int64) (<-chan *sarama.ConsumerMessage, error) {
149+
partitions, err := c.Partitions(topic)
150+
if err != nil {
151+
return nil, errors.Wrap(err, "get partitions")
152+
}
153+
154+
var wg sync.WaitGroup
155+
messages := make(chan *sarama.ConsumerMessage, len(partitions))
156+
157+
for _, partition := range partitions {
158+
con, err := c.ConsumePartition(topic, partition, offset)
159+
if err != nil {
160+
return nil, errors.Wrapf(err, "consume partition %d", partition)
161+
}
162+
163+
output := func(partitionMessages <-chan *sarama.ConsumerMessage) {
164+
defer wg.Done()
165+
for msg := range partitionMessages {
166+
select {
167+
case messages <- msg:
168+
case <-ctx.Done():
169+
return
170+
}
171+
}
172+
}
173+
174+
wg.Add(1)
175+
go output(con.Messages())
176+
177+
// Close this partition consumer when the context is done.
178+
go func() {
179+
<-ctx.Done()
180+
con.AsyncClose()
181+
}()
182+
}
183+
184+
// When all individual partition consumers are done, close the messages channel.
185+
go func() {
186+
wg.Wait()
187+
close(messages)
188+
}()
189+
190+
return messages, nil
191+
}

cmd/get/get_topics.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
type Topic struct {
1919
Name string
2020
Partitions []PartitionMetadata `table:"-"`
21-
Consumers []string `table:"-"`
21+
ConsumerGroups []string `table:"-"`
2222
NumPartitions int32 `json:"-" yaml:"-" table:"PARTITIONS"`
2323
ReplicationFactor int16 `table:"REPLICATION"`
2424
Retention string `table:"RETENTION"`
@@ -210,7 +210,7 @@ func (cmd *command) assignTopicConsumers(admin sarama.ClusterAdmin, topics []Top
210210
continue
211211
}
212212

213-
topic.Consumers = groups
213+
topic.ConsumerGroups = groups
214214
topics[i] = topic
215215
}
216216

cmd/produce/produce.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ func (cmd *command) ProduceCmd() *cobra.Command {
1717
Use: "produce <topic>",
1818
Short: "Read messages from stdin and write them to a Kafka topic",
1919
Args: cobra.ExactArgs(1),
20+
Example: `
21+
# Write each line entered into your terminal as new message to the Kafka topic "example-topic"
22+
kafkactl produce example-topic
23+
24+
# Read newline delimited messages from a file and send them to the Kafka topic "example-topic"
25+
cat example-file | kafkactl produce example-topic
26+
`,
2027
RunE: func(_ *cobra.Command, args []string) error {
2128
ctx := cli.Context()
2229
topic := args[0]

cmd/root.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/Shopify/sarama"
1414
"github.com/fgrosse/kafkactl/cmd/config"
15+
"github.com/fgrosse/kafkactl/cmd/consume"
1516
"github.com/fgrosse/kafkactl/cmd/context"
1617
"github.com/fgrosse/kafkactl/cmd/create"
1718
"github.com/fgrosse/kafkactl/cmd/delete"
@@ -59,6 +60,7 @@ func New() *Kafkactl {
5960
cmd.AddCommand(delete.Command(cmd, logger, debug))
6061
cmd.AddCommand(update.Command(cmd, logger, debug))
6162
cmd.AddCommand(produce.Command(cmd, logger, debug))
63+
cmd.AddCommand(consume.Command(cmd, logger, debug))
6264

6365
cmd.PersistentPreRunE = cmd.initConfig
6466

cmd/update/update_topic.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ Hint: You can use "kafkactl get config --topic <name>" to see the current
2424
`,
2525
Example: `
2626
# Update the topic retention configuration
27-
kafkactl update topic my-topic retention.ms=12000`,
27+
kafkactl update topic "example-topic" retention.ms=12000`,
2828
RunE: func(_ *cobra.Command, args []string) error {
2929
name := args[0]
3030
kvPairs, err := cmd.parseKeyValuePairs(args[1:])

pkg/decoder.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
package pkg
22

33
import (
4-
"encoding/json"
54
"fmt"
65
"os"
76

87
"github.com/Shopify/sarama"
98
)
109

1110
type Message struct {
12-
Topic string `json:"topic"`
13-
Partition int32 `json:"partition"`
14-
Offset int64 `json:"offset"`
15-
Value json.RawMessage `json:"value"`
11+
Topic string `json:"topic"`
12+
Partition int32 `json:"partition"`
13+
Offset int64 `json:"offset"`
14+
Value any `json:"value"`
1615
}
1716

1817
type Decoder interface {
@@ -27,7 +26,7 @@ func NewTopicDecoder(topic string, conf Configuration) (Decoder, error) {
2726

2827
switch {
2928
case topicConf == nil:
30-
return new(RawDecoder), nil
29+
return new(StringDecoder), nil
3130
case topicConf.Schema.Proto.Type != "":
3231
for i, s := range conf.Proto.Includes {
3332
conf.Proto.Includes[i] = os.ExpandEnv(s)
@@ -39,15 +38,16 @@ func NewTopicDecoder(topic string, conf Configuration) (Decoder, error) {
3938
Type: topicConf.Schema.Proto.Type,
4039
})
4140
default:
42-
return new(RawDecoder), nil
41+
return new(StringDecoder), nil
4342
}
4443
}
4544

46-
type RawDecoder struct{}
45+
// The StringDecoder assumes that the values of all consumed messages are unicode strings.
46+
type StringDecoder struct{}
4747

48-
func (d *RawDecoder) Decode(msg *sarama.ConsumerMessage) (*Message, error) {
48+
func (d *StringDecoder) Decode(msg *sarama.ConsumerMessage) (*Message, error) {
4949
return &Message{
50-
Value: msg.Value,
50+
Value: string(msg.Value),
5151
Topic: msg.Topic,
5252
Partition: msg.Partition,
5353
Offset: msg.Offset,

pkg/encoder.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func NewTopicEncoder(topic string, conf Configuration) (Encoder, error) {
1919

2020
switch {
2121
case topicConf == nil:
22-
return new(RawEncoder), nil
22+
return new(StringEncoder), nil
2323
case topicConf.Schema.Proto.Type != "":
2424
for i, s := range conf.Proto.Includes {
2525
conf.Proto.Includes[i] = os.ExpandEnv(s)
@@ -31,12 +31,12 @@ func NewTopicEncoder(topic string, conf Configuration) (Encoder, error) {
3131
Type: topicConf.Schema.Proto.Type,
3232
})
3333
default:
34-
return new(RawEncoder), nil
34+
return new(StringEncoder), nil
3535
}
3636
}
3737

38-
type RawEncoder struct{}
38+
type StringEncoder struct{}
3939

40-
func (e *RawEncoder) Encode(msg string) (sarama.Encoder, error) {
40+
func (e *StringEncoder) Encode(msg string) (sarama.Encoder, error) {
4141
return sarama.StringEncoder(msg), nil
4242
}

pkg/proto.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package pkg
22

33
import (
4+
"encoding/json"
45
"fmt"
56

67
"github.com/Shopify/sarama"
@@ -69,7 +70,7 @@ func (d *ProtoDecoder) Decode(msg *sarama.ConsumerMessage) (*Message, error) {
6970
}
7071

7172
return &Message{
72-
Value: value,
73+
Value: json.RawMessage(value),
7374
Topic: msg.Topic,
7475
Partition: msg.Partition,
7576
Offset: msg.Offset,

0 commit comments

Comments
 (0)