Skip to content

Commit 1f2079e

Browse files
authored
feat(kafka): Upgrade to SDK v4 (#11910)
1 parent aae76f9 commit 1f2079e

File tree

12 files changed

+159
-158
lines changed

12 files changed

+159
-158
lines changed

plugins/destination/kafka/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# CloudQuery Kafka Destination Plugin
22

3-
This destination plugin lets you sync data from a CloudQuery source to a PostgreSQL compatible database.
3+
This destination plugin lets you sync data from a CloudQuery source to Kafka brokers.
44

55
## Links
66

plugins/destination/kafka/client/client.go

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,54 +3,53 @@ package client
33
import (
44
"context"
55
"crypto/tls"
6+
"encoding/json"
67
"fmt"
78
"time"
89

9-
"github.com/cloudquery/filetypes/v3"
10-
"github.com/cloudquery/plugin-pb-go/specs"
11-
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
12-
1310
"github.com/Shopify/sarama"
11+
"github.com/cloudquery/filetypes/v4"
12+
"github.com/cloudquery/plugin-sdk/v4/plugin"
1413
"github.com/rs/zerolog"
1514
)
1615

1716
type Client struct {
18-
destination.UnimplementedManagedWriter
17+
plugin.UnimplementedSource
1918

2019
conf *sarama.Config
2120
producer sarama.SyncProducer
2221

23-
logger zerolog.Logger
24-
spec specs.Destination
25-
pluginSpec Spec
26-
metrics destination.Metrics
22+
logger zerolog.Logger
23+
spec *Spec
2724

2825
*filetypes.Client
2926
}
3027

31-
func New(ctx context.Context, logger zerolog.Logger, spec specs.Destination) (destination.Client, error) {
32-
if spec.WriteMode != specs.WriteModeAppend {
33-
return nil, fmt.Errorf("destination only supports append mode")
34-
}
28+
func New(_ context.Context, logger zerolog.Logger, spec []byte, opts plugin.NewClientOptions) (plugin.Client, error) {
3529
c := &Client{
3630
logger: logger.With().Str("module", "dest-kafka").Logger(),
3731
}
32+
if opts.NoConnection {
33+
return c, nil
34+
}
3835

39-
c.spec = spec
40-
if err := spec.UnmarshalSpec(&c.pluginSpec); err != nil {
36+
if err := json.Unmarshal(spec, &c.spec); err != nil {
4137
return nil, fmt.Errorf("failed to unmarshal spec: %w", err)
4238
}
43-
c.pluginSpec.SetDefaults()
44-
if err := c.pluginSpec.Validate(); err != nil {
39+
if err := c.spec.Validate(); err != nil {
40+
return nil, err
41+
}
42+
c.spec.SetDefaults()
43+
if err := c.spec.Validate(); err != nil {
4544
return nil, err
4645
}
47-
if c.pluginSpec.Verbose {
46+
if c.spec.Verbose {
4847
sarama.Logger = NewSaramaLoggerAdapter(logger)
4948
}
5049

5150
c.conf = sarama.NewConfig()
52-
if c.pluginSpec.MaxMetadataRetries != 0 {
53-
c.conf.Metadata.Retry.Max = c.pluginSpec.MaxMetadataRetries
51+
if c.spec.MaxMetadataRetries != 0 {
52+
c.conf.Metadata.Retry.Max = c.spec.MaxMetadataRetries
5453
}
5554
c.conf.Metadata.Retry.Backoff = time.Millisecond * 500
5655
c.conf.Producer.Retry.Max = 1
@@ -59,24 +58,24 @@ func New(ctx context.Context, logger zerolog.Logger, spec specs.Destination) (de
5958
c.conf.Metadata.Full = true
6059
c.conf.Version = sarama.V1_0_0_0
6160
c.conf.Metadata.Full = true
62-
c.conf.ClientID = "cq-destination-kafka-" + c.spec.Name
61+
c.conf.ClientID = c.spec.ClientID
6362

64-
if c.pluginSpec.SaslUsername != "" {
63+
if c.spec.SaslUsername != "" {
6564
c.conf.Net.SASL.Enable = true
66-
c.conf.Net.SASL.User = c.pluginSpec.SaslUsername
67-
c.conf.Net.SASL.Password = c.pluginSpec.SaslPassword
65+
c.conf.Net.SASL.User = c.spec.SaslUsername
66+
c.conf.Net.SASL.Password = c.spec.SaslPassword
6867
c.conf.Net.TLS.Enable = true
6968
c.conf.Net.TLS.Config = &tls.Config{InsecureSkipVerify: true}
7069
c.conf.Net.SASL.Handshake = true
7170
}
7271

7372
var err error
74-
c.producer, err = sarama.NewSyncProducer(c.pluginSpec.Brokers, c.conf)
73+
c.producer, err = sarama.NewSyncProducer(c.spec.Brokers, c.conf)
7574
if err != nil {
7675
return nil, err
7776
}
7877

79-
filetypesClient, err := filetypes.NewClient(c.pluginSpec.FileSpec)
78+
filetypesClient, err := filetypes.NewClient(c.spec.FileSpec)
8079
if err != nil {
8180
return nil, fmt.Errorf("failed to create filetypes client: %w", err)
8281
}
@@ -85,6 +84,6 @@ func New(ctx context.Context, logger zerolog.Logger, spec specs.Destination) (de
8584
return c, nil
8685
}
8786

88-
func (c *Client) Close(ctx context.Context) error {
87+
func (c *Client) Close(_ context.Context) error {
8988
return c.producer.Close()
9089
}
Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package client
22

33
import (
4+
"context"
5+
"encoding/json"
46
"os"
57
"strings"
68
"testing"
79

8-
"github.com/cloudquery/filetypes/v3"
9-
"github.com/cloudquery/plugin-pb-go/specs"
10-
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
10+
"github.com/cloudquery/filetypes/v4"
11+
"github.com/cloudquery/plugin-sdk/v4/plugin"
1112
)
1213

1314
const (
@@ -22,29 +23,31 @@ func getenv(key, fallback string) string {
2223
return value
2324
}
2425

25-
func TestPgPlugin(t *testing.T) {
26-
destination.PluginTestSuiteRunner(t,
27-
func() *destination.Plugin {
28-
return destination.NewPlugin("kafka", "development", New)
26+
func TestPlugin(t *testing.T) {
27+
ctx := context.Background()
28+
p := plugin.NewPlugin("kafka", "development", New)
29+
b, err := json.Marshal(&Spec{
30+
Brokers: strings.Split(getenv("CQ_DEST_KAFKA_CONNECTION_STRING", defaultConnectionString), ","),
31+
SaslUsername: getenv("CQ_DEST_KAFKA_SASL_USERNAME", ""),
32+
SaslPassword: getenv("CQ_DEST_KAFKA_SASL_PASSWORD", ""),
33+
Verbose: true,
34+
MaxMetadataRetries: 15,
35+
FileSpec: &filetypes.FileSpec{
36+
Format: filetypes.FormatTypeJSON,
2937
},
30-
specs.Destination{
31-
Spec: &Spec{
32-
Brokers: strings.Split(getenv("CQ_DEST_KAFKA_CONNECTION_STRING", defaultConnectionString), ","),
33-
SaslUsername: getenv("CQ_DEST_KAFKA_SASL_USERNAME", ""),
34-
SaslPassword: getenv("CQ_DEST_KAFKA_SASL_PASSWORD", ""),
35-
Verbose: true,
36-
MaxMetadataRetries: 15,
37-
FileSpec: &filetypes.FileSpec{
38-
Format: filetypes.FormatTypeJSON,
39-
},
40-
},
41-
},
42-
destination.PluginTestSuiteTests{
43-
SkipOverwrite: true,
44-
SkipMigrateAppend: true,
45-
SkipMigrateOverwrite: true,
46-
SkipMigrateOverwriteForce: true,
47-
SkipMigrateAppendForce: true,
38+
})
39+
if err != nil {
40+
t.Fatal(err)
41+
}
42+
if err := p.Init(ctx, b, plugin.NewClientOptions{}); err != nil {
43+
t.Fatal(err)
44+
}
45+
plugin.TestWriterSuiteRunner(t,
46+
p,
47+
plugin.WriterTestSuiteTests{
48+
SkipUpsert: true,
49+
SkipMigrate: true,
50+
SkipDeleteStale: true,
4851
},
4952
)
5053
}

plugins/destination/kafka/client/deletestale.go

Lines changed: 0 additions & 13 deletions
This file was deleted.

plugins/destination/kafka/client/metrics.go

Lines changed: 0 additions & 9 deletions
This file was deleted.

plugins/destination/kafka/client/migrate.go

Lines changed: 0 additions & 12 deletions
This file was deleted.

plugins/destination/kafka/client/read.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ import (
77

88
"github.com/Shopify/sarama"
99
"github.com/apache/arrow/go/v13/arrow"
10-
"github.com/cloudquery/plugin-sdk/v3/schema"
10+
"github.com/cloudquery/plugin-sdk/v4/schema"
1111
)
1212

1313
const (
1414
maxWaitTime = 3 * time.Second
1515
)
1616

17-
func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- arrow.Record) error {
18-
consumer, err := sarama.NewConsumer(c.pluginSpec.Brokers, c.conf)
17+
func (c *Client) Read(ctx context.Context, table *schema.Table, res chan<- arrow.Record) error {
18+
consumer, err := sarama.NewConsumer(c.spec.Brokers, c.conf)
1919
if err != nil {
2020
return err
2121
}
@@ -30,7 +30,7 @@ func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName strin
3030
case <-ctx.Done():
3131
return ctx.Err()
3232
case msg := <-partitionConsumer.Messages():
33-
if err := c.Client.Read(bytes.NewReader(msg.Value), table, sourceName, res); err != nil {
33+
if err := c.Client.Read(bytes.NewReader(msg.Value), table, res); err != nil {
3434
return err
3535
}
3636
case err := <-partitionConsumer.Errors():

plugins/destination/kafka/client/spec.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package client
33
import (
44
"fmt"
55

6-
"github.com/cloudquery/filetypes/v3"
6+
"github.com/cloudquery/filetypes/v4"
77
)
88

99
type Spec struct {
@@ -14,15 +14,29 @@ type Spec struct {
1414
// This is currently only used for testing to wait for
1515
// kafka cluster to be ready in GitHub actions.
1616
MaxMetadataRetries int `json:"max_metadata_retries,omitempty"`
17+
18+
ClientID string `json:"client_id,omitempty"`
19+
1720
*filetypes.FileSpec
21+
22+
BatchSize int `json:"batch_size"`
1823
}
1924

2025
func (s *Spec) SetDefaults() {
2126
if s.FileSpec == nil {
2227
s.FileSpec = &filetypes.FileSpec{}
2328
}
2429
s.FileSpec.SetDefaults()
30+
31+
if s.ClientID == "" {
32+
s.ClientID = "cq-destination-kafka"
33+
}
34+
35+
if s.BatchSize == 0 {
36+
s.BatchSize = 1000
37+
}
2538
}
39+
2640
func (s *Spec) Validate() error {
2741
if len(s.Brokers) == 0 {
2842
return fmt.Errorf("at least one broker is required")

0 commit comments

Comments
 (0)