Skip to content

Commit 9e20642

Browse files
authored
feat(mongodb): Create unique indexes for PKs (#9851)
Fixes #9614 Checking with `db.system.profile.find().limit(5).pretty()` after `db.setProfilingLevel(2)` I don't see any `COLLSCAN`s. ~We're not adding indexes for `_cq_source_name`,`_cq_sync_time` so `DeleteStale` is probably still slow.~ Verified with `db.system.profile.find({ "planSummary": "COLLSCAN", "ns": { "$ne": "mdb.system.profile" } })` as well.
1 parent eefbad5 commit 9e20642

File tree

2 files changed

+110
-3
lines changed

2 files changed

+110
-3
lines changed

plugins/destination/mongodb/client/deletestale.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ import (
1111
func (c *Client) DeleteStale(ctx context.Context, tables schema.Tables, source string, syncTime time.Time) error {
1212
for _, table := range tables {
1313
// delete all records that are not in the source and are older than syncTime
14-
if _, err := c.client.Database(c.pluginSpec.Database).Collection(table.Name).DeleteMany(ctx, bson.M{"_cq_source_name": source, "_cq_sync_time": bson.M{"$lt": syncTime}}); err != nil {
14+
if _, err := c.client.Database(c.pluginSpec.Database).Collection(table.Name).DeleteMany(ctx, bson.M{
15+
schema.CqSourceNameColumn.Name: source,
16+
schema.CqSyncTimeColumn.Name: bson.M{"$lt": syncTime},
17+
}); err != nil {
1518
return err
1619
}
1720
}

plugins/destination/mongodb/client/migrate.go

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,115 @@ package client
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/cloudquery/plugin-sdk/schema"
8+
"github.com/cloudquery/plugin-sdk/specs"
9+
"go.mongodb.org/mongo-driver/bson"
10+
"go.mongodb.org/mongo-driver/mongo"
11+
"go.mongodb.org/mongo-driver/mongo/options"
712
)
813

9-
// MongoDB. No migrations needed :)
10-
func (*Client) Migrate(context.Context, schema.Tables) error {
14+
func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
15+
for _, t := range tables {
16+
if err := c.migrateTable(ctx, t); err != nil {
17+
return err
18+
}
19+
}
20+
21+
return nil
22+
}
23+
24+
func (c *Client) migrateTable(ctx context.Context, table *schema.Table) error {
25+
for _, mdl := range c.getIndexTemplates(table) {
26+
res, err := c.client.Database(c.pluginSpec.Database).Collection(table.Name).Indexes().CreateOne(ctx, mdl)
27+
switch {
28+
case err == nil:
29+
c.logger.Debug().Str("index_name", res).Str("table", table.Name).Msg("created index")
30+
case isIndexConflictError(err):
31+
c.logger.Debug().Str("index_name", res).Str("table", table.Name).Err(err).Msg("create index conflict")
32+
if err := c.migrateTableOnConflict(ctx, table, mdl); err != nil {
33+
return err
34+
}
35+
case isIndexOptionsConflictError(err):
36+
c.logger.Debug().Str("table", table.Name).Err(err).Msg("skipped create index")
37+
default:
38+
return fmt.Errorf("create index on %s: %w", table.Name, err)
39+
}
40+
}
41+
42+
for _, subTable := range table.Relations {
43+
if err := c.migrateTable(ctx, subTable); err != nil {
44+
return err
45+
}
46+
}
47+
1148
return nil
1249
}
50+
51+
func (c *Client) migrateTableOnConflict(ctx context.Context, table *schema.Table, mdl mongo.IndexModel) error {
52+
if c.spec.MigrateMode != specs.MigrateModeForced {
53+
return fmt.Errorf("collection %s requires forced migration due to changes in unique indexes. use 'migrate_mode: forced'", table.Name)
54+
}
55+
56+
if _, err := c.client.Database(c.pluginSpec.Database).Collection(table.Name).Indexes().DropOne(ctx, *mdl.Options.Name); err != nil {
57+
return fmt.Errorf("drop index on %s: %w", table.Name, err)
58+
}
59+
res, err := c.client.Database(c.pluginSpec.Database).Collection(table.Name).Indexes().CreateOne(ctx, mdl)
60+
if err != nil {
61+
return fmt.Errorf("recreate index on %s: %w", table.Name, err)
62+
}
63+
c.logger.Debug().Str("index_name", res).Str("table", table.Name).Msg("recreated index")
64+
return nil
65+
}
66+
67+
func (c *Client) getIndexTemplates(table *schema.Table) []mongo.IndexModel {
68+
var indexes []mongo.IndexModel
69+
70+
pks := table.PrimaryKeys()
71+
if len(pks) > 0 {
72+
indexCols := bson.D{}
73+
for _, col := range pks {
74+
indexCols = append(indexCols, bson.E{Key: col, Value: 1})
75+
}
76+
77+
pkIndexName := "cq_pk"
78+
indexes = append(indexes, mongo.IndexModel{
79+
Keys: indexCols,
80+
Options: &options.IndexOptions{
81+
Unique: &[]bool{true}[0],
82+
Name: &pkIndexName,
83+
},
84+
})
85+
}
86+
87+
if c.spec.WriteMode == specs.WriteModeOverwriteDeleteStale {
88+
delIndexName := "cq_del"
89+
indexes = append(indexes, mongo.IndexModel{
90+
Keys: bson.D{{Key: schema.CqSourceNameColumn.Name, Value: 1}, {Key: schema.CqSyncTimeColumn.Name, Value: 1}},
91+
Options: &options.IndexOptions{
92+
Name: &delIndexName,
93+
},
94+
})
95+
}
96+
97+
return indexes
98+
}
99+
100+
func isIndexConflictError(err error) bool {
101+
cmdErr, ok := err.(mongo.CommandError)
102+
if !ok {
103+
return false
104+
}
105+
return cmdErr.Name == "IndexKeySpecsConflict"
106+
}
107+
108+
func isIndexOptionsConflictError(err error) bool {
109+
cmdErr, ok := err.(mongo.CommandError)
110+
if !ok {
111+
return false
112+
}
113+
114+
// This is either "Index already exists with a different name: %s" error or due to uniqueness change
115+
return cmdErr.Name == "IndexOptionsConflict"
116+
}

0 commit comments

Comments
 (0)