Skip to content

refactor: make scheduler proc-authoritative#1824

Merged
yottahmd merged 14 commits intomainfrom
fix/zombie-detector
Mar 22, 2026
Merged

refactor: make scheduler proc-authoritative#1824
yottahmd merged 14 commits intomainfrom
fix/zombie-detector

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Mar 22, 2026

Summary

  • refactor scheduler/runtime hot paths to use proc-authoritative liveness and latest-attempt lookups instead of running-history scans
  • reorder local execution so attempts are prepared before proc acquisition, then harden zombie detection, latest-status reads, and retry scanning around exact attempt identity
  • tighten proc/fileproc helpers and shared test harnesses so subprocess-based API and distributed tests always exercise the current binary

Testing

  • make fmt
  • go test ./internal/persis/fileproc ./internal/service/scheduler ./internal/runtime ./internal/cmd -count=1
  • go test ./internal/service/frontend/api/v1 -count=1
  • go test ./internal/intg/distr/... -count=1

Refs #546

Summary by CodeRabbit

  • Bug Fixes

    • Improved process lifecycle management with enhanced validation and stale file cleanup.
    • Refined retry scheduling logic to better handle failed DAG runs.
    • Fixed zombie process detection and repair for local executions.
  • Improvements

    • Strengthened process state tracking with per-attempt liveness verification.
    • Enhanced status resolution to prevent reading stale process sockets.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 22, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 78d0fee4-f638-4282-9482-33b87264f628

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This pull request introduces a "prepared local execution" pattern that pre-creates DAG run attempts and acquires process handles before passing them into execution agents. It refactors process-group metadata handling from DAG-run-level to attempt-level tracking, replaces stale-file-cleanup APIs with entry-based query/removal operations, updates command paths (start/restart/retry) to leverage the new pattern, refactors the retry scanner to query latest attempts, and adjusts the zombie detector to work with proc entries and per-attempt liveness.

Changes

Cohort / File(s) Summary
Context and Scheduler Initialization
internal/cmd/context.go
Added proc directory validation in NewContext and refactored NewScheduler to create a dedicated statusCache and schedulerRunStore with new constructor arguments and a schedulerRunMgr using the cached store.
Local Execution Orchestration
internal/cmd/local_execution.go
New file introducing prepareLocalExecution to validate inputs, acquire process handles with expanded metadata, and withPreparedLocalExecution to manage the lifecycle of prepared execution contexts with early-failure callbacks and attempt persistence.
Command Execution Paths
internal/cmd/start.go, internal/cmd/restart.go, internal/cmd/retry.go
Refactored to use withPreparedLocalExecution helper, added preparedAttempt parameter to executeDAGRun and executeRetry, and moved process-group lock/acquire/stop handling into the new orchestration layer.
Process Metadata and Store Interface
internal/core/exec/proc.go
Expanded ProcMeta to include AttemptID, RootName, RootDAGRunID; updated Acquire signature to accept ProcMeta; added IsAttemptAlive for attempt-level liveness; replaced CleanStaleFiles with ListEntries, LatestFreshEntryByDAGName, ListAllEntries, RemoveIfStale; added ProcEntry type with metadata and freshness state.
Persistent Proc File Handling
internal/persis/fileproc/procfile.go, internal/persis/fileproc/handle.go
New procfile.go module with on-disk proc format (8-byte heartbeat + JSON metadata), filename encoding/parsing for timestamps and attempt IDs, validation routines, and helper functions. Updated handle.go to validate metadata at heartbeat startup and delegate file writes to writeProcFile.
Proc Group and Store Operations
internal/persis/fileproc/procgrp.go, internal/persis/fileproc/store.go
Replaced regex-based file discovery with directory traversal returning ProcEntry objects; updated liveness checks to use entry metadata; added LatestFreshEntryByDAGName, ListEntries, and RemoveIfStale methods; changed Acquire to accept and validate ProcMeta.
Proc Test Infrastructure
internal/persis/fileproc/test_helpers_test.go, internal/persis/fileproc/handle_test.go, internal/persis/fileproc/procgrp_test.go, internal/persis/fileproc/store_test.go
Added testProcMetaFromRun helper; updated tests to use proc metadata instead of bare DAGRunRef; added TestStore_IsAttemptAlive and TestStore_LatestFreshEntryByDAGName test cases; changed stale-file cleanup to explicit RemoveIfStale calls.
Runtime Agent and Manager
internal/runtime/agent/agent.go, internal/runtime/manager.go
Added PreparedAttempt field to Options in agent; agent now reuses pre-created attempts when provided. Manager refactored GetCurrentStatus and GetLatestStatus to use LatestFreshEntryByDAGName and IsAttemptAlive with centralized running-status resolution logic.
Scheduler Retry and Zombie Detection
internal/service/scheduler/retry_scanner.go, internal/service/scheduler/scheduler.go, internal/service/scheduler/zombie_detector.go
Retry scanner now requires listTargets provider callback and queries LatestAttempt per DAG instead of listing all failed runs. Zombie detector replaced goroutine-based concurrency with direct ticker invocation and switched from DAG-run-status listing to proc-entry iteration with per-attempt stale counters.
Service and Test Endpoints
internal/service/scheduler/...\_test.go, internal/service/frontend/api/v1/proc_liveness_test.go, internal/test/proc.go, internal/test/helper.go, internal/test/coordinator.go, internal/test/server.go
Updated test helpers to create stale proc files with attempt identifiers using CreateStaleProcFileWithAttempt; changed proc file pattern matching from DAG-ID-specific glob to generic proc_*.proc; updated agent cancel and coordinator/server setup to use ProcMeta and built executables; added mock support for new proc store APIs.
Manager Test Coverage
internal/runtime/manager_test.go
Added two new test subtests for distributed DAG runs to verify that persisted local status takes precedence over remote socket status when WorkerID is non-local.

