@@ -17,6 +17,7 @@ import (
1717 "github.com/cloudquery/plugin-sdk/v4/message"
1818 "github.com/cloudquery/plugin-sdk/v4/plugin"
1919 "github.com/cloudquery/plugin-sdk/v4/scalar"
20+ "github.com/cloudquery/plugin-sdk/v4/schema"
2021 "github.com/cloudquery/plugin-sdk/v4/types"
2122 "github.com/google/uuid"
2223 pgx_zero_log "github.com/jackc/pgx-zerolog"
@@ -197,6 +198,34 @@ func createTestTable(ctx context.Context, conn *pgxpool.Pool, tableName string)
197198 return nil
198199}
199200
201+ func createTableWithUniqueKeys (ctx context.Context , conn * pgxpool.Pool , tableName string ) error {
202+ var query = `
203+ create table %s (
204+ column1 int primary key,
205+ column2 int unique,
206+ column3 int unique,
207+ column4 int unique,
208+ column5 int unique,
209+ column6 int unique,
210+ column7 int unique,
211+ column8 int unique,
212+ column9 int unique,
213+ column10 int unique,
214+ column11 int unique,
215+ column12 int unique,
216+ column13 int unique,
217+ column14 int unique,
218+ column15 int unique,
219+ column16 int
220+ )
221+ `
222+
223+ if _ , err := conn .Exec (ctx , fmt .Sprintf (query , tableName )); err != nil {
224+ return err
225+ }
226+ return nil
227+ }
228+
200229func insertTestTable (ctx context.Context , conn * pgxpool.Pool , tableName string , testCases []testCase ) error {
201230 var query = ""
202231 query += "INSERT INTO " + pgx.Identifier {tableName }.Sanitize () + " ("
@@ -485,3 +514,75 @@ func IsContextDeadlineExceeded(err error) bool {
485514 }
486515 return deadlineExceeded
487516}
517+
518+ func TestMigrate (t * testing.T ) {
519+ p := Plugin ()
520+ ctx := context .Background ()
521+ l := zerolog .New (zerolog .NewTestWriter (t )).Output (
522+ zerolog.ConsoleWriter {Out : os .Stderr , TimeFormat : time .StampMicro },
523+ ).Level (zerolog .DebugLevel ).With ().Timestamp ().Logger ()
524+ p .SetLogger (l )
525+ spec := client.Spec {
526+ ConnectionString : getTestConnectionString (),
527+ PgxLogLevel : client .LogLevelTrace ,
528+ }
529+ specBytes , err := json .Marshal (spec )
530+ if err != nil {
531+ t .Fatal (err )
532+ }
533+ conn , err := getTestConnection (ctx , l , spec .ConnectionString )
534+ if err != nil {
535+ t .Fatal (err )
536+ }
537+ defer conn .Close ()
538+
539+ testTable := "test_pg_migrate"
540+ if _ , err := conn .Exec (ctx , "DROP TABLE IF EXISTS test_pg_migrate" ); err != nil {
541+ t .Fatal (err )
542+ }
543+ if err := createTableWithUniqueKeys (ctx , conn , testTable ); err != nil {
544+ t .Fatal (err )
545+ }
546+
547+ // Init the plugin so we can call migrate
548+ if err := p .Init (ctx , specBytes , plugin.NewClientOptions {}); err != nil {
549+ t .Fatal (err )
550+ }
551+ res := make (chan message.SyncMessage , 1 )
552+ g := errgroup.Group {}
553+ g .Go (func () error {
554+ defer close (res )
555+ opts := plugin.SyncOptions {Tables : []string {testTable }}
556+ return p .Sync (ctx , opts , res )
557+ })
558+ var table * schema.Table
559+ for r := range res {
560+ switch r := r .(type ) {
561+ case * message.SyncMigrateTable :
562+ table = r .Table
563+ }
564+ }
565+ err = g .Wait ()
566+ if err != nil {
567+ t .Fatal ("got unexpected error:" , err )
568+ }
569+
570+ require .Equal (t , schema.ColumnList {
571+ {Name : "column1" , Type : & arrow.Int32Type {}, PrimaryKey : true , Unique : true , NotNull : true },
572+ {Name : "column2" , Type : & arrow.Int32Type {}, PrimaryKey : false , Unique : true , NotNull : false },
573+ {Name : "column3" , Type : & arrow.Int32Type {}, PrimaryKey : false , Unique : true , NotNull : false },
574+ {Name : "column4" , Type : & arrow.Int32Type {}, PrimaryKey : false , Unique : true , NotNull : false },
575+ {Name : "column5" , Type : & arrow.Int32Type {}, PrimaryKey : false , Unique : true , NotNull : false },
576+ {Name : "column6" , Type : & arrow.Int32Type {}, PrimaryKey : false , Unique : true , NotNull : false },
577+ {Name : "column7" , Type : & arrow.Int32Type {}, PrimaryKey : false , Unique : true , NotNull : false },
578+ {Name : "column8" , Type : & arrow.Int32Type {}, PrimaryKey : false , Unique : true , NotNull : false },
579+ {Name : "column9" , Type : & arrow.Int32Type {}, PrimaryKey : false , Unique : true , NotNull : false },
580+ {Name : "column10" , Type : & arrow.Int32Type {}, PrimaryKey : false , Unique : true , NotNull : false },
581+ {Name : "column11" , Type : & arrow.Int32Type {}, PrimaryKey : false , Unique : true , NotNull : false },
582+ {Name : "column12" , Type : & arrow.Int32Type {}, PrimaryKey : false , Unique : true , NotNull : false },
583+ {Name : "column13" , Type : & arrow.Int32Type {}, PrimaryKey : false , Unique : true , NotNull : false },
584+ {Name : "column14" , Type : & arrow.Int32Type {}, PrimaryKey : false , Unique : true , NotNull : false },
585+ {Name : "column15" , Type : & arrow.Int32Type {}, PrimaryKey : false , Unique : true , NotNull : false },
586+ {Name : "column16" , Type : & arrow.Int32Type {}, PrimaryKey : false , Unique : false , NotNull : false },
587+ }, table .Columns )
588+ }
0 commit comments