Fix reschedule for inline DAG run snapshots#1965
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThe changes implement fix for rescheduling DAG runs created from inline YAML specs by restoring DAGs from persisted YAML snapshots instead of requiring the original file path to exist. New helper functions reconstruct in-memory DAGs from stored YAML and runtime parameters. The reschedule flow updated to use snapshot restoration. Integration and unit tests validate the fix for both directly-started and enqueued inline runs. Changes
Sequence DiagramsequenceDiagram
participant Client
participant API
participant Store as DAGRun Store
participant Exec as DAG Executor
participant Coord as Coordinator
rect rgba(200, 100, 100, 0.5)
Note over Client,Coord: Original Flow (Broken for Inline DAGs)
Client->>API: POST /api/v1/dag-runs/reschedule
API->>Store: Fetch existing DAGRun
API->>API: Load DAG by name/path (temp file no longer exists)
API-->>Client: ✗ Error: File not found
end
rect rgba(100, 200, 100, 0.5)
Note over Client,Coord: New Flow (Using Snapshot)
Client->>API: POST /api/v1/dag-runs/reschedule
API->>Store: Fetch existing DAGRun + persisted YAML
API->>API: restoreDAGRunSnapshot(yaml, params)
API->>API: rebuildDAGRunSnapshotFromYAML(reconstruct DAG in-memory)
API->>API: startRescheduledDAGRunWithOptions(preserved params)
alt Local Execution
API->>Exec: Execute with restored DAG
Exec-->>API: Run completed
else Coordinator Dispatch
API->>Coord: Dispatch with restored DAG
Coord->>Exec: Execute
Exec-->>Coord: Run completed
Coord-->>API: Status updated
end
API-->>Client: ✓ Success: New run created
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/service/frontend/api/v1/dags.go (1)
1128-1177:⚠️ Potential issue | 🔴 CriticalLocal execution path doesn't receive preserved params for rescheduled DAG runs.
The coordinator dispatch correctly uses
dispatchParams(line 1140), but the local start path (line 1149) usesopts.paramsfromruntime.StartOptions. For rescheduled runs,opts.paramsis empty sincestartRescheduledDAGRunWithOptionsnever sets it. The subprocess will spawn with no-pparams flag, causing it to execute the DAG with the original unresolved YAML file rather than the preserved params from the prior run.The reconstructed
dag.ParamsfromrestoreDAGRunSnapshotis only in-memory and not passed to the subprocess. The local execution path should usedispatchParamsinstead ofopts.params, consistent with the coordinator path.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/frontend/api/v1/dags.go` around lines 1128 - 1177, In startPreparedDAGRunWithOptions, the local execution builds runtime.StartOptions with opts.params which is empty for rescheduled runs; change the Params field passed to subCmdBuilder.Start / runtime.Start from opts.params to dispatchParams so the spawned subprocess receives the preserved reconstructed params (keep all other fields the same), mirroring the coordinator dispatch path that already uses dispatchParams.
🧹 Nitpick comments (2)
internal/service/frontend/api/v1/dagruns_test.go (1)
1291-1312: Test helper creates ad-hoc queue processor with minimal config.The helper correctly instantiates a
QueueProcessorandDAGExecutorfor processing the queued run. The queue config usesqueueNameas the queue name, which correctly matches the DAG name used in the test (inline_reschedule_enqueue).However, note that this test setup manually constructs the coordinator and queue processor, which may diverge from production initialization patterns. Consider adding a comment explaining this is intentional for test isolation.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/frontend/api/v1/dagruns_test.go` around lines 1291 - 1312, The test helper processQueuedInlineRun constructs a scheduler.NewQueueProcessor and scheduler.NewDAGExecutor with an ad-hoc coordinator (coordinator.New) and minimal config rather than using production initialization; add a concise inline comment above processQueuedInlineRun (or inside it) stating this is intentional to isolate the test environment and avoid coupling to production initialization so future readers understand the divergence and don't refactor it away.internal/intg/reschedule_inline_api_test.go (1)
44-191: Consider extracting shared test helpers to reduce duplication.The helper functions in this file are nearly identical to those in
dagruns_test.go:
createInlineServerRunForReschedule≈createInlineDAGRunForReschedulewaitForServerAttempt≈waitForAttemptSnapshotprocessQueuedServerInlineRun≈processQueuedInlineRunConsider extracting these to a shared test helper package (e.g.,
internal/test/helpers) to reduce maintenance burden and ensure consistency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/intg/reschedule_inline_api_test.go` around lines 44 - 191, The file duplicates several test helper functions already present in dagruns_test.go; extract the shared helpers into a new package (e.g., internal/test/helpers) and replace the local copies with calls to that package. Move functions createInlineServerRunForReschedule, waitForServerAttempt, waitForServerAttemptWithDAG, processQueuedServerInlineRun, requireServerRunParams and expectedServerInlineTempPath into the helpers package (keeping signatures the same), update imports in this file and dagruns_test.go to use internal/test/helpers, and adjust any references (e.g., call helpers.CreateInlineServerRunForReschedule, helpers.WaitForServerAttempt, helpers.ProcessQueuedServerInlineRun, etc.); ensure helper package imports test.Server, scheduler, coordinator and other dependencies and run tests to verify no broken references.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/intg/reschedule_inline_api_test.go`:
- Around line 123-136: The callback passed to require.Eventually in
requireServerRunParams uses require.NoError(t, err) which will fail the test
immediately on transient errors; change the callback so it calls
attempt.ReadStatus(server.Context), returns false if err != nil (so Eventually
can retry) and only returns true when status.Status == core.Succeeded, then
after require.Eventually completes call attempt.ReadStatus once more and use
require.NoError(t, err) and the require.Equal assertion on status.ParamsList;
reference functions/variables: requireServerRunParams, waitForServerAttempt,
attempt.ReadStatus, require.Eventually, status.ParamsList.
In `@internal/service/frontend/api/v1/dagruns_test.go`:
- Around line 1244-1257: The callback passed to require.Eventually in
assertInlineRescheduledRunParams uses require.NoError(t, err) which will fail
the test immediately on a transient error; change the callback to handle errors
by returning false when attempt.ReadStatus(server.Context) returns an error
(i.e., if err != nil { return false }) and only check status.Status ==
core.Succeeded otherwise, leaving the existing require.NoError and require.Equal
after the Eventually call to assert the final state; references: function
assertInlineRescheduledRunParams, helper waitForAttemptSnapshot, and method
attempt.ReadStatus.
---
Outside diff comments:
In `@internal/service/frontend/api/v1/dags.go`:
- Around line 1128-1177: In startPreparedDAGRunWithOptions, the local execution
builds runtime.StartOptions with opts.params which is empty for rescheduled
runs; change the Params field passed to subCmdBuilder.Start / runtime.Start from
opts.params to dispatchParams so the spawned subprocess receives the preserved
reconstructed params (keep all other fields the same), mirroring the coordinator
dispatch path that already uses dispatchParams.
---
Nitpick comments:
In `@internal/intg/reschedule_inline_api_test.go`:
- Around line 44-191: The file duplicates several test helper functions already
present in dagruns_test.go; extract the shared helpers into a new package (e.g.,
internal/test/helpers) and replace the local copies with calls to that package.
Move functions createInlineServerRunForReschedule, waitForServerAttempt,
waitForServerAttemptWithDAG, processQueuedServerInlineRun,
requireServerRunParams and expectedServerInlineTempPath into the helpers package
(keeping signatures the same), update imports in this file and dagruns_test.go
to use internal/test/helpers, and adjust any references (e.g., call
helpers.CreateInlineServerRunForReschedule, helpers.WaitForServerAttempt,
helpers.ProcessQueuedServerInlineRun, etc.); ensure helper package imports
test.Server, scheduler, coordinator and other dependencies and run tests to
verify no broken references.
In `@internal/service/frontend/api/v1/dagruns_test.go`:
- Around line 1291-1312: The test helper processQueuedInlineRun constructs a
scheduler.NewQueueProcessor and scheduler.NewDAGExecutor with an ad-hoc
coordinator (coordinator.New) and minimal config rather than using production
initialization; add a concise inline comment above processQueuedInlineRun (or
inside it) stating this is intentional to isolate the test environment and avoid
coupling to production initialization so future readers understand the
divergence and don't refactor it away.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 09c18209-44e1-4383-b384-8049bcd3e43b
📒 Files selected for processing (4)
internal/intg/reschedule_inline_api_test.gointernal/service/frontend/api/v1/dagruns.gointernal/service/frontend/api/v1/dagruns_test.gointernal/service/frontend/api/v1/dags.go
| func requireServerRunParams(t *testing.T, server test.Server, dagName, runID string) { | ||
| t.Helper() | ||
|
|
||
| attempt := waitForServerAttempt(t, server, dagName, runID) | ||
| require.Eventually(t, func() bool { | ||
| status, err := attempt.ReadStatus(server.Context) | ||
| require.NoError(t, err) | ||
| return status.Status == core.Succeeded | ||
| }, 10*time.Second, 200*time.Millisecond) | ||
|
|
||
| status, err := attempt.ReadStatus(server.Context) | ||
| require.NoError(t, err) | ||
| require.Equal(t, []string{"KEY=hello world", "COUNT=3"}, status.ParamsList) | ||
| } |
There was a problem hiding this comment.
require.NoError inside require.Eventually will fail immediately instead of retrying.
Same issue as in dagruns_test.go: using require.NoError(t, err) at line 129 inside the require.Eventually callback causes immediate test failure on transient errors rather than retrying.
Proposed fix
func requireServerRunParams(t *testing.T, server test.Server, dagName, runID string) {
t.Helper()
attempt := waitForServerAttempt(t, server, dagName, runID)
require.Eventually(t, func() bool {
status, err := attempt.ReadStatus(server.Context)
- require.NoError(t, err)
+ if err != nil {
+ return false
+ }
return status.Status == core.Succeeded
}, 10*time.Second, 200*time.Millisecond)
status, err := attempt.ReadStatus(server.Context)
require.NoError(t, err)
require.Equal(t, []string{"KEY=hello world", "COUNT=3"}, status.ParamsList)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func requireServerRunParams(t *testing.T, server test.Server, dagName, runID string) { | |
| t.Helper() | |
| attempt := waitForServerAttempt(t, server, dagName, runID) | |
| require.Eventually(t, func() bool { | |
| status, err := attempt.ReadStatus(server.Context) | |
| require.NoError(t, err) | |
| return status.Status == core.Succeeded | |
| }, 10*time.Second, 200*time.Millisecond) | |
| status, err := attempt.ReadStatus(server.Context) | |
| require.NoError(t, err) | |
| require.Equal(t, []string{"KEY=hello world", "COUNT=3"}, status.ParamsList) | |
| } | |
| func requireServerRunParams(t *testing.T, server test.Server, dagName, runID string) { | |
| t.Helper() | |
| attempt := waitForServerAttempt(t, server, dagName, runID) | |
| require.Eventually(t, func() bool { | |
| status, err := attempt.ReadStatus(server.Context) | |
| if err != nil { | |
| return false | |
| } | |
| return status.Status == core.Succeeded | |
| }, 10*time.Second, 200*time.Millisecond) | |
| status, err := attempt.ReadStatus(server.Context) | |
| require.NoError(t, err) | |
| require.Equal(t, []string{"KEY=hello world", "COUNT=3"}, status.ParamsList) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/intg/reschedule_inline_api_test.go` around lines 123 - 136, The
callback passed to require.Eventually in requireServerRunParams uses
require.NoError(t, err) which will fail the test immediately on transient
errors; change the callback so it calls attempt.ReadStatus(server.Context),
returns false if err != nil (so Eventually can retry) and only returns true when
status.Status == core.Succeeded, then after require.Eventually completes call
attempt.ReadStatus once more and use require.NoError(t, err) and the
require.Equal assertion on status.ParamsList; reference functions/variables:
requireServerRunParams, waitForServerAttempt, attempt.ReadStatus,
require.Eventually, status.ParamsList.
| func assertInlineRescheduledRunParams(t *testing.T, server test.Server, dagName, dagRunID string) { | ||
| t.Helper() | ||
|
|
||
| attempt := waitForAttemptSnapshot(t, server, dagName, dagRunID) | ||
| require.Eventually(t, func() bool { | ||
| status, err := attempt.ReadStatus(server.Context) | ||
| require.NoError(t, err) | ||
| return status.Status == core.Succeeded | ||
| }, 10*time.Second, 200*time.Millisecond) | ||
|
|
||
| status, err := attempt.ReadStatus(server.Context) | ||
| require.NoError(t, err) | ||
| require.Equal(t, []string{"KEY=hello world", "COUNT=3"}, status.ParamsList) | ||
| } |
There was a problem hiding this comment.
require.NoError inside require.Eventually will fail immediately instead of retrying.
Using require.NoError(t, err) inside the require.Eventually callback (line 1250) causes the test to fail immediately on any transient error, rather than retrying until the timeout. This defeats the purpose of Eventually for handling timing-related failures.
Proposed fix
func assertInlineRescheduledRunParams(t *testing.T, server test.Server, dagName, dagRunID string) {
t.Helper()
attempt := waitForAttemptSnapshot(t, server, dagName, dagRunID)
require.Eventually(t, func() bool {
status, err := attempt.ReadStatus(server.Context)
- require.NoError(t, err)
+ if err != nil {
+ return false
+ }
return status.Status == core.Succeeded
}, 10*time.Second, 200*time.Millisecond)
status, err := attempt.ReadStatus(server.Context)
require.NoError(t, err)
require.Equal(t, []string{"KEY=hello world", "COUNT=3"}, status.ParamsList)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func assertInlineRescheduledRunParams(t *testing.T, server test.Server, dagName, dagRunID string) { | |
| t.Helper() | |
| attempt := waitForAttemptSnapshot(t, server, dagName, dagRunID) | |
| require.Eventually(t, func() bool { | |
| status, err := attempt.ReadStatus(server.Context) | |
| require.NoError(t, err) | |
| return status.Status == core.Succeeded | |
| }, 10*time.Second, 200*time.Millisecond) | |
| status, err := attempt.ReadStatus(server.Context) | |
| require.NoError(t, err) | |
| require.Equal(t, []string{"KEY=hello world", "COUNT=3"}, status.ParamsList) | |
| } | |
| func assertInlineRescheduledRunParams(t *testing.T, server test.Server, dagName, dagRunID string) { | |
| t.Helper() | |
| attempt := waitForAttemptSnapshot(t, server, dagName, dagRunID) | |
| require.Eventually(t, func() bool { | |
| status, err := attempt.ReadStatus(server.Context) | |
| if err != nil { | |
| return false | |
| } | |
| return status.Status == core.Succeeded | |
| }, 10*time.Second, 200*time.Millisecond) | |
| status, err := attempt.ReadStatus(server.Context) | |
| require.NoError(t, err) | |
| require.Equal(t, []string{"KEY=hello world", "COUNT=3"}, status.ParamsList) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/service/frontend/api/v1/dagruns_test.go` around lines 1244 - 1257,
The callback passed to require.Eventually in assertInlineRescheduledRunParams
uses require.NoError(t, err) which will fail the test immediately on a transient
error; change the callback to handle errors by returning false when
attempt.ReadStatus(server.Context) returns an error (i.e., if err != nil {
return false }) and only check status.Status == core.Succeeded otherwise,
leaving the existing require.NoError and require.Equal after the Eventually call
to assert the final state; references: function
assertInlineRescheduledRunParams, helper waitForAttemptSnapshot, and method
attempt.ReadStatus.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #1965 +/- ##
==========================================
- Coverage 68.09% 68.03% -0.06%
==========================================
Files 475 475
Lines 61381 61389 +8
==========================================
- Hits 41795 41769 -26
- Misses 15609 15641 +32
- Partials 3977 3979 +2 see 21 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary
Testing
Closes #1963
Summary by CodeRabbit
Release Notes