Sequence Diagram(s)

sequenceDiagram
    participant Cmd as Command Handler
    participant Prep as Local Execution<br/>Preparation
    participant ProcStore as ProcStore
    participant Agent as Execution Agent
    participant DAGRunStore as DAGRunStore

    Cmd->>Prep: withPreparedLocalExecution(ctx, dag, callbacks)
    Prep->>Prep: prepareLocalExecution(ctx, dag)
    Prep->>Prep: validate inputs, derive DAGRunRef
    Prep->>ProcStore: lock process group
    Prep->>ProcStore: Acquire(ctx, groupName, ProcMeta)
    Prep->>DAGRunStore: CreateAttempt(ctx, dagRun, ...) or reuse
    Prep->>Prep: set DAG on attempt
    Prep->>Agent: swap ctx.Proc to prepared handle
    Prep->>Agent: New(ctx with PreparedAttempt)
    Agent->>Agent: setupDAGRunAttempt()
    Agent->>Agent: return preparedAttempt (via SetDAG)
    Agent->>Agent: Execute DAG with prepared attempt
    Prep->>Prep: restore ctx.Proc
    Prep->>ProcStore: Stop prepared handle
    Cmd->>Cmd: Continue with result
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~55 minutes

Possibly related PRs

  • PR #1773: Related through shared modifications to proc file handling, ProcStore interface, fileproc group/store operations, and zombie detector liveness-checking logic.
  • PR #1774: Related through direct changes to scheduler retry scanner signature (NewRetryScanner callback parameter), retry-target computation, and EnqueueRetry enqueue flow.
  • PR #1564: Related through overlapping implementation of shared-nothing worker feature including proc file format changes, ProcStore API updates, NewContext/NewScheduler wiring modifications, and coordinator/scheduler integration.
🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 19.40% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'refactor: make scheduler proc-authoritative' directly summarizes the main refactoring objective shown across all files: shifting scheduler and runtime to rely on proc-authoritative liveness and latest-attempt lookups.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/zombie-detector

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/service/scheduler/zombie_detector_test.go (1)

327-444: ⚠️ Potential issue | 🟠 Major

Don't let the proc-store mock swallow unexpected calls.

