@@ -2,11 +2,115 @@ package client
22
33import (
44 "context"
5+ "fmt"
56
67 "github.com/cloudquery/plugin-sdk/schema"
8+ "github.com/cloudquery/plugin-sdk/specs"
9+ "go.mongodb.org/mongo-driver/bson"
10+ "go.mongodb.org/mongo-driver/mongo"
11+ "go.mongodb.org/mongo-driver/mongo/options"
712)
813
9- // MongoDB. No migrations needed :)
10- func (* Client ) Migrate (context.Context , schema.Tables ) error {
14+ func (c * Client ) Migrate (ctx context.Context , tables schema.Tables ) error {
15+ for _ , t := range tables {
16+ if err := c .migrateTable (ctx , t ); err != nil {
17+ return err
18+ }
19+ }
20+
21+ return nil
22+ }
23+
24+ func (c * Client ) migrateTable (ctx context.Context , table * schema.Table ) error {
25+ for _ , mdl := range c .getIndexTemplates (table ) {
26+ res , err := c .client .Database (c .pluginSpec .Database ).Collection (table .Name ).Indexes ().CreateOne (ctx , mdl )
27+ switch {
28+ case err == nil :
29+ c .logger .Debug ().Str ("index_name" , res ).Str ("table" , table .Name ).Msg ("created index" )
30+ case isIndexConflictError (err ):
31+ c .logger .Debug ().Str ("index_name" , res ).Str ("table" , table .Name ).Err (err ).Msg ("create index conflict" )
32+ if err := c .migrateTableOnConflict (ctx , table , mdl ); err != nil {
33+ return err
34+ }
35+ case isIndexOptionsConflictError (err ):
36+ c .logger .Debug ().Str ("table" , table .Name ).Err (err ).Msg ("skipped create index" )
37+ default :
38+ return fmt .Errorf ("create index on %s: %w" , table .Name , err )
39+ }
40+ }
41+
42+ for _ , subTable := range table .Relations {
43+ if err := c .migrateTable (ctx , subTable ); err != nil {
44+ return err
45+ }
46+ }
47+
1148 return nil
1249}
50+
51+ func (c * Client ) migrateTableOnConflict (ctx context.Context , table * schema.Table , mdl mongo.IndexModel ) error {
52+ if c .spec .MigrateMode != specs .MigrateModeForced {
53+ return fmt .Errorf ("collection %s requires forced migration due to changes in unique indexes. use 'migrate_mode: forced'" , table .Name )
54+ }
55+
56+ if _ , err := c .client .Database (c .pluginSpec .Database ).Collection (table .Name ).Indexes ().DropOne (ctx , * mdl .Options .Name ); err != nil {
57+ return fmt .Errorf ("drop index on %s: %w" , table .Name , err )
58+ }
59+ res , err := c .client .Database (c .pluginSpec .Database ).Collection (table .Name ).Indexes ().CreateOne (ctx , mdl )
60+ if err != nil {
61+ return fmt .Errorf ("recreate index on %s: %w" , table .Name , err )
62+ }
63+ c .logger .Debug ().Str ("index_name" , res ).Str ("table" , table .Name ).Msg ("recreated index" )
64+ return nil
65+ }
66+
67+ func (c * Client ) getIndexTemplates (table * schema.Table ) []mongo.IndexModel {
68+ var indexes []mongo.IndexModel
69+
70+ pks := table .PrimaryKeys ()
71+ if len (pks ) > 0 {
72+ indexCols := bson.D {}
73+ for _ , col := range pks {
74+ indexCols = append (indexCols , bson.E {Key : col , Value : 1 })
75+ }
76+
77+ pkIndexName := "cq_pk"
78+ indexes = append (indexes , mongo.IndexModel {
79+ Keys : indexCols ,
80+ Options : & options.IndexOptions {
81+ Unique : & []bool {true }[0 ],
82+ Name : & pkIndexName ,
83+ },
84+ })
85+ }
86+
87+ if c .spec .WriteMode == specs .WriteModeOverwriteDeleteStale {
88+ delIndexName := "cq_del"
89+ indexes = append (indexes , mongo.IndexModel {
90+ Keys : bson.D {{Key : schema .CqSourceNameColumn .Name , Value : 1 }, {Key : schema .CqSyncTimeColumn .Name , Value : 1 }},
91+ Options : & options.IndexOptions {
92+ Name : & delIndexName ,
93+ },
94+ })
95+ }
96+
97+ return indexes
98+ }
99+
100+ func isIndexConflictError (err error ) bool {
101+ cmdErr , ok := err .(mongo.CommandError )
102+ if ! ok {
103+ return false
104+ }
105+ return cmdErr .Name == "IndexKeySpecsConflict"
106+ }
107+
108+ func isIndexOptionsConflictError (err error ) bool {
109+ cmdErr , ok := err .(mongo.CommandError )
110+ if ! ok {
111+ return false
112+ }
113+
114+ // This is either "Index already exists with a different name: %s" error or due to uniqueness change
115+ return cmdErr .Name == "IndexOptionsConflict"
116+ }
0 commit comments