fix: recover stale queued distributed runs stuck after worker ack#1857
fix: recover stale queued distributed runs stuck after worker ack#1857
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 Walkthrough📝 Walkthrough🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
internal/intg/distr/zombie_recovery_test.go (1)
54-54: Increased scheduler timeout to 90 seconds.This change increases the test timeout significantly. While this may be necessary for CI environments with variable performance, consider if this indicates an underlying issue with the test or if the original 30-second timeout was too aggressive.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/intg/distr/zombie_recovery_test.go` at line 54, The test bumped the scheduler timeout from 30s to 90s by calling f.startScheduler(90 * time.Second), which masks flakiness; change this to a deterministic fix: restore a reasonable default (e.g., 30 * time.Second) and either (a) make the timeout a named constant (SchedulerStartTimeout) that can be overridden by CI via an env var or test flag, or (b) avoid a large static sleep by replacing the single long timeout usage in the test with a polling/wait helper that retries until the scheduler is confirmed running (use existing wait-for-condition helper or implement a short-interval loop with context deadline). Update the call to f.startScheduler(...) to use the constant or the wait helper and ensure any CI override is documented in the test setup.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/intg/distr/zombie_recovery_test.go`:
- Line 54: The test bumped the scheduler timeout from 30s to 90s by calling
f.startScheduler(90 * time.Second), which masks flakiness; change this to a
deterministic fix: restore a reasonable default (e.g., 30 * time.Second) and
either (a) make the timeout a named constant (SchedulerStartTimeout) that can be
overridden by CI via an env var or test flag, or (b) avoid a large static sleep
by replacing the single long timeout usage in the test with a polling/wait
helper that retries until the scheduler is confirmed running (use existing
wait-for-condition helper or implement a short-interval loop with context
deadline). Update the call to f.startScheduler(...) to use the constant or the
wait helper and ensure any CI override is documented in the test setup.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f25b0ef4-4e68-4c3c-9417-6476bef710c7
📒 Files selected for processing (5)
internal/intg/distr/fixtures_test.gointernal/intg/distr/zombie_recovery_test.gointernal/service/coordinator/handler.gointernal/service/coordinator/handler_test.gointernal/service/worker/worker.go
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (2)
internal/runtime/manager.go (1)
234-242: Avoid the unconditional DAG read on root status lookups.
attempt.ReadDAGnow runs for every rootGetSavedStatuscall, but the result is only consumed when resolving a localRunningstatus. Gating that read behindst.Status == core.Running && isLocalWorkerID(st.WorkerID)removes extra filesystem I/O from the saved-status path and avoids silently discarding unrelated DAG-read failures.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/runtime/manager.go` around lines 234 - 242, The code unconditionally calls attempt.ReadDAG even when its result is only used for local running status checks; move the ReadDAG call so it only executes when st.Status == core.Running && isLocalWorkerID(st.WorkerID) and dagRun.ID == st.DAGRunID, so change the block around dagRun.ID == st.DAGRunID to first check st.Status and isLocalWorkerID before invoking attempt.ReadDAG, then pass the obtained dag into m.resolveRunningStatus; this avoids unnecessary filesystem I/O and prevents unrelated DAG-read errors from occurring on non-local or non-running saved-status lookups.internal/persis/filedistributed/dispatch_task_store.go (1)
174-225: These outstanding checks now serialize on a full store scan.Both helpers grab
s.mu, recycle claims, then sort and JSON-decode every file underpending/andclaims/. If callers invoke them repeatedly during queue admission, scheduling latency will grow linearly with backlog andClaimNextwill contend behind the same mutex. Consider maintaining per-queue / per-attempt indexes, or reusing a single scanned snapshot across the caller.Also applies to: 288-312
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/persis/filedistributed/dispatch_task_store.go` around lines 174 - 225, Both CountOutstandingByQueue and HasOutstandingAttempt hold s.mu and call recycleExpiredClaims then scanOutstandingLocked which JSON-decodes every file under pending/ and claims/, causing full-store serialization; change the implementation to avoid long-held lock by either (a) create a short-locked snapshot: while holding s.mu only call recycleExpiredClaims and collect the list of filenames/metadata (not full decode) or copy necessary in-memory indexes, then release s.mu and perform the expensive JSON-decode/scan on that snapshot to compute counts/flags; or (b) maintain and update lightweight in-memory indexes (per-queue outstanding counts and attemptKey map) inside mutating methods (e.g., ClaimNext, Add/Remove helpers) so CountOutstandingByQueue and HasOutstandingAttempt can read those indexes under s.mu cheaply; update references to scanOutstandingLocked, recycleExpiredClaims, CountOutstandingByQueue, HasOutstandingAttempt and ClaimNext accordingly so scans are no longer performed while holding the main mutex.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/core/exec/distributed.go`:
- Around line 195-200: The current code fabricates an attempt key by falling
back from status.Root to status.DAGRun() which produces incorrect keys for
legacy sub-DAG statuses; update the logic in the block that computes the attempt
key (the lines using root, status.Root, status.DAGRun() and GenerateAttemptKey)
so that if status.Root is zero/unknown you return an empty string instead of
using status.DAGRun(); this prevents LeaseMatchesStatus from rejecting valid
leases on attempt-key mismatch—alternatively, thread an explicit root reference
from the caller into the function if callers can supply the true root instead of
constructing it here.
In `@internal/persis/filedistributed/active_distributed_run_store.go`:
- Around line 99-104: In ListAll (active_distributed_run_store.go) the error
path that calls readJSONFile(path, &record) currently returns on any non-ENOENT
error; change this to log or collect the decode/read error and continue the loop
so a single corrupted/truncated JSON file doesn't abort the entire
scan—specifically, inside the block handling readJSONFile errors, replace the
return nil, err with a non-fatal handling (e.g., processLogger.Warnf or similar)
that records path and err and then continue iterating, ensuring ListAll still
returns the successfully read records.
In `@internal/persis/filedistributed/stores_test.go`:
- Around line 156-208: The test is flaky because ClaimNext(ctx,
exec.DispatchTaskClaim{...}) has no queue filter and may claim either enqueued
task; fix by making the claim deterministic: add a queue filter to the
DispatchTaskClaim (e.g. set QueueName: "queue-a" or the appropriate queue filter
field on the exec.DispatchTaskClaim passed into store.ClaimNext) so ClaimNext
will deterministically select the task from "queue-a" and the subsequent
CountOutstandingByQueue / DeleteClaim / HasOutstandingAttempt assertions remain
valid.
In `@internal/service/coordinator/handler_test.go`:
- Around line 511-513: The test calls dispatchStore.CountOutstandingByQueue with
an empty string which is queue-scoped; change the call to use the actual test
queue name (e.g., "test-queue" or the variable used to create tasks) so it
asserts outstanding tasks for that queue specifically. Update the invocation of
CountOutstandingByQueue(...) and the subsequent require/assert to check the
count for "test-queue" (referencing CountOutstandingByQueue and the test
function in handler_test.go) instead of "".
In `@internal/service/coordinator/handler.go`:
- Around line 1127-1137: The current early-return treats any identical Status
enum as a noop and skips post-write side effects; change the logic in the
ReportStatus handling around h.remoteStatusDecision so that identical terminal
statuses are only considered noop when the full incoming payload (including
node/chat fields and any metadata used by persistChatMessages, lease/index sync
and Write) is unchanged, or alternatively perform the required post-write side
effects even on the noop path; update the block that currently returns on noop
(after calling h.remoteStatusDecision and h.logRejectedRemoteStatusUpdate) to
compare the full payload (not just latest.Status == incoming.Status) or invoke
the same Write/persistChatMessages and lease/index sync routines when status is
terminal but other payload pieces differ, ensuring follow-up terminal reports
with extra node/chat data still persist and trigger cleanup retries.
- Around line 1784-1789: The current early return when
h.activeDistributedRunStore != nil prevents the orphaned-run detector from
running; change the logic in the handler so that after calling
detectIndexedDistributedStatuses(ctx, now) you still invoke
detectOrphanedDistributedStatuses(ctx, now) as a fallback (i.e., remove the
return or call detectOrphanedDistributedStatuses unconditionally after
detectIndexedDistributedStatuses) so runs that lost their lease or whose index
rows failed to materialize (see syncActiveDistributedRunFromStatus) are still
detected; update the block that references h.activeDistributedRunStore,
detectIndexedDistributedStatuses, and detectOrphanedDistributedStatuses
accordingly.
- Around line 592-629: markPreparedAttemptDispatchFailed currently marks
prepared.attempt as Failed but leaves the opened handle cached in
h.openAttempts, which can leak resources; update
markPreparedAttemptDispatchFailed to always evict and close the prepared attempt
handle (prepared.attempt) from the coordinator's cache (h.openAttempts) whenever
prepared != nil && prepared.newlyCreated && prepared.attempt != nil and you are
exiting the function (both on early returns and after successfully writing the
failed status), by removing the entry keyed by task.AttemptKey from
h.openAttempts and calling the appropriate close/release method on
prepared.attempt; ensure you use the same mutex/lock that protects
h.openAttempts (e.g., h.openAttemptsMu) when mutating the map so there are no
races.
---
Nitpick comments:
In `@internal/persis/filedistributed/dispatch_task_store.go`:
- Around line 174-225: Both CountOutstandingByQueue and HasOutstandingAttempt
hold s.mu and call recycleExpiredClaims then scanOutstandingLocked which
JSON-decodes every file under pending/ and claims/, causing full-store
serialization; change the implementation to avoid long-held lock by either (a)
create a short-locked snapshot: while holding s.mu only call
recycleExpiredClaims and collect the list of filenames/metadata (not full
decode) or copy necessary in-memory indexes, then release s.mu and perform the
expensive JSON-decode/scan on that snapshot to compute counts/flags; or (b)
maintain and update lightweight in-memory indexes (per-queue outstanding counts
and attemptKey map) inside mutating methods (e.g., ClaimNext, Add/Remove
helpers) so CountOutstandingByQueue and HasOutstandingAttempt can read those
indexes under s.mu cheaply; update references to scanOutstandingLocked,
recycleExpiredClaims, CountOutstandingByQueue, HasOutstandingAttempt and
ClaimNext accordingly so scans are no longer performed while holding the main
mutex.
In `@internal/runtime/manager.go`:
- Around line 234-242: The code unconditionally calls attempt.ReadDAG even when
its result is only used for local running status checks; move the ReadDAG call
so it only executes when st.Status == core.Running &&
isLocalWorkerID(st.WorkerID) and dagRun.ID == st.DAGRunID, so change the block
around dagRun.ID == st.DAGRunID to first check st.Status and isLocalWorkerID
before invoking attempt.ReadDAG, then pass the obtained dag into
m.resolveRunningStatus; this avoids unnecessary filesystem I/O and prevents
unrelated DAG-read errors from occurring on non-local or non-running
saved-status lookups.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8940f45b-c3a8-4db6-be8c-b8e0efb4e5a4
📒 Files selected for processing (41)
internal/cmd/context.gointernal/cmd/coord.gointernal/cmd/exec.gointernal/cmd/retry.gointernal/cmd/start.gointernal/cmd/startall.gointernal/cmd/worker_attempt.gointernal/core/exec/dagrun.gointernal/core/exec/dagrun_test.gointernal/core/exec/distributed.gointernal/intg/distr/fixtures_test.gointernal/intg/distr/zombie_recovery_test.gointernal/persis/filedagrun/store.gointernal/persis/filedagrun/store_test.gointernal/persis/filedistributed/active_distributed_run_store.gointernal/persis/filedistributed/dispatch_task_store.gointernal/persis/filedistributed/stores_test.gointernal/runtime/agent/agent.gointernal/runtime/manager.gointernal/runtime/manager_test.gointernal/runtime/remote/status_pusher.gointernal/runtime/remote/status_pusher_test.gointernal/runtime/stale_run.gointernal/runtime/subcmd.gointernal/runtime/subcmd_test.gointernal/service/coordinator/handler.gointernal/service/coordinator/handler_test.gointernal/service/scheduler/queue_processor.gointernal/service/scheduler/queue_processor_test.gointernal/service/scheduler/retry_scanner.gointernal/service/scheduler/retry_scanner_test.gointernal/service/scheduler/scheduler.gointernal/service/scheduler/suspension.gointernal/service/scheduler/tick_planner.gointernal/service/scheduler/tick_planner_test.gointernal/service/worker/poller.gointernal/service/worker/poller_test.gointernal/service/worker/worker.gointernal/service/worker/worker_test.gointernal/test/coordinator.gointernal/test/helper.go
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1857 +/- ##
==========================================
- Coverage 69.26% 69.06% -0.20%
==========================================
Files 440 443 +3
Lines 53457 54478 +1021
==========================================
+ Hits 37027 37627 +600
- Misses 13206 13525 +319
- Partials 3224 3326 +102
... and 15 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary
Queuedstatus when a worker acknowledges a task claim but crashes or abandons execution before writing an initial statusmarkLeaseRunFailedto loop overRunning,NotStarted, andQueuedexpected statuses when performing compare-and-swap, so the coordinator's zombie detector can recover runs in any active stateSetAfterTaskAckHookon the worker to simulate post-ack abandonment in testsQueuedrecovery path alongside existingRunningandNotStartedcasesTesting
make test TEST_TARGET=./internal/service/coordinator/... -count=1make test TEST_TARGET=./internal/intg/distr/... -count=1 -run TestDistributedRun_AckedTaskWithoutInitialStatusmake test TEST_TARGET=./internal/service/worker/... -count=1Closes #1837 #1858
Summary by CodeRabbit
Bug Fixes
New Features
User-facing CLI
Other