Skip to content

Commit 26e7d0e

Browse files
authored
feat(clickhouse)!: Upgrade to github.com/cloudquery/plugin-sdk/v2 (#10284)
Closes #9847 BEGIN_COMMIT_OVERRIDE feat: Update to use [Apache Arrow](https://arrow.apache.org/) type system (#10284) BREAKING-CHANGE: This release introduces an internal change to our type system to use [Apache Arrow](https://arrow.apache.org/). This should not have any visible breaking changes, however due to the size of the change we are introducing it under a major version bump to communicate that it might have some bugs that we weren't able to catch during our internal tests. If you encounter an issue during the upgrade, please submit a [bug report](https://github.com/cloudquery/cloudquery/issues/new/choose). feat: Allow nullable columns in primary keys (#10284) BREAKING-CHANGE: This change enables [`allow_nullable_key`](https://clickhouse.com/docs/en/operations/settings/settings#allow-nullable-key) for tables (#10284). END_COMMIT_OVERRIDE
1 parent a55da3d commit 26e7d0e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

97 files changed

+3567
-710
lines changed

plugins/destination/clickhouse/client/client.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import (
88

99
"github.com/ClickHouse/clickhouse-go/v2"
1010
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
11-
"github.com/cloudquery/plugin-sdk/plugins/destination"
12-
"github.com/cloudquery/plugin-sdk/schema"
13-
"github.com/cloudquery/plugin-sdk/specs"
11+
"github.com/cloudquery/plugin-pb-go/specs"
12+
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
13+
"github.com/cloudquery/plugin-sdk/v2/schema"
1414
"github.com/rs/zerolog"
1515
)
1616

@@ -26,7 +26,7 @@ type Client struct {
2626

2727
var _ destination.Client = (*Client)(nil)
2828

29-
func (*Client) DeleteStale(context.Context, schema.Tables, string, time.Time) error {
29+
func (*Client) DeleteStale(context.Context, schema.Schemas, string, time.Time) error {
3030
return errors.New("DeleteStale is not implemented")
3131
}
3232

plugins/destination/clickhouse/client/client_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import (
66
"testing"
77

88
"github.com/cloudquery/cloudquery/plugins/destination/clickhouse/resources/plugin"
9-
"github.com/cloudquery/plugin-sdk/plugins/destination"
10-
"github.com/cloudquery/plugin-sdk/specs"
9+
"github.com/cloudquery/plugin-pb-go/specs"
10+
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
1111
)
1212

1313
var migrateStrategy = destination.MigrateStrategy{

plugins/destination/clickhouse/client/migrate.go

Lines changed: 84 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -3,136 +3,153 @@ package client
33
import (
44
"context"
55
"fmt"
6-
"sort"
7-
"strings"
86

7+
"github.com/apache/arrow/go/v13/arrow"
98
"github.com/cloudquery/cloudquery/plugins/destination/clickhouse/queries"
10-
"github.com/cloudquery/plugin-sdk/schema"
11-
"github.com/cloudquery/plugin-sdk/specs"
9+
"github.com/cloudquery/cloudquery/plugins/destination/clickhouse/typeconv"
10+
"github.com/cloudquery/cloudquery/plugins/destination/clickhouse/util"
11+
"github.com/cloudquery/plugin-pb-go/specs"
12+
"github.com/cloudquery/plugin-sdk/v2/schema"
13+
"golang.org/x/exp/slices"
1214
"golang.org/x/sync/errgroup"
1315
)
1416

1517
// Migrate relies on the CLI/client to lock before running migration.
16-
func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
17-
currentSchema, err := c.getTableDefinitions(ctx)
18+
func (c *Client) Migrate(ctx context.Context, scs schema.Schemas) error {
19+
have, err := c.getTableDefinitions(ctx, scs)
1820
if err != nil {
1921
return err
2022
}
2123

22-
newSchema := queries.NormalizedTables(tables)
24+
want, err := typeconv.CanonizedSchemas(scs)
25+
if err != nil {
26+
return err
27+
}
2328
if c.mode != specs.MigrateModeForced {
24-
nonSafeMigratableTables, changes := c.nonAutoMigratableTables(newSchema, currentSchema)
25-
if len(nonSafeMigratableTables) > 0 {
26-
return fmt.Errorf("tables %s with changes %v require force migration. use 'migrate_mode: forced'", strings.Join(nonSafeMigratableTables, ","), changes)
29+
unsafe := unsafeSchemaChanges(have, want)
30+
if len(unsafe) > 0 {
31+
return fmt.Errorf("'migrate_mode: forced' is required for the following changes: \n%s", util.SchemasChangesPrettified(unsafe))
2732
}
2833
}
2934

3035
const maxConcurrentMigrate = 10
3136
eg, ctx := errgroup.WithContext(ctx)
3237
eg.SetLimit(maxConcurrentMigrate)
3338

34-
for _, table := range newSchema {
35-
table := table
39+
for _, want := range want {
40+
want := want
3641
eg.Go(func() (err error) {
37-
c.logger.Info().Str("table", table.Name).Msg("Migrating table started")
42+
tableName := schema.TableName(want)
43+
c.logger.Info().Str("table", tableName).Msg("Migrating table started")
3844
defer func() {
39-
c.logger.Err(err).Str("table", table.Name).Msg("Migrating table done")
45+
c.logger.Err(err).Str("table", tableName).Msg("Migrating table done")
4046
}()
41-
if len(table.Columns) == 0 {
42-
c.logger.Warn().Str("table", table.Name).Msg("Table with no columns, skip")
47+
if len(want.Fields()) == 0 {
48+
c.logger.Warn().Str("table", tableName).Msg("Table with no columns, skip")
4349
return nil
4450
}
4551

46-
current := currentSchema.Get(table.Name)
47-
if current == nil {
48-
return c.createTable(ctx, table)
52+
have := have.SchemaByName(tableName)
53+
if have == nil {
54+
return c.createTable(ctx, want)
4955
}
5056

51-
return c.autoMigrate(ctx, table, current)
57+
return c.autoMigrate(ctx, have, want)
5258
})
5359
}
5460

5561
return eg.Wait()
5662
}
5763

58-
func (c *Client) nonAutoMigratableTables(tables schema.Tables, currentTables schema.Tables) ([]string, [][]schema.TableColumnChange) {
59-
var result []string
60-
var tableChanges [][]schema.TableColumnChange
61-
for _, t := range tables {
62-
current := currentTables.Get(t.Name)
64+
func unsafeSchemaChanges(have, want schema.Schemas) map[string]schema.FieldChanges {
65+
result := make(map[string]schema.FieldChanges)
66+
for _, w := range want {
67+
current := have.SchemaByName(schema.TableName(w))
6368
if current == nil {
6469
continue
6570
}
66-
changes := t.GetChanges(current)
67-
if !c.canSafelyMigrate(changes) {
68-
result = append(result, t.Name)
69-
tableChanges = append(tableChanges, changes)
71+
unsafe := unsafeChanges(schema.GetSchemaChanges(w, current))
72+
if len(unsafe) > 0 {
73+
result[schema.TableName(w)] = unsafe
7074
}
7175
}
72-
return result, tableChanges
76+
return result
7377
}
7478

75-
func (*Client) canSafelyMigrate(changes []schema.TableColumnChange) bool {
76-
for _, change := range changes {
77-
needsDrop := needsTableDrop(change)
78-
if needsDrop {
79-
return false
79+
func unsafeChanges(changes []schema.FieldChange) schema.FieldChanges {
80+
unsafe := make([]schema.FieldChange, 0, len(changes))
81+
for _, c := range changes {
82+
if needsTableDrop(c) {
83+
unsafe = append(unsafe, c)
8084
}
8185
}
82-
return true
86+
return slices.Clip(unsafe)
8387
}
8488

85-
func (c *Client) createTable(ctx context.Context, table *schema.Table) (err error) {
86-
c.logger.Debug().Str("table", table.Name).Msg("Table doesn't exist, creating")
89+
func (c *Client) createTable(ctx context.Context, sc *arrow.Schema) (err error) {
90+
c.logger.Debug().Str("table", schema.TableName(sc)).Msg("Table doesn't exist, creating")
91+
92+
query, err := queries.CreateTable(sc, c.spec.Cluster, c.spec.Engine)
93+
if err != nil {
94+
return err
95+
}
8796

88-
return c.conn.Exec(ctx, queries.CreateTable(table, c.spec.Cluster, c.spec.Engine))
97+
if err := c.conn.Exec(ctx, query); err != nil {
98+
return fmt.Errorf("failed to create table, query:\n%s\nerror: %w", query, err)
99+
}
100+
return nil
89101
}
90102

91-
func (c *Client) dropTable(ctx context.Context, table *schema.Table) (err error) {
92-
c.logger.Debug().Str("table", table.Name).Msg("Dropping table")
103+
func (c *Client) dropTable(ctx context.Context, sc *arrow.Schema) error {
104+
c.logger.Debug().Str("table", schema.TableName(sc)).Msg("Dropping table")
93105

94-
return c.conn.Exec(ctx, queries.DropTable(table, c.spec.Cluster))
106+
return c.conn.Exec(ctx, queries.DropTable(sc, c.spec.Cluster))
95107
}
96108

97-
func needsTableDrop(change schema.TableColumnChange) bool {
109+
func needsTableDrop(change schema.FieldChange) bool {
98110
// We can safely add a nullable column without dropping the table
99-
if change.Type == schema.TableColumnChangeTypeAdd && !change.Current.CreationOptions.NotNull {
111+
if change.Type == schema.TableColumnChangeTypeAdd && change.Current.Nullable {
100112
return false
101113
}
102114

103115
// We can safely ignore removal of nullable columns without dropping the table
104-
if change.Type == schema.TableColumnChangeTypeRemove && !change.Previous.CreationOptions.NotNull {
116+
if change.Type == schema.TableColumnChangeTypeRemove && change.Previous.Nullable {
105117
return false
106118
}
107119

120+
// TODO: add check for update + new type is extending the current type (uint8 -> uint16, float32 -> float64, new struct field, etc).
108121
return true
109122
}
110123

111-
func changesSorter(a, b schema.TableColumnChange) bool {
112-
return needsTableDrop(a) && !needsTableDrop(b)
113-
}
124+
func (c *Client) autoMigrate(ctx context.Context, have, want *arrow.Schema) error {
125+
changes := schema.GetSchemaChanges(want, have)
114126

115-
func (c *Client) autoMigrate(ctx context.Context, table *schema.Table, current *schema.Table) (err error) {
116-
changes := table.GetChanges(current)
117-
sort.SliceStable(changes, func(i, j int) bool {
118-
return changesSorter(changes[i], changes[j])
119-
})
127+
if unsafe := unsafeChanges(changes); len(unsafe) > 0 {
128+
// we can get here only with migrate_mode: forced
129+
if err := c.dropTable(ctx, have); err != nil {
130+
return err
131+
}
132+
133+
return c.createTable(ctx, want)
134+
}
135+
136+
tableName := schema.TableName(want)
120137
for _, change := range changes {
121-
switch {
122-
case change.Type == schema.TableColumnChangeTypeAdd && !change.Current.CreationOptions.NotNull:
123-
c.logger.Debug().Str("table", table.Name).Str("column", change.Current.Name).Msg("Adding new column")
124-
err := c.conn.Exec(ctx, queries.AddColumn(table.Name, c.spec.Cluster, &change.Current))
125-
if err != nil {
126-
return err
127-
}
128-
case change.Type == schema.TableColumnChangeTypeRemove && !change.Previous.CreationOptions.NotNull:
138+
// we only handle new columns
139+
if change.Type != schema.TableColumnChangeTypeAdd {
129140
continue
130-
default:
131-
err := c.dropTable(ctx, table)
132-
if err != nil {
133-
return err
134-
}
135-
return c.createTable(ctx, table)
141+
}
142+
143+
c.logger.Debug().Str("table", tableName).Str("column", change.Current.Name).Msg("Adding new column")
144+
145+
query, err := queries.AddColumn(tableName, c.spec.Cluster, change.Current)
146+
if err != nil {
147+
return err
148+
}
149+
150+
err = c.conn.Exec(ctx, query)
151+
if err != nil {
152+
return err
136153
}
137154
}
138155

plugins/destination/clickhouse/client/read.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ import (
55
"reflect"
66

77
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
8+
"github.com/apache/arrow/go/v13/arrow"
89
"github.com/cloudquery/cloudquery/plugins/destination/clickhouse/queries"
9-
"github.com/cloudquery/plugin-sdk/schema"
10+
"github.com/cloudquery/cloudquery/plugins/destination/clickhouse/typeconv/arrow/values"
1011
)
1112

12-
func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- []any) error {
13-
query, params := queries.Read(sourceName, table)
13+
func (c *Client) Read(ctx context.Context, sc *arrow.Schema, sourceName string, res chan<- arrow.Record) error {
14+
query, params := queries.Read(sourceName, sc)
1415

1516
rows, err := c.conn.Query(ctx, query, params...)
1617
if err != nil {
@@ -21,13 +22,18 @@ func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName strin
2122
columnTypes := rows.ColumnTypes()
2223

2324
for rows.Next() {
24-
resource := rowArr(columnTypes)
25+
row := rowArr(columnTypes)
2526

26-
if err := rows.Scan(resource...); err != nil {
27+
if err := rows.Scan(row...); err != nil {
2728
return err
2829
}
2930

30-
res <- resource
31+
record, err := values.Record(sc, row)
32+
if err != nil {
33+
return err
34+
}
35+
36+
res <- record
3137
}
3238

3339
return nil

plugins/destination/clickhouse/client/reverse_transform.go

Lines changed: 0 additions & 54 deletions
This file was deleted.

plugins/destination/clickhouse/client/table.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,22 @@ import (
44
"context"
55

66
"github.com/cloudquery/cloudquery/plugins/destination/clickhouse/queries"
7-
"github.com/cloudquery/plugin-sdk/schema"
7+
"github.com/cloudquery/plugin-sdk/v2/schema"
88
)
99

10-
func (c *Client) getTableDefinitions(ctx context.Context) (schema.Tables, error) {
10+
func (c *Client) getTableDefinitions(ctx context.Context, scs schema.Schemas) (schema.Schemas, error) {
11+
// need proper description without flattened columns
12+
const flattenNested0 = "SET flatten_nested = 0"
13+
if err := c.conn.Exec(ctx, flattenNested0); err != nil {
14+
return nil, err
15+
}
16+
1117
query, params := queries.GetTablesSchema(c.database)
1218
rows, err := c.conn.Query(ctx, query, params...)
1319
if err != nil {
1420
return nil, err
1521
}
1622
defer rows.Close()
1723

18-
return queries.ScanTableSchemas(rows)
24+
return queries.ScanTableSchemas(rows, scs)
1925
}

0 commit comments

Comments
 (0)