Skip to content

fix: corroborate stale distributed run repair with worker heartbeats#2041

Merged
yottahmd merged 6 commits intomainfrom
fix/scheduler-stale-proc-missing-status
Apr 27, 2026
Merged

fix: corroborate stale distributed run repair with worker heartbeats#2041
yottahmd merged 6 commits intomainfrom
fix/scheduler-stale-proc-missing-status

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Apr 26, 2026

Summary

  • require corroborating worker-heartbeat evidence before failing stale distributed runs
  • repair confirmed stale distributed runs on single-run details reads without adding list-endpoint repair work
  • add regression coverage for worker heartbeat lookup, coordinator zombie reconciliation, and frontend read-time repair

Testing

  • go test ./internal/persis/filedistributed -count=1
  • go test ./internal/service/coordinator -count=1
  • go test ./internal/service/frontend/api/v1 -count=1
  • go test ./internal/intg/distr -run 'TestDistributedRun_AckedTaskWithoutInitialStatus_MarkedFailedAndCleansLease|TestZombieRecovery|TestCoordinatorAPI' -count=1

Closes #2040

Summary by CodeRabbit

  • New Features

    • Introduced automatic reconciliation of stale distributed run statuses, checking worker heartbeat data to determine if repair is needed.
  • Bug Fixes

    • Improved detection of truly abandoned distributed runs by validating worker heartbeat activity before marking runs as failed, reducing false-positive failures from stale lease records.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 26, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f34b45c1-c8c6-435b-88d7-e0b4cb950266

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR implements stale distributed run detection and repair using worker heartbeat validation. When a DAG run lease becomes stale, the system now checks if the associated worker's heartbeat still reflects that attempt; if not, the run is marked failed. This adds heartbeat store integration to the coordinator's stale lease reconciliation flow and the API's read paths.

Changes

Cohort / File(s) Summary
Worker Heartbeat Storage
internal/core/exec/distributed.go, internal/persis/filedistributed/worker_heartbeat_store.go, internal/persis/filedistributed/stores_test.go
Introduces Get(ctx, workerID) method to fetch worker heartbeat records by ID, returning ErrWorkerHeartbeatNotFound when unavailable. Tests validate single-record retrieval after stale entries are deleted.
Distributed Run Repair Logic
internal/runtime/distributed_stale_run.go
New module implementing ConfirmAndRepairStaleDistributedRun that validates stale lease claims against worker heartbeat state, fetching lease and heartbeat records before deciding whether to atomically mark the run as failed with distributed-lease-expired reason.
Coordinator Stale Lease Reconciliation
internal/service/coordinator/handler.go, internal/service/coordinator/handler_test.go
Extends stale lease detection to optionally call repair logic when heartbeat store is configured, branching between direct failure (prior behavior) or confirmation via worker heartbeat data. Tests verify fresh heartbeats prevent failure and stale heartbeats trigger repair.
Frontend API Heartbeat Integration
internal/service/frontend/api/v1/api.go
Adds workerHeartbeatStore dependency and WithWorkerHeartbeatStore option constructor for injecting heartbeat store into API instance.
API Read-time Repair
internal/service/frontend/api/v1/dagruns.go, internal/service/frontend/api/v1/dags.go, internal/service/frontend/api/v1/proc_liveness_test.go
Embeds repair logic into GetDAGRunDetails and GetDAGDAGRunDetails read paths via repairConfirmedStaleDistributedRunOnRead, conditionally reconciling stale records at read time. Integration test validates API response reflects repaired failed status.
Zombie Detection Improvements
internal/service/scheduler/zombie_detector.go, internal/service/scheduler/zombie_detector_test.go
Extends error handling to treat ErrNoStatusData and ErrCorruptedStatusFile as ignorable (like ErrDAGRunIDNotFound); improves log message clarity. Adds test coverage for missing status data scenario.
Server Configuration
internal/cmd/context.go, internal/test/server.go
Wires worker heartbeat store into frontend API options during server initialization for both production and test contexts.

