@@ -3,136 +3,153 @@ package client
33import (
44 "context"
55 "fmt"
6- "sort"
7- "strings"
86
7+ "github.com/apache/arrow/go/v13/arrow"
98 "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/queries"
10- "github.com/cloudquery/plugin-sdk/schema"
11- "github.com/cloudquery/plugin-sdk/specs"
9+ "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/typeconv"
10+ "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/util"
11+ "github.com/cloudquery/plugin-pb-go/specs"
12+ "github.com/cloudquery/plugin-sdk/v2/schema"
13+ "golang.org/x/exp/slices"
1214 "golang.org/x/sync/errgroup"
1315)
1416
1517// Migrate relies on the CLI/client to lock before running migration.
16- func (c * Client ) Migrate (ctx context.Context , tables schema.Tables ) error {
17- currentSchema , err := c .getTableDefinitions (ctx )
18+ func (c * Client ) Migrate (ctx context.Context , scs schema.Schemas ) error {
19+ have , err := c .getTableDefinitions (ctx , scs )
1820 if err != nil {
1921 return err
2022 }
2123
22- newSchema := queries .NormalizedTables (tables )
24+ want , err := typeconv .CanonizedSchemas (scs )
25+ if err != nil {
26+ return err
27+ }
2328 if c .mode != specs .MigrateModeForced {
24- nonSafeMigratableTables , changes := c . nonAutoMigratableTables ( newSchema , currentSchema )
25- if len (nonSafeMigratableTables ) > 0 {
26- return fmt .Errorf ("tables %s with changes %v require force migration. use 'migrate_mode: forced' " , strings . Join ( nonSafeMigratableTables , "," ), changes )
29+ unsafe := unsafeSchemaChanges ( have , want )
30+ if len (unsafe ) > 0 {
31+ return fmt .Errorf ("'migrate_mode: forced' is required for the following changes: \n %s " , util . SchemasChangesPrettified ( unsafe ) )
2732 }
2833 }
2934
3035 const maxConcurrentMigrate = 10
3136 eg , ctx := errgroup .WithContext (ctx )
3237 eg .SetLimit (maxConcurrentMigrate )
3338
34- for _ , table := range newSchema {
35- table := table
39+ for _ , want := range want {
40+ want := want
3641 eg .Go (func () (err error ) {
37- c .logger .Info ().Str ("table" , table .Name ).Msg ("Migrating table started" )
42+ tableName := schema .TableName (want )
43+ c .logger .Info ().Str ("table" , tableName ).Msg ("Migrating table started" )
3844 defer func () {
39- c .logger .Err (err ).Str ("table" , table . Name ).Msg ("Migrating table done" )
45+ c .logger .Err (err ).Str ("table" , tableName ).Msg ("Migrating table done" )
4046 }()
41- if len (table . Columns ) == 0 {
42- c .logger .Warn ().Str ("table" , table . Name ).Msg ("Table with no columns, skip" )
47+ if len (want . Fields () ) == 0 {
48+ c .logger .Warn ().Str ("table" , tableName ).Msg ("Table with no columns, skip" )
4349 return nil
4450 }
4551
46- current := currentSchema . Get ( table . Name )
47- if current == nil {
48- return c .createTable (ctx , table )
52+ have := have . SchemaByName ( tableName )
53+ if have == nil {
54+ return c .createTable (ctx , want )
4955 }
5056
51- return c .autoMigrate (ctx , table , current )
57+ return c .autoMigrate (ctx , have , want )
5258 })
5359 }
5460
5561 return eg .Wait ()
5662}
5763
58- func (c * Client ) nonAutoMigratableTables (tables schema.Tables , currentTables schema.Tables ) ([]string , [][]schema.TableColumnChange ) {
59- var result []string
60- var tableChanges [][]schema.TableColumnChange
61- for _ , t := range tables {
62- current := currentTables .Get (t .Name )
64+ func unsafeSchemaChanges (have , want schema.Schemas ) map [string ]schema.FieldChanges {
65+ result := make (map [string ]schema.FieldChanges )
66+ for _ , w := range want {
67+ current := have .SchemaByName (schema .TableName (w ))
6368 if current == nil {
6469 continue
6570 }
66- changes := t .GetChanges (current )
67- if ! c .canSafelyMigrate (changes ) {
68- result = append (result , t .Name )
69- tableChanges = append (tableChanges , changes )
71+ unsafe := unsafeChanges (schema .GetSchemaChanges (w , current ))
72+ if len (unsafe ) > 0 {
73+ result [schema .TableName (w )] = unsafe
7074 }
7175 }
72- return result , tableChanges
76+ return result
7377}
7478
75- func ( * Client ) canSafelyMigrate ( changes []schema.TableColumnChange ) bool {
76- for _ , change := range changes {
77- needsDrop := needsTableDrop ( change )
78- if needsDrop {
79- return false
79+ func unsafeChanges ( changes []schema.FieldChange ) schema. FieldChanges {
80+ unsafe := make ([]schema. FieldChange , 0 , len ( changes ))
81+ for _ , c := range changes {
82+ if needsTableDrop ( c ) {
83+ unsafe = append ( unsafe , c )
8084 }
8185 }
82- return true
86+ return slices . Clip ( unsafe )
8387}
8488
85- func (c * Client ) createTable (ctx context.Context , table * schema.Table ) (err error ) {
86- c .logger .Debug ().Str ("table" , table .Name ).Msg ("Table doesn't exist, creating" )
89+ func (c * Client ) createTable (ctx context.Context , sc * arrow.Schema ) (err error ) {
90+ c .logger .Debug ().Str ("table" , schema .TableName (sc )).Msg ("Table doesn't exist, creating" )
91+
92+ query , err := queries .CreateTable (sc , c .spec .Cluster , c .spec .Engine )
93+ if err != nil {
94+ return err
95+ }
8796
88- return c .conn .Exec (ctx , queries .CreateTable (table , c .spec .Cluster , c .spec .Engine ))
97+ if err := c .conn .Exec (ctx , query ); err != nil {
98+ return fmt .Errorf ("failed to create table, query:\n %s\n error: %w" , query , err )
99+ }
100+ return nil
89101}
90102
91- func (c * Client ) dropTable (ctx context.Context , table * schema. Table ) ( err error ) {
92- c .logger .Debug ().Str ("table" , table . Name ).Msg ("Dropping table" )
103+ func (c * Client ) dropTable (ctx context.Context , sc * arrow. Schema ) error {
104+ c .logger .Debug ().Str ("table" , schema . TableName ( sc ) ).Msg ("Dropping table" )
93105
94- return c .conn .Exec (ctx , queries .DropTable (table , c .spec .Cluster ))
106+ return c .conn .Exec (ctx , queries .DropTable (sc , c .spec .Cluster ))
95107}
96108
97- func needsTableDrop (change schema.TableColumnChange ) bool {
109+ func needsTableDrop (change schema.FieldChange ) bool {
98110 // We can safely add a nullable column without dropping the table
99- if change .Type == schema .TableColumnChangeTypeAdd && ! change .Current .CreationOptions . NotNull {
111+ if change .Type == schema .TableColumnChangeTypeAdd && change .Current .Nullable {
100112 return false
101113 }
102114
103115 // We can safely ignore removal of nullable columns without dropping the table
104- if change .Type == schema .TableColumnChangeTypeRemove && ! change .Previous .CreationOptions . NotNull {
116+ if change .Type == schema .TableColumnChangeTypeRemove && change .Previous .Nullable {
105117 return false
106118 }
107119
120+ // TODO: add check for update + new type is extending the current type (uint8 -> uint16, float32 -> float64, new struct field, etc).
108121 return true
109122}
110123
111- func changesSorter (a , b schema.TableColumnChange ) bool {
112- return needsTableDrop (a ) && ! needsTableDrop (b )
113- }
124+ func (c * Client ) autoMigrate (ctx context.Context , have , want * arrow.Schema ) error {
125+ changes := schema .GetSchemaChanges (want , have )
114126
115- func (c * Client ) autoMigrate (ctx context.Context , table * schema.Table , current * schema.Table ) (err error ) {
116- changes := table .GetChanges (current )
117- sort .SliceStable (changes , func (i , j int ) bool {
118- return changesSorter (changes [i ], changes [j ])
119- })
127+ if unsafe := unsafeChanges (changes ); len (unsafe ) > 0 {
128+ // we can get here only with migrate_mode: forced
129+ if err := c .dropTable (ctx , have ); err != nil {
130+ return err
131+ }
132+
133+ return c .createTable (ctx , want )
134+ }
135+
136+ tableName := schema .TableName (want )
120137 for _ , change := range changes {
121- switch {
122- case change .Type == schema .TableColumnChangeTypeAdd && ! change .Current .CreationOptions .NotNull :
123- c .logger .Debug ().Str ("table" , table .Name ).Str ("column" , change .Current .Name ).Msg ("Adding new column" )
124- err := c .conn .Exec (ctx , queries .AddColumn (table .Name , c .spec .Cluster , & change .Current ))
125- if err != nil {
126- return err
127- }
128- case change .Type == schema .TableColumnChangeTypeRemove && ! change .Previous .CreationOptions .NotNull :
138+ // we only handle new columns
139+ if change .Type != schema .TableColumnChangeTypeAdd {
129140 continue
130- default :
131- err := c .dropTable (ctx , table )
132- if err != nil {
133- return err
134- }
135- return c .createTable (ctx , table )
141+ }
142+
143+ c .logger .Debug ().Str ("table" , tableName ).Str ("column" , change .Current .Name ).Msg ("Adding new column" )
144+
145+ query , err := queries .AddColumn (tableName , c .spec .Cluster , change .Current )
146+ if err != nil {
147+ return err
148+ }
149+
150+ err = c .conn .Exec (ctx , query )
151+ if err != nil {
152+ return err
136153 }
137154 }
138155
0 commit comments