@@ -5,11 +5,12 @@ import (
55 "fmt"
66 "os"
77 "strings"
8+ "time"
89
910 "github.com/apache/arrow/go/v13/arrow"
1011 "github.com/apache/arrow/go/v13/parquet"
1112 "github.com/apache/arrow/go/v13/parquet/pqarrow"
12- "github.com/cloudquery/plugin-pb-go/specs "
13+ "github.com/cenkalti/backoff/v4 "
1314 "github.com/cloudquery/plugin-sdk/v3/schema"
1415 "github.com/google/uuid"
1516 "golang.org/x/exp/slices"
@@ -46,62 +47,61 @@ func dtContainsList(dt arrow.DataType) bool {
4647 }
4748}
4849
49- func (c * Client ) upsert (ctx context.Context , tmpTableName string , tableName string , table * schema.Table ) error {
50+ func (c * Client ) upsert (ctx context.Context , tmpTableName string , table * schema.Table ) error {
5051 var sb strings.Builder
51- sb .WriteString ("insert into " + tableName + " select * from " + tmpTableName + " on conflict (" )
52+ sb .WriteString ("insert into " + table . Name + " select * from " + tmpTableName + " on conflict (" )
5253 sb .WriteString (strings .Join (table .PrimaryKeys (), ", " ))
5354 sb .WriteString (" ) do update set " )
5455 indices := nonPkIndices (table )
55- for i , indice := range indices {
56- col := table .Columns [indice ]
56+ for i , index := range indices {
57+ if i > 0 {
58+ sb .WriteString (", " )
59+ }
60+ col := table .Columns [index ]
5761 sb .WriteString (col .Name )
5862 sb .WriteString (" = excluded." )
5963 sb .WriteString (col .Name )
60- if i < len (indices )- 1 {
61- sb .WriteString (", " )
62- }
6364 }
64- if _ , err := c .db .ExecContext (ctx , sb .String ()); err != nil {
65- return err
66- }
67- return nil
65+ query := sb .String ()
66+ // per https://duckdb.org/docs/sql/indexes#over-eager-unique-constraint-checking we might need some retries
67+ // as the upsert for tables with PKs is transformed into delete + insert internally
68+ return backoff .Retry (
69+ func () error {
70+ return c .exec (ctx , query )
71+ },
72+ backoff .WithContext (backoff .WithMaxRetries (backoff .NewConstantBackOff (50 * time .Millisecond ), 3 ), ctx ),
73+ )
6874}
6975
70- func (c * Client ) deleteByPK (ctx context.Context , tmpTableName string , tableName string , table * schema.Table ) error {
76+ func (c * Client ) deleteByPK (ctx context.Context , tmpTableName string , table * schema.Table ) error {
7177 var sb strings.Builder
72- sb .WriteString ("delete from " + tableName + " using " + tmpTableName + " where " )
73- pks := table .PrimaryKeys ()
74- for i , col := range pks {
75- sb .WriteString (tableName + "." + col )
76- sb .WriteString (" = " )
77- sb .WriteString (tmpTableName + "." + col )
78- if i < len (pks )- 1 {
78+ sb .WriteString ("delete from " + table .Name + " using " + tmpTableName + " where " )
79+ for i , col := range table .PrimaryKeys () {
80+ if i > 0 {
7981 sb .WriteString (" and " )
8082 }
83+ sb .WriteString (table .Name + "." + col )
84+ sb .WriteString (" = " )
85+ sb .WriteString (tmpTableName + "." + col )
8186 }
82- if _ , err := c .db .ExecContext (ctx , sb .String ()); err != nil {
83- return err
84- }
85- return nil
87+
88+ return c .exec (ctx , sb .String ())
8689}
8790
8891func (c * Client ) copyFromFile (ctx context.Context , tableName string , fileName string , sc * arrow.Schema ) error {
8992 var sb strings.Builder
9093 sb .WriteString ("copy " + tableName + "(" )
9194 for i , col := range sc .Fields () {
92- sb .WriteString (sanitizeID (col .Name ))
93- if i < len (sc .Fields ())- 1 {
95+ if i > 0 {
9496 sb .WriteString (", " )
9597 }
98+ sb .WriteString (sanitizeID (col .Name ))
9699 }
97100 sb .WriteString (") from '" + fileName + "' (FORMAT PARQUET)" )
98- if _ , err := c .db .ExecContext (ctx , sb .String ()); err != nil {
99- return err
100- }
101- return nil
101+ return c .exec (ctx , sb .String ())
102102}
103103
104- func (c * Client ) WriteTableBatch (ctx context.Context , table * schema.Table , records []arrow.Record ) error {
104+ func (c * Client ) WriteTableBatch (ctx context.Context , table * schema.Table , records []arrow.Record ) ( err error ) {
105105 f , err := os .CreateTemp ("" , fmt .Sprintf ("%s-*.parquet" , table .Name ))
106106 if err != nil {
107107 return err
@@ -134,39 +134,46 @@ func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, recor
134134 return err
135135 }
136136
137- if c .spec .WriteMode == specs .WriteModeAppend || len (table .PrimaryKeys ()) == 0 {
138- if err := c .copyFromFile (ctx , table .Name , f .Name (), sc ); err != nil {
139- return err
140- }
141- } else {
142- tmpTableName := table .Name + strings .ReplaceAll (uuid .New ().String (), "-" , "_" )
143- if err := c .createTableIfNotExist (ctx , tmpTableName , table ); err != nil {
144- return fmt .Errorf ("failed to create table %s: %w" , tmpTableName , err )
145- }
146- if err := c .copyFromFile (ctx , tmpTableName , f .Name (), sc ); err != nil {
147- return fmt .Errorf ("failed to copy from file %s: %w" , f .Name (), err )
148- }
137+ if ! c .enabledPks () || len (table .PrimaryKeys ()) == 0 {
138+ return c .copyFromFile (ctx , table .Name , f .Name (), sc )
139+ }
149140
150- // At time of writing (March 2023), duckdb does not support updating list columns.
151- // As a workaround, we delete the row and insert it again. This makes it non-atomic, unfortunately,
152- // but this is unavoidable until support is added to duckdb itself.
153- // See https://github.com/duckdb/duckdb/blob/c5d9afb97bbf0be12216f3b89ae3131afbbc3156/src/storage/table/list_column_data.cpp#L243-L251
154- if containsList (table ) {
155- if err := c .deleteByPK (ctx , tmpTableName , table .Name , table ); err != nil {
156- return err
157- }
158- if _ , err = c .db .ExecContext (ctx , "insert into " + table .Name + " from " + tmpTableName ); err != nil {
159- return fmt .Errorf ("failed to insert into %s from %s: %w" , table .Name , tmpTableName , err )
160- }
161- } else {
162- if err := c .upsert (ctx , tmpTableName , table .Name , table ); err != nil {
163- return err
164- }
165- }
166- if _ , err = c .db .ExecContext (ctx , "drop table " + tmpTableName ); err != nil {
167- return err
141+ tmpTableName := table .Name + strings .ReplaceAll (uuid .New ().String (), "-" , "_" )
142+ if err := c .createTableIfNotExist (ctx , tmpTableName , table ); err != nil {
143+ return fmt .Errorf ("failed to create table %s: %w" , tmpTableName , err )
144+ }
145+ defer func () {
146+ e := c .exec (ctx , "drop table " + tmpTableName )
147+ if err == nil {
148+ // we preserve original error, so update only on nil err
149+ err = e
168150 }
151+ }()
152+ if err := c .copyFromFile (ctx , tmpTableName , f .Name (), sc ); err != nil {
153+ return fmt .Errorf ("failed to copy from file %s: %w" , f .Name (), err )
169154 }
170155
171- return nil
156+ // At time of writing (March 2023), duckdb does not support updating list columns.
157+ // As a workaround, we delete the row and insert it again. This makes it non-atomic, unfortunately,
158+ // but this is unavoidable until support is added to duckdb itself.
159+ // See https://github.com/duckdb/duckdb/blob/c5d9afb97bbf0be12216f3b89ae3131afbbc3156/src/storage/table/list_column_data.cpp#L243-L251
160+ if containsList (table ) {
161+ return c .deleteInsert (ctx , tmpTableName , table )
162+ }
163+
164+ return c .upsert (ctx , tmpTableName , table )
165+ }
166+
167+ func (c * Client ) deleteInsert (ctx context.Context , tmpTableName string , table * schema.Table ) error {
168+ if err := c .deleteByPK (ctx , tmpTableName , table ); err != nil {
169+ return err
170+ }
171+
172+ // per https://duckdb.org/docs/sql/indexes#over-eager-unique-constraint-checking we might need to retry
173+ return backoff .Retry (
174+ func () error {
175+ return c .exec (ctx , "insert into " + table .Name + " from " + tmpTableName )
176+ },
177+ backoff .WithContext (backoff .WithMaxRetries (backoff .NewConstantBackOff (50 * time .Millisecond ), 3 ), ctx ),
178+ )
172179}
0 commit comments