@@ -2,19 +2,31 @@ package client
22
33import (
44 "context"
5-
6- "github.com/cloudquery/plugin-pb-go/specs"
7- "github.com/cloudquery/plugin-sdk/v3/backend"
8- "github.com/cloudquery/plugin-sdk/v3/plugins/source"
9- "github.com/cloudquery/plugin-sdk/v3/schema"
5+ "encoding/json"
6+ "fmt"
7+
8+ "github.com/cloudquery/plugin-sdk/v4/message"
9+ "github.com/cloudquery/plugin-sdk/v4/plugin"
10+ "github.com/cloudquery/plugin-sdk/v4/scheduler"
11+ "github.com/cloudquery/plugin-sdk/v4/schema"
12+ "github.com/cloudquery/plugin-sdk/v4/state"
1013 "github.com/rs/zerolog"
1114 analyticsdata "google.golang.org/api/analyticsdata/v1beta"
1215 "google.golang.org/api/option"
16+ "google.golang.org/grpc"
17+ "google.golang.org/grpc/credentials/insecure"
18+ )
19+
20+ const (
21+ maxMsgSize = 100 * 1024 * 1024 // 100 MiB
1322)
1423
1524type Client struct {
16- service * analyticsdata.Service
17- backend backend.Backend
25+ plugin.UnimplementedDestination
26+ scheduler * scheduler.Scheduler
27+ options plugin.NewClientOptions
28+ service * analyticsdata.Service
29+ backend state.Client
1830
1931 reports []* Report
2032
@@ -34,9 +46,67 @@ func (c *Client) ID() string {
3446 return "googleanalytics:property-id:{" + c .PropertyID + "}"
3547}
3648
37- func Configure (ctx context.Context , logger zerolog.Logger , srcSpec specs.Source , options source.Options ) (schema.ClientMeta , error ) {
49+ func (* Client ) Close (ctx context.Context ) error {
50+ return nil
51+ }
52+
53+ func (c * Client ) Tables (ctx context.Context , options plugin.TableOptions ) (schema.Tables , error ) {
54+ tables := make (schema.Tables , len (c .reports ))
55+ for i , r := range c .reports {
56+ tables [i ] = r .table (c .PropertyID )
57+ }
58+ tables , err := tables .FilterDfs (options .Tables , options .SkipTables , options .SkipDependentTables )
59+ if err != nil {
60+ return nil , err
61+ }
62+ return tables , nil
63+ }
64+
65+ func (c * Client ) Sync (ctx context.Context , options plugin.SyncOptions , res chan <- message.SyncMessage ) error {
66+ if c .options .NoConnection {
67+ return fmt .Errorf ("no connection" )
68+ }
69+
70+ var stateClient state.Client
71+ if options .BackendOptions == nil {
72+ c .logger .Info ().Msg ("No backend options provided, using no state backend" )
73+ stateClient = & state.NoOpClient {}
74+ } else {
75+ conn , err := grpc .DialContext (ctx , options .BackendOptions .Connection ,
76+ grpc .WithTransportCredentials (insecure .NewCredentials ()),
77+ grpc .WithDefaultCallOptions (
78+ grpc .MaxCallRecvMsgSize (maxMsgSize ),
79+ grpc .MaxCallSendMsgSize (maxMsgSize ),
80+ ),
81+ )
82+ if err != nil {
83+ return fmt .Errorf ("failed to dial grpc source plugin at %s: %w" , options .BackendOptions .Connection , err )
84+ }
85+ stateClient , err = state .NewClient (ctx , conn , options .BackendOptions .TableName )
86+ if err != nil {
87+ return fmt .Errorf ("failed to create state client: %w" , err )
88+ }
89+ c .logger .Info ().Str ("table_name" , options .BackendOptions .TableName ).Msg ("Connected to state backend" )
90+ }
91+ c .backend = stateClient
92+
93+ tables , err := c .Tables (ctx , plugin.TableOptions {
94+ Tables : options .Tables ,
95+ SkipTables : options .SkipTables ,
96+ SkipDependentTables : options .SkipDependentTables ,
97+ })
98+ if err != nil {
99+ return err
100+ }
101+ if err := c .scheduler .Sync (ctx , c , tables , res , scheduler .WithSyncDeterministicCQID (options .DeterministicCQID )); err != nil {
102+ return fmt .Errorf ("failed to sync: %w" , err )
103+ }
104+ return nil
105+ }
106+
107+ func Configure (ctx context.Context , logger zerolog.Logger , specBytes []byte , options plugin.NewClientOptions ) (plugin.Client , error ) {
38108 spec := new (Spec )
39- if err := srcSpec . UnmarshalSpec ( & spec ); err != nil {
109+ if err := json . Unmarshal ( specBytes , spec ); err != nil {
40110 return nil , err
41111 }
42112
@@ -65,11 +135,10 @@ func Configure(ctx context.Context, logger zerolog.Logger, srcSpec specs.Source,
65135 return nil , err
66136 }
67137
68- svc .UserAgent = "cloudquery:source-googleanalytics/" + srcSpec . Version
138+ svc .UserAgent = "cloudquery:source-googleanalytics"
69139
70140 c := & Client {
71141 service : svc ,
72- backend : options .Backend ,
73142 StartDate : spec .StartDate ,
74143 PropertyID : spec .PropertyID ,
75144 reports : spec .Reports ,
@@ -78,6 +147,6 @@ func Configure(ctx context.Context, logger zerolog.Logger, srcSpec specs.Source,
78147 Str ("property_id" , spec .PropertyID ).
79148 Logger (),
80149 }
81-
150+ c . scheduler = scheduler . NewScheduler ( scheduler . WithConcurrency ( spec . Concurrency ))
82151 return c , nil
83152}
0 commit comments