Skip to content

fix: recover stale queued distributed runs stuck after worker ack#1857

Merged
yottahmd merged 11 commits intomainfrom
fix/1837-queued-run-stuck
Mar 26, 2026
Merged

fix: recover stale queued distributed runs stuck after worker ack#1857
yottahmd merged 11 commits intomainfrom
fix/1837-queued-run-stuck

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Mar 26, 2026

Summary

  • Fix distributed runs that get stuck in Queued status when a worker acknowledges a task claim but crashes or abandons execution before writing an initial status
  • Extend markLeaseRunFailed to loop over Running, NotStarted, and Queued expected statuses when performing compare-and-swap, so the coordinator's zombie detector can recover runs in any active state
  • Improve the error message to clarify the failure context ("accepted the task claim but stopped reporting")
  • Add SetAfterTaskAckHook on the worker to simulate post-ack abandonment in tests
  • Add integration and unit tests covering the Queued recovery path alongside existing Running and NotStarted cases

Testing

  • make test TEST_TARGET=./internal/service/coordinator/... -count=1
  • make test TEST_TARGET=./internal/intg/distr/... -count=1 -run TestDistributedRun_AckedTaskWithoutInitialStatus
  • make test TEST_TARGET=./internal/service/worker/... -count=1

Closes #1837 #1858

Summary by CodeRabbit

  • Bug Fixes

    • More reliable detection and cleanup of stale distributed task leases; failed runs are cleaned up and clearer failure messages shown.
    • Improved handling of late or superseded status updates to avoid executing after stale cleanup.
  • New Features

    • New hidden attempt-id support surfaced in CLI start/retry flows so executions can target a specific attempt.
    • Scheduler now respects DAG suspension: scheduler-managed catchups/starts are skipped and suspended catchups advance watermarks.
  • User-facing CLI

    • start/retry commands accept an attempt identifier to bind execution to a prepared attempt.
  • Other

    • Option to list full DAG run history (bypass default “today” window) when querying run statuses.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 26, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 236d1f8b-1370-4fba-bb6d-c1452674025c

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough
📝 Walkthrough
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 26.96% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly describes the main change: fixing recovery of stale queued distributed runs stuck after worker acknowledgment, which is the core objective of this PR.
Linked Issues check ✅ Passed The PR addresses all coding requirements from #1837: extends zombie detection to recover Queued runs via improved markLeaseRunFailed [#1837], adds SetAfterTaskAckHook for test simulation [#1837], and implements comprehensive integration/unit tests [#1837].
Out of Scope Changes check ✅ Passed All changes are directly related to fixing stale queued distributed run recovery: distributed run indexing, attempt preparation refactoring, lease-aware status handling, stale detection, suspension handling, and supporting test infrastructure are all necessary for the fix.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/1837-queued-run-stuck

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
internal/intg/distr/zombie_recovery_test.go (1)

54-54: Increased scheduler timeout to 90 seconds.

This change increases the test timeout significantly. While this may be necessary for CI environments with variable performance, consider if this indicates an underlying issue with the test or if the original 30-second timeout was too aggressive.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/intg/distr/zombie_recovery_test.go` at line 54, The test bumped the
scheduler timeout from 30s to 90s by calling f.startScheduler(90 * time.Second),
which masks flakiness; change this to a deterministic fix: restore a reasonable
default (e.g., 30 * time.Second) and either (a) make the timeout a named
constant (SchedulerStartTimeout) that can be overridden by CI via an env var or
test flag, or (b) avoid a large static sleep by replacing the single long
timeout usage in the test with a polling/wait helper that retries until the
scheduler is confirmed running (use existing wait-for-condition helper or
implement a short-interval loop with context deadline). Update the call to
f.startScheduler(...) to use the constant or the wait helper and ensure any CI
override is documented in the test setup.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@internal/intg/distr/zombie_recovery_test.go`:
- Line 54: The test bumped the scheduler timeout from 30s to 90s by calling
f.startScheduler(90 * time.Second), which masks flakiness; change this to a
deterministic fix: restore a reasonable default (e.g., 30 * time.Second) and
either (a) make the timeout a named constant (SchedulerStartTimeout) that can be
overridden by CI via an env var or test flag, or (b) avoid a large static sleep
by replacing the single long timeout usage in the test with a polling/wait
helper that retries until the scheduler is confirmed running (use existing
wait-for-condition helper or implement a short-interval loop with context
deadline). Update the call to f.startScheduler(...) to use the constant or the
wait helper and ensure any CI override is documented in the test setup.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f25b0ef4-4e68-4c3c-9417-6476bef710c7

📥 Commits

Reviewing files that changed from the base of the PR and between d15a214 and aeb62fc.