Sequence Diagram

sequenceDiagram
    participant Coord as Coordinator
    participant LStore as DAGRunLeaseStore
    participant HBStore as WorkerHeartbeatStore
    participant DStore as DAGRunStore
    participant Runtime as Runtime
    
    Coord->>LStore: Fetch lease for run
    alt Lease found
        LStore-->>Coord: Lease record
        Coord->>HBStore: Get heartbeat for worker
        alt Heartbeat found & fresh
            HBStore-->>Coord: Heartbeat record
            Coord->>Coord: Compare attempt keys/names
            alt Attempt matches in heartbeat
                Coord-->>Coord: Lease valid, skip repair
            else Attempt absent in heartbeat
                Coord->>Runtime: Call ConfirmAndRepairStaleDistributedRun
                Runtime->>DStore: CAS run status to Failed
                DStore-->>Runtime: Swap success/failure
                Runtime-->>Coord: Repair result
                Coord->>LStore: Delete stale lease
            end
        else Heartbeat missing/stale
            HBStore-->>Coord: Error or stale record
            Coord->>Runtime: Call ConfirmAndRepairStaleDistributedRun
            Runtime->>DStore: CAS run status to Failed
            DStore-->>Runtime: Swap success/failure
            Runtime-->>Coord: Repair result
            Coord->>LStore: Delete stale lease
        end
    else Lease not found
        LStore-->>Coord: Not found
        Coord-->>Coord: Treat as absent, skip repair
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • #2031 — Modifies coordinator stale-lease reconciliation and distributed-run identity matching logic with overlapping handler and lease-tracking changes.
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 21.05% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title directly summarizes the main change: adding worker heartbeat corroboration to stale distributed run repair logic, which is the primary theme across all modified files.
Linked Issues check ✅ Passed The PR implements all key objectives from issue #2040: it adds worker heartbeat corroboration for stale distributed run detection, includes repair logic in read paths, handles missing status data errors, and prevents repeated stale marking of zombie processes.
Out of Scope Changes check ✅ Passed All changes are scoped to fixing stale distributed run repair and zombie detection as specified in issue #2040. The PR adds heartbeat validation, repair logic, new error handling, and integration tests—all directly addressing the linked issue objectives.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/scheduler-stale-proc-missing-status

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
internal/service/scheduler/zombie_detector.go (1)

274-287: ⚠️ Potential issue | 🟡 Minor

Silent removal on ErrCorruptedStatusFile may hide data-integrity issues.

ErrCorruptedStatusFile is semantically different from ErrDAGRunIDNotFound / ErrNoStatusData: the status file exists but is unparsable (see internal/persis/filedagrun/attempt.go returning it on io.EOF). Elsewhere in the codebase this condition is logged at error level (e.g., internal/service/scheduler/queue_processor.go — "Status file is corrupted, marking as invalid"), whereas here it is collapsed under the same Info-level "missing persisted DAG run state" message and the proc entry is silently removed.

Two suggestions:

  • Log the corrupted-file case at Warn/Error (or branch the message) so operators see an actionable signal instead of an info line that misattributes the cause as "missing".
  • Confirm that silently dropping the proc entry without surfacing a stronger signal is the desired behavior for a corrupted (not absent) status file — if so, a comment justifying it would help future readers.
