Skip to content

Commit 2b70cb8

Browse files
committed
fix: prune deleted watermark state after rewrite grace
1 parent 4437b49 commit 2b70cb8

3 files changed

Lines changed: 143 additions & 12 deletions

File tree

internal/service/scheduler/tick_planner.go

Lines changed: 68 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ type DAGChangeEvent struct {
3535
DAGName string // always set (needed for delete)
3636
}
3737

38+
const deletedWatermarkGrace = 2 * time.Minute
39+
3840
// PlannedRun represents a run that the TickPlanner has decided should be dispatched.
3941
type PlannedRun struct {
4042
DAG *core.DAG
@@ -106,8 +108,8 @@ type TickPlannerConfig struct {
106108
// tracks progress via watermarks, and reacts to DAG lifecycle changes.
107109
//
108110
// Thread safety:
109-
// - entries and buffers are protected by entryMu (accessed from drainEvents
110-
// goroutine and cronLoop's Plan).
111+
// - entries, buffers, and deletedGrace are protected by entryMu (accessed
112+
// from drainEvents goroutine and cronLoop's Plan).
111113
// - watermarkState is shared with the flusher goroutine and protected by mu.
112114
// - Plan() holds entryMu during I/O calls (IsSuspended, IsRunning,
113115
// GetLatestStatus, GenRunID). This is intentional: the lock prevents
@@ -124,9 +126,10 @@ type TickPlanner struct {
124126
watermarkDirty atomic.Bool
125127

126128
// per-DAG tracking (protected by entryMu)
127-
entryMu sync.Mutex
128-
entries map[string]*plannerEntry
129-
buffers map[string]*ScheduleBuffer
129+
entryMu sync.Mutex
130+
entries map[string]*plannerEntry
131+
buffers map[string]*ScheduleBuffer
132+
deletedGrace map[string]time.Time
130133

131134
// lastPlanResult holds the runs from the most recent Plan() call.
132135
// It is written by Plan() and read by Advance(). Both are called
@@ -204,9 +207,10 @@ func NewTickPlanner(cfg TickPlannerConfig) *TickPlanner {
204207
}
205208
}
206209
return &TickPlanner{
207-
cfg: cfg,
208-
entries: make(map[string]*plannerEntry),
209-
buffers: make(map[string]*ScheduleBuffer),
210+
cfg: cfg,
211+
entries: make(map[string]*plannerEntry),
212+
buffers: make(map[string]*ScheduleBuffer),
213+
deletedGrace: make(map[string]time.Time),
210214
}
211215
}
212216

@@ -383,6 +387,8 @@ func (tp *TickPlanner) Plan(ctx context.Context, now time.Time) []PlannedRun {
383387
tp.entryMu.Lock()
384388
defer tp.entryMu.Unlock()
385389

390+
tp.pruneExpiredDeletedWatermarks(now)
391+
386392
var candidates []PlannedRun
387393

388394
for dagName, entry := range tp.entries {
@@ -649,6 +655,9 @@ func (tp *TickPlanner) shouldRun(ctx context.Context, dag *core.DAG, scheduledTi
649655
// The latest run belongs to a removed/edited slot. Do not let its runtime
650656
// timestamps suppress the current schedule.
651657
return true
658+
case latestScheduledSlotUnknown:
659+
// Fall back to runtime-based suppression when the latest run does not carry
660+
// a trustworthy scheduled slot identity.
652661
}
653662

654663
// Guard 2 fallback: legacy/manual runs without an authoritative schedule slot.
@@ -1047,6 +1056,7 @@ func (tp *TickPlanner) handleEvent(ctx context.Context, event DAGChangeEvent) {
10471056
if event.DAG == nil {
10481057
return
10491058
}
1059+
delete(tp.deletedGrace, event.DAGName)
10501060
tp.entries[event.DAGName] = &plannerEntry{dag: event.DAG}
10511061
// Set watermark to now (new DAGs have no catchup)
10521062
flushNow = tp.advanceDAGWatermark(event.DAGName, tp.cfg.Clock())
@@ -1058,6 +1068,7 @@ func (tp *TickPlanner) handleEvent(ctx context.Context, event DAGChangeEvent) {
10581068
if event.DAG == nil {
10591069
return
10601070
}
1071+
delete(tp.deletedGrace, event.DAGName)
10611072
tp.entries[event.DAGName] = &plannerEntry{dag: event.DAG}
10621073
// Remove existing buffer and recompute if catchupWindow > 0
10631074
delete(tp.buffers, event.DAGName)
@@ -1071,9 +1082,11 @@ func (tp *TickPlanner) handleEvent(ctx context.Context, event DAGChangeEvent) {
10711082
case DAGChangeDeleted:
10721083
delete(tp.entries, event.DAGName)
10731084
delete(tp.buffers, event.DAGName)
1074-
// Preserve persisted watermark state across delete+add rewrite cycles so
1075-
// the re-added DAG can detect schedule edits before its next slot. Truly
1076-
// deleted DAG watermarks are pruned on the next Init.
1085+
if tp.hasDAGWatermark(event.DAGName) {
1086+
tp.deletedGrace[event.DAGName] = tp.cfg.Clock().Add(deletedWatermarkGrace)
1087+
}
1088+
// Preserve watermark state briefly across delete+add rewrite cycles so
1089+
// the re-added DAG can detect schedule edits before its next slot.
10771090
logger.Info(ctx, "Planner: DAG deleted", tag.DAG(event.DAGName))
10781091
}
10791092

@@ -1128,6 +1141,50 @@ func (tp *TickPlanner) reconcileStartScheduleState(dag *core.DAG) bool {
11281141
return true
11291142
}
11301143

1144+
func (tp *TickPlanner) hasDAGWatermark(dagName string) bool {
1145+
tp.mu.RLock()
1146+
defer tp.mu.RUnlock()
1147+
1148+
if tp.watermarkState == nil {
1149+
return false
1150+
}
1151+
_, ok := tp.watermarkState.DAGs[dagName]
1152+
return ok
1153+
}
1154+
1155+
func (tp *TickPlanner) pruneExpiredDeletedWatermarks(now time.Time) {
1156+
if len(tp.deletedGrace) == 0 {
1157+
return
1158+
}
1159+
1160+
tp.mu.Lock()
1161+
defer tp.mu.Unlock()
1162+
1163+
if tp.watermarkState == nil || tp.watermarkState.DAGs == nil {
1164+
return
1165+
}
1166+
1167+
changed := false
1168+
for dagName, expiresAt := range tp.deletedGrace {
1169+
if now.Before(expiresAt) {
1170+
continue
1171+
}
1172+
delete(tp.deletedGrace, dagName)
1173+
if _, active := tp.entries[dagName]; active {
1174+
continue
1175+
}
1176+
if _, ok := tp.watermarkState.DAGs[dagName]; !ok {
1177+
continue
1178+
}
1179+
delete(tp.watermarkState.DAGs, dagName)
1180+
changed = true
1181+
}
1182+
1183+
if changed {
1184+
tp.watermarkDirty.Store(true)
1185+
}
1186+
}
1187+
11311188
// reinsertCatchupItem puts a failed catchup run back at the front of the
11321189
// DAG's schedule buffer so it retries on the next tick. If the buffer was
11331190
// already cleaned up, a new one is created.

internal/service/scheduler/tick_planner_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,80 @@ func TestTickPlanner_HandleEvent_Deleted(t *testing.T) {
459459
// Verify entry was removed
460460
_, ok = tp.entries["del-dag"]
461461
assert.False(t, ok, "del-dag should be removed from entries")
462+
463+
tp.mu.RLock()
464+
_, hasWM := tp.watermarkState.DAGs["del-dag"]
465+
tp.mu.RUnlock()
466+
assert.True(t, hasWM, "del-dag watermark should be retained during the rewrite grace window")
467+
}
468+
469+
func TestTickPlanner_DeletedWatermarkExpiresAfterGraceWindow(t *testing.T) {
470+
t.Parallel()
471+
472+
const graceWindow = 2 * time.Minute
473+
474+
now := time.Date(2026, 2, 7, 12, 0, 0, 0, time.UTC)
475+
clock := now
476+
eventCh := make(chan DAGChangeEvent, 256)
477+
store := &mockWatermarkStore{}
478+
tp := NewTickPlanner(TickPlannerConfig{
479+
WatermarkStore: store,
480+
QueuesEnabled: true,
481+
IsSuspended: func(_ context.Context, _ string) bool {
482+
return false
483+
},
484+
GetLatestStatus: func(_ context.Context, _ *core.DAG) (exec.DAGRunStatus, error) {
485+
return exec.DAGRunStatus{}, nil
486+
},
487+
Dispatch: func(_ context.Context, _ *core.DAG, _ string, _ core.TriggerType, _ time.Time) error {
488+
return nil
489+
},
490+
GenRunID: func(_ context.Context) (string, error) {
491+
return "test-run-id", nil
492+
},
493+
IsRunning: func(_ context.Context, _ *core.DAG) (bool, error) {
494+
return false, nil
495+
},
496+
RunExists: func(_ context.Context, _ *core.DAG, _ string) (bool, error) {
497+
return false, nil
498+
},
499+
Clock: func() time.Time {
500+
return clock
501+
},
502+
Events: eventCh,
503+
})
504+
505+
dag := &core.DAG{
506+
Name: "deleted-after-grace",
507+
Schedule: []core.Schedule{mustParseSchedule(t, "0 * * * *")},
508+
}
509+
require.NoError(t, tp.Init(context.Background(), []*core.DAG{dag}))
510+
511+
tp.entryMu.Lock()
512+
tp.handleEvent(context.Background(), DAGChangeEvent{
513+
Type: DAGChangeDeleted,
514+
DAGName: dag.Name,
515+
})
516+
tp.entryMu.Unlock()
517+
518+
tp.mu.RLock()
519+
_, hasWM := tp.watermarkState.DAGs[dag.Name]
520+
tp.mu.RUnlock()
521+
require.True(t, hasWM, "watermark should remain available during the rewrite grace window")
522+
523+
clock = now.Add(graceWindow + time.Second)
524+
tp.Plan(context.Background(), clock)
525+
tp.Flush(context.Background())
526+
527+
tp.mu.RLock()
528+
_, hasWM = tp.watermarkState.DAGs[dag.Name]
529+
tp.mu.RUnlock()
530+
assert.False(t, hasWM, "watermark should be pruned after the grace window expires")
531+
532+
saved := store.lastSaved()
533+
require.NotNil(t, saved)
534+
_, hasPersisted := saved.DAGs[dag.Name]
535+
assert.False(t, hasPersisted, "expired deleted watermark should not be persisted")
462536
}
463537

464538
func TestTickPlanner_HandleEvent_Updated(t *testing.T) {

internal/service/scheduler/watermark.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type SchedulerState struct {
2020
type DAGWatermark struct {
2121
LastScheduledTime time.Time `json:"lastScheduledTime"`
2222
StartScheduleFingerprint string `json:"startScheduleFingerprint,omitempty"`
23-
SkipSuccessResetAt time.Time `json:"skipSuccessResetAt,omitempty"`
23+
SkipSuccessResetAt time.Time `json:"skipSuccessResetAt"`
2424
OneOffs map[string]OneOffScheduleState `json:"oneOffs,omitempty"`
2525
}
2626

0 commit comments

Comments
 (0)