📒 Files selected for processing (5)
  • internal/intg/distr/fixtures_test.go
  • internal/intg/distr/zombie_recovery_test.go
  • internal/service/coordinator/handler.go
  • internal/service/coordinator/handler_test.go
  • internal/service/worker/worker.go

@yottahmd
Copy link
Copy Markdown
Collaborator Author

@coderabbitai full review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 26, 2026

✅ Actions performed

Full review triggered.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (2)
internal/runtime/manager.go (1)

234-242: Avoid the unconditional DAG read on root status lookups.

attempt.ReadDAG now runs for every root GetSavedStatus call, but the result is only consumed when resolving a local Running status. Gating that read behind st.Status == core.Running && isLocalWorkerID(st.WorkerID) removes extra filesystem I/O from the saved-status path and avoids silently discarding unrelated DAG-read failures.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/runtime/manager.go` around lines 234 - 242, The code unconditionally
calls attempt.ReadDAG even when its result is only used for local running status
checks; move the ReadDAG call so it only executes when st.Status == core.Running
&& isLocalWorkerID(st.WorkerID) and dagRun.ID == st.DAGRunID, so change the
block around dagRun.ID == st.DAGRunID to first check st.Status and
isLocalWorkerID before invoking attempt.ReadDAG, then pass the obtained dag into
m.resolveRunningStatus; this avoids unnecessary filesystem I/O and prevents
unrelated DAG-read errors from occurring on non-local or non-running
saved-status lookups.
internal/persis/filedistributed/dispatch_task_store.go (1)

174-225: These outstanding checks now serialize on a full store scan.

Both helpers grab s.mu, recycle claims, then sort and JSON-decode every file under pending/ and claims/. If callers invoke them repeatedly during queue admission, scheduling latency will grow linearly with backlog and ClaimNext will contend behind the same mutex. Consider maintaining per-queue / per-attempt indexes, or reusing a single scanned snapshot across the caller.

Also applies to: 288-312

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/persis/filedistributed/dispatch_task_store.go` around lines 174 -
225, Both CountOutstandingByQueue and HasOutstandingAttempt hold s.mu and call
recycleExpiredClaims then scanOutstandingLocked which JSON-decodes every file
under pending/ and claims/, causing full-store serialization; change the
implementation to avoid long-held lock by either (a) create a short-locked
snapshot: while holding s.mu only call recycleExpiredClaims and collect the list
of filenames/metadata (not full decode) or copy necessary in-memory indexes,
then release s.mu and perform the expensive JSON-decode/scan on that snapshot to
compute counts/flags; or (b) maintain and update lightweight in-memory indexes
(per-queue outstanding counts and attemptKey map) inside mutating methods (e.g.,
ClaimNext, Add/Remove helpers) so CountOutstandingByQueue and
HasOutstandingAttempt can read those indexes under s.mu cheaply; update
references to scanOutstandingLocked, recycleExpiredClaims,
CountOutstandingByQueue, HasOutstandingAttempt and ClaimNext accordingly so
scans are no longer performed while holding the main mutex.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/core/exec/distributed.go`:
- Around line 195-200: The current code fabricates an attempt key by falling
back from status.Root to status.DAGRun() which produces incorrect keys for
legacy sub-DAG statuses; update the logic in the block that computes the attempt
key (the lines using root, status.Root, status.DAGRun() and GenerateAttemptKey)
so that if status.Root is zero/unknown you return an empty string instead of
using status.DAGRun(); this prevents LeaseMatchesStatus from rejecting valid
leases on attempt-key mismatch—alternatively, thread an explicit root reference
from the caller into the function if callers can supply the true root instead of
constructing it here.

In `@internal/persis/filedistributed/active_distributed_run_store.go`:
- Around line 99-104: In ListAll (active_distributed_run_store.go) the error
path that calls readJSONFile(path, &record) currently returns on any non-ENOENT
error; change this to log or collect the decode/read error and continue the loop
so a single corrupted/truncated JSON file doesn't abort the entire
scan—specifically, inside the block handling readJSONFile errors, replace the
return nil, err with a non-fatal handling (e.g., processLogger.Warnf or similar)
that records path and err and then continue iterating, ensuring ListAll still
returns the successfully read records.

In `@internal/persis/filedistributed/stores_test.go`:
- Around line 156-208: The test is flaky because ClaimNext(ctx,
exec.DispatchTaskClaim{...}) has no queue filter and may claim either enqueued
task; fix by making the claim deterministic: add a queue filter to the
DispatchTaskClaim (e.g. set QueueName: "queue-a" or the appropriate queue filter
field on the exec.DispatchTaskClaim passed into store.ClaimNext) so ClaimNext
will deterministically select the task from "queue-a" and the subsequent
CountOutstandingByQueue / DeleteClaim / HasOutstandingAttempt assertions remain
valid.

In `@internal/service/coordinator/handler_test.go`:
- Around line 511-513: The test calls dispatchStore.CountOutstandingByQueue with
an empty string which is queue-scoped; change the call to use the actual test
queue name (e.g., "test-queue" or the variable used to create tasks) so it
asserts outstanding tasks for that queue specifically. Update the invocation of
CountOutstandingByQueue(...) and the subsequent require/assert to check the
count for "test-queue" (referencing CountOutstandingByQueue and the test
function in handler_test.go) instead of "".

In `@internal/service/coordinator/handler.go`:
- Around line 1127-1137: The current early-return treats any identical Status
enum as a noop and skips post-write side effects; change the logic in the
ReportStatus handling around h.remoteStatusDecision so that identical terminal
statuses are only considered noop when the full incoming payload (including
node/chat fields and any metadata used by persistChatMessages, lease/index sync
and Write) is unchanged, or alternatively perform the required post-write side
effects even on the noop path; update the block that currently returns on noop
(after calling h.remoteStatusDecision and h.logRejectedRemoteStatusUpdate) to
compare the full payload (not just latest.Status == incoming.Status) or invoke
the same Write/persistChatMessages and lease/index sync routines when status is
terminal but other payload pieces differ, ensuring follow-up terminal reports
with extra node/chat data still persist and trigger cleanup retries.
- Around line 1784-1789: The current early return when
h.activeDistributedRunStore != nil prevents the orphaned-run detector from
running; change the logic in the handler so that after calling
detectIndexedDistributedStatuses(ctx, now) you still invoke
detectOrphanedDistributedStatuses(ctx, now) as a fallback (i.e., remove the
return or call detectOrphanedDistributedStatuses unconditionally after
detectIndexedDistributedStatuses) so runs that lost their lease or whose index
rows failed to materialize (see syncActiveDistributedRunFromStatus) are still
detected; update the block that references h.activeDistributedRunStore,
detectIndexedDistributedStatuses, and detectOrphanedDistributedStatuses
accordingly.
- Around line 592-629: markPreparedAttemptDispatchFailed currently marks
prepared.attempt as Failed but leaves the opened handle cached in
h.openAttempts, which can leak resources; update
markPreparedAttemptDispatchFailed to always evict and close the prepared attempt
handle (prepared.attempt) from the coordinator's cache (h.openAttempts) whenever
prepared != nil && prepared.newlyCreated && prepared.attempt != nil and you are
exiting the function (both on early returns and after successfully writing the
failed status), by removing the entry keyed by task.AttemptKey from
h.openAttempts and calling the appropriate close/release method on
prepared.attempt; ensure you use the same mutex/lock that protects
h.openAttempts (e.g., h.openAttemptsMu) when mutating the map so there are no
races.

---

Nitpick comments:
In `@internal/persis/filedistributed/dispatch_task_store.go`:
- Around line 174-225: Both CountOutstandingByQueue and HasOutstandingAttempt
hold s.mu and call recycleExpiredClaims then scanOutstandingLocked which
JSON-decodes every file under pending/ and claims/, causing full-store
serialization; change the implementation to avoid long-held lock by either (a)
create a short-locked snapshot: while holding s.mu only call
recycleExpiredClaims and collect the list of filenames/metadata (not full
decode) or copy necessary in-memory indexes, then release s.mu and perform the
expensive JSON-decode/scan on that snapshot to compute counts/flags; or (b)
maintain and update lightweight in-memory indexes (per-queue outstanding counts
and attemptKey map) inside mutating methods (e.g., ClaimNext, Add/Remove
helpers) so CountOutstandingByQueue and HasOutstandingAttempt can read those
indexes under s.mu cheaply; update references to scanOutstandingLocked,
recycleExpiredClaims, CountOutstandingByQueue, HasOutstandingAttempt and
ClaimNext accordingly so scans are no longer performed while holding the main
mutex.

In `@internal/runtime/manager.go`:
- Around line 234-242: The code unconditionally calls attempt.ReadDAG even when
its result is only used for local running status checks; move the ReadDAG call
so it only executes when st.Status == core.Running &&
isLocalWorkerID(st.WorkerID) and dagRun.ID == st.DAGRunID, so change the block
around dagRun.ID == st.DAGRunID to first check st.Status and isLocalWorkerID
before invoking attempt.ReadDAG, then pass the obtained dag into
m.resolveRunningStatus; this avoids unnecessary filesystem I/O and prevents
unrelated DAG-read errors from occurring on non-local or non-running
saved-status lookups.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 8940f45b-c3a8-4db6-be8c-b8e0efb4e5a4

📥 Commits

Reviewing files that changed from the base of the PR and between d15a214 and ab7329b.

📒 Files selected for processing (41)
  • internal/cmd/context.go
  • internal/cmd/coord.go
  • internal/cmd/exec.go
  • internal/cmd/retry.go
  • internal/cmd/start.go
  • internal/cmd/startall.go
  • internal/cmd/worker_attempt.go
  • internal/core/exec/dagrun.go
  • internal/core/exec/dagrun_test.go
  • internal/core/exec/distributed.go
  • internal/intg/distr/fixtures_test.go
  • internal/intg/distr/zombie_recovery_test.go
  • internal/persis/filedagrun/store.go
  • internal/persis/filedagrun/store_test.go
  • internal/persis/filedistributed/active_distributed_run_store.go
  • internal/persis/filedistributed/dispatch_task_store.go
  • internal/persis/filedistributed/stores_test.go
  • internal/runtime/agent/agent.go
  • internal/runtime/manager.go
  • internal/runtime/manager_test.go
  • internal/runtime/remote/status_pusher.go
  • internal/runtime/remote/status_pusher_test.go
  • internal/runtime/stale_run.go
  • internal/runtime/subcmd.go
  • internal/runtime/subcmd_test.go
  • internal/service/coordinator/handler.go
  • internal/service/coordinator/handler_test.go
  • internal/service/scheduler/queue_processor.go
  • internal/service/scheduler/queue_processor_test.go
  • internal/service/scheduler/retry_scanner.go
  • internal/service/scheduler/retry_scanner_test.go
  • internal/service/scheduler/scheduler.go
  • internal/service/scheduler/suspension.go
  • internal/service/scheduler/tick_planner.go
  • internal/service/scheduler/tick_planner_test.go
  • internal/service/worker/poller.go
  • internal/service/worker/poller_test.go
  • internal/service/worker/worker.go
  • internal/service/worker/worker_test.go
  • internal/test/coordinator.go
  • internal/test/helper.go

Comment thread internal/core/exec/distributed.go
Comment thread internal/persis/filedistributed/active_distributed_run_store.go
Comment thread internal/persis/filedistributed/stores_test.go
Comment thread internal/service/coordinator/handler_test.go Outdated
Comment thread internal/service/coordinator/handler.go
Comment thread internal/service/coordinator/handler.go Outdated
Comment thread internal/service/coordinator/handler.go
@yottahmd yottahmd merged commit d127f42 into main Mar 26, 2026
5 checks passed
@yottahmd yottahmd deleted the fix/1837-queued-run-stuck branch March 26, 2026 15:49
@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 26, 2026

Codecov Report

❌ Patch coverage is 59.39249% with 508 lines in your changes missing coverage. Please review.
✅ Project coverage is 69.06%. Comparing base (d15a214) to head (26c324e).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
internal/service/coordinator/handler.go 55.68% 210 Missing and 67 partials ⚠️
internal/service/scheduler/queue_processor.go 59.50% 32 Missing and 17 partials ⚠️
internal/cmd/worker_attempt.go 14.00% 42 Missing and 1 partial ⚠️
internal/cmd/start.go 16.21% 28 Missing and 3 partials ⚠️
...rnal/persis/filedistributed/dispatch_task_store.go 54.16% 13 Missing and 9 partials ⚠️
internal/cmd/retry.go 16.66% 19 Missing and 1 partial ⚠️
internal/core/exec/distributed.go 53.48% 10 Missing and 10 partials ⚠️
...is/filedistributed/active_distributed_run_store.go 71.42% 10 Missing and 8 partials ⚠️
internal/runtime/agent/agent.go 53.84% 5 Missing and 1 partial ⚠️
internal/service/scheduler/suspension.go 73.91% 4 Missing and 2 partials ⚠️
... and 5 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1857      +/-   ##
==========================================
- Coverage   69.26%   69.06%   -0.20%     
==========================================
  Files         440      443       +3     
  Lines       53457    54478    +1021     
==========================================
+ Hits        37027    37627     +600     
- Misses      13206    13525     +319     
- Partials     3224     3326     +102     
Files with missing lines Coverage Δ
internal/cmd/context.go 70.55% <100.00%> (+0.30%) ⬆️
internal/cmd/coord.go 77.20% <100.00%> (+0.34%) ⬆️
internal/cmd/exec.go 64.53% <100.00%> (ø)
internal/cmd/startall.go 51.04% <100.00%> (+0.25%) ⬆️
internal/core/exec/dagrun.go 86.20% <100.00%> (+0.75%) ⬆️
internal/persis/filedagrun/store.go 73.46% <100.00%> (+0.55%) ⬆️
internal/runtime/remote/status_pusher.go 93.75% <100.00%> (+0.89%) ⬆️
internal/runtime/subcmd.go 93.08% <100.00%> (+0.12%) ⬆️
internal/service/scheduler/retry_scanner.go 58.73% <100.00%> (-2.73%) ⬇️
internal/service/worker/poller.go 87.70% <100.00%> (+0.63%) ⬆️
... and 15 more

... and 15 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d15a214...26c324e. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bug: some scheduled tasks are queued but not run

1 participant