Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions plugins/destination/mysql/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ import (
"fmt"
"time"

"github.com/cloudquery/plugin-sdk/plugins/destination"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
"github.com/cloudquery/plugin-sdk/v2/specs"
"github.com/rs/zerolog"

mysql "github.com/go-sql-driver/mysql"
)

type Client struct {
destination.UnimplementedUnmanagedWriter
destination.DefaultReverseTransformer
logger zerolog.Logger

spec specs.Destination
Expand Down
4 changes: 2 additions & 2 deletions plugins/destination/mysql/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"testing"

"github.com/cloudquery/cloudquery/plugins/destination/mysql/resources/plugin"
"github.com/cloudquery/plugin-sdk/plugins/destination"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
"github.com/cloudquery/plugin-sdk/v2/specs"
)

var migrateStrategy = destination.MigrateStrategy{
Expand Down
9 changes: 5 additions & 4 deletions plugins/destination/mysql/client/deletestale.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"fmt"
"time"

"github.com/cloudquery/plugin-sdk/schema"
"github.com/cloudquery/plugin-sdk/v2/schema"
)

func (c *Client) DeleteStale(ctx context.Context, tables schema.Tables, source string, syncTime time.Time) error {
for _, table := range tables.FlattenTables() {
query := fmt.Sprintf(`delete from %s where %s = ? and %s < ?`, identifier(table.Name), identifier(schema.CqSourceNameColumn.Name), identifier(schema.CqSyncTimeColumn.Name))
func (c *Client) DeleteStale(ctx context.Context, tables schema.Schemas, source string, syncTime time.Time) error {
for _, table := range tables {
name := schema.TableName(table)
query := fmt.Sprintf(`delete from %s where %s = ? and %s < ?`, identifier(name), identifier(schema.CqSourceNameColumn.Name), identifier(schema.CqSyncTimeColumn.Name))
if _, err := c.db.ExecContext(ctx, query, source, syncTime); err != nil {
return err
}
Expand Down
105 changes: 64 additions & 41 deletions plugins/destination/mysql/client/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,58 +5,70 @@ import (
"fmt"
"strings"

"github.com/cloudquery/plugin-sdk/schema"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/apache/arrow/go/v12/arrow"
"github.com/cloudquery/plugin-sdk/v2/schema"
"github.com/cloudquery/plugin-sdk/v2/specs"
)

func (c *Client) normalizedTables(tables schema.Tables) schema.Tables {
var normalized schema.Tables
for _, table := range tables.FlattenTables() {
for i := range table.Columns {
// Since multiple schema types can map to the same MySQL type we need to normalize them to avoid false positives when detecting schema changes
// This should never fail we convert an internal schema type to an MySQL type and back
schemaType, _ := SchemaType(table.Name, table.Columns[i].Name, SQLType(table.Columns[i].Type))
table.Columns[i].Type = schemaType
}
// If there are no PKs, we use CqID as PK
pks := table.PrimaryKeys()
if !c.pkEnabled() || len(pks) == 0 {
table.Columns.Get(schema.CqIDColumn.Name).CreationOptions.PrimaryKey = true
}

for i, col := range table.Columns {
table.Columns[i].CreationOptions.NotNull = col.CreationOptions.NotNull || col.CreationOptions.PrimaryKey
func normalizeSchemas(tables schema.Schemas) (schema.Schemas, error) {
var normalized schema.Schemas
for _, sc := range tables {
tableName := schema.TableName(sc)
fields := make([]arrow.Field, 0)
for _, f := range sc.Fields() {
keys := make([]string, 0)
values := make([]string, 0)
origKeys := f.Metadata.Keys()
origValues := f.Metadata.Values()
for k, v := range origKeys {
if v != schema.MetadataUnique {
keys = append(keys, v)
values = append(values, origValues[k])
}
}
normalizedType, err := mySQLTypeToArrowType(tableName, f.Name, arrowTypeToMySqlStr(f.Type))
if err != nil {
return nil, err
}
fields = append(fields, arrow.Field{
Name: f.Name,
Type: normalizedType,
Nullable: f.Nullable && !schema.IsPk(f),
Metadata: arrow.NewMetadata(keys, values),
})
}

normalized = append(normalized, table)
md := sc.Metadata()
normalized = append(normalized, arrow.NewSchema(fields, &md))
}

return normalized
return normalized, nil
}

func (c *Client) nonAutoMigrableTables(tables schema.Tables, schemaTables schema.Tables) (names []string, changes [][]schema.TableColumnChange) {
var tableChanges [][]schema.TableColumnChange
func (c *Client) nonAutoMigrtableTables(tables schema.Schemas, schemaTables schema.Schemas) (names []string, changes [][]schema.FieldChange) {
var tableChanges [][]schema.FieldChange
for _, t := range tables {
schemaTable := schemaTables.Get(t.Name)
tableName := schema.TableName(t)
schemaTable := schemaTables.SchemaByName(tableName)
if schemaTable == nil {
continue
}
changes := t.GetChanges(schemaTable)
changes := schema.GetSchemaChanges(t, schemaTable)
if !c.canAutoMigrate(changes) {
names = append(names, t.Name)
names = append(names, tableName)
tableChanges = append(tableChanges, changes)
}
}
return names, tableChanges
}

func (*Client) canAutoMigrate(changes []schema.TableColumnChange) bool {
func (*Client) canAutoMigrate(changes []schema.FieldChange) bool {
for _, change := range changes {
if change.Type == schema.TableColumnChangeTypeAdd && (change.Current.CreationOptions.PrimaryKey || change.Current.CreationOptions.NotNull) {
if change.Type == schema.TableColumnChangeTypeAdd && (schema.IsPk(change.Current) || !change.Current.Nullable) {
return false
}

if change.Type == schema.TableColumnChangeTypeRemove && (change.Previous.CreationOptions.PrimaryKey || change.Previous.CreationOptions.NotNull) {
if change.Type == schema.TableColumnChangeTypeRemove && (schema.IsPk(change.Previous) || !change.Previous.Nullable) {
return false
}

Expand All @@ -67,7 +79,7 @@ func (*Client) canAutoMigrate(changes []schema.TableColumnChange) bool {
return true
}

func (c *Client) autoMigrateTable(ctx context.Context, table *schema.Table, changes []schema.TableColumnChange) error {
func (c *Client) autoMigrateTable(ctx context.Context, table *arrow.Schema, changes []schema.FieldChange) error {
for _, change := range changes {
if change.Type == schema.TableColumnChangeTypeAdd {
err := c.addColumn(ctx, table, change.Current)
Expand All @@ -81,42 +93,53 @@ func (c *Client) autoMigrateTable(ctx context.Context, table *schema.Table, chan
}

// Migrate relies on the CLI/client to lock before running migration.
func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
func (c *Client) Migrate(ctx context.Context, tables schema.Schemas) error {
schemaTables, err := c.schemaTables(ctx, tables)
if err != nil {
return err
}

normalizedTables := c.normalizedTables(tables)
normalizedTables, err := normalizeSchemas(tables)
if err != nil {
return err
}

if c.spec.MigrateMode != specs.MigrateModeForced {
nonAutoMigrableTables, changes := c.nonAutoMigrableTables(normalizedTables, schemaTables)
if len(nonAutoMigrableTables) > 0 {
return fmt.Errorf("tables %s with changes %v require force migration. use 'migrate_mode: forced'", strings.Join(nonAutoMigrableTables, ","), changes)
nonAutoMigrtableTables, changes := c.nonAutoMigrtableTables(normalizedTables, schemaTables)
if len(nonAutoMigrtableTables) > 0 {
return fmt.Errorf("tables %s with changes %v require force migration. use 'migrate_mode: forced'", strings.Join(nonAutoMigrtableTables, ","), changes)
}
}

for _, table := range normalizedTables {
c.logger.Info().Str("table", table.Name).Msg("Migrating table")
schemaTable := schemaTables.Get(table.Name)
tableName := schema.TableName(table)
if tableName == "" {
return fmt.Errorf("schema %s has no table name", table.String())
}
c.logger.Info().Str("table", tableName).Msg("Migrating table")
if len(table.Fields()) == 0 {
c.logger.Info().Str("table", tableName).Msg("Table with no columns, skipping")
continue
}
schemaTable := schemaTables.SchemaByName(tableName)
if schemaTable == nil {
c.logger.Info().Str("table", table.Name).Msg("Table doesn't exist, creating")
c.logger.Info().Str("table", tableName).Msg("Table doesn't exist, creating")
if err := c.createTable(ctx, table); err != nil {
return err
}
continue
}

changes := table.GetChanges(schemaTable)
changes := schema.GetSchemaChanges(table, schemaTable)
if c.canAutoMigrate(changes) {
c.logger.Info().Str("table", table.Name).Msg("Table exists, auto-migrating")
c.logger.Info().Str("table", tableName).Msg("Table exists, auto-migrating")
if err := c.autoMigrateTable(ctx, table, changes); err != nil {
return err
}
continue
}

c.logger.Info().Str("table", table.Name).Msg("Table exists, force migration required")
c.logger.Info().Str("table", tableName).Msg("Table exists, force migration required")
if err := c.recreateTable(ctx, table); err != nil {
return err
}
Expand Down
Loading