@@ -10,9 +10,13 @@ import (
1010 "strings"
1111 "time"
1212
13+ "github.com/apache/arrow/go/v12/arrow"
14+ "github.com/apache/arrow/go/v12/arrow/array"
15+ "github.com/apache/arrow/go/v12/arrow/memory"
1316 "github.com/aws/aws-sdk-go-v2/aws"
1417 "github.com/aws/aws-sdk-go-v2/service/s3"
15- "github.com/cloudquery/plugin-sdk/schema"
18+ "github.com/cloudquery/plugin-sdk/v2/schema"
19+ "github.com/cloudquery/plugin-sdk/v2/types"
1620 "github.com/google/uuid"
1721)
1822
@@ -28,19 +32,15 @@ const (
2832
2933var reInvalidJSONKey = regexp .MustCompile (`\W` )
3034
31- func (c * Client ) WriteTableBatch (ctx context.Context , table * schema. Table , data [][] any ) error {
35+ func (c * Client ) WriteTableBatch (ctx context.Context , arrowSchema * arrow. Schema , data []arrow. Record ) error {
3236 if len (data ) == 0 {
3337 return nil
3438 }
39+ tableName := schema .TableName (arrowSchema )
3540
3641 if c .pluginSpec .Athena {
37- for _ , resource := range data {
38- for u := range resource {
39- if table .Columns [u ].Type != schema .TypeJSON {
40- continue
41- }
42- sanitizeJSONKeys (resource [u ])
43- }
42+ for i , record := range data {
43+ data [i ] = sanitizeRecordJSONKeys (record )
4444 }
4545 }
4646
@@ -49,15 +49,15 @@ func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, data
4949
5050 timeNow := time .Now ().UTC ()
5151
52- if err := c .Client .WriteTableBatchFile (w , table , data ); err != nil {
52+ if err := c .Client .WriteTableBatchFile (w , arrowSchema , data ); err != nil {
5353 return err
5454 }
5555 // we don't upload in parallel here because AWS sdk moves the burden to the developer, and
5656 // we don't want to deal with that yet. in the future maybe we can run some benchmarks and see if adding parallelization helps.
5757 r := io .Reader (& b )
5858 if _ , err := c .uploader .Upload (ctx , & s3.PutObjectInput {
5959 Bucket : aws .String (c .pluginSpec .Bucket ),
60- Key : aws .String (replacePathVariables (c .pluginSpec .Path , table . Name , uuid .NewString (), timeNow )),
60+ Key : aws .String (replacePathVariables (c .pluginSpec .Path , tableName , uuid .NewString (), timeNow )),
6161 Body : r ,
6262 }); err != nil {
6363 return err
@@ -66,10 +66,31 @@ func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, data
6666 return nil
6767}
6868
69- // sanitizeJSONKeys replaces all invalid characters in JSON keys with underscores.
70- // It does the replacement in-place, modifying the original object. This is required
69+ // sanitizeRecordJSONKeys replaces all invalid characters in JSON keys with underscores. This is required
7170// for compatibility with Athena.
72- func sanitizeJSONKeys (obj any ) {
71+ func sanitizeRecordJSONKeys (record arrow.Record ) arrow.Record {
72+ cols := make ([]arrow.Array , record .NumCols ())
73+ for i , col := range record .Columns () {
74+ if arrow .TypeEqual (col .DataType (), types .NewJSONType ()) {
75+ b := types .NewJSONBuilder (array .NewExtensionBuilder (memory .DefaultAllocator , types .NewJSONType ()))
76+ for r := 0 ; r < int (record .NumRows ()); r ++ {
77+ if col .IsNull (r ) {
78+ b .AppendNull ()
79+ continue
80+ }
81+ obj := col .GetOneForMarshal (r )
82+ sanitizeJSONKeysForObject (obj )
83+ b .Append (obj )
84+ }
85+ cols [i ] = b .NewArray ()
86+ continue
87+ }
88+ cols [i ] = col
89+ }
90+ return array .NewRecord (record .Schema (), cols , record .NumRows ())
91+ }
92+
93+ func sanitizeJSONKeysForObject (obj any ) {
7394 value := reflect .ValueOf (obj )
7495 switch value .Kind () {
7596 case reflect .Map :
@@ -79,14 +100,14 @@ func sanitizeJSONKeys(obj any) {
79100 if k .Kind () == reflect .String {
80101 nk := reInvalidJSONKey .ReplaceAllString (k .String (), "_" )
81102 v := iter .Value ()
82- sanitizeJSONKeys (v .Interface ())
103+ sanitizeJSONKeysForObject (v .Interface ())
83104 value .SetMapIndex (k , reflect.Value {})
84105 value .SetMapIndex (reflect .ValueOf (nk ), v )
85106 }
86107 }
87108 case reflect .Slice :
88109 for i := 0 ; i < value .Len (); i ++ {
89- sanitizeJSONKeys (value .Index (i ).Interface ())
110+ sanitizeJSONKeysForObject (value .Index (i ).Interface ())
90111 }
91112 }
92113}
0 commit comments