@@ -219,6 +219,9 @@ func (cfg *Config) SetDefault() {
219219 cfg .MetadataLoadedAt = g .Bool (false )
220220 }
221221
222+ // Normalize loaded_at shorthand and enforce metadata when needed
223+ cfg .normalizeLoadedAtShorthand ()
224+
222225 // set vars
223226 for k , v := range cfg .Env {
224227 os .Setenv (k , v )
@@ -329,6 +332,22 @@ func (cfg *Config) sourceIsFile() bool {
329332 return cfg .Options .StdIn || cfg .SrcConn .Info ().Type .IsFile ()
330333}
331334
335+ // normalizeLoadedAtShorthand ensures the "." shorthand maps to the loaded_at column
336+ // and enables metadata generation when required for file incrementals.
337+ func (cfg * Config ) normalizeLoadedAtShorthand () {
338+ if cfg .Source .UpdateKey == "." {
339+ if cfg .sourceIsFile () {
340+ cfg .Source .UpdateKey = slingLoadedAtColumn
341+ } else {
342+ g .Warn ("update_key shorthand '.' is only supported for file sources; leaving as '.'" )
343+ }
344+ }
345+
346+ if cfg .Source .UpdateKey == slingLoadedAtColumn && cfg .Mode == IncrementalMode && cfg .sourceIsFile () {
347+ cfg .MetadataLoadedAt = g .Bool (true )
348+ }
349+ }
350+
332351func (cfg * Config ) DetermineType () (Type JobType , err error ) {
333352
334353 srcFileProvided := cfg .sourceIsFile ()
@@ -364,8 +383,14 @@ func (cfg *Config) DetermineType() (Type JobType, err error) {
364383 if cfg .Source .UpdateKey == "" {
365384 cfg .Source .UpdateKey = "_bigtable_timestamp"
366385 }
367- } else if srcFileProvided && cfg .Source .UpdateKey == slingLoadedAtColumn {
368- // need to loaded_at column for file incremental
386+ } else if cfg .Source .UpdateKey == "." && ! srcFileProvided {
387+ err = g .Error ("update_key shorthand '.' is only supported when source is file" )
388+ return
389+ } else if srcFileProvided && g .In (cfg .Source .UpdateKey , slingLoadedAtColumn , "." ) {
390+ // need the loaded_at column for file incremental when using the default shorthand
391+ if cfg .Source .UpdateKey == "." {
392+ cfg .Source .UpdateKey = slingLoadedAtColumn
393+ }
369394 cfg .MetadataLoadedAt = g .Bool (true )
370395 } else if cfg .Source .UpdateKey == "" && len (cfg .Source .PrimaryKey ()) == 0 {
371396 err = g .Error ("must specify value for 'update_key' and/or 'primary_key' for incremental mode. See docs for more details: https://docs.slingdata.io/sling-cli/run/configuration" )
@@ -720,6 +745,11 @@ func (cfg *Config) Prepare() (err error) {
720745 }
721746 }
722747
748+ // shorthand validation: "." only supported for file sources
749+ if cfg .Source .UpdateKey == "." && ! cfg .sourceIsFile () {
750+ return g .Error ("update_key shorthand '.' is only supported when source is file" )
751+ }
752+
723753 // compile pre and post sql
724754 if cfg .TgtConn .Type .IsDb () {
725755
@@ -748,6 +778,9 @@ func (cfg *Config) Prepare() (err error) {
748778 }
749779 }
750780
781+ // Normalize loaded_at shorthand and enforce metadata when needed
782+ cfg .normalizeLoadedAtShorthand ()
783+
751784 // done
752785 cfg .Prepared = true
753786 return
@@ -1053,6 +1086,21 @@ func (cfg *Config) IgnoreExisting() bool {
10531086 return cfg .Target .Options .IgnoreExisting != nil && * cfg .Target .Options .IgnoreExisting
10541087}
10551088
1089+ // SkipIncrementalCheckpoint returns true when this run should *not* fetch the
1090+ // incremental watermark (max(update_key)) from the target before an
1091+ // incremental run. This is primarily used for file->Proton restore scenarios
1092+ // where we want to re-insert older records (e.g. older _tp_time values) even
1093+ // if the table already contains newer rows.
1094+ func (cfg * Config ) SkipIncrementalCheckpoint () bool {
1095+ if cfg .Source .Options != nil && cfg .Source .Options .SkipIncrementalCheckpoint != nil && * cfg .Source .Options .SkipIncrementalCheckpoint {
1096+ return true
1097+ }
1098+ if cast .ToBool (os .Getenv ("SLING_SKIP_INCREMENTAL_CHECKPOINT" )) {
1099+ return true
1100+ }
1101+ return false
1102+ }
1103+
10561104// ColumnsPrepared returns the prepared columns
10571105func (cfg * Config ) ColumnsPrepared () (columns iop.Columns ) {
10581106
@@ -1317,21 +1365,27 @@ type SourceOptions struct {
13171365 EmptyAsNull * bool `json:"empty_as_null,omitempty" yaml:"empty_as_null,omitempty"`
13181366 Header * bool `json:"header,omitempty" yaml:"header,omitempty"`
13191367 Flatten * bool `json:"flatten,omitempty" yaml:"flatten,omitempty"`
1320- FieldsPerRec * int `json:"fields_per_rec,omitempty" yaml:"fields_per_rec,omitempty"`
1321- Compression * iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"`
1322- Format * dbio.FileType `json:"format,omitempty" yaml:"format,omitempty"`
1323- NullIf * string `json:"null_if,omitempty" yaml:"null_if,omitempty"`
1324- DatetimeFormat string `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"`
1325- SkipBlankLines * bool `json:"skip_blank_lines,omitempty" yaml:"skip_blank_lines,omitempty"`
1326- Delimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
1327- Escape string `json:"escape,omitempty" yaml:"escape,omitempty"`
1328- Quote string `json:"quote,omitempty" yaml:"quote,omitempty"`
1329- MaxDecimals * int `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"`
1330- JmesPath * string `json:"jmespath,omitempty" yaml:"jmespath,omitempty"`
1331- Sheet * string `json:"sheet,omitempty" yaml:"sheet,omitempty"`
1332- Range * string `json:"range,omitempty" yaml:"range,omitempty"`
1333- Limit * int `json:"limit,omitempty" yaml:"limit,omitempty"`
1334- Offset * int `json:"offset,omitempty" yaml:"offset,omitempty"`
1368+ // SkipIncrementalCheckpoint, when true, prevents Sling from fetching the
1369+ // current max(update_key) from the target before an incremental run. This
1370+ // is used for restore scenarios (e.g. file->Proton) where we need to
1371+ // re-insert older records that would otherwise be skipped by the
1372+ // incremental watermark.
1373+ SkipIncrementalCheckpoint * bool `json:"skip_incremental_checkpoint,omitempty" yaml:"skip_incremental_checkpoint,omitempty"`
1374+ FieldsPerRec * int `json:"fields_per_rec,omitempty" yaml:"fields_per_rec,omitempty"`
1375+ Compression * iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"`
1376+ Format * dbio.FileType `json:"format,omitempty" yaml:"format,omitempty"`
1377+ NullIf * string `json:"null_if,omitempty" yaml:"null_if,omitempty"`
1378+ DatetimeFormat string `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"`
1379+ SkipBlankLines * bool `json:"skip_blank_lines,omitempty" yaml:"skip_blank_lines,omitempty"`
1380+ Delimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
1381+ Escape string `json:"escape,omitempty" yaml:"escape,omitempty"`
1382+ Quote string `json:"quote,omitempty" yaml:"quote,omitempty"`
1383+ MaxDecimals * int `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"`
1384+ JmesPath * string `json:"jmespath,omitempty" yaml:"jmespath,omitempty"`
1385+ Sheet * string `json:"sheet,omitempty" yaml:"sheet,omitempty"`
1386+ Range * string `json:"range,omitempty" yaml:"range,omitempty"`
1387+ Limit * int `json:"limit,omitempty" yaml:"limit,omitempty"`
1388+ Offset * int `json:"offset,omitempty" yaml:"offset,omitempty"`
13351389
13361390 // columns & transforms were moved out of source_options
13371391 // https://github.com/slingdata-io/sling-cli/issues/348
0 commit comments