Skip to content

Commit 1e8d0f8

Browse files
authored
feat(mysql)!: Update to SDK V4 (#11696)
#### Summary This is based on the MemDB plugin from cloudquery/plugin-sdk#915 Fixes #11755 BEGIN_COMMIT_OVERRIDE feat!: Upgrades the mysql source plugin to use plugin-sdk v4. This version does not contain any user-facing breaking changes, but because it is now using CloudQuery gRPC protocol v3, it does require use of a destination plugin that also supports protocol v3. All recent destination plugin versions support this. END_COMMIT_OVERRIDE <!--
1 parent 56aba8b commit 1e8d0f8

File tree

12 files changed

+143
-175
lines changed

12 files changed

+143
-175
lines changed

.github/workflows/source_mysql.yml

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,6 @@ jobs:
2424
defaults:
2525
run:
2626
working-directory: ./plugins/source/mysql
27-
services:
28-
postgres:
29-
image: mysql:5.7
30-
env:
31-
MYSQL_ROOT_PASSWORD: test
32-
MYSQL_DATABASE: cloudquery
33-
ports:
34-
- 3306:3306
35-
options: >-
36-
--health-cmd "mysqladmin ping"
37-
--health-interval 10s
38-
--health-timeout 5s
39-
--health-retries 5
4027
steps:
4128
- uses: actions/checkout@v3
4229
with:
@@ -57,6 +44,12 @@ jobs:
5744
skip-build-cache: true
5845
- name: Build
5946
run: go build .
47+
# We don't use services as we need to pass additional arguments to the container
48+
- name: Spin up MySQL
49+
run: |
50+
docker run --platform linux/amd64 -p 3306:3306 --name mysql -e MYSQL_ROOT_PASSWORD=test -e MYSQL_DATABASE=cloudquery -d mysql:5.7 --innodb_log_file_size=512M --innodb_strict_mode=0
51+
sudo apt update && sudo apt install wait-for-it -y
52+
wait-for-it -h localhost -p 3306
6053
- name: Test mysql
6154
run: make test
6255
validate-release:

plugins/source/mysql/client/client.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,21 @@ package client
33
import (
44
"context"
55
"database/sql"
6+
"encoding/json"
67
"fmt"
78
"time"
89

9-
"github.com/cloudquery/plugin-pb-go/specs"
10-
"github.com/cloudquery/plugin-sdk/v3/plugins/source"
11-
"github.com/cloudquery/plugin-sdk/v3/schema"
10+
"github.com/cloudquery/plugin-sdk/v4/plugin"
11+
"github.com/cloudquery/plugin-sdk/v4/schema"
1212
"github.com/go-sql-driver/mysql"
1313
"github.com/rs/zerolog"
1414
)
1515

1616
type Client struct {
17+
plugin.UnimplementedDestination
1718
logger zerolog.Logger
18-
metrics *source.Metrics
19-
Tables schema.Tables
19+
tables schema.Tables
20+
options plugin.NewClientOptions
2021
db *sql.DB
2122
tableSchema string
2223
}
@@ -27,9 +28,9 @@ func (*Client) ID() string {
2728
return "source-mysql"
2829
}
2930

30-
func Configure(ctx context.Context, logger zerolog.Logger, spec specs.Source, _ source.Options) (schema.ClientMeta, error) {
31+
func Configure(ctx context.Context, logger zerolog.Logger, spec []byte, opts plugin.NewClientOptions) (plugin.Client, error) {
3132
var mySQLSpec Spec
32-
err := spec.UnmarshalSpec(&mySQLSpec)
33+
err := json.Unmarshal(spec, &mySQLSpec)
3334
if err != nil {
3435
return nil, fmt.Errorf("failed to unmarshal spec: %w", err)
3536
}
@@ -56,15 +57,22 @@ func Configure(ctx context.Context, logger zerolog.Logger, spec specs.Source, _
5657
db.SetMaxOpenConns(10)
5758
db.SetMaxIdleConns(10)
5859

59-
c := &Client{logger: logger.With().Str("module", "mysql-source").Logger(), db: db, tableSchema: dsn.DBName}
60-
c.Tables, err = c.listTables(ctx)
60+
c := Client{logger: logger.With().Str("module", "mysql-source").Logger(), db: db, tableSchema: dsn.DBName, options: opts}
61+
c.tables, err = c.listTables(ctx)
6162
if err != nil {
6263
return nil, fmt.Errorf("failed to list tables: %w", err)
6364
}
64-
c.Tables, err = c.Tables.FilterDfs(spec.Tables, spec.SkipTables, spec.SkipDependentTables)
6565
if err != nil {
6666
return nil, fmt.Errorf("failed to apply config to tables: %w", err)
6767
}
6868

6969
return c, nil
7070
}
71+
72+
func (c Client) Tables(ctx context.Context, opts plugin.TableOptions) (schema.Tables, error) {
73+
return c.tables.FilterDfs(opts.Tables, opts.SkipTables, opts.SkipDependentTables)
74+
}
75+
76+
func (c Client) Close(_ context.Context) error {
77+
return c.db.Close()
78+
}

plugins/source/mysql/client/list_tables.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package client
33
import (
44
"context"
55

6-
"github.com/cloudquery/plugin-sdk/v3/schema"
6+
"github.com/cloudquery/plugin-sdk/v4/schema"
77
)
88

99
func Identifier(name string) string {

plugins/source/mysql/client/sync.go

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,34 @@ package client
33
import (
44
"context"
55
"fmt"
6+
"reflect"
67
"strings"
78
"time"
89

910
"github.com/apache/arrow/go/v13/arrow"
10-
"github.com/cloudquery/plugin-sdk/v3/plugins/source"
11-
"github.com/cloudquery/plugin-sdk/v3/schema"
12-
"github.com/cloudquery/plugin-sdk/v3/types"
11+
"github.com/apache/arrow/go/v13/arrow/array"
12+
"github.com/apache/arrow/go/v13/arrow/memory"
13+
"github.com/cloudquery/plugin-sdk/v4/message"
14+
"github.com/cloudquery/plugin-sdk/v4/plugin"
15+
"github.com/cloudquery/plugin-sdk/v4/scalar"
16+
"github.com/cloudquery/plugin-sdk/v4/schema"
17+
"github.com/cloudquery/plugin-sdk/v4/types"
1318
)
1419

15-
func (c *Client) Sync(ctx context.Context, metrics *source.Metrics, res chan<- *schema.Resource) error {
16-
c.metrics = metrics
17-
for _, table := range c.Tables {
18-
if c.metrics.TableClient[table.Name] == nil {
19-
c.metrics.TableClient[table.Name] = make(map[string]*source.TableClientMetrics)
20-
c.metrics.TableClient[table.Name][c.ID()] = &source.TableClientMetrics{}
20+
func (c Client) Sync(ctx context.Context, options plugin.SyncOptions, res chan<- message.SyncMessage) error {
21+
if c.options.NoConnection {
22+
return fmt.Errorf("no connection")
23+
}
24+
filtered, err := c.tables.FilterDfs(options.Tables, options.SkipTables, options.SkipDependentTables)
25+
if err != nil {
26+
return err
27+
}
28+
for _, table := range filtered {
29+
res <- &message.SyncMigrateTable{
30+
Table: table,
2131
}
2232
}
23-
24-
return c.syncTables(ctx, res)
33+
return c.syncTables(ctx, filtered, res)
2534
}
2635

2736
func (*Client) createResultsArray(table *schema.Table) []any {
@@ -79,15 +88,14 @@ func (*Client) createResultsArray(table *schema.Table) []any {
7988
return results
8089
}
8190

82-
func (c *Client) syncTable(ctx context.Context, table *schema.Table, res chan<- *schema.Resource) error {
91+
func (c *Client) syncTable(ctx context.Context, table *schema.Table, res chan<- message.SyncMessage) error {
8392
colNames := make([]string, len(table.Columns))
8493
for i, col := range table.Columns {
8594
colNames[i] = Identifier(col.Name)
8695
}
8796
query := "SELECT " + strings.Join(colNames, ",") + " FROM " + Identifier(table.Name)
8897
rows, err := c.db.QueryContext(ctx, query)
8998
if err != nil {
90-
c.metrics.TableClient[table.Name][c.ID()].Errors++
9199
return err
92100
}
93101
defer rows.Close()
@@ -97,36 +105,30 @@ func (c *Client) syncTable(ctx context.Context, table *schema.Table, res chan<-
97105
return fmt.Errorf("failed to read from table %s: %w", table.Name, err)
98106
}
99107
if err != nil {
100-
c.metrics.TableClient[table.Name][c.ID()].Errors++
101108
return err
102109
}
103-
resource, err := c.resourceFromValues(table.Name, values)
104-
if err != nil {
105-
c.metrics.TableClient[table.Name][c.ID()].Errors++
106-
return err
110+
111+
arrowSchema := table.ToArrowSchema()
112+
rb := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema)
113+
for i := range values {
114+
// Gets the underlying value of the pointer
115+
val := reflect.ValueOf(values[i]).Elem().Interface()
116+
s := scalar.NewScalar(arrowSchema.Field(i).Type)
117+
if err := s.Set(val); err != nil {
118+
return err
119+
}
120+
scalar.AppendToBuilder(rb.Field(i), s)
107121
}
108-
c.metrics.TableClient[table.Name][c.ID()].Resources++
109-
res <- resource
122+
res <- &message.SyncInsert{Record: rb.NewRecord()}
110123
}
111124
return nil
112125
}
113126

114-
func (c *Client) syncTables(ctx context.Context, res chan<- *schema.Resource) error {
115-
for _, table := range c.Tables {
127+
func (c *Client) syncTables(ctx context.Context, tables schema.Tables, res chan<- message.SyncMessage) error {
128+
for _, table := range tables {
116129
if err := c.syncTable(ctx, table, res); err != nil {
117130
return err
118131
}
119132
}
120133
return nil
121134
}
122-
123-
func (c *Client) resourceFromValues(tableName string, values []any) (*schema.Resource, error) {
124-
table := c.Tables.Get(tableName)
125-
resource := schema.NewResourceData(table, nil, values)
126-
for i, col := range table.Columns {
127-
if err := resource.Set(col.Name, values[i]); err != nil {
128-
return nil, err
129-
}
130-
}
131-
return resource, nil
132-
}

plugins/source/mysql/client/transformer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package client
33
import (
44
"github.com/apache/arrow/go/v13/arrow"
55
"github.com/apache/arrow/go/v13/arrow/array"
6-
"github.com/cloudquery/plugin-sdk/v3/types"
6+
"github.com/cloudquery/plugin-sdk/v4/types"
77
)
88

99
func GetValue(arr arrow.Array, i int) (any, error) {

plugins/source/mysql/client/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"strings"
77

88
"github.com/apache/arrow/go/v13/arrow"
9-
"github.com/cloudquery/plugin-sdk/v3/types"
9+
"github.com/cloudquery/plugin-sdk/v4/types"
1010
)
1111

1212
const defaultPrecision = 10

plugins/source/mysql/go.mod

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ go 1.20
44

55
require (
66
github.com/apache/arrow/go/v13 v13.0.0-20230630125530-5a06b2ec2a8e
7-
github.com/cloudquery/plugin-pb-go v1.7.0
8-
github.com/cloudquery/plugin-sdk/v3 v3.10.6
7+
github.com/cloudquery/plugin-sdk/v4 v4.1.1
98
github.com/go-sql-driver/mysql v1.7.0
109
github.com/rs/zerolog v1.29.1
1110
github.com/stretchr/testify v1.8.4
@@ -16,6 +15,7 @@ require (
1615
replace github.com/apache/arrow/go/v13 => github.com/cloudquery/arrow/go/v13 v13.0.0-20230717001540-8e2219bec8ee
1716

1817
require (
18+
github.com/cloudquery/plugin-pb-go v1.7.1 // indirect
1919
github.com/cloudquery/plugin-sdk/v2 v2.7.0 // indirect
2020
github.com/davecgh/go-spew v1.1.1 // indirect
2121
github.com/getsentry/sentry-go v0.20.0 // indirect
@@ -27,14 +27,13 @@ require (
2727
github.com/grpc-ecosystem/go-grpc-middleware/providers/zerolog/v2 v2.0.0-rc.3 // indirect
2828
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.3 // indirect
2929
github.com/inconshreveable/mousetrap v1.1.0 // indirect
30-
github.com/klauspost/compress v1.16.0 // indirect
30+
github.com/klauspost/compress v1.16.6 // indirect
3131
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
3232
github.com/kr/pretty v0.3.1 // indirect
3333
github.com/mattn/go-colorable v0.1.13 // indirect
3434
github.com/mattn/go-isatty v0.0.19 // indirect
3535
github.com/pierrec/lz4/v4 v4.1.17 // indirect
3636
github.com/pmezard/go-difflib v1.0.0 // indirect
37-
github.com/spf13/cast v1.5.0 // indirect
3837
github.com/spf13/cobra v1.6.1 // indirect
3938
github.com/spf13/pflag v1.0.5 // indirect
4039
github.com/thoas/go-funk v0.9.3 // indirect

plugins/source/mysql/go.sum

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
4141
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
4242
github.com/cloudquery/arrow/go/v13 v13.0.0-20230717001540-8e2219bec8ee h1:YTL32wlLEntGAqAwceD4+LKzkBDa1sI2/MAeNRkcsyg=
4343
github.com/cloudquery/arrow/go/v13 v13.0.0-20230717001540-8e2219bec8ee/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
44-
github.com/cloudquery/plugin-pb-go v1.7.0 h1:0HN1xCyw90ab4Nsywc0gTxK987ophmaGXTp2ulUxUm8=
45-
github.com/cloudquery/plugin-pb-go v1.7.0/go.mod h1:R0Wse6NbJDZIHcRQjJ1sZGYDo3mrIDm4k3El1YUrvGA=
44+
github.com/cloudquery/plugin-pb-go v1.7.1 h1:+g+gcJTp+On1+AVvIrMTgp+K5JyhKagX2VUpIFeHl+M=
45+
github.com/cloudquery/plugin-pb-go v1.7.1/go.mod h1:R0Wse6NbJDZIHcRQjJ1sZGYDo3mrIDm4k3El1YUrvGA=
4646
github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U=
4747
github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug=
48-
github.com/cloudquery/plugin-sdk/v3 v3.10.6 h1:KqTsLZ6OA1h8BUMeMcU6BAD6TBW6ojgQaC4zDZMgvu0=
49-
github.com/cloudquery/plugin-sdk/v3 v3.10.6/go.mod h1:QhBaVgiNyQ3P6uAzJWOYpYykHXL+WDZffwg1riTwv60=
48+
github.com/cloudquery/plugin-sdk/v4 v4.1.1 h1:7DzSRgtix4WEKH9OV/uau1ir0gpjt+xqupAv4Ovs08k=
49+
github.com/cloudquery/plugin-sdk/v4 v4.1.1/go.mod h1:bBwijlFd4jPeQRLuLQPasdomeXw3khDiMXpQjIG2E5g=
5050
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
5151
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
5252
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
@@ -61,7 +61,6 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m
6161
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
6262
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
6363
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
64-
github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE=
6564
github.com/getsentry/sentry-go v0.20.0 h1:bwXW98iMRIWxn+4FgPW7vMrjmbym6HblXALmhjHmQaQ=
6665
github.com/getsentry/sentry-go v0.20.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
6766
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
@@ -147,8 +146,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
147146
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
148147
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
149148
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
150-
github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
151-
github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
149+
github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk=
150+
github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
152151
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
153152
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
154153
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@@ -185,8 +184,6 @@ github.com/rs/zerolog v1.19.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJ
185184
github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc=
186185
github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU=
187186
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
188-
github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w=
189-
github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU=
190187
github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA=
191188
github.com/spf13/cobra v1.6.1/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUqzrY=
192189
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=

plugins/source/mysql/main.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
package main
22

33
import (
4+
"context"
5+
"log"
6+
47
"github.com/cloudquery/cloudquery/plugins/source/mysql/resources/plugin"
5-
"github.com/cloudquery/plugin-sdk/v3/serve"
8+
"github.com/cloudquery/plugin-sdk/v4/serve"
69
)
710

811
const sentryDSN = "https://[email protected]/4504830878416896"
912

1013
func main() {
11-
serve.Source(plugin.Plugin(), serve.WithSourceSentryDSN(sentryDSN))
14+
pluginServe := serve.Plugin(plugin.Plugin(), serve.WithPluginSentryDSN(sentryDSN))
15+
if err := pluginServe.Serve(context.Background()); err != nil {
16+
log.Fatalf("failed to serve plugin: %v", err)
17+
}
1218
}

plugins/source/mysql/resources/plugin/plugin.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,15 @@ package plugin
22

33
import (
44
"github.com/cloudquery/cloudquery/plugins/source/mysql/client"
5-
"github.com/cloudquery/plugin-sdk/v3/plugins/source"
5+
"github.com/cloudquery/plugin-sdk/v4/plugin"
66
)
77

88
var Version = "Development"
99

10-
func Plugin() *source.Plugin {
11-
return source.NewPlugin(
10+
func Plugin() *plugin.Plugin {
11+
return plugin.NewPlugin(
1212
"mysql",
1313
Version,
14-
nil,
1514
client.Configure,
16-
source.WithDynamicTableOption(getDynamicTables),
17-
source.WithNoInternalColumns(),
18-
source.WithUnmanaged(),
1915
)
2016
}

0 commit comments

Comments
 (0)