📝 Possible refactor
 func (z *ZombieDetector) cleanupOrphanedStaleEntry(ctx context.Context, entry exec.ProcEntry, attemptKey string, findErr error) error {
-	if !errors.Is(findErr, exec.ErrDAGRunIDNotFound) &&
-		!errors.Is(findErr, exec.ErrNoStatusData) &&
-		!errors.Is(findErr, exec.ErrCorruptedStatusFile) {
+	switch {
+	case errors.Is(findErr, exec.ErrDAGRunIDNotFound), errors.Is(findErr, exec.ErrNoStatusData):
+		logger.Info(ctx, "Removing orphaned stale proc entry with missing persisted DAG run state", tag.Error(findErr))
+	case errors.Is(findErr, exec.ErrCorruptedStatusFile):
+		logger.Warn(ctx, "Removing orphaned stale proc entry with corrupted persisted DAG run state", tag.Error(findErr))
+	default:
 		return fmt.Errorf("find attempt: %w", findErr)
 	}
-
-	logger.Info(ctx, "Removing orphaned stale proc entry with missing persisted DAG run state", tag.Error(findErr))
 	z.clearAttemptState(attemptKey)
 	if err := z.procStore.RemoveIfStale(ctx, entry); err != nil {
 		return fmt.Errorf("remove orphaned stale proc: %w", err)
 	}
 	return nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/scheduler/zombie_detector.go` around lines 274 - 287, In
cleanupOrphanedStaleEntry, treat errors.Is(findErr, exec.ErrCorruptedStatusFile)
separately from the missing-state cases: instead of collapsing it under the Info
log, branch on ErrCorruptedStatusFile and emit a higher-severity log
(logger.Warn or logger.Error) with the error and context, then decide whether to
proceed with z.clearAttemptState(attemptKey) and z.procStore.RemoveIfStale(ctx,
entry) or bail with a wrapped error; if you choose to keep removing the proc
entry despite corruption, add a brief comment explaining that choice and why
silent removal is acceptable, referencing the function cleanupOrphanedStaleEntry
and the calls to z.clearAttemptState and z.procStore.RemoveIfStale so reviewers
can find the logic easily.
internal/service/frontend/api/v1/dags.go (1)

891-895: ⚠️ Potential issue | 🔴 Critical

Inconsistent nil-checking for repairConfirmedStaleDistributedRunOnRead return value across call sites.

The "latest" branch at lines 860–862 stores the return value in a pointer and guards against nil before dereferencing. The dagRunId branch at line 891 overwrites dagStatus unconditionally and dereferences it at line 894 without a nil check. Since repairConfirmedStaleDistributedRunOnRead explicitly returns the input status (which can be nil) when certain conditions are met, both branches should apply consistent nil-checking for defensive safety.

Align both branches on the same contract—either guard both or ensure the function's return contract precludes nil.

Suggested fix
-	dagStatus = a.repairConfirmedStaleDistributedRunOnRead(ctx, dagStatus)
+	if repaired := a.repairConfirmedStaleDistributedRunOnRead(ctx, dagStatus); repaired != nil {
+		dagStatus = repaired
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/api/v1/dags.go` around lines 891 - 895, The call to
repairConfirmedStaleDistributedRunOnRead can return the original dagStatus
(which may be nil) but the dagRunId branch overwrites dagStatus and passes it to
toDAGRunDetailsWithSpecSource without a nil check; change the dagRunId branch to
mirror the "latest" branch by assigning the result to a pointer (dagStatus =
a.repairConfirmedStaleDistributedRunOnRead(ctx, dagStatus)), then check for nil
before dereferencing or calling toDAGRunDetailsWithSpecSource, and return an
appropriate response/error if nil; alternatively, adjust
repairConfirmedStaleDistributedRunOnRead to guarantee a non-nil return and
update both call sites to the same contract—ensure you reference
repairConfirmedStaleDistributedRunOnRead, dagStatus, and
toDAGRunDetailsWithSpecSource when making the change.
🧹 Nitpick comments (1)
internal/service/scheduler/zombie_detector_test.go (1)

230-248: Consider also covering the ErrCorruptedStatusFile branch.

cleanupOrphanedStaleEntry now ignores three distinct errors (ErrDAGRunIDNotFound, ErrNoStatusData, ErrCorruptedStatusFile), but only the first two are tested. A test asserting that ErrCorruptedStatusFile triggers RemoveIfStale (rather than propagating) would lock in the new behavior and prevent regressions if the ignore-list is later narrowed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/scheduler/zombie_detector_test.go` around lines 230 - 248,
Add a new test mirroring
TestZombieDetectorDetectAndCleanZombies_StaleEntryWithMissingStatusIsRemoved but
make dagRunStore.FindAttempt return exec.ErrCorruptedStatusFile instead of
exec.ErrNoStatusData and assert procStore.RemoveIfStale is called; specifically,
in the test for cleanupOrphanedStaleEntry behavior create the same entry via
testRootProcEntry, set up procStore.ListAllEntries to return it, set
dagRunStore.FindAttempt to return (nil, exec.ErrCorruptedStatusFile), expect
procStore.RemoveIfStale(entry) to be invoked once, call
detector.detectAndCleanZombies(ctx) (created via NewZombieDetector) and assert
expectations on procStore and dagRunStore to lock in the ignored
ErrCorruptedStatusFile branch.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/service/coordinator/handler_test.go`:
- Around line 1954-1957: The test is asserting NotNil on a value-type return
(exec.DAGRunLease) which is always non-nil; update the assertion to check
meaningful fields instead: after calling leaseStore.Get(ctx, attemptKey) in the
test, replace assert.NotNil(t, lease) with assertions that verify
lease.AttemptKey (or lease.WorkerID) equals the expected value that was
upserted; reference the leaseStore.Get call and the
exec.DAGRunLease.AttemptKey/WorkerID fields when making the replacement so the
test validates actual contents.

In `@internal/service/coordinator/handler.go`:
- Around line 2308-2335: The code path in handler.go where
confirmAndRepairStaleDistributedRun returns repaired==false leaves the lease
missing and causes healthy workers to be cancelled; before returning from the
no-repair branch you should upsert/recreate the distributed lease using the
reconciled status and worker info (use the AttemptID/WorkerID and
reconciledStatus.DAGRun() as the source) so the lease exists for subsequent
RunHeartbeat.Touch calls; locate the block around
confirmAndRepairStaleDistributedRun and add a call to the existing lease upsert
logic (or implement an upsert helper) before the continue, and apply the same
change at the similar block around lines flagged (the other occurrence
mentioned: 2404-2432) to ensure both paths restore the lease when the heartbeat
still reports the attempt.

In `@internal/service/frontend/api/v1/dagruns.go`:
- Around line 1879-1889: In repairConfirmedStaleDistributedRunOnRead, the
fallback worker passed to runtime.ConfirmAndRepairStaleDistributedRun uses
status.WorkerID which can be empty for acked-but-never-reported runs; before
calling ConfirmAndRepairStaleDistributedRun, if status.WorkerID is empty, query
the claim lease from a.dagRunLeaseStore for this run/attempt (use the store's
read/get method) and derive the fallback worker ID from the lease record (e.g.,
lease.Owner or lease.WorkerID), then pass that derived worker ID instead of the
empty status.WorkerID; keep using status.AttemptID and existing parameters and
only fall back to the lease when status.WorkerID is missing.
- Around line 1852-1854: The repair function
repairConfirmedStaleDistributedRunOnRead can persist state changes and must only
run after enforcing visibility, so move the requireWorkspaceVisible(ctx,
statusWorkspaceName(status)) check to occur before any call to
repairConfirmedStaleDistributedRunOnRead(ctx, status), returning the error if
visibility fails and only then invoking the repair; apply the same reorder for
every other place in this file where repairConfirmedStaleDistributedRunOnRead is
called (ensure both branches that load status perform requireWorkspaceVisible
first) so unauthorized callers cannot trigger state changes.

---

Outside diff comments:
In `@internal/service/frontend/api/v1/dags.go`:
- Around line 891-895: The call to repairConfirmedStaleDistributedRunOnRead can
return the original dagStatus (which may be nil) but the dagRunId branch
overwrites dagStatus and passes it to toDAGRunDetailsWithSpecSource without a
nil check; change the dagRunId branch to mirror the "latest" branch by assigning
the result to a pointer (dagStatus =
a.repairConfirmedStaleDistributedRunOnRead(ctx, dagStatus)), then check for nil
before dereferencing or calling toDAGRunDetailsWithSpecSource, and return an
appropriate response/error if nil; alternatively, adjust
repairConfirmedStaleDistributedRunOnRead to guarantee a non-nil return and
update both call sites to the same contract—ensure you reference
repairConfirmedStaleDistributedRunOnRead, dagStatus, and
toDAGRunDetailsWithSpecSource when making the change.

In `@internal/service/scheduler/zombie_detector.go`:
- Around line 274-287: In cleanupOrphanedStaleEntry, treat errors.Is(findErr,
exec.ErrCorruptedStatusFile) separately from the missing-state cases: instead of
collapsing it under the Info log, branch on ErrCorruptedStatusFile and emit a
higher-severity log (logger.Warn or logger.Error) with the error and context,
then decide whether to proceed with z.clearAttemptState(attemptKey) and
z.procStore.RemoveIfStale(ctx, entry) or bail with a wrapped error; if you
choose to keep removing the proc entry despite corruption, add a brief comment
explaining that choice and why silent removal is acceptable, referencing the
function cleanupOrphanedStaleEntry and the calls to z.clearAttemptState and
z.procStore.RemoveIfStale so reviewers can find the logic easily.

---

Nitpick comments:
In `@internal/service/scheduler/zombie_detector_test.go`:
- Around line 230-248: Add a new test mirroring
TestZombieDetectorDetectAndCleanZombies_StaleEntryWithMissingStatusIsRemoved but
make dagRunStore.FindAttempt return exec.ErrCorruptedStatusFile instead of
exec.ErrNoStatusData and assert procStore.RemoveIfStale is called; specifically,
in the test for cleanupOrphanedStaleEntry behavior create the same entry via
testRootProcEntry, set up procStore.ListAllEntries to return it, set
dagRunStore.FindAttempt to return (nil, exec.ErrCorruptedStatusFile), expect
procStore.RemoveIfStale(entry) to be invoked once, call
detector.detectAndCleanZombies(ctx) (created via NewZombieDetector) and assert
expectations on procStore and dagRunStore to lock in the ignored
ErrCorruptedStatusFile branch.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a264d583-b751-491d-b1c0-92ca436e7e1c

📥 Commits

Reviewing files that changed from the base of the PR and between dabaa11 and 92ff0f6.

📒 Files selected for processing (14)
  • internal/cmd/context.go
  • internal/core/exec/distributed.go
  • internal/persis/filedistributed/stores_test.go
  • internal/persis/filedistributed/worker_heartbeat_store.go
  • internal/runtime/distributed_stale_run.go
  • internal/service/coordinator/handler.go
  • internal/service/coordinator/handler_test.go
  • internal/service/frontend/api/v1/api.go
  • internal/service/frontend/api/v1/dagruns.go
  • internal/service/frontend/api/v1/dags.go
  • internal/service/frontend/api/v1/proc_liveness_test.go
  • internal/service/scheduler/zombie_detector.go
  • internal/service/scheduler/zombie_detector_test.go
  • internal/test/server.go

Comment on lines +1954 to +1957
lease, err := leaseStore.Get(ctx, attemptKey)
require.NoError(t, err)
assert.NotNil(t, lease)
})
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Nit: assert.NotNil on a struct value is a no-op.

