Skip to content

Commit c86c36a

Browse files
authored
fix!: Upgrade azblob plugin-sdk and filetypes to v2 (#10218)
Closes #10100 BEGIN_COMMIT_OVERRIDE feat: Update to use [Apache Arrow](https://arrow.apache.org/) type system BREAKING-CHANGE: This release introduces an internal change to our type system to use [Apache Arrow](https://arrow.apache.org/). This should not have any visible breaking changes for CSV or JSON output formats, however the Parquet output changes for UUID columns, which now have dashes, and timestamps, which now uses the default Arrow time format (e.g. `2023-01-02 12:23:45`). If you encounter an issue during the upgrade, please submit a [bug report](https://github.com/cloudquery/cloudquery/issues/new/choose). END_COMMIT_OVERRIDE
1 parent 859212b commit c86c36a

File tree

10 files changed

+62
-670
lines changed

10 files changed

+62
-670
lines changed

plugins/destination/azblob/client/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ import (
66
"strings"
77

88
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
9+
"github.com/cloudquery/filetypes/v2"
910

1011
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
11-
"github.com/cloudquery/filetypes"
12-
"github.com/cloudquery/plugin-sdk/plugins/destination"
13-
"github.com/cloudquery/plugin-sdk/specs"
12+
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
13+
"github.com/cloudquery/plugin-sdk/v2/specs"
1414
"github.com/rs/zerolog"
1515
)
1616

plugins/destination/azblob/client/client_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package client
33
import (
44
"testing"
55

6-
"github.com/cloudquery/filetypes"
7-
"github.com/cloudquery/plugin-sdk/plugins/destination"
8-
"github.com/cloudquery/plugin-sdk/specs"
6+
"github.com/cloudquery/filetypes/v2"
7+
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
8+
"github.com/cloudquery/plugin-sdk/v2/specs"
99
)
1010

1111
const storage_account = "cqdestinationazblob"

plugins/destination/azblob/client/deletestale.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ import (
55
"fmt"
66
"time"
77

8-
"github.com/cloudquery/plugin-sdk/schema"
8+
"github.com/cloudquery/plugin-sdk/v2/schema"
99
)
1010

11-
func (*Client) DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error {
11+
func (*Client) DeleteStale(ctx context.Context, schemas schema.Schemas, sourceName string, syncTime time.Time) error {
1212
return fmt.Errorf("destination plugin doesn't support overwrite-delete-stale mode. please use append mode")
1313
}

plugins/destination/azblob/client/migrate.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ package client
33
import (
44
"context"
55

6-
"github.com/cloudquery/plugin-sdk/schema"
6+
"github.com/cloudquery/plugin-sdk/v2/schema"
77
)
88

9-
func (*Client) Migrate(ctx context.Context, tables schema.Tables) error {
9+
func (*Client) Migrate(ctx context.Context, schemas schema.Schemas) error {
1010
// migrate is not needed in append mode
1111
return nil
1212
}
Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,31 @@
11
package client
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
7+
"io"
68

7-
"github.com/cloudquery/plugin-sdk/schema"
9+
"github.com/apache/arrow/go/v12/arrow"
10+
"github.com/cloudquery/plugin-sdk/v2/schema"
811
)
912

10-
func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- []any) error {
13+
func (c *Client) Read(ctx context.Context, arrowSchema *arrow.Schema, sourceName string, res chan<- arrow.Record) error {
14+
tableName := schema.TableName(arrowSchema)
1115
if !c.pluginSpec.NoRotate {
12-
return fmt.Errorf("reading is not supported when no_rotate is false. Table: %q; Source: %q", table.Name, sourceName)
16+
return fmt.Errorf("reading is not supported when `no_rotate` is false. Table: %q; Source: %q", tableName, sourceName)
1317
}
14-
name := fmt.Sprintf("%s/%s.%s", c.pluginSpec.Path, table.Name, c.pluginSpec.Format)
18+
name := fmt.Sprintf("%s/%s.%s", c.pluginSpec.Path, tableName, c.pluginSpec.Format)
1519

1620
response, err := c.storageClient.DownloadStream(ctx, c.pluginSpec.Container, name, nil)
1721
if err != nil {
1822
return err
1923
}
2024
defer response.Body.Close()
21-
22-
return c.Client.Read(response.Body, table, sourceName, res)
25+
b, err := io.ReadAll(response.Body)
26+
if err != nil {
27+
return err
28+
}
29+
byteReader := bytes.NewReader(b)
30+
return c.Client.Read(byteReader, arrowSchema, sourceName, res)
2331
}

plugins/destination/azblob/client/spec.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package client
33
import (
44
"fmt"
55

6-
"github.com/cloudquery/filetypes"
6+
"github.com/cloudquery/filetypes/v2"
77
)
88

99
type Spec struct {

plugins/destination/azblob/client/write.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,23 @@ import (
66
"fmt"
77
"io"
88

9-
"github.com/cloudquery/plugin-sdk/schema"
9+
"github.com/apache/arrow/go/v12/arrow"
10+
"github.com/cloudquery/plugin-sdk/v2/schema"
1011
"github.com/google/uuid"
1112
)
1213

13-
func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, data [][]any) error {
14-
name := fmt.Sprintf("%s/%s.%s.%s", c.pluginSpec.Path, table.Name, c.pluginSpec.Format, uuid.NewString())
14+
func (c *Client) WriteTableBatch(ctx context.Context, arrowSchema *arrow.Schema, data []arrow.Record) error {
15+
if len(data) == 0 {
16+
return nil
17+
}
18+
tableName := schema.TableName(arrowSchema)
19+
name := fmt.Sprintf("%s/%s.%s.%s", c.pluginSpec.Path, tableName, c.pluginSpec.Format, uuid.NewString())
1520
if c.pluginSpec.NoRotate {
16-
name = fmt.Sprintf("%s/%s.%s", c.pluginSpec.Path, table.Name, c.pluginSpec.Format)
21+
name = fmt.Sprintf("%s/%s.%s", c.pluginSpec.Path, tableName, c.pluginSpec.Format)
1722
}
1823
var b bytes.Buffer
1924
w := io.Writer(&b)
20-
if err := c.Client.WriteTableBatchFile(w, table, data); err != nil {
25+
if err := c.Client.WriteTableBatchFile(w, arrowSchema, data); err != nil {
2126
return err
2227
}
2328
r := io.Reader(&b)

plugins/destination/azblob/go.mod

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,42 +5,38 @@ go 1.19
55
require (
66
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.2
77
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0
8-
github.com/cloudquery/filetypes v1.6.2
9-
github.com/cloudquery/plugin-sdk v1.45.0
8+
github.com/apache/arrow/go/v12 v12.0.0-20230417014917-9888ac36c142
9+
github.com/cloudquery/filetypes/v2 v2.0.2
10+
github.com/cloudquery/plugin-sdk/v2 v2.3.7
1011
github.com/google/uuid v1.3.0
1112
github.com/rs/zerolog v1.29.0
1213
)
1314

14-
replace github.com/apache/arrow/go/v12 => github.com/cloudquery/arrow/go/v12 v12.0.0-20230317130341-c648117570af
15+
replace github.com/apache/arrow/go/v12 => github.com/cloudquery/arrow/go/v12 v12.0.0-20230419074556-00ceafa3b033
1516

1617
require (
18+
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
1719
github.com/andybalholm/brotli v1.0.5 // indirect
18-
github.com/apache/arrow/go/v12 v12.0.0-20230331222054-7e19111f2f81 // indirect
20+
github.com/dnaeon/go-vcr v1.2.0 // indirect
1921
github.com/goccy/go-json v0.10.2 // indirect
2022
github.com/google/flatbuffers v23.3.3+incompatible // indirect
2123
github.com/klauspost/asmfmt v1.3.2 // indirect
2224
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
2325
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
2426
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
27+
github.com/stretchr/objx v0.5.0 // indirect
2528
github.com/zeebo/xxh3 v1.0.2 // indirect
2629
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
2730
golang.org/x/mod v0.9.0 // indirect
2831
golang.org/x/tools v0.7.0 // indirect
2932
)
3033

3134
require (
32-
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
3335
github.com/apache/thrift v0.18.1 // indirect
34-
github.com/davecgh/go-spew v1.1.1 // indirect
3536
github.com/golang/snappy v0.0.4 // indirect
3637
github.com/klauspost/compress v1.16.3 // indirect
3738
github.com/pierrec/lz4/v4 v4.1.17 // indirect
38-
github.com/pmezard/go-difflib v1.0.0 // indirect
39-
github.com/stretchr/testify v1.8.2 // indirect
40-
github.com/xitongsys/parquet-go v1.6.2 // indirect
41-
github.com/xitongsys/parquet-go-source v0.0.0-20230312005205-fbbcdea5f512 // indirect
4239
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect; indirect // indirect
43-
gopkg.in/yaml.v3 v3.0.1 // indirect
4440
)
4541

4642
require (
@@ -63,10 +59,10 @@ require (
6359
github.com/spf13/pflag v1.0.5 // indirect
6460
github.com/thoas/go-funk v0.9.3 // indirect; indirect // indirect
6561
golang.org/x/crypto v0.7.0 // indirect
66-
golang.org/x/net v0.8.0 // indirect; indirect // indirect
62+
golang.org/x/net v0.9.0 // indirect; indirect // indirect
6763
golang.org/x/sync v0.1.0 // indirect
68-
golang.org/x/sys v0.6.0 // indirect
69-
golang.org/x/text v0.8.0 // indirect
64+
golang.org/x/sys v0.7.0 // indirect
65+
golang.org/x/text v0.9.0 // indirect
7066
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 // indirect; indirect // indirect
7167
google.golang.org/grpc v1.54.0 // indirect
7268
google.golang.org/protobuf v1.30.0 // indirect

0 commit comments

Comments
 (0)