refactor: make scheduler proc-authoritative#1824
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis pull request introduces a "prepared local execution" pattern that pre-creates DAG run attempts and acquires process handles before passing them into execution agents. It refactors process-group metadata handling from DAG-run-level to attempt-level tracking, replaces stale-file-cleanup APIs with entry-based query/removal operations, updates command paths (start/restart/retry) to leverage the new pattern, refactors the retry scanner to query latest attempts, and adjusts the zombie detector to work with proc entries and per-attempt liveness. Changes
Sequence Diagram(s)sequenceDiagram
participant Cmd as Command Handler
participant Prep as Local Execution<br/>Preparation
participant ProcStore as ProcStore
participant Agent as Execution Agent
participant DAGRunStore as DAGRunStore
Cmd->>Prep: withPreparedLocalExecution(ctx, dag, callbacks)
Prep->>Prep: prepareLocalExecution(ctx, dag)
Prep->>Prep: validate inputs, derive DAGRunRef
Prep->>ProcStore: lock process group
Prep->>ProcStore: Acquire(ctx, groupName, ProcMeta)
Prep->>DAGRunStore: CreateAttempt(ctx, dagRun, ...) or reuse
Prep->>Prep: set DAG on attempt
Prep->>Agent: swap ctx.Proc to prepared handle
Prep->>Agent: New(ctx with PreparedAttempt)
Agent->>Agent: setupDAGRunAttempt()
Agent->>Agent: return preparedAttempt (via SetDAG)
Agent->>Agent: Execute DAG with prepared attempt
Prep->>Prep: restore ctx.Proc
Prep->>ProcStore: Stop prepared handle
Cmd->>Cmd: Continue with result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~55 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/service/scheduler/zombie_detector_test.go (1)
327-444:⚠️ Potential issue | 🟠 MajorDon't let the proc-store mock swallow unexpected calls.
The
hasExpectedCallpattern returns zero values without callingm.Called(...), which masks unexpected method calls. IfdetectAndCleanZombiesstarts callingRemoveIfStale,IsAttemptAlive, or other methods, tests like line 62 and line 204 will silently pass because these methods return nil by default instead of failing. Use standard testify/mock behavior: explicitly mock calls with.Once(), mark truly optional interactions with.Maybe(), or useAssertNotCalledfor methods that must not be invoked.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/scheduler/zombie_detector_test.go` around lines 327 - 444, The mockProcStore currently uses hasExpectedCall to short-circuit and return zero values instead of delegating to testify/mock, which hides unexpected calls; update mockProcStore methods (e.g., RemoveIfStale, IsAttemptAlive, IsRunAlive, ListAlive, ListEntries, LatestFreshEntryByDAGName, ListAllEntries, Acquire, CountAlive, CountAliveByDAGName, ListAllAlive) to always call m.Called(...) and return its results so testify can assert unexpected calls, and in tests mark optional interactions with .Maybe() or set explicit expectations with .Once() (or use AssertNotCalled) for methods that must not be invoked by detectAndCleanZombies. Ensure hasExpectedCall is removed or only used to toggle deliberate optional behavior but does not suppress m.Called(...) invocations.
🧹 Nitpick comments (2)
internal/persis/fileproc/handle.go (1)
177-178: Verify partial heartbeat update behavior.The
writeHeartbeathelper only writes the 8-byte timestamp at offset 0, leaving the JSON metadata unchanged. This is efficient for periodic updates, but relies on the metadata portion remaining intact.Given the file was initially created with
writeProcFile(which writes timestamp + metadata), and only the timestamp is updated periodically, this should work correctly. However, if the file is truncated or corrupted between writes, the metadata could be lost while the timestamp appears valid.Consider adding a length check in
readProcEntryerror messages to distinguish between "too short" (no data) vs "truncated" (has timestamp but incomplete metadata) for easier debugging of rare corruption scenarios.Also applies to: 227-232
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/persis/fileproc/handle.go` around lines 177 - 178, readProcEntry currently treats any undersized file as a generic "too short" error but writeHeartbeat updates only the 8-byte timestamp leaving JSON metadata after offset 8, so add a length-aware check in readProcEntry to distinguish three cases: file length < 8 (no timestamp), file length >= 8 but < expectedHeader+metadataStart (timestamp present but metadata truncated), and full length OK; update the error messages accordingly to report "no timestamp (file too short)", "timestamp present but metadata truncated", or the existing parse error, and apply the same length-differentiated checks/messages to the other read path referenced around the second block (the code handling the alternate read at the other site) so corruption vs empty-file cases are clear.internal/persis/fileproc/procfile.go (1)
199-202: Consider documenting dual freshness check rationale.The freshness logic checks both
modTimeandheartbeatTime. If modTime indicates staleness but heartbeatTime is fresh, the entry is considered fresh. This handles cases where file metadata updates lag behind content writes.A brief comment explaining why both are checked would help future maintainers understand this defensive pattern.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/persis/fileproc/procfile.go` around lines 199 - 202, The freshness check currently sets fresh based on now.Sub(info.ModTime()) < staleTime and then rechecks using heartbeatTime if modTime is stale; add a concise inline comment above these lines (referencing fresh, info.ModTime(), heartbeatTime, staleTime) explaining that both modTime and heartbeatTime are checked because filesystem modTime may lag behind application heartbeats, so a recent heartbeat should mark the entry fresh even if the file mtime appears stale; keep the comment short and focused on the defensive rationale for future maintainers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/persis/fileproc/procgrp.go`:
- Around line 198-229: The scan currently fails if a DAG directory or proc file
disappears mid-walk; update the loops around os.ReadDir(dagDir) and the call to
readProcEntry(file, pg.groupName, pg.staleTime, now) to treat missing-file/dir
errors as non-fatal: when os.ReadDir(dagDir) returns an error, if errors.Is(err,
fs.ErrNotExist) (or os.IsNotExist(err)) then continue to the next dagEntry
instead of returning the error; similarly, when readProcEntry returns an error,
if it indicates ENOENT (errors.Is(err, fs.ErrNotExist) / os.IsNotExist(err))
just skip that file and continue, otherwise return the error—this yields a
best-effort scan used by ListAllAlive and zombie detection while still surfacing
real errors.
In `@internal/runtime/manager_test.go`:
- Around line 369-377: The handler currently calls w.WriteHeader(http.StatusOK)
before json.Marshal, causing a second
WriteHeader(http.StatusInternalServerError) to be ignored; change the order in
the anonymous http handler so that you call json.Marshal(status) first, handle
marshalErr (write a 500 response and return) and only after successful
marshaling call w.WriteHeader(http.StatusOK) and w.Write(jsonData); reference
the existing json.Marshal(status), w.WriteHeader, and w.Write calls to locate
and reorder the logic.
In `@internal/test/helper.go`:
- Around line 753-760: Replace the hardcoded AttemptID ("attempt_"+a.dagRunID)
used when calling a.ProcStore.Acquire with the production-consistent format:
call the same generator used in production (genAttemptID()) or replicate it by
hex-encoding 3 random bytes to produce a 6-character hex string, and use that
value for AttemptID; if the test intentionally uses the synthetic format, add a
one-line comment next to AttemptID explaining why the simplified value is used
and that it intentionally diverges from genAttemptID().
---
Outside diff comments:
In `@internal/service/scheduler/zombie_detector_test.go`:
- Around line 327-444: The mockProcStore currently uses hasExpectedCall to
short-circuit and return zero values instead of delegating to testify/mock,
which hides unexpected calls; update mockProcStore methods (e.g., RemoveIfStale,
IsAttemptAlive, IsRunAlive, ListAlive, ListEntries, LatestFreshEntryByDAGName,
ListAllEntries, Acquire, CountAlive, CountAliveByDAGName, ListAllAlive) to
always call m.Called(...) and return its results so testify can assert
unexpected calls, and in tests mark optional interactions with .Maybe() or set
explicit expectations with .Once() (or use AssertNotCalled) for methods that
must not be invoked by detectAndCleanZombies. Ensure hasExpectedCall is removed
or only used to toggle deliberate optional behavior but does not suppress
m.Called(...) invocations.
---
Nitpick comments:
In `@internal/persis/fileproc/handle.go`:
- Around line 177-178: readProcEntry currently treats any undersized file as a
generic "too short" error but writeHeartbeat updates only the 8-byte timestamp
leaving JSON metadata after offset 8, so add a length-aware check in
readProcEntry to distinguish three cases: file length < 8 (no timestamp), file
length >= 8 but < expectedHeader+metadataStart (timestamp present but metadata
truncated), and full length OK; update the error messages accordingly to report
"no timestamp (file too short)", "timestamp present but metadata truncated", or
the existing parse error, and apply the same length-differentiated
checks/messages to the other read path referenced around the second block (the
code handling the alternate read at the other site) so corruption vs empty-file
cases are clear.
In `@internal/persis/fileproc/procfile.go`:
- Around line 199-202: The freshness check currently sets fresh based on
now.Sub(info.ModTime()) < staleTime and then rechecks using heartbeatTime if
modTime is stale; add a concise inline comment above these lines (referencing
fresh, info.ModTime(), heartbeatTime, staleTime) explaining that both modTime
and heartbeatTime are checked because filesystem modTime may lag behind
application heartbeats, so a recent heartbeat should mark the entry fresh even
if the file mtime appears stale; keep the comment short and focused on the
defensive rationale for future maintainers.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c14ff232-becb-4107-893b-c7db84418ed9
📒 Files selected for processing (28)
internal/cmd/context.gointernal/cmd/local_execution.gointernal/cmd/restart.gointernal/cmd/retry.gointernal/cmd/start.gointernal/core/exec/proc.gointernal/intg/queue/proc_liveness_test.gointernal/persis/fileproc/handle.gointernal/persis/fileproc/handle_test.gointernal/persis/fileproc/procfile.gointernal/persis/fileproc/procgrp.gointernal/persis/fileproc/procgrp_test.gointernal/persis/fileproc/store.gointernal/persis/fileproc/store_test.gointernal/persis/fileproc/test_helpers_test.gointernal/runtime/agent/agent.gointernal/runtime/manager.gointernal/runtime/manager_test.gointernal/service/frontend/api/v1/proc_liveness_test.gointernal/service/scheduler/retry_scanner.gointernal/service/scheduler/retry_scanner_test.gointernal/service/scheduler/scheduler.gointernal/service/scheduler/zombie_detector.gointernal/service/scheduler/zombie_detector_test.gointernal/test/coordinator.gointernal/test/helper.gointernal/test/proc.gointernal/test/server.go
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1824 +/- ##
==========================================
+ Coverage 69.18% 69.19% +0.01%
==========================================
Files 426 432 +6
Lines 51421 52033 +612
==========================================
+ Hits 35576 36005 +429
- Misses 12827 12944 +117
- Partials 3018 3084 +66
... and 30 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary
Testing
Refs #546
Summary by CodeRabbit
Bug Fixes
Improvements