leaseStore.Get returns exec.DAGRunLease by value (see reconcileDistributedLease's parameter shape), so assert.NotNil(t, lease) always succeeds regardless of contents. The preceding require.NoError already proves retrieval succeeded — either drop the line or assert a meaningful field (e.g., the lease's AttemptKey/WorkerID matches what was upserted).

♻️ Suggested tweak
 		lease, err := leaseStore.Get(ctx, attemptKey)
 		require.NoError(t, err)
-		assert.NotNil(t, lease)
+		assert.Equal(t, attemptKey, lease.AttemptKey)
+		assert.Equal(t, "worker-1", lease.WorkerID)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
lease, err := leaseStore.Get(ctx, attemptKey)
require.NoError(t, err)
assert.NotNil(t, lease)
})
lease, err := leaseStore.Get(ctx, attemptKey)
require.NoError(t, err)
assert.Equal(t, attemptKey, lease.AttemptKey)
assert.Equal(t, "worker-1", lease.WorkerID)
})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/coordinator/handler_test.go` around lines 1954 - 1957, The
test is asserting NotNil on a value-type return (exec.DAGRunLease) which is
always non-nil; update the assertion to check meaningful fields instead: after
calling leaseStore.Get(ctx, attemptKey) in the test, replace assert.NotNil(t,
lease) with assertions that verify lease.AttemptKey (or lease.WorkerID) equals
the expected value that was upserted; reference the leaseStore.Get call and the
exec.DAGRunLease.AttemptKey/WorkerID fields when making the replacement so the
test validates actual contents.

Comment on lines +2308 to +2335
reconciledStatus, repaired, err := h.confirmAndRepairStaleDistributedRun(ctx, status, leaseState.attemptID, status.WorkerID)
if err != nil {
logger.Error(ctx, "Failed to confirm stale orphaned distributed run",
tag.DAG(status.Name),
tag.RunID(status.DAGRunID),
tag.AttemptKey(leaseState.attemptKey),
tag.Error(err),
)
continue
}
if repaired {
h.deleteDistributedTracking(ctx, context.WithoutCancel(ctx), status.DAGRun(), leaseState.attemptKey,
"Failed to delete orphaned distributed lease after confirmed failure",
"Failed to delete orphaned active distributed run after confirmed failure",
)
continue
}
if reconciledStatus == nil {
continue
}
if reconciledStatus.AttemptID != leaseState.attemptID || (!reconciledStatus.Status.IsActive() && reconciledStatus.Status != core.NotStarted) {
h.deleteDistributedTracking(ctx, context.WithoutCancel(ctx), status.DAGRun(), leaseState.attemptKey,
"Failed to delete superseded orphaned distributed lease after reconciliation",
"Failed to delete superseded orphaned active distributed run after reconciliation",
)
continue
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Recreate the lease when heartbeat confirmation keeps the run alive.

When confirmAndRepairStaleDistributedRun returns repaired=false because the worker heartbeat still reports the attempt, these branches leave a missing lease missing. The next RunHeartbeat will still fail Touch and add the attempt to CancelledRuns, so a healthy worker gets told to cancel itself. Please upsert the lease from the reconciled status/worker before leaving the no-repair path.

Also applies to: 2404-2432

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/coordinator/handler.go` around lines 2308 - 2335, The code
path in handler.go where confirmAndRepairStaleDistributedRun returns
repaired==false leaves the lease missing and causes healthy workers to be
cancelled; before returning from the no-repair branch you should upsert/recreate
the distributed lease using the reconciled status and worker info (use the
AttemptID/WorkerID and reconciledStatus.DAGRun() as the source) so the lease
exists for subsequent RunHeartbeat.Touch calls; locate the block around
confirmAndRepairStaleDistributedRun and add a call to the existing lease upsert
logic (or implement an upsert helper) before the continue, and apply the same
change at the similar block around lines flagged (the other occurrence
mentioned: 2404-2432) to ensure both paths restore the lease when the heartbeat
still reports the attempt.

