Skip to content

Commit 96a39d6

Browse files
authored
feat(k8s): Update to SDK V4 (#11963)
Closes #11960
1 parent 9d5b611 commit 96a39d6

File tree

138 files changed

+399
-431
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

138 files changed

+399
-431
lines changed

.github/workflows/source_k8s.yml

Lines changed: 46 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -105,49 +105,49 @@ jobs:
105105
run: goreleaser release --snapshot --clean --skip-validate --skip-publish --skip-sign -f ./plugins/source/k8s/.goreleaser.yaml
106106
env:
107107
GORELEASER_KEY: ${{ secrets.GORELEASER_KEY }}
108-
test-policies:
109-
timeout-minutes: 30
110-
needs: [resolve-runner]
111-
runs-on: ${{ needs.resolve-runner.outputs.runner }}
112-
defaults:
113-
run:
114-
working-directory: ./plugins/source/k8s
115-
services:
116-
postgres:
117-
image: postgres:11
118-
env:
119-
POSTGRES_PASSWORD: pass
120-
POSTGRES_USER: postgres
121-
POSTGRES_DB: postgres
122-
ports:
123-
- 5432:5432
124-
# Set health checks to wait until postgres has started
125-
options: >-
126-
--health-cmd pg_isready
127-
--health-interval 10s
128-
--health-timeout 5s
129-
--health-retries 5
130-
steps:
131-
- name: Checkout
132-
uses: actions/checkout@v3
133-
- name: Set up Go 1.x
134-
uses: erezrokah/setup-go@feat/add_cache_prefix
135-
with:
136-
go-version-file: plugins/source/k8s/go.mod
137-
cache: true
138-
cache-dependency-path: plugins/source/k8s/go.sum
139-
cache-key-prefix: policies-cache-
140-
- name: Build
141-
run: go build .
142-
- name: Setup CloudQuery
143-
uses: cloudquery/setup-cloudquery@v3
144-
with:
145-
version: 'v3.5.0'
146-
- name: Migrate DB
147-
run: cloudquery migrate test/policy_cq_config.yml
148-
env:
149-
CQ_DSN: postgresql://postgres:pass@localhost:5432/postgres
150-
- name: Run all policies
151-
run: cd policies && psql -h localhost -p 5432 -U postgres -d postgres -w -f ./policy.sql
152-
env:
153-
PGPASSWORD: pass
108+
# test-policies:
109+
# timeout-minutes: 30
110+
# needs: [resolve-runner]
111+
# runs-on: ${{ needs.resolve-runner.outputs.runner }}
112+
# defaults:
113+
# run:
114+
# working-directory: ./plugins/source/k8s
115+
# services:
116+
# postgres:
117+
# image: postgres:11
118+
# env:
119+
# POSTGRES_PASSWORD: pass
120+
# POSTGRES_USER: postgres
121+
# POSTGRES_DB: postgres
122+
# ports:
123+
# - 5432:5432
124+
# # Set health checks to wait until postgres has started
125+
# options: >-
126+
# --health-cmd pg_isready
127+
# --health-interval 10s
128+
# --health-timeout 5s
129+
# --health-retries 5
130+
# steps:
131+
# - name: Checkout
132+
# uses: actions/checkout@v3
133+
# - name: Set up Go 1.x
134+
# uses: erezrokah/setup-go@feat/add_cache_prefix
135+
# with:
136+
# go-version-file: plugins/source/k8s/go.mod
137+
# cache: true
138+
# cache-dependency-path: plugins/source/k8s/go.sum
139+
# cache-key-prefix: policies-cache-
140+
# - name: Build
141+
# run: go build .
142+
# - name: Setup CloudQuery
143+
# uses: cloudquery/setup-cloudquery@v3
144+
# with:
145+
# version: 'v3.5.0'
146+
# - name: Migrate DB
147+
# run: cloudquery migrate test/policy_cq_config.yml
148+
# env:
149+
# CQ_DSN: postgresql://postgres:pass@localhost:5432/postgres
150+
# - name: Run all policies
151+
# run: cd policies && psql -h localhost -p 5432 -U postgres -d postgres -w -f ./policy.sql
152+
# env:
153+
# PGPASSWORD: pass

plugins/source/k8s/client/client.go

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ import (
55
"fmt"
66
"strings"
77

8-
"github.com/cloudquery/plugin-pb-go/specs"
9-
"github.com/cloudquery/plugin-sdk/v3/plugins/source"
10-
"github.com/cloudquery/plugin-sdk/v3/schema"
8+
"github.com/cloudquery/plugin-sdk/v4/schema"
119
"github.com/rs/zerolog"
1210
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
1311
"k8s.io/client-go/kubernetes"
@@ -77,13 +75,7 @@ func (c *Client) WithNamespace(namespace string) *Client {
7775
return &newC
7876
}
7977

80-
func Configure(ctx context.Context, logger zerolog.Logger, s specs.Source, _ source.Options) (schema.ClientMeta, error) {
81-
var k8sSpec Spec
82-
83-
if err := s.UnmarshalSpec(&k8sSpec); err != nil {
84-
return nil, fmt.Errorf("failed to unmarshal k8s spec: %w", err)
85-
}
86-
78+
func Configure(ctx context.Context, logger zerolog.Logger, s Spec) (schema.ClientMeta, error) {
8779
kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
8880
clientcmd.NewDefaultClientConfigLoadingRules(),
8981
&clientcmd.ConfigOverrides{},
@@ -94,24 +86,24 @@ func Configure(ctx context.Context, logger zerolog.Logger, s specs.Source, _ sou
9486
}
9587

9688
var contexts []string
97-
switch len(k8sSpec.Contexts) {
89+
switch len(s.Contexts) {
9890
case 0:
9991
logger.Debug().Str("context", rawKubeConfig.CurrentContext).Msg("no context set in configuration using current default defined context")
10092
contexts = []string{rawKubeConfig.CurrentContext}
10193
case 1:
102-
if k8sSpec.Contexts[0] == "*" {
94+
if s.Contexts[0] == "*" {
10395
logger.Debug().Msg("loading all available configuration")
10496
for cName := range rawKubeConfig.Contexts {
10597
contexts = append(contexts, cName)
10698
}
10799
} else {
108-
if _, ok := rawKubeConfig.Contexts[k8sSpec.Contexts[0]]; !ok {
109-
return nil, fmt.Errorf("context %q doesn't exist in kube configuration", k8sSpec.Contexts[0])
100+
if _, ok := rawKubeConfig.Contexts[s.Contexts[0]]; !ok {
101+
return nil, fmt.Errorf("context %q doesn't exist in kube configuration", s.Contexts[0])
110102
}
111-
contexts = []string{k8sSpec.Contexts[0]}
103+
contexts = []string{s.Contexts[0]}
112104
}
113105
default:
114-
for _, cName := range k8sSpec.Contexts {
106+
for _, cName := range s.Contexts {
115107
if _, ok := rawKubeConfig.Contexts[cName]; !ok {
116108
return nil, fmt.Errorf("context %q doesn't exist in kube configuration", cName)
117109
}
@@ -128,7 +120,7 @@ func Configure(ctx context.Context, logger zerolog.Logger, s specs.Source, _ sou
128120
clients: make(map[string]kubernetes.Interface),
129121
namespaces: make(map[string][]v1.Namespace),
130122
apiExtensions: make(map[string]apiextensionsclientset.Interface),
131-
spec: &k8sSpec,
123+
spec: &s,
132124
contexts: contexts,
133125
Context: contexts[0],
134126
paths: make(map[string]struct{}),

plugins/source/k8s/client/columns.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package client
22

33
import (
44
"github.com/apache/arrow/go/v13/arrow"
5-
"github.com/cloudquery/plugin-sdk/v3/schema"
5+
"github.com/cloudquery/plugin-sdk/v4/schema"
66
)
77

88
var ContextColumn = schema.Column{Name: "context", Type: arrow.BinaryTypes.String, Resolver: ResolveContext}

plugins/source/k8s/client/helpers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
"reflect"
66

7-
"github.com/cloudquery/plugin-sdk/v3/schema"
7+
"github.com/cloudquery/plugin-sdk/v4/schema"
88
"github.com/thoas/go-funk"
99
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1010
"k8s.io/apimachinery/pkg/types"

plugins/source/k8s/client/spec.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
package client
22

3+
const (
4+
defaultConcurrency = 50000
5+
)
6+
37
type Spec struct {
4-
Contexts []string `yaml:"contexts,omitempty" json:"contexts"`
8+
Contexts []string `yaml:"contexts,omitempty" json:"contexts"`
9+
Concurrency int `yaml:"concurrency,omitempty" json:"concurrency"`
10+
}
11+
12+
func (s *Spec) SetDefaults() {
13+
if s.Concurrency == 0 {
14+
s.Concurrency = defaultConcurrency
15+
}
516
}

plugins/source/k8s/client/testing.go

Lines changed: 49 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@ package client
22

33
import (
44
"context"
5-
"fmt"
65
"os"
76
"testing"
87
"time"
98

10-
"github.com/cloudquery/plugin-pb-go/specs"
11-
"github.com/cloudquery/plugin-sdk/v3/plugins/source"
12-
"github.com/cloudquery/plugin-sdk/v3/schema"
9+
"github.com/cloudquery/plugin-sdk/v4/message"
10+
"github.com/cloudquery/plugin-sdk/v4/scheduler"
11+
"github.com/cloudquery/plugin-sdk/v4/schema"
1312
"github.com/golang/mock/gomock"
1413
"github.com/rs/zerolog"
1514
v1 "k8s.io/api/core/v1"
@@ -26,99 +25,70 @@ func WithTestNamespaces(namespaces ...v1.Namespace) TestOption {
2625
}
2726

2827
func K8sMockTestHelper(t *testing.T, table *schema.Table, builder func(*testing.T, *gomock.Controller) kubernetes.Interface, opts ...TestOption) {
29-
version := "vDev"
30-
3128
t.Helper()
32-
3329
table.IgnoreInTests = false
3430

3531
mockController := gomock.NewController(t)
3632
l := zerolog.New(zerolog.NewTestWriter(t)).Output(
3733
zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.StampMicro},
3834
).Level(zerolog.DebugLevel).With().Timestamp().Logger()
39-
configureFunc := func(ctx context.Context, logger zerolog.Logger, s specs.Source, _ source.Options) (schema.ClientMeta, error) {
40-
var k8sSpec Spec
41-
if err := s.UnmarshalSpec(&k8sSpec); err != nil {
42-
return nil, fmt.Errorf("failed to unmarshal k8s spec: %w", err)
43-
}
35+
c := &Client{
36+
logger: l,
37+
Context: "testContext",
38+
contexts: []string{"testContext"},
39+
namespaces: map[string][]v1.Namespace{},
40+
}
41+
c.clients = map[string]kubernetes.Interface{"testContext": builder(t, mockController)}
42+
for _, opt := range opts {
43+
opt(c)
44+
}
45+
sched := scheduler.NewScheduler(scheduler.WithLogger(l))
46+
messages, err := sched.SyncAll(context.Background(), c, schema.Tables{table})
47+
if err != nil {
48+
t.Fatalf("failed to sync: %v", err)
49+
}
50+
records := filterInserts(messages).GetRecordsForTable(table)
51+
emptyColumns := schema.FindEmptyColumns(table, records)
52+
if len(emptyColumns) > 0 {
53+
t.Fatalf("empty columns: %v", emptyColumns)
54+
}
55+
}
4456

45-
c := &Client{
46-
logger: logger,
47-
Context: "testContext",
48-
spec: &k8sSpec,
49-
contexts: []string{"testContext"},
50-
namespaces: map[string][]v1.Namespace{},
51-
}
52-
c.clients = map[string]kubernetes.Interface{"testContext": builder(t, mockController)}
53-
for _, opt := range opts {
54-
opt(c)
57+
func filterInserts(msgs message.SyncMessages) message.SyncInserts {
58+
inserts := []*message.SyncInsert{}
59+
for _, msg := range msgs {
60+
if m, ok := msg.(*message.SyncInsert); ok {
61+
inserts = append(inserts, m)
5562
}
56-
return c, nil
5763
}
58-
59-
plugin := source.NewPlugin(
60-
table.Name,
61-
version,
62-
[]*schema.Table{
63-
table,
64-
},
65-
configureFunc,
66-
)
67-
plugin.SetLogger(l)
68-
source.TestPluginSync(t, plugin, specs.Source{
69-
Name: "dev",
70-
Path: "cloudquery/dev",
71-
Version: version,
72-
Tables: []string{table.Name},
73-
Destinations: []string{"mock-destination"},
74-
})
64+
return inserts
7565
}
7666

7767
func APIExtensionsMockTestHelper(t *testing.T, table *schema.Table, builder func(*testing.T, *gomock.Controller) apiextensionsclientset.Interface, opts ...TestOption) {
78-
version := "vDev"
79-
8068
t.Helper()
81-
8269
table.IgnoreInTests = false
83-
8470
mockController := gomock.NewController(t)
8571
l := zerolog.New(zerolog.NewTestWriter(t)).Output(
8672
zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.StampMicro},
8773
).Level(zerolog.DebugLevel).With().Timestamp().Logger()
88-
configureFunc := func(ctx context.Context, logger zerolog.Logger, s specs.Source, _ source.Options) (schema.ClientMeta, error) {
89-
var k8sSpec Spec
90-
if err := s.UnmarshalSpec(&k8sSpec); err != nil {
91-
return nil, fmt.Errorf("failed to unmarshal k8s spec: %w", err)
92-
}
93-
94-
c := &Client{
95-
logger: logger,
96-
Context: "testContext",
97-
spec: &k8sSpec,
98-
contexts: []string{"testContext"},
99-
namespaces: map[string][]v1.Namespace{},
100-
}
101-
c.apiExtensions = map[string]apiextensionsclientset.Interface{"testContext": builder(t, mockController)}
102-
for _, opt := range opts {
103-
opt(c)
104-
}
105-
return c, nil
74+
c := &Client{
75+
logger: l,
76+
Context: "testContext",
77+
contexts: []string{"testContext"},
78+
namespaces: map[string][]v1.Namespace{},
79+
}
80+
c.apiExtensions = map[string]apiextensionsclientset.Interface{"testContext": builder(t, mockController)}
81+
for _, opt := range opts {
82+
opt(c)
83+
}
84+
sched := scheduler.NewScheduler(scheduler.WithLogger(l))
85+
messages, err := sched.SyncAll(context.Background(), c, schema.Tables{table})
86+
if err != nil {
87+
t.Fatalf("failed to sync: %v", err)
88+
}
89+
records := filterInserts(messages).GetRecordsForTable(table)
90+
emptyColumns := schema.FindEmptyColumns(table, records)
91+
if len(emptyColumns) > 0 {
92+
t.Fatalf("empty columns: %v", emptyColumns)
10693
}
107-
108-
plugin := source.NewPlugin(
109-
table.Name,
110-
version,
111-
[]*schema.Table{
112-
table,
113-
},
114-
configureFunc,
115-
)
116-
plugin.SetLogger(l)
117-
source.TestPluginSync(t, plugin, specs.Source{
118-
Name: "dev",
119-
Path: "cloudquery/dev",
120-
Version: version,
121-
Tables: []string{table.Name},
122-
Destinations: []string{"mock-destination"},
123-
})
12494
}

plugins/source/k8s/client/transformers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"reflect"
55

66
"github.com/apache/arrow/go/v13/arrow"
7-
"github.com/cloudquery/plugin-sdk/v3/schema"
8-
"github.com/cloudquery/plugin-sdk/v3/transformers"
7+
"github.com/cloudquery/plugin-sdk/v4/schema"
8+
"github.com/cloudquery/plugin-sdk/v4/transformers"
99
)
1010

1111
var (

0 commit comments

Comments
 (0)