Skip to content

Commit 33b5382

Browse files
authored
fix!: Upgrade S3 plugin-sdk and filetypes to v2 (#10167)
Closes #10112 BEGIN_COMMIT_OVERRIDE feat: Update to use [Apache Arrow](https://arrow.apache.org/) type system BREAKING-CHANGE: This release introduces an internal change to our type system to use [Apache Arrow](https://arrow.apache.org/). This should not have any visible breaking changes for CSV or JSON output formats, however the Parquet output changes for UUID columns, which now have dashes, and timestamps, which now uses the default Arrow time format (e.g. `2023-01-02 12:23:45`). If you encounter an issue during the upgrade, please submit a [bug report](https://github.com/cloudquery/cloudquery/issues/new/choose). END_COMMIT_OVERRIDE
1 parent 07c7edb commit 33b5382

File tree

12 files changed

+85
-636
lines changed

12 files changed

+85
-636
lines changed

plugins/destination/s3/client/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import (
1111
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
1212
"github.com/aws/aws-sdk-go-v2/service/s3"
1313

14-
"github.com/cloudquery/filetypes"
15-
"github.com/cloudquery/plugin-sdk/plugins/destination"
16-
"github.com/cloudquery/plugin-sdk/specs"
14+
"github.com/cloudquery/filetypes/v2"
15+
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
16+
"github.com/cloudquery/plugin-sdk/v2/specs"
1717
"github.com/rs/zerolog"
1818
)
1919

plugins/destination/s3/client/client_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package client
33
import (
44
"testing"
55

6-
"github.com/cloudquery/filetypes"
7-
"github.com/cloudquery/plugin-sdk/plugins/destination"
8-
"github.com/cloudquery/plugin-sdk/specs"
6+
"github.com/cloudquery/filetypes/v2"
7+
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
8+
"github.com/cloudquery/plugin-sdk/v2/specs"
99
)
1010

1111
const bucket = "cq-playground-test"

plugins/destination/s3/client/deletestale.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ import (
55
"fmt"
66
"time"
77

8-
"github.com/cloudquery/plugin-sdk/schema"
8+
"github.com/cloudquery/plugin-sdk/v2/schema"
99
)
1010

11-
func (*Client) DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error {
11+
func (*Client) DeleteStale(ctx context.Context, schemas schema.Schemas, sourceName string, syncTime time.Time) error {
1212
return fmt.Errorf("destination plugin doesn't support overwrite-delete-stale mode. please use append mode")
1313
}

plugins/destination/s3/client/migrate.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ package client
33
import (
44
"context"
55

6-
"github.com/cloudquery/plugin-sdk/schema"
6+
"github.com/cloudquery/plugin-sdk/v2/schema"
77
)
88

9-
func (*Client) Migrate(ctx context.Context, tables schema.Tables) error {
9+
func (*Client) Migrate(ctx context.Context, schemas schema.Schemas) error {
1010
// migrate is not needed in append mode
1111
return nil
1212
}

plugins/destination/s3/client/read.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,24 @@ import (
66
"fmt"
77
"strings"
88

9+
"github.com/apache/arrow/go/v12/arrow"
910
"github.com/aws/aws-sdk-go-v2/aws"
1011
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
1112
"github.com/aws/aws-sdk-go-v2/service/s3"
12-
"github.com/cloudquery/plugin-sdk/schema"
13+
"github.com/cloudquery/plugin-sdk/v2/schema"
1314
)
1415

1516
const maxFileSize = 1024 * 1024 * 20
1617

17-
func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- []any) error {
18+
func (c *Client) Read(ctx context.Context, arrowSchema *arrow.Schema, sourceName string, res chan<- arrow.Record) error {
19+
tableName := schema.TableName(arrowSchema)
1820
if !c.pluginSpec.NoRotate {
19-
return fmt.Errorf("reading is not supported when no_rotate is false. Table: %q; Source: %q", table.Name, sourceName)
21+
return fmt.Errorf("reading is not supported when no_rotate is false. Table: %q; Source: %q", tableName, sourceName)
2022
}
21-
name := strings.ReplaceAll(c.pluginSpec.Path, PathVarTable, table.Name)
23+
if strings.Contains(c.pluginSpec.Path, PathVarUUID) {
24+
return fmt.Errorf("reading is not supported when path contains uuid variable. Table: %q; Source: %q", tableName, sourceName)
25+
}
26+
name := strings.ReplaceAll(c.pluginSpec.Path, PathVarTable, tableName)
2227
writerAtBuffer := manager.NewWriteAtBuffer(make([]byte, 0, maxFileSize))
2328
_, err := c.downloader.Download(ctx,
2429
writerAtBuffer,
@@ -30,5 +35,5 @@ func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName strin
3035
return err
3136
}
3237
r := bytes.NewReader(writerAtBuffer.Bytes())
33-
return c.Client.Read(r, table, sourceName, res)
38+
return c.Client.Read(r, arrowSchema, sourceName, res)
3439
}

plugins/destination/s3/client/spec.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"path"
66
"strings"
77

8-
"github.com/cloudquery/filetypes"
8+
filetypes "github.com/cloudquery/filetypes/v2"
99
)
1010

1111
type Spec struct {

plugins/destination/s3/client/spec_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package client
33
import (
44
"testing"
55

6-
"github.com/cloudquery/filetypes"
6+
filetypes "github.com/cloudquery/filetypes/v2"
77
"github.com/google/go-cmp/cmp"
88
"github.com/google/go-cmp/cmp/cmpopts"
99
"github.com/stretchr/testify/require"

plugins/destination/s3/client/write.go

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,13 @@ import (
1010
"strings"
1111
"time"
1212

13+
"github.com/apache/arrow/go/v12/arrow"
14+
"github.com/apache/arrow/go/v12/arrow/array"
15+
"github.com/apache/arrow/go/v12/arrow/memory"
1316
"github.com/aws/aws-sdk-go-v2/aws"
1417
"github.com/aws/aws-sdk-go-v2/service/s3"
15-
"github.com/cloudquery/plugin-sdk/schema"
18+
"github.com/cloudquery/plugin-sdk/v2/schema"
19+
"github.com/cloudquery/plugin-sdk/v2/types"
1620
"github.com/google/uuid"
1721
)
1822

@@ -28,19 +32,15 @@ const (
2832

2933
var reInvalidJSONKey = regexp.MustCompile(`\W`)
3034

31-
func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, data [][]any) error {
35+
func (c *Client) WriteTableBatch(ctx context.Context, arrowSchema *arrow.Schema, data []arrow.Record) error {
3236
if len(data) == 0 {
3337
return nil
3438
}
39+
tableName := schema.TableName(arrowSchema)
3540

3641
if c.pluginSpec.Athena {
37-
for _, resource := range data {
38-
for u := range resource {
39-
if table.Columns[u].Type != schema.TypeJSON {
40-
continue
41-
}
42-
sanitizeJSONKeys(resource[u])
43-
}
42+
for i, record := range data {
43+
data[i] = sanitizeRecordJSONKeys(record)
4444
}
4545
}
4646

@@ -49,15 +49,15 @@ func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, data
4949

5050
timeNow := time.Now().UTC()
5151

52-
if err := c.Client.WriteTableBatchFile(w, table, data); err != nil {
52+
if err := c.Client.WriteTableBatchFile(w, arrowSchema, data); err != nil {
5353
return err
5454
}
5555
// we don't upload in parallel here because AWS sdk moves the burden to the developer, and
5656
// we don't want to deal with that yet. in the future maybe we can run some benchmarks and see if adding parallelization helps.
5757
r := io.Reader(&b)
5858
if _, err := c.uploader.Upload(ctx, &s3.PutObjectInput{
5959
Bucket: aws.String(c.pluginSpec.Bucket),
60-
Key: aws.String(replacePathVariables(c.pluginSpec.Path, table.Name, uuid.NewString(), timeNow)),
60+
Key: aws.String(replacePathVariables(c.pluginSpec.Path, tableName, uuid.NewString(), timeNow)),
6161
Body: r,
6262
}); err != nil {
6363
return err
@@ -66,10 +66,31 @@ func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, data
6666
return nil
6767
}
6868

69-
// sanitizeJSONKeys replaces all invalid characters in JSON keys with underscores.
70-
// It does the replacement in-place, modifying the original object. This is required
69+
// sanitizeRecordJSONKeys replaces all invalid characters in JSON keys with underscores. This is required
7170
// for compatibility with Athena.
72-
func sanitizeJSONKeys(obj any) {
71+
func sanitizeRecordJSONKeys(record arrow.Record) arrow.Record {
72+
cols := make([]arrow.Array, record.NumCols())
73+
for i, col := range record.Columns() {
74+
if arrow.TypeEqual(col.DataType(), types.NewJSONType()) {
75+
b := types.NewJSONBuilder(array.NewExtensionBuilder(memory.DefaultAllocator, types.NewJSONType()))
76+
for r := 0; r < int(record.NumRows()); r++ {
77+
if col.IsNull(r) {
78+
b.AppendNull()
79+
continue
80+
}
81+
obj := col.GetOneForMarshal(r)
82+
sanitizeJSONKeysForObject(obj)
83+
b.Append(obj)
84+
}
85+
cols[i] = b.NewArray()
86+
continue
87+
}
88+
cols[i] = col
89+
}
90+
return array.NewRecord(record.Schema(), cols, record.NumRows())
91+
}
92+
93+
func sanitizeJSONKeysForObject(obj any) {
7394
value := reflect.ValueOf(obj)
7495
switch value.Kind() {
7596
case reflect.Map:
@@ -79,14 +100,14 @@ func sanitizeJSONKeys(obj any) {
79100
if k.Kind() == reflect.String {
80101
nk := reInvalidJSONKey.ReplaceAllString(k.String(), "_")
81102
v := iter.Value()
82-
sanitizeJSONKeys(v.Interface())
103+
sanitizeJSONKeysForObject(v.Interface())
83104
value.SetMapIndex(k, reflect.Value{})
84105
value.SetMapIndex(reflect.ValueOf(nk), v)
85106
}
86107
}
87108
case reflect.Slice:
88109
for i := 0; i < value.Len(); i++ {
89-
sanitizeJSONKeys(value.Index(i).Interface())
110+
sanitizeJSONKeysForObject(value.Index(i).Interface())
90111
}
91112
}
92113
}

plugins/destination/s3/client/write_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func TestSanitizeJSONKeys(t *testing.T) {
2525
"foo-bar": &[]string{"baz"}[0],
2626
},
2727
}
28-
sanitizeJSONKeys(m)
28+
sanitizeJSONKeysForObject(m)
2929
want := map[string]any{
3030
"foo": "bar",
3131
"bar": map[string]any{

plugins/destination/s3/go.mod

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,24 @@ module github.com/cloudquery/cloudquery/plugins/destination/s3
33
go 1.19
44

55
require (
6+
github.com/apache/arrow/go/v12 v12.0.0-20230417014917-9888ac36c142
67
github.com/aws/aws-sdk-go-v2 v1.17.8
78
github.com/aws/aws-sdk-go-v2/config v1.18.21
89
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.62
910
github.com/aws/aws-sdk-go-v2/service/s3 v1.31.3
10-
github.com/cloudquery/filetypes v1.6.2
11-
github.com/cloudquery/plugin-sdk v1.45.0
11+
github.com/cloudquery/filetypes/v2 v2.0.2
12+
github.com/cloudquery/plugin-sdk/v2 v2.3.7
1213
github.com/google/go-cmp v0.5.9
1314
github.com/google/uuid v1.3.0
1415
github.com/rs/zerolog v1.29.0
1516
github.com/stretchr/testify v1.8.2
1617
)
1718

18-
replace github.com/apache/arrow/go/v12 => github.com/cloudquery/arrow/go/v12 v12.0.0-20230317130341-c648117570af
19+
replace github.com/apache/arrow/go/v12 => github.com/cloudquery/arrow/go/v12 v12.0.0-20230419074556-00ceafa3b033
1920

2021
require (
22+
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
2123
github.com/andybalholm/brotli v1.0.5 // indirect
22-
github.com/apache/arrow/go/v12 v12.0.0-20230331222054-7e19111f2f81 // indirect
2324
github.com/goccy/go-json v0.10.2 // indirect
2425
github.com/google/flatbuffers v2.0.8+incompatible // indirect
2526
github.com/klauspost/asmfmt v1.3.2 // indirect
@@ -33,15 +34,12 @@ require (
3334
)
3435

3536
require (
36-
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
3737
github.com/apache/thrift v0.18.1 // indirect
3838
github.com/davecgh/go-spew v1.1.1 // indirect
3939
github.com/golang/snappy v0.0.4 // indirect
4040
github.com/klauspost/compress v1.16.3 // indirect
4141
github.com/pierrec/lz4/v4 v4.1.17 // indirect
4242
github.com/pmezard/go-difflib v1.0.0 // indirect
43-
github.com/xitongsys/parquet-go v1.6.2 // indirect
44-
github.com/xitongsys/parquet-go-source v0.0.0-20230312005205-fbbcdea5f512 // indirect
4543
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect; indirect // indirect
4644
gopkg.in/yaml.v3 v3.0.1 // indirect
4745
)
@@ -75,10 +73,10 @@ require (
7573
github.com/spf13/cobra v1.6.1 // indirect
7674
github.com/spf13/pflag v1.0.5 // indirect
7775
github.com/thoas/go-funk v0.9.3 // indirect; indirect // indirect
78-
golang.org/x/net v0.8.0 // indirect; indirect // indirect
76+
golang.org/x/net v0.9.0 // indirect; indirect // indirect
7977
golang.org/x/sync v0.1.0 // indirect
80-
golang.org/x/sys v0.6.0 // indirect
81-
golang.org/x/text v0.8.0 // indirect
78+
golang.org/x/sys v0.7.0 // indirect
79+
golang.org/x/text v0.9.0 // indirect
8280
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 // indirect; indirect // indirect
8381
google.golang.org/grpc v1.54.0 // indirect
8482
google.golang.org/protobuf v1.30.0 // indirect

0 commit comments

Comments
 (0)