Skip to content

Commit 99e7ad9

Browse files
authored
fix: Don't create duplicate columns for unique constraints (#12518)
#### Summary Fixes #12440 We can't use `like` otherwise an ordinal of `1` matches both `indkey = 11` and `indkey = 1` <!--
1 parent b92ada1 commit 99e7ad9

File tree

2 files changed

+114
-15
lines changed

2 files changed

+114
-15
lines changed

plugins/source/postgresql/client/list_tables.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99

1010
// this returns the following table in sorted manner:
1111
// +----------------+-------------+-------------+------------+----------------+-----------+-----------+---------------------+
12-
// | ordinal_position | table_name | column_name | data_type | is_primary_key | not_null | is_unique | pk_constraint_name |
12+
// | ordinal_position | table_name | column_name | data_type | is_primary_key | not_null | is_unique | constraint_name |
1313
// +----------------+-------------+-------------+------------+----------------+-----------+-----------+---------------------+
1414
// | 1 | users | id | bigint | YES | true | true | cq_users_pk |
1515
// | 2 | users | name | text | NO | false | false | |
@@ -23,18 +23,18 @@ SELECT
2323
pg_attribute.attname AS column_name,
2424
pg_catalog.format_type(pg_attribute.atttypid, pg_attribute.atttypmod) AS data_type,
2525
CASE
26-
WHEN conkey IS NOT NULL AND array_position(conkey, pg_attribute.attnum) > 0 THEN true
26+
WHEN conkey IS NOT NULL AND contype = 'p' AND array_position(conkey, pg_attribute.attnum) > 0 THEN true
2727
ELSE false
2828
END AS is_primary_key,
2929
CASE
3030
WHEN pg_attribute.attnotnull THEN true
3131
ELSE false
3232
END AS not_null,
3333
CASE
34-
WHEN pg_index.indisunique THEN true
35-
ELSE false
36-
END AS is_unique,
37-
COALESCE(pg_constraint.conname, '') AS primary_key_constraint_name
34+
WHEN conkey IS NOT NULL AND (contype = 'p' OR contype = 'u') AND array_position(conkey, pg_attribute.attnum) > 0 THEN true
35+
ELSE false
36+
END AS is_unique,
37+
COALESCE(pg_constraint.conname, '') AS constraint_name
3838
FROM
3939
pg_catalog.pg_attribute
4040
INNER JOIN
@@ -44,9 +44,7 @@ FROM
4444
LEFT JOIN
4545
pg_catalog.pg_constraint ON pg_constraint.conrelid = pg_attribute.attrelid
4646
AND conkey IS NOT NULL AND array_position(conkey, pg_attribute.attnum) > 0
47-
AND contype = 'p'
48-
LEFT JOIN pg_catalog.pg_index ON pg_index.indrelid = pg_attribute.attrelid
49-
AND pg_index.indkey::text LIKE '%%' || pg_attribute.attnum || '%%'
47+
AND (contype = 'p' OR contype = 'u')
5048
INNER JOIN
5149
information_schema.columns ON columns.table_name = pg_class.relname AND columns.column_name = pg_attribute.attname AND columns.table_schema = pg_catalog.pg_namespace.nspname
5250
WHERE
@@ -70,9 +68,9 @@ func (c *Client) listTables(ctx context.Context) (schema.Tables, error) {
7068

7169
for rows.Next() {
7270
var ordinalPosition int
73-
var tableName, columnName, columnType, pkName string
71+
var tableName, columnName, columnType, constraintName string
7472
var isPrimaryKey, notNull, isUnique bool
75-
if err := rows.Scan(&ordinalPosition, &tableName, &columnName, &columnType, &isPrimaryKey, &notNull, &isUnique, &pkName); err != nil {
73+
if err := rows.Scan(&ordinalPosition, &tableName, &columnName, &columnType, &isPrimaryKey, &notNull, &isUnique, &constraintName); err != nil {
7674
return nil, err
7775
}
7876
table := tableMap[tableName]
@@ -85,8 +83,8 @@ func (c *Client) listTables(ctx context.Context) (schema.Tables, error) {
8583
tables = append(tables, table)
8684
}
8785

88-
if pkName != "" {
89-
table.PkConstraintName = pkName
86+
if isPrimaryKey && constraintName != "" {
87+
table.PkConstraintName = constraintName
9088
}
9189
table.Columns = append(table.Columns, schema.Column{
9290
Name: columnName,

plugins/source/postgresql/resources/plugin/plugin_test.go

Lines changed: 103 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cloudquery/plugin-sdk/v4/message"
1818
"github.com/cloudquery/plugin-sdk/v4/plugin"
1919
"github.com/cloudquery/plugin-sdk/v4/scalar"
20+
"github.com/cloudquery/plugin-sdk/v4/schema"
2021
"github.com/cloudquery/plugin-sdk/v4/types"
2122
"github.com/google/uuid"
2223
pgx_zero_log "github.com/jackc/pgx-zerolog"
@@ -197,6 +198,34 @@ func createTestTable(ctx context.Context, conn *pgxpool.Pool, tableName string)
197198
return nil
198199
}
199200

201+
func createTableWithUniqueKeys(ctx context.Context, conn *pgxpool.Pool, tableName string) error {
202+
var query = `
203+
create table %s (
204+
column1 int primary key,
205+
column2 int unique,
206+
column3 int unique,
207+
column4 int unique,
208+
column5 int unique,
209+
column6 int unique,
210+
column7 int unique,
211+
column8 int unique,
212+
column9 int unique,
213+
column10 int unique,
214+
column11 int unique,
215+
column12 int unique,
216+
column13 int unique,
217+
column14 int unique,
218+
column15 int unique,
219+
column16 int
220+
)
221+
`
222+
223+
if _, err := conn.Exec(ctx, fmt.Sprintf(query, tableName)); err != nil {
224+
return err
225+
}
226+
return nil
227+
}
228+
200229
func insertTestTable(ctx context.Context, conn *pgxpool.Pool, tableName string, testCases []testCase) error {
201230
var query = ""
202231
query += "INSERT INTO " + pgx.Identifier{tableName}.Sanitize() + " ("
@@ -294,9 +323,9 @@ func assertRecord(t *testing.T, actualRecord arrow.Record, expected []testCase)
294323
if len(expected) != int(actualRecord.NumCols()) {
295324
t.Fatalf("expected record to have %d columns, got %d", len(expected), actualRecord.NumCols())
296325
}
297-
schema := actualRecord.Schema()
326+
sc := actualRecord.Schema()
298327
for i, val := range expected {
299-
actualScalar := scalar.NewScalar(schema.Field(i).Type)
328+
actualScalar := scalar.NewScalar(sc.Field(i).Type)
300329
actualVal, err := getValue(actualRecord.Column(i), 0)
301330
if err != nil {
302331
t.Fatal(err)
@@ -485,3 +514,75 @@ func IsContextDeadlineExceeded(err error) bool {
485514
}
486515
return deadlineExceeded
487516
}
517+
518+
func TestMigrate(t *testing.T) {
519+
p := Plugin()
520+
ctx := context.Background()
521+
l := zerolog.New(zerolog.NewTestWriter(t)).Output(
522+
zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.StampMicro},
523+
).Level(zerolog.DebugLevel).With().Timestamp().Logger()
524+
p.SetLogger(l)
525+
spec := client.Spec{
526+
ConnectionString: getTestConnectionString(),
527+
PgxLogLevel: client.LogLevelTrace,
528+
}
529+
specBytes, err := json.Marshal(spec)
530+
if err != nil {
531+
t.Fatal(err)
532+
}
533+
conn, err := getTestConnection(ctx, l, spec.ConnectionString)
534+
if err != nil {
535+
t.Fatal(err)
536+
}
537+
defer conn.Close()
538+
539+
testTable := "test_pg_migrate"
540+
if _, err := conn.Exec(ctx, "DROP TABLE IF EXISTS test_pg_migrate"); err != nil {
541+
t.Fatal(err)
542+
}
543+
if err := createTableWithUniqueKeys(ctx, conn, testTable); err != nil {
544+
t.Fatal(err)
545+
}
546+
547+
// Init the plugin so we can call migrate
548+
if err := p.Init(ctx, specBytes, plugin.NewClientOptions{}); err != nil {
549+
t.Fatal(err)
550+
}
551+
res := make(chan message.SyncMessage, 1)
552+
g := errgroup.Group{}
553+
g.Go(func() error {
554+
defer close(res)
555+
opts := plugin.SyncOptions{Tables: []string{testTable}}
556+
return p.Sync(ctx, opts, res)
557+
})
558+
var table *schema.Table
559+
for r := range res {
560+
m, ok := r.(*message.SyncMigrateTable)
561+
if ok {
562+
table = m.Table
563+
}
564+
}
565+
err = g.Wait()
566+
if err != nil {
567+
t.Fatal("got unexpected error:", err)
568+
}
569+
570+
require.Equal(t, schema.ColumnList{
571+
{Name: "column1", Type: &arrow.Int32Type{}, PrimaryKey: true, Unique: true, NotNull: true},
572+
{Name: "column2", Type: &arrow.Int32Type{}, PrimaryKey: false, Unique: true, NotNull: false},
573+
{Name: "column3", Type: &arrow.Int32Type{}, PrimaryKey: false, Unique: true, NotNull: false},
574+
{Name: "column4", Type: &arrow.Int32Type{}, PrimaryKey: false, Unique: true, NotNull: false},
575+
{Name: "column5", Type: &arrow.Int32Type{}, PrimaryKey: false, Unique: true, NotNull: false},
576+
{Name: "column6", Type: &arrow.Int32Type{}, PrimaryKey: false, Unique: true, NotNull: false},
577+
{Name: "column7", Type: &arrow.Int32Type{}, PrimaryKey: false, Unique: true, NotNull: false},
578+
{Name: "column8", Type: &arrow.Int32Type{}, PrimaryKey: false, Unique: true, NotNull: false},
579+
{Name: "column9", Type: &arrow.Int32Type{}, PrimaryKey: false, Unique: true, NotNull: false},
580+
{Name: "column10", Type: &arrow.Int32Type{}, PrimaryKey: false, Unique: true, NotNull: false},
581+
{Name: "column11", Type: &arrow.Int32Type{}, PrimaryKey: false, Unique: true, NotNull: false},
582+
{Name: "column12", Type: &arrow.Int32Type{}, PrimaryKey: false, Unique: true, NotNull: false},
583+
{Name: "column13", Type: &arrow.Int32Type{}, PrimaryKey: false, Unique: true, NotNull: false},
584+
{Name: "column14", Type: &arrow.Int32Type{}, PrimaryKey: false, Unique: true, NotNull: false},
585+
{Name: "column15", Type: &arrow.Int32Type{}, PrimaryKey: false, Unique: true, NotNull: false},
586+
{Name: "column16", Type: &arrow.Int32Type{}, PrimaryKey: false, Unique: false, NotNull: false},
587+
}, table.Columns)
588+
}

0 commit comments

Comments
 (0)