The hasExpectedCall pattern returns zero values without calling m.Called(...), which masks unexpected method calls. If detectAndCleanZombies starts calling RemoveIfStale, IsAttemptAlive, or other methods, tests like line 62 and line 204 will silently pass because these methods return nil by default instead of failing. Use standard testify/mock behavior: explicitly mock calls with .Once(), mark truly optional interactions with .Maybe(), or use AssertNotCalled for methods that must not be invoked.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/scheduler/zombie_detector_test.go` around lines 327 - 444,
The mockProcStore currently uses hasExpectedCall to short-circuit and return
zero values instead of delegating to testify/mock, which hides unexpected calls;
update mockProcStore methods (e.g., RemoveIfStale, IsAttemptAlive, IsRunAlive,
ListAlive, ListEntries, LatestFreshEntryByDAGName, ListAllEntries, Acquire,
CountAlive, CountAliveByDAGName, ListAllAlive) to always call m.Called(...) and
return its results so testify can assert unexpected calls, and in tests mark
optional interactions with .Maybe() or set explicit expectations with .Once()
(or use AssertNotCalled) for methods that must not be invoked by
detectAndCleanZombies. Ensure hasExpectedCall is removed or only used to toggle
deliberate optional behavior but does not suppress m.Called(...) invocations.
🧹 Nitpick comments (2)
internal/persis/fileproc/handle.go (1)

177-178: Verify partial heartbeat update behavior.

The writeHeartbeat helper only writes the 8-byte timestamp at offset 0, leaving the JSON metadata unchanged. This is efficient for periodic updates, but relies on the metadata portion remaining intact.

Given the file was initially created with writeProcFile (which writes timestamp + metadata), and only the timestamp is updated periodically, this should work correctly. However, if the file is truncated or corrupted between writes, the metadata could be lost while the timestamp appears valid.

Consider adding a length check in readProcEntry error messages to distinguish between "too short" (no data) vs "truncated" (has timestamp but incomplete metadata) for easier debugging of rare corruption scenarios.

Also applies to: 227-232

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/persis/fileproc/handle.go` around lines 177 - 178, readProcEntry
currently treats any undersized file as a generic "too short" error but
writeHeartbeat updates only the 8-byte timestamp leaving JSON metadata after
offset 8, so add a length-aware check in readProcEntry to distinguish three
cases: file length < 8 (no timestamp), file length >= 8 but <
expectedHeader+metadataStart (timestamp present but metadata truncated), and
full length OK; update the error messages accordingly to report "no timestamp
(file too short)", "timestamp present but metadata truncated", or the existing
parse error, and apply the same length-differentiated checks/messages to the
other read path referenced around the second block (the code handling the
alternate read at the other site) so corruption vs empty-file cases are clear.
internal/persis/fileproc/procfile.go (1)

199-202: Consider documenting dual freshness check rationale.

The freshness logic checks both modTime and heartbeatTime. If modTime indicates staleness but heartbeatTime is fresh, the entry is considered fresh. This handles cases where file metadata updates lag behind content writes.

A brief comment explaining why both are checked would help future maintainers understand this defensive pattern.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/persis/fileproc/procfile.go` around lines 199 - 202, The freshness
check currently sets fresh based on now.Sub(info.ModTime()) < staleTime and then
rechecks using heartbeatTime if modTime is stale; add a concise inline comment
above these lines (referencing fresh, info.ModTime(), heartbeatTime, staleTime)
explaining that both modTime and heartbeatTime are checked because filesystem
modTime may lag behind application heartbeats, so a recent heartbeat should mark
the entry fresh even if the file mtime appears stale; keep the comment short and
focused on the defensive rationale for future maintainers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/persis/fileproc/procgrp.go`:
- Around line 198-229: The scan currently fails if a DAG directory or proc file
disappears mid-walk; update the loops around os.ReadDir(dagDir) and the call to
readProcEntry(file, pg.groupName, pg.staleTime, now) to treat missing-file/dir
errors as non-fatal: when os.ReadDir(dagDir) returns an error, if errors.Is(err,
fs.ErrNotExist) (or os.IsNotExist(err)) then continue to the next dagEntry
instead of returning the error; similarly, when readProcEntry returns an error,
if it indicates ENOENT (errors.Is(err, fs.ErrNotExist) / os.IsNotExist(err))
just skip that file and continue, otherwise return the error—this yields a
best-effort scan used by ListAllAlive and zombie detection while still surfacing
real errors.

