Skip to content

Commit c0b9f0a

Browse files
authored
fix(race): Fix race in DuckDB delete-stale (#11215)
1 parent 1b7f65b commit c0b9f0a

File tree

7 files changed

+95
-86
lines changed

7 files changed

+95
-86
lines changed

plugins/destination/duckdb/client/client.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,28 +25,30 @@ type Client struct {
2525

2626
var _ destination.Client = (*Client)(nil)
2727

28-
func New(ctx context.Context, logger zerolog.Logger, spec specs.Destination) (destination.Client, error) {
28+
func New(ctx context.Context, logger zerolog.Logger, dstSpec specs.Destination) (destination.Client, error) {
2929
var err error
3030
c := &Client{
3131
logger: logger.With().Str("module", "duckdb-dest").Logger(),
32+
spec: dstSpec,
3233
}
3334

34-
var duckdbSpec Spec
35-
c.spec = spec
36-
if err := spec.UnmarshalSpec(&duckdbSpec); err != nil {
35+
var spec Spec
36+
if err := dstSpec.UnmarshalSpec(&spec); err != nil {
3737
return nil, fmt.Errorf("failed to unmarshal duckdb spec: %w", err)
3838
}
39-
c.connector, err = duckdb.NewConnector(duckdbSpec.ConnectionString, nil)
40-
db := sql.OpenDB(c.connector)
39+
40+
c.connector, err = duckdb.NewConnector(spec.ConnectionString, nil)
4141
if err != nil {
4242
return nil, err
4343
}
44-
c.db = db
45-
_, err = c.db.ExecContext(ctx, "INSTALL 'json'; LOAD 'json';")
44+
45+
c.db = sql.OpenDB(c.connector)
46+
47+
err = c.exec(ctx, "INSTALL 'json'; LOAD 'json';")
4648
if err != nil {
4749
return nil, err
4850
}
49-
_, err = c.db.ExecContext(ctx, "INSTALL 'parquet'; LOAD 'parquet';")
51+
err = c.exec(ctx, "INSTALL 'parquet'; LOAD 'parquet';")
5052
if err != nil {
5153
return nil, err
5254
}
@@ -69,3 +71,8 @@ func (c *Client) Close(_ context.Context) error {
6971
func (c *Client) Metrics() destination.Metrics {
7072
return c.metrics
7173
}
74+
75+
func (c *Client) exec(ctx context.Context, query string, args ...any) error {
76+
_, err := c.db.ExecContext(ctx, query, args...)
77+
return err
78+
}

plugins/destination/duckdb/client/delete_stale.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ func (c *Client) DeleteStale(ctx context.Context, tables schema.Tables, source s
1818
sb.WriteString(" = $1 and ")
1919
sb.WriteString(schema.CqSyncTimeColumn.Name)
2020
sb.WriteString(" < to_timestamp($2)")
21-
sql := sb.String()
22-
if _, err := c.db.ExecContext(ctx, sql, source, syncTime.Unix()); err != nil {
21+
if err := c.exec(ctx, sb.String(), source, syncTime.Unix()); err != nil {
2322
return err
2423
}
2524
}
25+
2626
return nil
2727
}

plugins/destination/duckdb/client/migrate.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -177,18 +177,15 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
177177

178178
func (c *Client) recreateTable(ctx context.Context, table *schema.Table) error {
179179
sql := "drop table if exists " + sanitizeID(table.Name)
180-
if _, err := c.db.ExecContext(ctx, sql); err != nil {
181-
return fmt.Errorf("failed to drop table %s: %w", table.Name, err)
180+
if err := c.exec(ctx, sql); err != nil {
181+
return err
182182
}
183183
return c.createTableIfNotExist(ctx, table.Name, table)
184184
}
185185

186186
func (c *Client) addColumn(ctx context.Context, tableName string, columnName string, columnType string) error {
187187
sql := "alter table " + sanitizeID(tableName) + " add column " + sanitizeID(columnName) + " " + columnType
188-
if _, err := c.db.ExecContext(ctx, sql); err != nil {
189-
return fmt.Errorf("failed to add column %s on table %s: %w", columnName, tableName, err)
190-
}
191-
return nil
188+
return c.exec(ctx, sql)
192189
}
193190

194191
func (c *Client) createTableIfNotExist(ctx context.Context, tableName string, table *schema.Table) error {
@@ -227,11 +224,7 @@ func (c *Client) createTableIfNotExist(ctx context.Context, tableName string, ta
227224
sb.WriteString(")")
228225
}
229226
sb.WriteString(")")
230-
_, err := c.db.ExecContext(ctx, sb.String())
231-
if err != nil {
232-
return fmt.Errorf("failed to create table with '%s': %w", sb.String(), err)
233-
}
234-
return nil
227+
return c.exec(ctx, sb.String())
235228
}
236229

237230
func (c *Client) isColumnUnique(ctx context.Context, tableName string, columName string) (bool, error) {

plugins/destination/duckdb/client/read.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ func (c *Client) Read(ctx context.Context, table *schema.Table, _ string, res ch
3939
}
4040
sb.WriteString(") to '" + f.Name() + "' (FORMAT PARQUET)")
4141

42-
_, err = c.db.ExecContext(ctx, sb.String())
43-
if err != nil {
42+
if err := c.exec(ctx, sb.String()); err != nil {
4443
return err
4544
}
4645
f, err = os.Open(fName)

plugins/destination/duckdb/client/write.go

Lines changed: 69 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ import (
55
"fmt"
66
"os"
77
"strings"
8+
"time"
89

910
"github.com/apache/arrow/go/v13/arrow"
1011
"github.com/apache/arrow/go/v13/parquet"
1112
"github.com/apache/arrow/go/v13/parquet/pqarrow"
12-
"github.com/cloudquery/plugin-pb-go/specs"
13+
"github.com/cenkalti/backoff/v4"
1314
"github.com/cloudquery/plugin-sdk/v3/schema"
1415
"github.com/google/uuid"
1516
"golang.org/x/exp/slices"
@@ -46,62 +47,61 @@ func dtContainsList(dt arrow.DataType) bool {
4647
}
4748
}
4849

49-
func (c *Client) upsert(ctx context.Context, tmpTableName string, tableName string, table *schema.Table) error {
50+
func (c *Client) upsert(ctx context.Context, tmpTableName string, table *schema.Table) error {
5051
var sb strings.Builder
51-
sb.WriteString("insert into " + tableName + " select * from " + tmpTableName + " on conflict (")
52+
sb.WriteString("insert into " + table.Name + " select * from " + tmpTableName + " on conflict (")
5253
sb.WriteString(strings.Join(table.PrimaryKeys(), ", "))
5354
sb.WriteString(" ) do update set ")
5455
indices := nonPkIndices(table)
55-
for i, indice := range indices {
56-
col := table.Columns[indice]
56+
for i, index := range indices {
57+
if i > 0 {
58+
sb.WriteString(", ")
59+
}
60+
col := table.Columns[index]
5761
sb.WriteString(col.Name)
5862
sb.WriteString(" = excluded.")
5963
sb.WriteString(col.Name)
60-
if i < len(indices)-1 {
61-
sb.WriteString(", ")
62-
}
6364
}
64-
if _, err := c.db.ExecContext(ctx, sb.String()); err != nil {
65-
return err
66-
}
67-
return nil
65+
query := sb.String()
66+
// per https://duckdb.org/docs/sql/indexes#over-eager-unique-constraint-checking we might need some retries
67+
// as the upsert for tables with PKs is transformed into delete + insert internally
68+
return backoff.Retry(
69+
func() error {
70+
return c.exec(ctx, query)
71+
},
72+
backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(50*time.Millisecond), 3), ctx),
73+
)
6874
}
6975

70-
func (c *Client) deleteByPK(ctx context.Context, tmpTableName string, tableName string, table *schema.Table) error {
76+
func (c *Client) deleteByPK(ctx context.Context, tmpTableName string, table *schema.Table) error {
7177
var sb strings.Builder
72-
sb.WriteString("delete from " + tableName + " using " + tmpTableName + " where ")
73-
pks := table.PrimaryKeys()
74-
for i, col := range pks {
75-
sb.WriteString(tableName + "." + col)
76-
sb.WriteString(" = ")
77-
sb.WriteString(tmpTableName + "." + col)
78-
if i < len(pks)-1 {
78+
sb.WriteString("delete from " + table.Name + " using " + tmpTableName + " where ")
79+
for i, col := range table.PrimaryKeys() {
80+
if i > 0 {
7981
sb.WriteString(" and ")
8082
}
83+
sb.WriteString(table.Name + "." + col)
84+
sb.WriteString(" = ")
85+
sb.WriteString(tmpTableName + "." + col)
8186
}
82-
if _, err := c.db.ExecContext(ctx, sb.String()); err != nil {
83-
return err
84-
}
85-
return nil
87+
88+
return c.exec(ctx, sb.String())
8689
}
8790

8891
func (c *Client) copyFromFile(ctx context.Context, tableName string, fileName string, sc *arrow.Schema) error {
8992
var sb strings.Builder
9093
sb.WriteString("copy " + tableName + "(")
9194
for i, col := range sc.Fields() {
92-
sb.WriteString(sanitizeID(col.Name))
93-
if i < len(sc.Fields())-1 {
95+
if i > 0 {
9496
sb.WriteString(", ")
9597
}
98+
sb.WriteString(sanitizeID(col.Name))
9699
}
97100
sb.WriteString(") from '" + fileName + "' (FORMAT PARQUET)")
98-
if _, err := c.db.ExecContext(ctx, sb.String()); err != nil {
99-
return err
100-
}
101-
return nil
101+
return c.exec(ctx, sb.String())
102102
}
103103

104-
func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, records []arrow.Record) error {
104+
func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, records []arrow.Record) (err error) {
105105
f, err := os.CreateTemp("", fmt.Sprintf("%s-*.parquet", table.Name))
106106
if err != nil {
107107
return err
@@ -134,39 +134,46 @@ func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, recor
134134
return err
135135
}
136136

137-
if c.spec.WriteMode == specs.WriteModeAppend || len(table.PrimaryKeys()) == 0 {
138-
if err := c.copyFromFile(ctx, table.Name, f.Name(), sc); err != nil {
139-
return err
140-
}
141-
} else {
142-
tmpTableName := table.Name + strings.ReplaceAll(uuid.New().String(), "-", "_")
143-
if err := c.createTableIfNotExist(ctx, tmpTableName, table); err != nil {
144-
return fmt.Errorf("failed to create table %s: %w", tmpTableName, err)
145-
}
146-
if err := c.copyFromFile(ctx, tmpTableName, f.Name(), sc); err != nil {
147-
return fmt.Errorf("failed to copy from file %s: %w", f.Name(), err)
148-
}
137+
if !c.enabledPks() || len(table.PrimaryKeys()) == 0 {
138+
return c.copyFromFile(ctx, table.Name, f.Name(), sc)
139+
}
149140

150-
// At time of writing (March 2023), duckdb does not support updating list columns.
151-
// As a workaround, we delete the row and insert it again. This makes it non-atomic, unfortunately,
152-
// but this is unavoidable until support is added to duckdb itself.
153-
// See https://github.com/duckdb/duckdb/blob/c5d9afb97bbf0be12216f3b89ae3131afbbc3156/src/storage/table/list_column_data.cpp#L243-L251
154-
if containsList(table) {
155-
if err := c.deleteByPK(ctx, tmpTableName, table.Name, table); err != nil {
156-
return err
157-
}
158-
if _, err = c.db.ExecContext(ctx, "insert into "+table.Name+" from "+tmpTableName); err != nil {
159-
return fmt.Errorf("failed to insert into %s from %s: %w", table.Name, tmpTableName, err)
160-
}
161-
} else {
162-
if err := c.upsert(ctx, tmpTableName, table.Name, table); err != nil {
163-
return err
164-
}
165-
}
166-
if _, err = c.db.ExecContext(ctx, "drop table "+tmpTableName); err != nil {
167-
return err
141+
tmpTableName := table.Name + strings.ReplaceAll(uuid.New().String(), "-", "_")
142+
if err := c.createTableIfNotExist(ctx, tmpTableName, table); err != nil {
143+
return fmt.Errorf("failed to create table %s: %w", tmpTableName, err)
144+
}
145+
defer func() {
146+
e := c.exec(ctx, "drop table "+tmpTableName)
147+
if err == nil {
148+
// we preserve original error, so update only on nil err
149+
err = e
168150
}
151+
}()
152+
if err := c.copyFromFile(ctx, tmpTableName, f.Name(), sc); err != nil {
153+
return fmt.Errorf("failed to copy from file %s: %w", f.Name(), err)
169154
}
170155

171-
return nil
156+
// At time of writing (March 2023), duckdb does not support updating list columns.
157+
// As a workaround, we delete the row and insert it again. This makes it non-atomic, unfortunately,
158+
// but this is unavoidable until support is added to duckdb itself.
159+
// See https://github.com/duckdb/duckdb/blob/c5d9afb97bbf0be12216f3b89ae3131afbbc3156/src/storage/table/list_column_data.cpp#L243-L251
160+
if containsList(table) {
161+
return c.deleteInsert(ctx, tmpTableName, table)
162+
}
163+
164+
return c.upsert(ctx, tmpTableName, table)
165+
}
166+
167+
func (c *Client) deleteInsert(ctx context.Context, tmpTableName string, table *schema.Table) error {
168+
if err := c.deleteByPK(ctx, tmpTableName, table); err != nil {
169+
return err
170+
}
171+
172+
// per https://duckdb.org/docs/sql/indexes#over-eager-unique-constraint-checking we might need to retry
173+
return backoff.Retry(
174+
func() error {
175+
return c.exec(ctx, "insert into "+table.Name+" from "+tmpTableName)
176+
},
177+
backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(50*time.Millisecond), 3), ctx),
178+
)
172179
}

plugins/destination/duckdb/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.19
44

55
require (
66
github.com/apache/arrow/go/v13 v13.0.0-20230601164043-3299d12efc91
7+
github.com/cenkalti/backoff/v4 v4.2.1
78
github.com/cloudquery/plugin-pb-go v1.0.8
89
github.com/cloudquery/plugin-sdk/v3 v3.10.3
910
github.com/google/uuid v1.3.0

plugins/destination/duckdb/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHG
4040
github.com/apache/thrift v0.18.1 h1:lNhK/1nqjbwbiOPDBPFJVKxgDEGSepKuTh6OLiXW8kg=
4141
github.com/apache/thrift v0.18.1/go.mod h1:rdQn/dCcDKEWjjylUeueum4vQEjG2v8v2PqriUnbr+I=
4242
github.com/bradleyjkemp/cupaloy/v2 v2.8.0 h1:any4BmKE+jGIaMpnU8YgH/I2LPiLBufr6oMMlVBbn9M=
43+
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
44+
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
4345
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
4446
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
4547
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=

0 commit comments

Comments
 (0)