@@ -2,14 +2,13 @@ package client
22
33import (
44 "context"
5- "fmt"
65 "os"
76 "testing"
87 "time"
98
10- "github.com/cloudquery/plugin-pb-go/specs "
11- "github.com/cloudquery/plugin-sdk/v3/plugins/source "
12- "github.com/cloudquery/plugin-sdk/v3 /schema"
9+ "github.com/cloudquery/plugin-sdk/v4/message "
10+ "github.com/cloudquery/plugin-sdk/v4/scheduler "
11+ "github.com/cloudquery/plugin-sdk/v4 /schema"
1312 "github.com/golang/mock/gomock"
1413 "github.com/rs/zerolog"
1514 v1 "k8s.io/api/core/v1"
@@ -26,99 +25,70 @@ func WithTestNamespaces(namespaces ...v1.Namespace) TestOption {
2625}
2726
2827func K8sMockTestHelper (t * testing.T , table * schema.Table , builder func (* testing.T , * gomock.Controller ) kubernetes.Interface , opts ... TestOption ) {
29- version := "vDev"
30-
3128 t .Helper ()
32-
3329 table .IgnoreInTests = false
3430
3531 mockController := gomock .NewController (t )
3632 l := zerolog .New (zerolog .NewTestWriter (t )).Output (
3733 zerolog.ConsoleWriter {Out : os .Stderr , TimeFormat : time .StampMicro },
3834 ).Level (zerolog .DebugLevel ).With ().Timestamp ().Logger ()
39- configureFunc := func (ctx context.Context , logger zerolog.Logger , s specs.Source , _ source.Options ) (schema.ClientMeta , error ) {
40- var k8sSpec Spec
41- if err := s .UnmarshalSpec (& k8sSpec ); err != nil {
42- return nil , fmt .Errorf ("failed to unmarshal k8s spec: %w" , err )
43- }
35+ c := & Client {
36+ logger : l ,
37+ Context : "testContext" ,
38+ contexts : []string {"testContext" },
39+ namespaces : map [string ][]v1.Namespace {},
40+ }
41+ c .clients = map [string ]kubernetes.Interface {"testContext" : builder (t , mockController )}
42+ for _ , opt := range opts {
43+ opt (c )
44+ }
45+ sched := scheduler .NewScheduler (scheduler .WithLogger (l ))
46+ messages , err := sched .SyncAll (context .Background (), c , schema.Tables {table })
47+ if err != nil {
48+ t .Fatalf ("failed to sync: %v" , err )
49+ }
50+ records := filterInserts (messages ).GetRecordsForTable (table )
51+ emptyColumns := schema .FindEmptyColumns (table , records )
52+ if len (emptyColumns ) > 0 {
53+ t .Fatalf ("empty columns: %v" , emptyColumns )
54+ }
55+ }
4456
45- c := & Client {
46- logger : logger ,
47- Context : "testContext" ,
48- spec : & k8sSpec ,
49- contexts : []string {"testContext" },
50- namespaces : map [string ][]v1.Namespace {},
51- }
52- c .clients = map [string ]kubernetes.Interface {"testContext" : builder (t , mockController )}
53- for _ , opt := range opts {
54- opt (c )
57+ func filterInserts (msgs message.SyncMessages ) message.SyncInserts {
58+ inserts := []* message.SyncInsert {}
59+ for _ , msg := range msgs {
60+ if m , ok := msg .(* message.SyncInsert ); ok {
61+ inserts = append (inserts , m )
5562 }
56- return c , nil
5763 }
58-
59- plugin := source .NewPlugin (
60- table .Name ,
61- version ,
62- []* schema.Table {
63- table ,
64- },
65- configureFunc ,
66- )
67- plugin .SetLogger (l )
68- source .TestPluginSync (t , plugin , specs.Source {
69- Name : "dev" ,
70- Path : "cloudquery/dev" ,
71- Version : version ,
72- Tables : []string {table .Name },
73- Destinations : []string {"mock-destination" },
74- })
64+ return inserts
7565}
7666
7767func APIExtensionsMockTestHelper (t * testing.T , table * schema.Table , builder func (* testing.T , * gomock.Controller ) apiextensionsclientset.Interface , opts ... TestOption ) {
78- version := "vDev"
79-
8068 t .Helper ()
81-
8269 table .IgnoreInTests = false
83-
8470 mockController := gomock .NewController (t )
8571 l := zerolog .New (zerolog .NewTestWriter (t )).Output (
8672 zerolog.ConsoleWriter {Out : os .Stderr , TimeFormat : time .StampMicro },
8773 ).Level (zerolog .DebugLevel ).With ().Timestamp ().Logger ()
88- configureFunc := func (ctx context.Context , logger zerolog.Logger , s specs.Source , _ source.Options ) (schema.ClientMeta , error ) {
89- var k8sSpec Spec
90- if err := s .UnmarshalSpec (& k8sSpec ); err != nil {
91- return nil , fmt .Errorf ("failed to unmarshal k8s spec: %w" , err )
92- }
93-
94- c := & Client {
95- logger : logger ,
96- Context : "testContext" ,
97- spec : & k8sSpec ,
98- contexts : []string {"testContext" },
99- namespaces : map [string ][]v1.Namespace {},
100- }
101- c .apiExtensions = map [string ]apiextensionsclientset.Interface {"testContext" : builder (t , mockController )}
102- for _ , opt := range opts {
103- opt (c )
104- }
105- return c , nil
74+ c := & Client {
75+ logger : l ,
76+ Context : "testContext" ,
77+ contexts : []string {"testContext" },
78+ namespaces : map [string ][]v1.Namespace {},
79+ }
80+ c .apiExtensions = map [string ]apiextensionsclientset.Interface {"testContext" : builder (t , mockController )}
81+ for _ , opt := range opts {
82+ opt (c )
83+ }
84+ sched := scheduler .NewScheduler (scheduler .WithLogger (l ))
85+ messages , err := sched .SyncAll (context .Background (), c , schema.Tables {table })
86+ if err != nil {
87+ t .Fatalf ("failed to sync: %v" , err )
88+ }
89+ records := filterInserts (messages ).GetRecordsForTable (table )
90+ emptyColumns := schema .FindEmptyColumns (table , records )
91+ if len (emptyColumns ) > 0 {
92+ t .Fatalf ("empty columns: %v" , emptyColumns )
10693 }
107-
108- plugin := source .NewPlugin (
109- table .Name ,
110- version ,
111- []* schema.Table {
112- table ,
113- },
114- configureFunc ,
115- )
116- plugin .SetLogger (l )
117- source .TestPluginSync (t , plugin , specs.Source {
118- Name : "dev" ,
119- Path : "cloudquery/dev" ,
120- Version : version ,
121- Tables : []string {table .Name },
122- Destinations : []string {"mock-destination" },
123- })
12494}
0 commit comments