In `@internal/runtime/manager_test.go`:
- Around line 369-377: The handler currently calls w.WriteHeader(http.StatusOK)
before json.Marshal, causing a second
WriteHeader(http.StatusInternalServerError) to be ignored; change the order in
the anonymous http handler so that you call json.Marshal(status) first, handle
marshalErr (write a 500 response and return) and only after successful
marshaling call w.WriteHeader(http.StatusOK) and w.Write(jsonData); reference
the existing json.Marshal(status), w.WriteHeader, and w.Write calls to locate
and reorder the logic.

In `@internal/test/helper.go`:
- Around line 753-760: Replace the hardcoded AttemptID ("attempt_"+a.dagRunID)
used when calling a.ProcStore.Acquire with the production-consistent format:
call the same generator used in production (genAttemptID()) or replicate it by
hex-encoding 3 random bytes to produce a 6-character hex string, and use that
value for AttemptID; if the test intentionally uses the synthetic format, add a
one-line comment next to AttemptID explaining why the simplified value is used
and that it intentionally diverges from genAttemptID().

---

Outside diff comments:
In `@internal/service/scheduler/zombie_detector_test.go`:
- Around line 327-444: The mockProcStore currently uses hasExpectedCall to
short-circuit and return zero values instead of delegating to testify/mock,
which hides unexpected calls; update mockProcStore methods (e.g., RemoveIfStale,
IsAttemptAlive, IsRunAlive, ListAlive, ListEntries, LatestFreshEntryByDAGName,
ListAllEntries, Acquire, CountAlive, CountAliveByDAGName, ListAllAlive) to
always call m.Called(...) and return its results so testify can assert
unexpected calls, and in tests mark optional interactions with .Maybe() or set
explicit expectations with .Once() (or use AssertNotCalled) for methods that
must not be invoked by detectAndCleanZombies. Ensure hasExpectedCall is removed
or only used to toggle deliberate optional behavior but does not suppress
m.Called(...) invocations.

---

Nitpick comments:
In `@internal/persis/fileproc/handle.go`:
- Around line 177-178: readProcEntry currently treats any undersized file as a
generic "too short" error but writeHeartbeat updates only the 8-byte timestamp
leaving JSON metadata after offset 8, so add a length-aware check in
readProcEntry to distinguish three cases: file length < 8 (no timestamp), file
length >= 8 but < expectedHeader+metadataStart (timestamp present but metadata
truncated), and full length OK; update the error messages accordingly to report
"no timestamp (file too short)", "timestamp present but metadata truncated", or
the existing parse error, and apply the same length-differentiated
checks/messages to the other read path referenced around the second block (the
code handling the alternate read at the other site) so corruption vs empty-file
cases are clear.

In `@internal/persis/fileproc/procfile.go`:
- Around line 199-202: The freshness check currently sets fresh based on
now.Sub(info.ModTime()) < staleTime and then rechecks using heartbeatTime if
modTime is stale; add a concise inline comment above these lines (referencing
fresh, info.ModTime(), heartbeatTime, staleTime) explaining that both modTime
and heartbeatTime are checked because filesystem modTime may lag behind
application heartbeats, so a recent heartbeat should mark the entry fresh even
if the file mtime appears stale; keep the comment short and focused on the
defensive rationale for future maintainers.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c14ff232-becb-4107-893b-c7db84418ed9

📥 Commits

Reviewing files that changed from the base of the PR and between c33cb46 and a8bb575.