Comment on lines 1852 to 1854
status = a.repairConfirmedStaleDistributedRunOnRead(ctx, status)
if err := a.requireWorkspaceVisible(ctx, statusWorkspaceName(status)); err != nil {
return api.GetDAGRunDetails200JSONResponse{}, err
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Authorize before running read-time repair.

repairConfirmedStaleDistributedRunOnRead can persist a FAILED status, so calling it before requireWorkspaceVisible lets a caller who cannot view this DAG run still trigger state changes just by hitting the details endpoint. Please gate the repair behind the visibility check in both branches.

Suggested change
-		status = a.repairConfirmedStaleDistributedRunOnRead(ctx, status)
-		if err := a.requireWorkspaceVisible(ctx, statusWorkspaceName(status)); err != nil {
+		if err := a.requireWorkspaceVisible(ctx, statusWorkspaceName(status)); err != nil {
 			return api.GetDAGRunDetails200JSONResponse{}, err
 		}
+		status = a.repairConfirmedStaleDistributedRunOnRead(ctx, status)
@@
-	dagStatus = a.repairConfirmedStaleDistributedRunOnRead(ctx, dagStatus)
-	if err := a.requireWorkspaceVisible(ctx, statusWorkspaceName(dagStatus)); err != nil {
+	if err := a.requireWorkspaceVisible(ctx, statusWorkspaceName(dagStatus)); err != nil {
 		return api.GetDAGRunDetails200JSONResponse{}, err
 	}
+	dagStatus = a.repairConfirmedStaleDistributedRunOnRead(ctx, dagStatus)

Based on learnings: Single‑resource DAG and DAG‑run endpoints intentionally do not accept a workspace query parameter; the backend infers workspace from the loaded target and enforces visibility/permissions server‑side.

Also applies to: 1870-1872, 1879-1903

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/api/v1/dagruns.go` around lines 1852 - 1854, The
repair function repairConfirmedStaleDistributedRunOnRead can persist state
changes and must only run after enforcing visibility, so move the
requireWorkspaceVisible(ctx, statusWorkspaceName(status)) check to occur before
any call to repairConfirmedStaleDistributedRunOnRead(ctx, status), returning the
error if visibility fails and only then invoking the repair; apply the same
reorder for every other place in this file where
repairConfirmedStaleDistributedRunOnRead is called (ensure both branches that
load status perform requireWorkspaceVisible first) so unauthorized callers
cannot trigger state changes.

Comment on lines +1879 to +1889
func (a *API) repairConfirmedStaleDistributedRunOnRead(ctx context.Context, status *exec.DAGRunStatus) *exec.DAGRunStatus {
if status == nil || a.dagRunLeaseStore == nil || a.workerHeartbeatStore == nil {
return status
}

reconciled, _, err := runtime.ConfirmAndRepairStaleDistributedRun(ctx, runtime.DistributedRunRepairConfig{
DAGRunStore: a.dagRunStore,
DAGRunLeaseStore: a.dagRunLeaseStore,
WorkerHeartbeatStore: a.workerHeartbeatStore,
StaleLeaseThreshold: a.leaseStaleThreshold,
}, status, status.AttemptID, status.WorkerID)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This read path still skips acked runs that never recorded WorkerID.

Here the fallback worker is status.WorkerID, but the initial distributed NotStarted status can be persisted without any worker ID while the claim lease already knows the remote worker. In that state ConfirmAndRepairStaleDistributedRun exits before it even reads the lease, so the details endpoint cannot repair the "acked but never reported status" case. Please derive the fallback worker from the lease when the saved status is still missing it.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/api/v1/dagruns.go` around lines 1879 - 1889, In
repairConfirmedStaleDistributedRunOnRead, the fallback worker passed to
runtime.ConfirmAndRepairStaleDistributedRun uses status.WorkerID which can be
empty for acked-but-never-reported runs; before calling
ConfirmAndRepairStaleDistributedRun, if status.WorkerID is empty, query the
claim lease from a.dagRunLeaseStore for this run/attempt (use the store's
read/get method) and derive the fallback worker ID from the lease record (e.g.,
lease.Owner or lease.WorkerID), then pass that derived worker ID instead of the
empty status.WorkerID; keep using status.AttemptID and existing parameters and
only fall back to the lease when status.WorkerID is missing.

@yottahmd yottahmd merged commit a2055d9 into main Apr 27, 2026
10 checks passed
@yottahmd yottahmd deleted the fix/scheduler-stale-proc-missing-status branch April 27, 2026 10:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Unable to stop/cancel running jobs after worker terminated by k8s

1 participant