fix: corroborate stale distributed run repair with worker heartbeats#2041
fix: corroborate stale distributed run repair with worker heartbeats#2041
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR implements stale distributed run detection and repair using worker heartbeat validation. When a DAG run lease becomes stale, the system now checks if the associated worker's heartbeat still reflects that attempt; if not, the run is marked failed. This adds heartbeat store integration to the coordinator's stale lease reconciliation flow and the API's read paths. Changes
Sequence DiagramsequenceDiagram
participant Coord as Coordinator
participant LStore as DAGRunLeaseStore
participant HBStore as WorkerHeartbeatStore
participant DStore as DAGRunStore
participant Runtime as Runtime
Coord->>LStore: Fetch lease for run
alt Lease found
LStore-->>Coord: Lease record
Coord->>HBStore: Get heartbeat for worker
alt Heartbeat found & fresh
HBStore-->>Coord: Heartbeat record
Coord->>Coord: Compare attempt keys/names
alt Attempt matches in heartbeat
Coord-->>Coord: Lease valid, skip repair
else Attempt absent in heartbeat
Coord->>Runtime: Call ConfirmAndRepairStaleDistributedRun
Runtime->>DStore: CAS run status to Failed
DStore-->>Runtime: Swap success/failure
Runtime-->>Coord: Repair result
Coord->>LStore: Delete stale lease
end
else Heartbeat missing/stale
HBStore-->>Coord: Error or stale record
Coord->>Runtime: Call ConfirmAndRepairStaleDistributedRun
Runtime->>DStore: CAS run status to Failed
DStore-->>Runtime: Swap success/failure
Runtime-->>Coord: Repair result
Coord->>LStore: Delete stale lease
end
else Lease not found
LStore-->>Coord: Not found
Coord-->>Coord: Treat as absent, skip repair
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
internal/service/scheduler/zombie_detector.go (1)
274-287:⚠️ Potential issue | 🟡 MinorSilent removal on
ErrCorruptedStatusFilemay hide data-integrity issues.
ErrCorruptedStatusFileis semantically different fromErrDAGRunIDNotFound/ErrNoStatusData: the status file exists but is unparsable (seeinternal/persis/filedagrun/attempt.goreturning it onio.EOF). Elsewhere in the codebase this condition is logged at error level (e.g.,internal/service/scheduler/queue_processor.go— "Status file is corrupted, marking as invalid"), whereas here it is collapsed under the same Info-level "missing persisted DAG run state" message and the proc entry is silently removed.Two suggestions:
- Log the corrupted-file case at
Warn/Error(or branch the message) so operators see an actionable signal instead of an info line that misattributes the cause as "missing".- Confirm that silently dropping the proc entry without surfacing a stronger signal is the desired behavior for a corrupted (not absent) status file — if so, a comment justifying it would help future readers.
📝 Possible refactor
func (z *ZombieDetector) cleanupOrphanedStaleEntry(ctx context.Context, entry exec.ProcEntry, attemptKey string, findErr error) error { - if !errors.Is(findErr, exec.ErrDAGRunIDNotFound) && - !errors.Is(findErr, exec.ErrNoStatusData) && - !errors.Is(findErr, exec.ErrCorruptedStatusFile) { + switch { + case errors.Is(findErr, exec.ErrDAGRunIDNotFound), errors.Is(findErr, exec.ErrNoStatusData): + logger.Info(ctx, "Removing orphaned stale proc entry with missing persisted DAG run state", tag.Error(findErr)) + case errors.Is(findErr, exec.ErrCorruptedStatusFile): + logger.Warn(ctx, "Removing orphaned stale proc entry with corrupted persisted DAG run state", tag.Error(findErr)) + default: return fmt.Errorf("find attempt: %w", findErr) } - - logger.Info(ctx, "Removing orphaned stale proc entry with missing persisted DAG run state", tag.Error(findErr)) z.clearAttemptState(attemptKey) if err := z.procStore.RemoveIfStale(ctx, entry); err != nil { return fmt.Errorf("remove orphaned stale proc: %w", err) } return nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/scheduler/zombie_detector.go` around lines 274 - 287, In cleanupOrphanedStaleEntry, treat errors.Is(findErr, exec.ErrCorruptedStatusFile) separately from the missing-state cases: instead of collapsing it under the Info log, branch on ErrCorruptedStatusFile and emit a higher-severity log (logger.Warn or logger.Error) with the error and context, then decide whether to proceed with z.clearAttemptState(attemptKey) and z.procStore.RemoveIfStale(ctx, entry) or bail with a wrapped error; if you choose to keep removing the proc entry despite corruption, add a brief comment explaining that choice and why silent removal is acceptable, referencing the function cleanupOrphanedStaleEntry and the calls to z.clearAttemptState and z.procStore.RemoveIfStale so reviewers can find the logic easily.internal/service/frontend/api/v1/dags.go (1)
891-895:⚠️ Potential issue | 🔴 CriticalInconsistent nil-checking for
repairConfirmedStaleDistributedRunOnReadreturn value across call sites.The
"latest"branch at lines 860–862 stores the return value in a pointer and guards against nil before dereferencing. ThedagRunIdbranch at line 891 overwritesdagStatusunconditionally and dereferences it at line 894 without a nil check. SincerepairConfirmedStaleDistributedRunOnReadexplicitly returns the input status (which can be nil) when certain conditions are met, both branches should apply consistent nil-checking for defensive safety.Align both branches on the same contract—either guard both or ensure the function's return contract precludes nil.
Suggested fix
- dagStatus = a.repairConfirmedStaleDistributedRunOnRead(ctx, dagStatus) + if repaired := a.repairConfirmedStaleDistributedRunOnRead(ctx, dagStatus); repaired != nil { + dagStatus = repaired + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/frontend/api/v1/dags.go` around lines 891 - 895, The call to repairConfirmedStaleDistributedRunOnRead can return the original dagStatus (which may be nil) but the dagRunId branch overwrites dagStatus and passes it to toDAGRunDetailsWithSpecSource without a nil check; change the dagRunId branch to mirror the "latest" branch by assigning the result to a pointer (dagStatus = a.repairConfirmedStaleDistributedRunOnRead(ctx, dagStatus)), then check for nil before dereferencing or calling toDAGRunDetailsWithSpecSource, and return an appropriate response/error if nil; alternatively, adjust repairConfirmedStaleDistributedRunOnRead to guarantee a non-nil return and update both call sites to the same contract—ensure you reference repairConfirmedStaleDistributedRunOnRead, dagStatus, and toDAGRunDetailsWithSpecSource when making the change.
🧹 Nitpick comments (1)
internal/service/scheduler/zombie_detector_test.go (1)
230-248: Consider also covering theErrCorruptedStatusFilebranch.
cleanupOrphanedStaleEntrynow ignores three distinct errors (ErrDAGRunIDNotFound,ErrNoStatusData,ErrCorruptedStatusFile), but only the first two are tested. A test asserting thatErrCorruptedStatusFiletriggersRemoveIfStale(rather than propagating) would lock in the new behavior and prevent regressions if the ignore-list is later narrowed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/scheduler/zombie_detector_test.go` around lines 230 - 248, Add a new test mirroring TestZombieDetectorDetectAndCleanZombies_StaleEntryWithMissingStatusIsRemoved but make dagRunStore.FindAttempt return exec.ErrCorruptedStatusFile instead of exec.ErrNoStatusData and assert procStore.RemoveIfStale is called; specifically, in the test for cleanupOrphanedStaleEntry behavior create the same entry via testRootProcEntry, set up procStore.ListAllEntries to return it, set dagRunStore.FindAttempt to return (nil, exec.ErrCorruptedStatusFile), expect procStore.RemoveIfStale(entry) to be invoked once, call detector.detectAndCleanZombies(ctx) (created via NewZombieDetector) and assert expectations on procStore and dagRunStore to lock in the ignored ErrCorruptedStatusFile branch.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/service/coordinator/handler_test.go`:
- Around line 1954-1957: The test is asserting NotNil on a value-type return
(exec.DAGRunLease) which is always non-nil; update the assertion to check
meaningful fields instead: after calling leaseStore.Get(ctx, attemptKey) in the
test, replace assert.NotNil(t, lease) with assertions that verify
lease.AttemptKey (or lease.WorkerID) equals the expected value that was
upserted; reference the leaseStore.Get call and the
exec.DAGRunLease.AttemptKey/WorkerID fields when making the replacement so the
test validates actual contents.
In `@internal/service/coordinator/handler.go`:
- Around line 2308-2335: The code path in handler.go where
confirmAndRepairStaleDistributedRun returns repaired==false leaves the lease
missing and causes healthy workers to be cancelled; before returning from the
no-repair branch you should upsert/recreate the distributed lease using the
reconciled status and worker info (use the AttemptID/WorkerID and
reconciledStatus.DAGRun() as the source) so the lease exists for subsequent
RunHeartbeat.Touch calls; locate the block around
confirmAndRepairStaleDistributedRun and add a call to the existing lease upsert
logic (or implement an upsert helper) before the continue, and apply the same
change at the similar block around lines flagged (the other occurrence
mentioned: 2404-2432) to ensure both paths restore the lease when the heartbeat
still reports the attempt.
In `@internal/service/frontend/api/v1/dagruns.go`:
- Around line 1879-1889: In repairConfirmedStaleDistributedRunOnRead, the
fallback worker passed to runtime.ConfirmAndRepairStaleDistributedRun uses
status.WorkerID which can be empty for acked-but-never-reported runs; before
calling ConfirmAndRepairStaleDistributedRun, if status.WorkerID is empty, query
the claim lease from a.dagRunLeaseStore for this run/attempt (use the store's
read/get method) and derive the fallback worker ID from the lease record (e.g.,
lease.Owner or lease.WorkerID), then pass that derived worker ID instead of the
empty status.WorkerID; keep using status.AttemptID and existing parameters and
only fall back to the lease when status.WorkerID is missing.
- Around line 1852-1854: The repair function
repairConfirmedStaleDistributedRunOnRead can persist state changes and must only
run after enforcing visibility, so move the requireWorkspaceVisible(ctx,
statusWorkspaceName(status)) check to occur before any call to
repairConfirmedStaleDistributedRunOnRead(ctx, status), returning the error if
visibility fails and only then invoking the repair; apply the same reorder for
every other place in this file where repairConfirmedStaleDistributedRunOnRead is
called (ensure both branches that load status perform requireWorkspaceVisible
first) so unauthorized callers cannot trigger state changes.
---
Outside diff comments:
In `@internal/service/frontend/api/v1/dags.go`:
- Around line 891-895: The call to repairConfirmedStaleDistributedRunOnRead can
return the original dagStatus (which may be nil) but the dagRunId branch
overwrites dagStatus and passes it to toDAGRunDetailsWithSpecSource without a
nil check; change the dagRunId branch to mirror the "latest" branch by assigning
the result to a pointer (dagStatus =
a.repairConfirmedStaleDistributedRunOnRead(ctx, dagStatus)), then check for nil
before dereferencing or calling toDAGRunDetailsWithSpecSource, and return an
appropriate response/error if nil; alternatively, adjust
repairConfirmedStaleDistributedRunOnRead to guarantee a non-nil return and
update both call sites to the same contract—ensure you reference
repairConfirmedStaleDistributedRunOnRead, dagStatus, and
toDAGRunDetailsWithSpecSource when making the change.
In `@internal/service/scheduler/zombie_detector.go`:
- Around line 274-287: In cleanupOrphanedStaleEntry, treat errors.Is(findErr,
exec.ErrCorruptedStatusFile) separately from the missing-state cases: instead of
collapsing it under the Info log, branch on ErrCorruptedStatusFile and emit a
higher-severity log (logger.Warn or logger.Error) with the error and context,
then decide whether to proceed with z.clearAttemptState(attemptKey) and
z.procStore.RemoveIfStale(ctx, entry) or bail with a wrapped error; if you
choose to keep removing the proc entry despite corruption, add a brief comment
explaining that choice and why silent removal is acceptable, referencing the
function cleanupOrphanedStaleEntry and the calls to z.clearAttemptState and
z.procStore.RemoveIfStale so reviewers can find the logic easily.
---
Nitpick comments:
In `@internal/service/scheduler/zombie_detector_test.go`:
- Around line 230-248: Add a new test mirroring
TestZombieDetectorDetectAndCleanZombies_StaleEntryWithMissingStatusIsRemoved but
make dagRunStore.FindAttempt return exec.ErrCorruptedStatusFile instead of
exec.ErrNoStatusData and assert procStore.RemoveIfStale is called; specifically,
in the test for cleanupOrphanedStaleEntry behavior create the same entry via
testRootProcEntry, set up procStore.ListAllEntries to return it, set
dagRunStore.FindAttempt to return (nil, exec.ErrCorruptedStatusFile), expect
procStore.RemoveIfStale(entry) to be invoked once, call
detector.detectAndCleanZombies(ctx) (created via NewZombieDetector) and assert
expectations on procStore and dagRunStore to lock in the ignored
ErrCorruptedStatusFile branch.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a264d583-b751-491d-b1c0-92ca436e7e1c
📒 Files selected for processing (14)
internal/cmd/context.gointernal/core/exec/distributed.gointernal/persis/filedistributed/stores_test.gointernal/persis/filedistributed/worker_heartbeat_store.gointernal/runtime/distributed_stale_run.gointernal/service/coordinator/handler.gointernal/service/coordinator/handler_test.gointernal/service/frontend/api/v1/api.gointernal/service/frontend/api/v1/dagruns.gointernal/service/frontend/api/v1/dags.gointernal/service/frontend/api/v1/proc_liveness_test.gointernal/service/scheduler/zombie_detector.gointernal/service/scheduler/zombie_detector_test.gointernal/test/server.go
| lease, err := leaseStore.Get(ctx, attemptKey) | ||
| require.NoError(t, err) | ||
| assert.NotNil(t, lease) | ||
| }) |
There was a problem hiding this comment.
Nit: assert.NotNil on a struct value is a no-op.
leaseStore.Get returns exec.DAGRunLease by value (see reconcileDistributedLease's parameter shape), so assert.NotNil(t, lease) always succeeds regardless of contents. The preceding require.NoError already proves retrieval succeeded — either drop the line or assert a meaningful field (e.g., the lease's AttemptKey/WorkerID matches what was upserted).
♻️ Suggested tweak
lease, err := leaseStore.Get(ctx, attemptKey)
require.NoError(t, err)
- assert.NotNil(t, lease)
+ assert.Equal(t, attemptKey, lease.AttemptKey)
+ assert.Equal(t, "worker-1", lease.WorkerID)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| lease, err := leaseStore.Get(ctx, attemptKey) | |
| require.NoError(t, err) | |
| assert.NotNil(t, lease) | |
| }) | |
| lease, err := leaseStore.Get(ctx, attemptKey) | |
| require.NoError(t, err) | |
| assert.Equal(t, attemptKey, lease.AttemptKey) | |
| assert.Equal(t, "worker-1", lease.WorkerID) | |
| }) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/service/coordinator/handler_test.go` around lines 1954 - 1957, The
test is asserting NotNil on a value-type return (exec.DAGRunLease) which is
always non-nil; update the assertion to check meaningful fields instead: after
calling leaseStore.Get(ctx, attemptKey) in the test, replace assert.NotNil(t,
lease) with assertions that verify lease.AttemptKey (or lease.WorkerID) equals
the expected value that was upserted; reference the leaseStore.Get call and the
exec.DAGRunLease.AttemptKey/WorkerID fields when making the replacement so the
test validates actual contents.
| reconciledStatus, repaired, err := h.confirmAndRepairStaleDistributedRun(ctx, status, leaseState.attemptID, status.WorkerID) | ||
| if err != nil { | ||
| logger.Error(ctx, "Failed to confirm stale orphaned distributed run", | ||
| tag.DAG(status.Name), | ||
| tag.RunID(status.DAGRunID), | ||
| tag.AttemptKey(leaseState.attemptKey), | ||
| tag.Error(err), | ||
| ) | ||
| continue | ||
| } | ||
| if repaired { | ||
| h.deleteDistributedTracking(ctx, context.WithoutCancel(ctx), status.DAGRun(), leaseState.attemptKey, | ||
| "Failed to delete orphaned distributed lease after confirmed failure", | ||
| "Failed to delete orphaned active distributed run after confirmed failure", | ||
| ) | ||
| continue | ||
| } | ||
| if reconciledStatus == nil { | ||
| continue | ||
| } | ||
| if reconciledStatus.AttemptID != leaseState.attemptID || (!reconciledStatus.Status.IsActive() && reconciledStatus.Status != core.NotStarted) { | ||
| h.deleteDistributedTracking(ctx, context.WithoutCancel(ctx), status.DAGRun(), leaseState.attemptKey, | ||
| "Failed to delete superseded orphaned distributed lease after reconciliation", | ||
| "Failed to delete superseded orphaned active distributed run after reconciliation", | ||
| ) | ||
| continue | ||
| } | ||
|
|
There was a problem hiding this comment.
Recreate the lease when heartbeat confirmation keeps the run alive.
When confirmAndRepairStaleDistributedRun returns repaired=false because the worker heartbeat still reports the attempt, these branches leave a missing lease missing. The next RunHeartbeat will still fail Touch and add the attempt to CancelledRuns, so a healthy worker gets told to cancel itself. Please upsert the lease from the reconciled status/worker before leaving the no-repair path.
Also applies to: 2404-2432
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/service/coordinator/handler.go` around lines 2308 - 2335, The code
path in handler.go where confirmAndRepairStaleDistributedRun returns
repaired==false leaves the lease missing and causes healthy workers to be
cancelled; before returning from the no-repair branch you should upsert/recreate
the distributed lease using the reconciled status and worker info (use the
AttemptID/WorkerID and reconciledStatus.DAGRun() as the source) so the lease
exists for subsequent RunHeartbeat.Touch calls; locate the block around
confirmAndRepairStaleDistributedRun and add a call to the existing lease upsert
logic (or implement an upsert helper) before the continue, and apply the same
change at the similar block around lines flagged (the other occurrence
mentioned: 2404-2432) to ensure both paths restore the lease when the heartbeat
still reports the attempt.
| status = a.repairConfirmedStaleDistributedRunOnRead(ctx, status) | ||
| if err := a.requireWorkspaceVisible(ctx, statusWorkspaceName(status)); err != nil { | ||
| return api.GetDAGRunDetails200JSONResponse{}, err |
There was a problem hiding this comment.
Authorize before running read-time repair.
repairConfirmedStaleDistributedRunOnRead can persist a FAILED status, so calling it before requireWorkspaceVisible lets a caller who cannot view this DAG run still trigger state changes just by hitting the details endpoint. Please gate the repair behind the visibility check in both branches.
Suggested change
- status = a.repairConfirmedStaleDistributedRunOnRead(ctx, status)
- if err := a.requireWorkspaceVisible(ctx, statusWorkspaceName(status)); err != nil {
+ if err := a.requireWorkspaceVisible(ctx, statusWorkspaceName(status)); err != nil {
return api.GetDAGRunDetails200JSONResponse{}, err
}
+ status = a.repairConfirmedStaleDistributedRunOnRead(ctx, status)
@@
- dagStatus = a.repairConfirmedStaleDistributedRunOnRead(ctx, dagStatus)
- if err := a.requireWorkspaceVisible(ctx, statusWorkspaceName(dagStatus)); err != nil {
+ if err := a.requireWorkspaceVisible(ctx, statusWorkspaceName(dagStatus)); err != nil {
return api.GetDAGRunDetails200JSONResponse{}, err
}
+ dagStatus = a.repairConfirmedStaleDistributedRunOnRead(ctx, dagStatus)Based on learnings: Single‑resource DAG and DAG‑run endpoints intentionally do not accept a workspace query parameter; the backend infers workspace from the loaded target and enforces visibility/permissions server‑side.
Also applies to: 1870-1872, 1879-1903
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/service/frontend/api/v1/dagruns.go` around lines 1852 - 1854, The
repair function repairConfirmedStaleDistributedRunOnRead can persist state
changes and must only run after enforcing visibility, so move the
requireWorkspaceVisible(ctx, statusWorkspaceName(status)) check to occur before
any call to repairConfirmedStaleDistributedRunOnRead(ctx, status), returning the
error if visibility fails and only then invoking the repair; apply the same
reorder for every other place in this file where
repairConfirmedStaleDistributedRunOnRead is called (ensure both branches that
load status perform requireWorkspaceVisible first) so unauthorized callers
cannot trigger state changes.
| func (a *API) repairConfirmedStaleDistributedRunOnRead(ctx context.Context, status *exec.DAGRunStatus) *exec.DAGRunStatus { | ||
| if status == nil || a.dagRunLeaseStore == nil || a.workerHeartbeatStore == nil { | ||
| return status | ||
| } | ||
|
|
||
| reconciled, _, err := runtime.ConfirmAndRepairStaleDistributedRun(ctx, runtime.DistributedRunRepairConfig{ | ||
| DAGRunStore: a.dagRunStore, | ||
| DAGRunLeaseStore: a.dagRunLeaseStore, | ||
| WorkerHeartbeatStore: a.workerHeartbeatStore, | ||
| StaleLeaseThreshold: a.leaseStaleThreshold, | ||
| }, status, status.AttemptID, status.WorkerID) |
There was a problem hiding this comment.
This read path still skips acked runs that never recorded WorkerID.
Here the fallback worker is status.WorkerID, but the initial distributed NotStarted status can be persisted without any worker ID while the claim lease already knows the remote worker. In that state ConfirmAndRepairStaleDistributedRun exits before it even reads the lease, so the details endpoint cannot repair the "acked but never reported status" case. Please derive the fallback worker from the lease when the saved status is still missing it.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/service/frontend/api/v1/dagruns.go` around lines 1879 - 1889, In
repairConfirmedStaleDistributedRunOnRead, the fallback worker passed to
runtime.ConfirmAndRepairStaleDistributedRun uses status.WorkerID which can be
empty for acked-but-never-reported runs; before calling
ConfirmAndRepairStaleDistributedRun, if status.WorkerID is empty, query the
claim lease from a.dagRunLeaseStore for this run/attempt (use the store's
read/get method) and derive the fallback worker ID from the lease record (e.g.,
lease.Owner or lease.WorkerID), then pass that derived worker ID instead of the
empty status.WorkerID; keep using status.AttemptID and existing parameters and
only fall back to the lease when status.WorkerID is missing.
Summary
Testing
Closes #2040
Summary by CodeRabbit
New Features
Bug Fixes