📒 Files selected for processing (28)
  • internal/cmd/context.go
  • internal/cmd/local_execution.go
  • internal/cmd/restart.go
  • internal/cmd/retry.go
  • internal/cmd/start.go
  • internal/core/exec/proc.go
  • internal/intg/queue/proc_liveness_test.go
  • internal/persis/fileproc/handle.go
  • internal/persis/fileproc/handle_test.go
  • internal/persis/fileproc/procfile.go
  • internal/persis/fileproc/procgrp.go
  • internal/persis/fileproc/procgrp_test.go
  • internal/persis/fileproc/store.go
  • internal/persis/fileproc/store_test.go
  • internal/persis/fileproc/test_helpers_test.go
  • internal/runtime/agent/agent.go
  • internal/runtime/manager.go
  • internal/runtime/manager_test.go
  • internal/service/frontend/api/v1/proc_liveness_test.go
  • internal/service/scheduler/retry_scanner.go
  • internal/service/scheduler/retry_scanner_test.go
  • internal/service/scheduler/scheduler.go
  • internal/service/scheduler/zombie_detector.go
  • internal/service/scheduler/zombie_detector_test.go
  • internal/test/coordinator.go
  • internal/test/helper.go
  • internal/test/proc.go
  • internal/test/server.go

Comment thread internal/persis/fileproc/procgrp.go
Comment thread internal/runtime/manager_test.go
Comment thread internal/test/helper.go
@yottahmd yottahmd merged commit c335040 into main Mar 22, 2026
4 checks passed
@yottahmd yottahmd deleted the fix/zombie-detector branch March 22, 2026 12:06
@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 22, 2026

Codecov Report

❌ Patch coverage is 65.73427% with 294 lines in your changes missing coverage. Please review.
✅ Project coverage is 69.19%. Comparing base (c33fb51) to head (0c37fb4).
⚠️ Report is 13 commits behind head on main.

Files with missing lines Patch % Lines
internal/persis/fileproc/procfile.go 62.42% 32 Missing and 30 partials ⚠️
internal/cmd/local_execution.go 47.05% 38 Missing and 7 partials ⚠️
internal/cmd/start.go 30.76% 36 Missing ⚠️
internal/persis/fileproc/procgrp.go 68.69% 23 Missing and 13 partials ⚠️
internal/service/scheduler/zombie_detector.go 66.66% 18 Missing and 11 partials ⚠️
internal/persis/fileproc/handle.go 31.70% 19 Missing and 9 partials ⚠️
internal/cmn/sock/server.go 72.09% 7 Missing and 5 partials ⚠️
internal/persis/fileproc/store.go 82.14% 5 Missing and 5 partials ⚠️
internal/runtime/manager.go 80.95% 5 Missing and 3 partials ⚠️
internal/runtime/stale_run.go 61.90% 4 Missing and 4 partials ⚠️
... and 7 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1824      +/-   ##
==========================================
+ Coverage   69.18%   69.19%   +0.01%     
==========================================
  Files         426      432       +6     
  Lines       51421    52033     +612     
==========================================
+ Hits        35576    36005     +429     
- Misses      12827    12944     +117     
- Partials     3018     3084      +66     
Files with missing lines Coverage Δ
internal/cmd/restart.go 67.88% <100.00%> (+6.28%) ⬆️
internal/core/exec/dagrun.go 85.45% <ø> (ø)
internal/persis/filedagrun/store.go 72.82% <100.00%> (+0.56%) ⬆️
internal/runtime/agent/agent.go 69.14% <100.00%> (+0.29%) ⬆️
internal/runtime/env.go 64.73% <100.00%> (ø)
internal/runtime/executor/dag_runner.go 77.01% <100.00%> (-0.04%) ⬇️
internal/runtime/subcmd.go 92.68% <100.00%> (+0.22%) ⬆️
internal/service/scheduler/scheduler.go 85.37% <ø> (-0.30%) ⬇️
internal/service/scheduler/retry_scanner.go 61.45% <66.66%> (+2.63%) ⬆️
internal/cmd/context.go 69.89% <83.33%> (+0.91%) ⬆️
... and 15 more

... and 30 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c33fb51...0c37fb4. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant