Skip to content

Commit 20966e9

Browse files
authored
add skip_incremental_checkpoint option (#54)
1 parent a146377 commit 20966e9

File tree

3 files changed

+197
-26
lines changed

3 files changed

+197
-26
lines changed

core/sling/config.go

Lines changed: 71 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,9 @@ func (cfg *Config) SetDefault() {
219219
cfg.MetadataLoadedAt = g.Bool(false)
220220
}
221221

222+
// Normalize loaded_at shorthand and enforce metadata when needed
223+
cfg.normalizeLoadedAtShorthand()
224+
222225
// set vars
223226
for k, v := range cfg.Env {
224227
os.Setenv(k, v)
@@ -329,6 +332,22 @@ func (cfg *Config) sourceIsFile() bool {
329332
return cfg.Options.StdIn || cfg.SrcConn.Info().Type.IsFile()
330333
}
331334

335+
// normalizeLoadedAtShorthand ensures the "." shorthand maps to the loaded_at column
336+
// and enables metadata generation when required for file incrementals.
337+
func (cfg *Config) normalizeLoadedAtShorthand() {
338+
if cfg.Source.UpdateKey == "." {
339+
if cfg.sourceIsFile() {
340+
cfg.Source.UpdateKey = slingLoadedAtColumn
341+
} else {
342+
g.Warn("update_key shorthand '.' is only supported for file sources; leaving as '.'")
343+
}
344+
}
345+
346+
if cfg.Source.UpdateKey == slingLoadedAtColumn && cfg.Mode == IncrementalMode && cfg.sourceIsFile() {
347+
cfg.MetadataLoadedAt = g.Bool(true)
348+
}
349+
}
350+
332351
func (cfg *Config) DetermineType() (Type JobType, err error) {
333352

334353
srcFileProvided := cfg.sourceIsFile()
@@ -364,8 +383,14 @@ func (cfg *Config) DetermineType() (Type JobType, err error) {
364383
if cfg.Source.UpdateKey == "" {
365384
cfg.Source.UpdateKey = "_bigtable_timestamp"
366385
}
367-
} else if srcFileProvided && cfg.Source.UpdateKey == slingLoadedAtColumn {
368-
// need to loaded_at column for file incremental
386+
} else if cfg.Source.UpdateKey == "." && !srcFileProvided {
387+
err = g.Error("update_key shorthand '.' is only supported when source is file")
388+
return
389+
} else if srcFileProvided && g.In(cfg.Source.UpdateKey, slingLoadedAtColumn, ".") {
390+
// need the loaded_at column for file incremental when using the default shorthand
391+
if cfg.Source.UpdateKey == "." {
392+
cfg.Source.UpdateKey = slingLoadedAtColumn
393+
}
369394
cfg.MetadataLoadedAt = g.Bool(true)
370395
} else if cfg.Source.UpdateKey == "" && len(cfg.Source.PrimaryKey()) == 0 {
371396
err = g.Error("must specify value for 'update_key' and/or 'primary_key' for incremental mode. See docs for more details: https://docs.slingdata.io/sling-cli/run/configuration")
@@ -720,6 +745,11 @@ func (cfg *Config) Prepare() (err error) {
720745
}
721746
}
722747

748+
// shorthand validation: "." only supported for file sources
749+
if cfg.Source.UpdateKey == "." && !cfg.sourceIsFile() {
750+
return g.Error("update_key shorthand '.' is only supported when source is file")
751+
}
752+
723753
// compile pre and post sql
724754
if cfg.TgtConn.Type.IsDb() {
725755

@@ -748,6 +778,9 @@ func (cfg *Config) Prepare() (err error) {
748778
}
749779
}
750780

781+
// Normalize loaded_at shorthand and enforce metadata when needed
782+
cfg.normalizeLoadedAtShorthand()
783+
751784
// done
752785
cfg.Prepared = true
753786
return
@@ -1053,6 +1086,21 @@ func (cfg *Config) IgnoreExisting() bool {
10531086
return cfg.Target.Options.IgnoreExisting != nil && *cfg.Target.Options.IgnoreExisting
10541087
}
10551088

1089+
// SkipIncrementalCheckpoint returns true when this run should *not* fetch the
1090+
// incremental watermark (max(update_key)) from the target before an
1091+
// incremental run. This is primarily used for file->Proton restore scenarios
1092+
// where we want to re-insert older records (e.g. older _tp_time values) even
1093+
// if the table already contains newer rows.
1094+
func (cfg *Config) SkipIncrementalCheckpoint() bool {
1095+
if cfg.Source.Options != nil && cfg.Source.Options.SkipIncrementalCheckpoint != nil && *cfg.Source.Options.SkipIncrementalCheckpoint {
1096+
return true
1097+
}
1098+
if cast.ToBool(os.Getenv("SLING_SKIP_INCREMENTAL_CHECKPOINT")) {
1099+
return true
1100+
}
1101+
return false
1102+
}
1103+
10561104
// ColumnsPrepared returns the prepared columns
10571105
func (cfg *Config) ColumnsPrepared() (columns iop.Columns) {
10581106

@@ -1317,21 +1365,27 @@ type SourceOptions struct {
13171365
EmptyAsNull *bool `json:"empty_as_null,omitempty" yaml:"empty_as_null,omitempty"`
13181366
Header *bool `json:"header,omitempty" yaml:"header,omitempty"`
13191367
Flatten *bool `json:"flatten,omitempty" yaml:"flatten,omitempty"`
1320-
FieldsPerRec *int `json:"fields_per_rec,omitempty" yaml:"fields_per_rec,omitempty"`
1321-
Compression *iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"`
1322-
Format *dbio.FileType `json:"format,omitempty" yaml:"format,omitempty"`
1323-
NullIf *string `json:"null_if,omitempty" yaml:"null_if,omitempty"`
1324-
DatetimeFormat string `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"`
1325-
SkipBlankLines *bool `json:"skip_blank_lines,omitempty" yaml:"skip_blank_lines,omitempty"`
1326-
Delimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
1327-
Escape string `json:"escape,omitempty" yaml:"escape,omitempty"`
1328-
Quote string `json:"quote,omitempty" yaml:"quote,omitempty"`
1329-
MaxDecimals *int `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"`
1330-
JmesPath *string `json:"jmespath,omitempty" yaml:"jmespath,omitempty"`
1331-
Sheet *string `json:"sheet,omitempty" yaml:"sheet,omitempty"`
1332-
Range *string `json:"range,omitempty" yaml:"range,omitempty"`
1333-
Limit *int `json:"limit,omitempty" yaml:"limit,omitempty"`
1334-
Offset *int `json:"offset,omitempty" yaml:"offset,omitempty"`
1368+
// SkipIncrementalCheckpoint, when true, prevents Sling from fetching the
1369+
// current max(update_key) from the target before an incremental run. This
1370+
// is used for restore scenarios (e.g. file->Proton) where we need to
1371+
// re-insert older records that would otherwise be skipped by the
1372+
// incremental watermark.
1373+
SkipIncrementalCheckpoint *bool `json:"skip_incremental_checkpoint,omitempty" yaml:"skip_incremental_checkpoint,omitempty"`
1374+
FieldsPerRec *int `json:"fields_per_rec,omitempty" yaml:"fields_per_rec,omitempty"`
1375+
Compression *iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"`
1376+
Format *dbio.FileType `json:"format,omitempty" yaml:"format,omitempty"`
1377+
NullIf *string `json:"null_if,omitempty" yaml:"null_if,omitempty"`
1378+
DatetimeFormat string `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"`
1379+
SkipBlankLines *bool `json:"skip_blank_lines,omitempty" yaml:"skip_blank_lines,omitempty"`
1380+
Delimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
1381+
Escape string `json:"escape,omitempty" yaml:"escape,omitempty"`
1382+
Quote string `json:"quote,omitempty" yaml:"quote,omitempty"`
1383+
MaxDecimals *int `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"`
1384+
JmesPath *string `json:"jmespath,omitempty" yaml:"jmespath,omitempty"`
1385+
Sheet *string `json:"sheet,omitempty" yaml:"sheet,omitempty"`
1386+
Range *string `json:"range,omitempty" yaml:"range,omitempty"`
1387+
Limit *int `json:"limit,omitempty" yaml:"limit,omitempty"`
1388+
Offset *int `json:"offset,omitempty" yaml:"offset,omitempty"`
13351389

13361390
// columns & transforms were moved out of source_options
13371391
// https://github.com/slingdata-io/sling-cli/issues/348

core/sling/config_test.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package sling
22

33
import (
44
"math"
5+
"os"
56
"testing"
67
"time"
78

@@ -58,6 +59,110 @@ func TestConfig(t *testing.T) {
5859

5960
}
6061

62+
func TestSkipIncrementalCheckpointFlag(t *testing.T) {
63+
t.Run("config flag only", func(t *testing.T) {
64+
cfgStr := `
65+
mode: incremental
66+
source:
67+
conn: LOCAL
68+
stream: /tmp/file.csv
69+
options:
70+
skip_incremental_checkpoint: true
71+
target:
72+
conn: PROTON_DB
73+
object: db.table
74+
`
75+
cfg, err := NewConfig(cfgStr)
76+
assert.NoError(t, err)
77+
assert.NotNil(t, cfg.Source.Options)
78+
assert.NotNil(t, cfg.Source.Options.SkipIncrementalCheckpoint)
79+
assert.True(t, *cfg.Source.Options.SkipIncrementalCheckpoint)
80+
assert.True(t, cfg.SkipIncrementalCheckpoint())
81+
})
82+
83+
t.Run("env var only", func(t *testing.T) {
84+
orig := os.Getenv("SLING_SKIP_INCREMENTAL_CHECKPOINT")
85+
defer os.Setenv("SLING_SKIP_INCREMENTAL_CHECKPOINT", orig)
86+
87+
_ = os.Setenv("SLING_SKIP_INCREMENTAL_CHECKPOINT", "TRUE")
88+
89+
cfgStr := `
90+
mode: incremental
91+
source:
92+
conn: LOCAL
93+
stream: /tmp/file.csv
94+
target:
95+
conn: PROTON_DB
96+
object: db.table
97+
`
98+
cfg, err := NewConfig(cfgStr)
99+
assert.NoError(t, err)
100+
// no explicit flag in config
101+
if cfg.Source.Options != nil {
102+
assert.Nil(t, cfg.Source.Options.SkipIncrementalCheckpoint)
103+
}
104+
assert.True(t, cfg.SkipIncrementalCheckpoint())
105+
})
106+
}
107+
108+
func TestFileUpdateKeyShorthandDot(t *testing.T) {
109+
t.Run("normalize dot to loaded_at", func(t *testing.T) {
110+
cfgStr := `
111+
mode: incremental
112+
source:
113+
conn: LOCAL
114+
stream: /tmp/file.csv
115+
update_key: "."
116+
target:
117+
conn: PROTON_DB
118+
object: db.table
119+
`
120+
cfg, err := NewConfig(cfgStr)
121+
assert.NoError(t, err)
122+
assert.Equal(t, slingLoadedAtColumn, cfg.Source.UpdateKey)
123+
if assert.NotNil(t, cfg.MetadataLoadedAt) {
124+
assert.True(t, *cfg.MetadataLoadedAt)
125+
}
126+
})
127+
128+
t.Run("dot shorthand not allowed for db source", func(t *testing.T) {
129+
cfgStr := `
130+
mode: incremental
131+
source:
132+
conn: POSTGRES
133+
stream: public.tbl
134+
update_key: "."
135+
target:
136+
conn: PROTON_DB
137+
object: db.table
138+
`
139+
_, err := NewConfig(cfgStr)
140+
assert.Error(t, err)
141+
})
142+
143+
t.Run("normalize when skipping checkpoint", func(t *testing.T) {
144+
cfgStr := `
145+
mode: incremental
146+
source:
147+
conn: LOCAL
148+
stream: /tmp/file.csv
149+
update_key: "."
150+
options:
151+
skip_incremental_checkpoint: true
152+
target:
153+
conn: PROTON_DB
154+
object: db.table
155+
`
156+
cfg, err := NewConfig(cfgStr)
157+
assert.NoError(t, err)
158+
assert.Equal(t, slingLoadedAtColumn, cfg.Source.UpdateKey)
159+
if assert.NotNil(t, cfg.MetadataLoadedAt) {
160+
assert.True(t, *cfg.MetadataLoadedAt)
161+
}
162+
assert.True(t, cfg.SkipIncrementalCheckpoint())
163+
})
164+
}
165+
61166
func TestColumnCasing(t *testing.T) {
62167
df := iop.NewDataflow(0)
63168

core/sling/task_run.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -397,15 +397,23 @@ func (t *TaskExecution) runFileToDB() (err error) {
397397
}
398398

399399
if t.isIncrementalWithUpdateKey() {
400-
t.SetProgress("getting checkpoint value")
400+
// Normalize shorthand for loaded_at before deciding whether to fetch the checkpoint
401401
if t.Config.Source.UpdateKey == "." {
402402
t.Config.Source.UpdateKey = slingLoadedAtColumn
403403
}
404404

405-
template, _ := dbio.TypeDbDuckDb.Template()
406-
if err = getIncrementalValue(t.Config, tgtConn, template.Variable); err != nil {
407-
err = g.Error(err, "Could not get incremental value")
408-
return err
405+
// For file->Proton restore use-cases we may want to skip the checkpoint
406+
// (max(update_key)) so that older rows can be re-inserted. In that case
407+
// we honor the Config flag and do not call getIncrementalValue.
408+
if t.Config.SkipIncrementalCheckpoint() && t.Config.TgtConn.Type == dbio.TypeDbProton && t.Config.SrcConn.Type.Kind() == dbio.KindFile {
409+
g.Debug("skipping incremental checkpoint for file->proton (skip_incremental_checkpoint=true)")
410+
} else {
411+
t.SetProgress("getting checkpoint value")
412+
template, _ := dbio.TypeDbDuckDb.Template()
413+
if err = getIncrementalValue(t.Config, tgtConn, template.Variable); err != nil {
414+
err = g.Error(err, "Could not get incremental value")
415+
return err
416+
}
409417
}
410418
}
411419

@@ -551,10 +559,14 @@ func (t *TaskExecution) runDbToDb() (err error) {
551559

552560
// get watermark
553561
if t.isIncrementalWithUpdateKey() {
554-
t.SetProgress("getting checkpoint value")
555-
if err = getIncrementalValue(t.Config, tgtConn, srcConn.Template().Variable); err != nil {
556-
err = g.Error(err, "Could not get incremental value")
557-
return err
562+
if t.Config.SkipIncrementalCheckpoint() && t.Config.TgtConn.Type == dbio.TypeDbProton && t.Config.SrcConn.Type == dbio.TypeDbProton {
563+
g.Debug("skipping incremental checkpoint for proton->proton (skip_incremental_checkpoint=true)")
564+
} else {
565+
t.SetProgress("getting checkpoint value")
566+
if err = getIncrementalValue(t.Config, tgtConn, srcConn.Template().Variable); err != nil {
567+
err = g.Error(err, "Could not get incremental value")
568+
return err
569+
}
558570
}
559571
}
560572

0 commit comments

Comments
 (0)