@@ -3,25 +3,34 @@ package client
33import (
44 "context"
55 "fmt"
6+ "reflect"
67 "strings"
78 "time"
89
910 "github.com/apache/arrow/go/v13/arrow"
10- "github.com/cloudquery/plugin-sdk/v3/plugins/source"
11- "github.com/cloudquery/plugin-sdk/v3/schema"
12- "github.com/cloudquery/plugin-sdk/v3/types"
11+ "github.com/apache/arrow/go/v13/arrow/array"
12+ "github.com/apache/arrow/go/v13/arrow/memory"
13+ "github.com/cloudquery/plugin-sdk/v4/message"
14+ "github.com/cloudquery/plugin-sdk/v4/plugin"
15+ "github.com/cloudquery/plugin-sdk/v4/scalar"
16+ "github.com/cloudquery/plugin-sdk/v4/schema"
17+ "github.com/cloudquery/plugin-sdk/v4/types"
1318)
1419
15- func (c * Client ) Sync (ctx context.Context , metrics * source.Metrics , res chan <- * schema.Resource ) error {
16- c .metrics = metrics
17- for _ , table := range c .Tables {
18- if c .metrics .TableClient [table .Name ] == nil {
19- c .metrics .TableClient [table .Name ] = make (map [string ]* source.TableClientMetrics )
20- c .metrics .TableClient [table .Name ][c .ID ()] = & source.TableClientMetrics {}
20+ func (c Client ) Sync (ctx context.Context , options plugin.SyncOptions , res chan <- message.SyncMessage ) error {
21+ if c .options .NoConnection {
22+ return fmt .Errorf ("no connection" )
23+ }
24+ filtered , err := c .tables .FilterDfs (options .Tables , options .SkipTables , options .SkipDependentTables )
25+ if err != nil {
26+ return err
27+ }
28+ for _ , table := range filtered {
29+ res <- & message.SyncMigrateTable {
30+ Table : table ,
2131 }
2232 }
23-
24- return c .syncTables (ctx , res )
33+ return c .syncTables (ctx , filtered , res )
2534}
2635
2736func (* Client ) createResultsArray (table * schema.Table ) []any {
@@ -79,15 +88,14 @@ func (*Client) createResultsArray(table *schema.Table) []any {
7988 return results
8089}
8190
82- func (c * Client ) syncTable (ctx context.Context , table * schema.Table , res chan <- * schema. Resource ) error {
91+ func (c * Client ) syncTable (ctx context.Context , table * schema.Table , res chan <- message. SyncMessage ) error {
8392 colNames := make ([]string , len (table .Columns ))
8493 for i , col := range table .Columns {
8594 colNames [i ] = Identifier (col .Name )
8695 }
8796 query := "SELECT " + strings .Join (colNames , "," ) + " FROM " + Identifier (table .Name )
8897 rows , err := c .db .QueryContext (ctx , query )
8998 if err != nil {
90- c .metrics .TableClient [table .Name ][c .ID ()].Errors ++
9199 return err
92100 }
93101 defer rows .Close ()
@@ -97,36 +105,30 @@ func (c *Client) syncTable(ctx context.Context, table *schema.Table, res chan<-
97105 return fmt .Errorf ("failed to read from table %s: %w" , table .Name , err )
98106 }
99107 if err != nil {
100- c .metrics .TableClient [table .Name ][c .ID ()].Errors ++
101108 return err
102109 }
103- resource , err := c .resourceFromValues (table .Name , values )
104- if err != nil {
105- c .metrics .TableClient [table .Name ][c .ID ()].Errors ++
106- return err
110+
111+ arrowSchema := table .ToArrowSchema ()
112+ rb := array .NewRecordBuilder (memory .DefaultAllocator , arrowSchema )
113+ for i := range values {
114+ // Gets the underlying value of the pointer
115+ val := reflect .ValueOf (values [i ]).Elem ().Interface ()
116+ s := scalar .NewScalar (arrowSchema .Field (i ).Type )
117+ if err := s .Set (val ); err != nil {
118+ return err
119+ }
120+ scalar .AppendToBuilder (rb .Field (i ), s )
107121 }
108- c .metrics .TableClient [table .Name ][c .ID ()].Resources ++
109- res <- resource
122+ res <- & message.SyncInsert {Record : rb .NewRecord ()}
110123 }
111124 return nil
112125}
113126
114- func (c * Client ) syncTables (ctx context.Context , res chan <- * schema. Resource ) error {
115- for _ , table := range c . Tables {
127+ func (c * Client ) syncTables (ctx context.Context , tables schema. Tables , res chan <- message. SyncMessage ) error {
128+ for _ , table := range tables {
116129 if err := c .syncTable (ctx , table , res ); err != nil {
117130 return err
118131 }
119132 }
120133 return nil
121134}
122-
123- func (c * Client ) resourceFromValues (tableName string , values []any ) (* schema.Resource , error ) {
124- table := c .Tables .Get (tableName )
125- resource := schema .NewResourceData (table , nil , values )
126- for i , col := range table .Columns {
127- if err := resource .Set (col .Name , values [i ]); err != nil {
128- return nil , err
129- }
130- }
131- return resource , nil
132- }
0 commit comments