@@ -35,6 +35,8 @@ type DAGChangeEvent struct {
3535 DAGName string // always set (needed for delete)
3636}
3737
38+ const deletedWatermarkGrace = 2 * time .Minute
39+
3840// PlannedRun represents a run that the TickPlanner has decided should be dispatched.
3941type PlannedRun struct {
4042 DAG * core.DAG
@@ -106,8 +108,8 @@ type TickPlannerConfig struct {
106108// tracks progress via watermarks, and reacts to DAG lifecycle changes.
107109//
108110// Thread safety:
109- // - entries and buffers are protected by entryMu (accessed from drainEvents
110- // goroutine and cronLoop's Plan).
111+ // - entries, buffers, and deletedGrace are protected by entryMu (accessed
112+ // from drainEvents goroutine and cronLoop's Plan).
111113// - watermarkState is shared with the flusher goroutine and protected by mu.
112114// - Plan() holds entryMu during I/O calls (IsSuspended, IsRunning,
113115// GetLatestStatus, GenRunID). This is intentional: the lock prevents
@@ -124,9 +126,10 @@ type TickPlanner struct {
124126 watermarkDirty atomic.Bool
125127
126128 // per-DAG tracking (protected by entryMu)
127- entryMu sync.Mutex
128- entries map [string ]* plannerEntry
129- buffers map [string ]* ScheduleBuffer
129+ entryMu sync.Mutex
130+ entries map [string ]* plannerEntry
131+ buffers map [string ]* ScheduleBuffer
132+ deletedGrace map [string ]time.Time
130133
131134 // lastPlanResult holds the runs from the most recent Plan() call.
132135 // It is written by Plan() and read by Advance(). Both are called
@@ -204,9 +207,10 @@ func NewTickPlanner(cfg TickPlannerConfig) *TickPlanner {
204207 }
205208 }
206209 return & TickPlanner {
207- cfg : cfg ,
208- entries : make (map [string ]* plannerEntry ),
209- buffers : make (map [string ]* ScheduleBuffer ),
210+ cfg : cfg ,
211+ entries : make (map [string ]* plannerEntry ),
212+ buffers : make (map [string ]* ScheduleBuffer ),
213+ deletedGrace : make (map [string ]time.Time ),
210214 }
211215}
212216
@@ -383,6 +387,8 @@ func (tp *TickPlanner) Plan(ctx context.Context, now time.Time) []PlannedRun {
383387 tp .entryMu .Lock ()
384388 defer tp .entryMu .Unlock ()
385389
390+ tp .pruneExpiredDeletedWatermarks (now )
391+
386392 var candidates []PlannedRun
387393
388394 for dagName , entry := range tp .entries {
@@ -649,6 +655,9 @@ func (tp *TickPlanner) shouldRun(ctx context.Context, dag *core.DAG, scheduledTi
649655 // The latest run belongs to a removed/edited slot. Do not let its runtime
650656 // timestamps suppress the current schedule.
651657 return true
658+ case latestScheduledSlotUnknown :
659+ // Fall back to runtime-based suppression when the latest run does not carry
660+ // a trustworthy scheduled slot identity.
652661 }
653662
654663 // Guard 2 fallback: legacy/manual runs without an authoritative schedule slot.
@@ -1047,6 +1056,7 @@ func (tp *TickPlanner) handleEvent(ctx context.Context, event DAGChangeEvent) {
10471056 if event .DAG == nil {
10481057 return
10491058 }
1059+ delete (tp .deletedGrace , event .DAGName )
10501060 tp .entries [event .DAGName ] = & plannerEntry {dag : event .DAG }
10511061 // Set watermark to now (new DAGs have no catchup)
10521062 flushNow = tp .advanceDAGWatermark (event .DAGName , tp .cfg .Clock ())
@@ -1058,6 +1068,7 @@ func (tp *TickPlanner) handleEvent(ctx context.Context, event DAGChangeEvent) {
10581068 if event .DAG == nil {
10591069 return
10601070 }
1071+ delete (tp .deletedGrace , event .DAGName )
10611072 tp .entries [event .DAGName ] = & plannerEntry {dag : event .DAG }
10621073 // Remove existing buffer and recompute if catchupWindow > 0
10631074 delete (tp .buffers , event .DAGName )
@@ -1071,9 +1082,11 @@ func (tp *TickPlanner) handleEvent(ctx context.Context, event DAGChangeEvent) {
10711082 case DAGChangeDeleted :
10721083 delete (tp .entries , event .DAGName )
10731084 delete (tp .buffers , event .DAGName )
1074- // Preserve persisted watermark state across delete+add rewrite cycles so
1075- // the re-added DAG can detect schedule edits before its next slot. Truly
1076- // deleted DAG watermarks are pruned on the next Init.
1085+ if tp .hasDAGWatermark (event .DAGName ) {
1086+ tp .deletedGrace [event .DAGName ] = tp .cfg .Clock ().Add (deletedWatermarkGrace )
1087+ }
1088+ // Preserve watermark state briefly across delete+add rewrite cycles so
1089+ // the re-added DAG can detect schedule edits before its next slot.
10771090 logger .Info (ctx , "Planner: DAG deleted" , tag .DAG (event .DAGName ))
10781091 }
10791092
@@ -1128,6 +1141,50 @@ func (tp *TickPlanner) reconcileStartScheduleState(dag *core.DAG) bool {
11281141 return true
11291142}
11301143
1144+ func (tp * TickPlanner ) hasDAGWatermark (dagName string ) bool {
1145+ tp .mu .RLock ()
1146+ defer tp .mu .RUnlock ()
1147+
1148+ if tp .watermarkState == nil {
1149+ return false
1150+ }
1151+ _ , ok := tp .watermarkState .DAGs [dagName ]
1152+ return ok
1153+ }
1154+
1155+ func (tp * TickPlanner ) pruneExpiredDeletedWatermarks (now time.Time ) {
1156+ if len (tp .deletedGrace ) == 0 {
1157+ return
1158+ }
1159+
1160+ tp .mu .Lock ()
1161+ defer tp .mu .Unlock ()
1162+
1163+ if tp .watermarkState == nil || tp .watermarkState .DAGs == nil {
1164+ return
1165+ }
1166+
1167+ changed := false
1168+ for dagName , expiresAt := range tp .deletedGrace {
1169+ if now .Before (expiresAt ) {
1170+ continue
1171+ }
1172+ delete (tp .deletedGrace , dagName )
1173+ if _ , active := tp .entries [dagName ]; active {
1174+ continue
1175+ }
1176+ if _ , ok := tp .watermarkState .DAGs [dagName ]; ! ok {
1177+ continue
1178+ }
1179+ delete (tp .watermarkState .DAGs , dagName )
1180+ changed = true
1181+ }
1182+
1183+ if changed {
1184+ tp .watermarkDirty .Store (true )
1185+ }
1186+ }
1187+
11311188// reinsertCatchupItem puts a failed catchup run back at the front of the
11321189// DAG's schedule buffer so it retries on the next tick. If the buffer was
11331190// already cleaned up, a new one is created.
0 commit comments