Skip to content

Fix reschedule for inline DAG run snapshots#1965

Merged
yottahmd merged 2 commits intomainfrom
issue-1963-fix-reschedule-selected-for-dag-runs-cre
Apr 4, 2026
Merged

Fix reschedule for inline DAG run snapshots#1965
yottahmd merged 2 commits intomainfrom
issue-1963-fix-reschedule-selected-for-dag-runs-cre

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Apr 3, 2026

Summary

  • restore inline DAG run snapshots from persisted YAML before rescheduling so reruns still work after the temp spec file has been removed
  • preserve stored runtime params and rebuild the DAG with the original build env and base config before dispatching the rescheduled run
  • add API and integration coverage for rescheduling inline DAG runs created through both start and enqueue flows

Testing

  • go test ./internal/service/frontend/api/v1 -run TestRescheduleDAGRunFromInline -count=1
  • go test ./internal/intg -run TestAPIRescheduleInline -count=1

Closes #1963

Summary by CodeRabbit

Release Notes

  • New Features
    • Added support for rescheduling inline DAG runs with automatic preservation of original specifications and runtime parameters.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 3, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1dbed96a-c1ef-4926-a67b-ab492ece2ffd

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

The changes implement fix for rescheduling DAG runs created from inline YAML specs by restoring DAGs from persisted YAML snapshots instead of requiring the original file path to exist. New helper functions reconstruct in-memory DAGs from stored YAML and runtime parameters. The reschedule flow updated to use snapshot restoration. Integration and unit tests validate the fix for both directly-started and enqueued inline runs.

Changes

Cohort / File(s) Summary
Inline Reschedule API Logic
internal/service/frontend/api/v1/dagruns.go, internal/service/frontend/api/v1/dags.go
Added restoreDAGRunSnapshot and rebuildDAGRunSnapshotFromYAML functions to reconstruct DAGs from persisted YAML and runtime params. Updated rescheduleDAGRun to use snapshot restoration instead of file-based loading. Refactored DAG start logic by introducing startPreparedDAGRunWithOptions and startRescheduledDAGRunWithOptions to decouple parameter handling for dispatched vs local runs.
Test Coverage
internal/intg/reschedule_inline_api_test.go, internal/service/frontend/api/v1/dagruns_test.go
Added integration tests for inline DAG reschedule via API (both direct start and enqueue scenarios) and unit tests with helpers to verify rescheduled runs complete successfully and preserve stored YAML snapshots and runtime parameters. Validates missing inline temp files and correctness of parameter restoration.

Sequence Diagram

sequenceDiagram
    participant Client
    participant API
    participant Store as DAGRun Store
    participant Exec as DAG Executor
    participant Coord as Coordinator

    rect rgba(200, 100, 100, 0.5)
    Note over Client,Coord: Original Flow (Broken for Inline DAGs)
    Client->>API: POST /api/v1/dag-runs/reschedule
    API->>Store: Fetch existing DAGRun
    API->>API: Load DAG by name/path (temp file no longer exists)
    API-->>Client: ✗ Error: File not found
    end

    rect rgba(100, 200, 100, 0.5)
    Note over Client,Coord: New Flow (Using Snapshot)
    Client->>API: POST /api/v1/dag-runs/reschedule
    API->>Store: Fetch existing DAGRun + persisted YAML
    API->>API: restoreDAGRunSnapshot(yaml, params)
    API->>API: rebuildDAGRunSnapshotFromYAML(reconstruct DAG in-memory)
    API->>API: startRescheduledDAGRunWithOptions(preserved params)
    alt Local Execution
        API->>Exec: Execute with restored DAG
        Exec-->>API: Run completed
    else Coordinator Dispatch
        API->>Coord: Dispatch with restored DAG
        Coord->>Exec: Execute
        Exec-->>Coord: Run completed
        Coord-->>API: Status updated
    end
    API-->>Client: ✓ Success: New run created
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Fix reschedule for inline DAG run snapshots' clearly and concisely describes the main objective: restoring rescheduling functionality for DAG runs created from inline YAML specs by using persisted snapshots.
Linked Issues check ✅ Passed All coding requirements from issue #1963 are addressed: inline DAG snapshots are restored from persisted YAML before rescheduling, original runtime parameters are preserved, and comprehensive integration tests are included covering both start and enqueue flows.
Out of Scope Changes check ✅ Passed All changes directly support the reschedule fix for inline DAG runs: helper functions for snapshot restoration, refactored start methods to support preserved parameters, and API/integration tests validating the fix—nothing extraneous is present.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch issue-1963-fix-reschedule-selected-for-dag-runs-cre

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/service/frontend/api/v1/dags.go (1)

1128-1177: ⚠️ Potential issue | 🔴 Critical

Local execution path doesn't receive preserved params for rescheduled DAG runs.

The coordinator dispatch correctly uses dispatchParams (line 1140), but the local start path (line 1149) uses opts.params from runtime.StartOptions. For rescheduled runs, opts.params is empty since startRescheduledDAGRunWithOptions never sets it. The subprocess will spawn with no -p params flag, causing it to execute the DAG with the original unresolved YAML file rather than the preserved params from the prior run.

The reconstructed dag.Params from restoreDAGRunSnapshot is only in-memory and not passed to the subprocess. The local execution path should use dispatchParams instead of opts.params, consistent with the coordinator path.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/api/v1/dags.go` around lines 1128 - 1177, In
startPreparedDAGRunWithOptions, the local execution builds runtime.StartOptions
with opts.params which is empty for rescheduled runs; change the Params field
passed to subCmdBuilder.Start / runtime.Start from opts.params to dispatchParams
so the spawned subprocess receives the preserved reconstructed params (keep all
other fields the same), mirroring the coordinator dispatch path that already
uses dispatchParams.
🧹 Nitpick comments (2)
internal/service/frontend/api/v1/dagruns_test.go (1)

1291-1312: Test helper creates ad-hoc queue processor with minimal config.

The helper correctly instantiates a QueueProcessor and DAGExecutor for processing the queued run. The queue config uses queueName as the queue name, which correctly matches the DAG name used in the test (inline_reschedule_enqueue).

However, note that this test setup manually constructs the coordinator and queue processor, which may diverge from production initialization patterns. Consider adding a comment explaining this is intentional for test isolation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/api/v1/dagruns_test.go` around lines 1291 - 1312,
The test helper processQueuedInlineRun constructs a scheduler.NewQueueProcessor
and scheduler.NewDAGExecutor with an ad-hoc coordinator (coordinator.New) and
minimal config rather than using production initialization; add a concise inline
comment above processQueuedInlineRun (or inside it) stating this is intentional
to isolate the test environment and avoid coupling to production initialization
so future readers understand the divergence and don't refactor it away.
internal/intg/reschedule_inline_api_test.go (1)

44-191: Consider extracting shared test helpers to reduce duplication.

The helper functions in this file are nearly identical to those in dagruns_test.go:

  • createInlineServerRunForReschedulecreateInlineDAGRunForReschedule
  • waitForServerAttemptwaitForAttemptSnapshot
  • processQueuedServerInlineRunprocessQueuedInlineRun

Consider extracting these to a shared test helper package (e.g., internal/test/helpers) to reduce maintenance burden and ensure consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/intg/reschedule_inline_api_test.go` around lines 44 - 191, The file
duplicates several test helper functions already present in dagruns_test.go;
extract the shared helpers into a new package (e.g., internal/test/helpers) and
replace the local copies with calls to that package. Move functions
createInlineServerRunForReschedule, waitForServerAttempt,
waitForServerAttemptWithDAG, processQueuedServerInlineRun,
requireServerRunParams and expectedServerInlineTempPath into the helpers package
(keeping signatures the same), update imports in this file and dagruns_test.go
to use internal/test/helpers, and adjust any references (e.g., call
helpers.CreateInlineServerRunForReschedule, helpers.WaitForServerAttempt,
helpers.ProcessQueuedServerInlineRun, etc.); ensure helper package imports
test.Server, scheduler, coordinator and other dependencies and run tests to
verify no broken references.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/intg/reschedule_inline_api_test.go`:
- Around line 123-136: The callback passed to require.Eventually in
requireServerRunParams uses require.NoError(t, err) which will fail the test
immediately on transient errors; change the callback so it calls
attempt.ReadStatus(server.Context), returns false if err != nil (so Eventually
can retry) and only returns true when status.Status == core.Succeeded, then
after require.Eventually completes call attempt.ReadStatus once more and use
require.NoError(t, err) and the require.Equal assertion on status.ParamsList;
reference functions/variables: requireServerRunParams, waitForServerAttempt,
attempt.ReadStatus, require.Eventually, status.ParamsList.

In `@internal/service/frontend/api/v1/dagruns_test.go`:
- Around line 1244-1257: The callback passed to require.Eventually in
assertInlineRescheduledRunParams uses require.NoError(t, err) which will fail
the test immediately on a transient error; change the callback to handle errors
by returning false when attempt.ReadStatus(server.Context) returns an error
(i.e., if err != nil { return false }) and only check status.Status ==
core.Succeeded otherwise, leaving the existing require.NoError and require.Equal
after the Eventually call to assert the final state; references: function
assertInlineRescheduledRunParams, helper waitForAttemptSnapshot, and method
attempt.ReadStatus.

---

Outside diff comments:
In `@internal/service/frontend/api/v1/dags.go`:
- Around line 1128-1177: In startPreparedDAGRunWithOptions, the local execution
builds runtime.StartOptions with opts.params which is empty for rescheduled
runs; change the Params field passed to subCmdBuilder.Start / runtime.Start from
opts.params to dispatchParams so the spawned subprocess receives the preserved
reconstructed params (keep all other fields the same), mirroring the coordinator
dispatch path that already uses dispatchParams.

---

Nitpick comments:
In `@internal/intg/reschedule_inline_api_test.go`:
- Around line 44-191: The file duplicates several test helper functions already
present in dagruns_test.go; extract the shared helpers into a new package (e.g.,
internal/test/helpers) and replace the local copies with calls to that package.
Move functions createInlineServerRunForReschedule, waitForServerAttempt,
waitForServerAttemptWithDAG, processQueuedServerInlineRun,
requireServerRunParams and expectedServerInlineTempPath into the helpers package
(keeping signatures the same), update imports in this file and dagruns_test.go
to use internal/test/helpers, and adjust any references (e.g., call
helpers.CreateInlineServerRunForReschedule, helpers.WaitForServerAttempt,
helpers.ProcessQueuedServerInlineRun, etc.); ensure helper package imports
test.Server, scheduler, coordinator and other dependencies and run tests to
verify no broken references.

In `@internal/service/frontend/api/v1/dagruns_test.go`:
- Around line 1291-1312: The test helper processQueuedInlineRun constructs a
scheduler.NewQueueProcessor and scheduler.NewDAGExecutor with an ad-hoc
coordinator (coordinator.New) and minimal config rather than using production
initialization; add a concise inline comment above processQueuedInlineRun (or
inside it) stating this is intentional to isolate the test environment and avoid
coupling to production initialization so future readers understand the
divergence and don't refactor it away.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 09c18209-44e1-4383-b384-8049bcd3e43b

📥 Commits

Reviewing files that changed from the base of the PR and between c4e6ce9 and 41623e4.

📒 Files selected for processing (4)
  • internal/intg/reschedule_inline_api_test.go
  • internal/service/frontend/api/v1/dagruns.go
  • internal/service/frontend/api/v1/dagruns_test.go
  • internal/service/frontend/api/v1/dags.go

Comment on lines +123 to +136
func requireServerRunParams(t *testing.T, server test.Server, dagName, runID string) {
t.Helper()

attempt := waitForServerAttempt(t, server, dagName, runID)
require.Eventually(t, func() bool {
status, err := attempt.ReadStatus(server.Context)
require.NoError(t, err)
return status.Status == core.Succeeded
}, 10*time.Second, 200*time.Millisecond)

status, err := attempt.ReadStatus(server.Context)
require.NoError(t, err)
require.Equal(t, []string{"KEY=hello world", "COUNT=3"}, status.ParamsList)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

require.NoError inside require.Eventually will fail immediately instead of retrying.

Same issue as in dagruns_test.go: using require.NoError(t, err) at line 129 inside the require.Eventually callback causes immediate test failure on transient errors rather than retrying.

Proposed fix
 func requireServerRunParams(t *testing.T, server test.Server, dagName, runID string) {
 	t.Helper()
 
 	attempt := waitForServerAttempt(t, server, dagName, runID)
 	require.Eventually(t, func() bool {
 		status, err := attempt.ReadStatus(server.Context)
-		require.NoError(t, err)
+		if err != nil {
+			return false
+		}
 		return status.Status == core.Succeeded
 	}, 10*time.Second, 200*time.Millisecond)
 
 	status, err := attempt.ReadStatus(server.Context)
 	require.NoError(t, err)
 	require.Equal(t, []string{"KEY=hello world", "COUNT=3"}, status.ParamsList)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func requireServerRunParams(t *testing.T, server test.Server, dagName, runID string) {
t.Helper()
attempt := waitForServerAttempt(t, server, dagName, runID)
require.Eventually(t, func() bool {
status, err := attempt.ReadStatus(server.Context)
require.NoError(t, err)
return status.Status == core.Succeeded
}, 10*time.Second, 200*time.Millisecond)
status, err := attempt.ReadStatus(server.Context)
require.NoError(t, err)
require.Equal(t, []string{"KEY=hello world", "COUNT=3"}, status.ParamsList)
}
func requireServerRunParams(t *testing.T, server test.Server, dagName, runID string) {
t.Helper()
attempt := waitForServerAttempt(t, server, dagName, runID)
require.Eventually(t, func() bool {
status, err := attempt.ReadStatus(server.Context)
if err != nil {
return false
}
return status.Status == core.Succeeded
}, 10*time.Second, 200*time.Millisecond)
status, err := attempt.ReadStatus(server.Context)
require.NoError(t, err)
require.Equal(t, []string{"KEY=hello world", "COUNT=3"}, status.ParamsList)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/intg/reschedule_inline_api_test.go` around lines 123 - 136, The
callback passed to require.Eventually in requireServerRunParams uses
require.NoError(t, err) which will fail the test immediately on transient
errors; change the callback so it calls attempt.ReadStatus(server.Context),
returns false if err != nil (so Eventually can retry) and only returns true when
status.Status == core.Succeeded, then after require.Eventually completes call
attempt.ReadStatus once more and use require.NoError(t, err) and the
require.Equal assertion on status.ParamsList; reference functions/variables:
requireServerRunParams, waitForServerAttempt, attempt.ReadStatus,
require.Eventually, status.ParamsList.

Comment on lines +1244 to +1257
func assertInlineRescheduledRunParams(t *testing.T, server test.Server, dagName, dagRunID string) {
t.Helper()

attempt := waitForAttemptSnapshot(t, server, dagName, dagRunID)
require.Eventually(t, func() bool {
status, err := attempt.ReadStatus(server.Context)
require.NoError(t, err)
return status.Status == core.Succeeded
}, 10*time.Second, 200*time.Millisecond)

status, err := attempt.ReadStatus(server.Context)
require.NoError(t, err)
require.Equal(t, []string{"KEY=hello world", "COUNT=3"}, status.ParamsList)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

require.NoError inside require.Eventually will fail immediately instead of retrying.

Using require.NoError(t, err) inside the require.Eventually callback (line 1250) causes the test to fail immediately on any transient error, rather than retrying until the timeout. This defeats the purpose of Eventually for handling timing-related failures.

Proposed fix
 func assertInlineRescheduledRunParams(t *testing.T, server test.Server, dagName, dagRunID string) {
 	t.Helper()
 
 	attempt := waitForAttemptSnapshot(t, server, dagName, dagRunID)
 	require.Eventually(t, func() bool {
 		status, err := attempt.ReadStatus(server.Context)
-		require.NoError(t, err)
+		if err != nil {
+			return false
+		}
 		return status.Status == core.Succeeded
 	}, 10*time.Second, 200*time.Millisecond)
 
 	status, err := attempt.ReadStatus(server.Context)
 	require.NoError(t, err)
 	require.Equal(t, []string{"KEY=hello world", "COUNT=3"}, status.ParamsList)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func assertInlineRescheduledRunParams(t *testing.T, server test.Server, dagName, dagRunID string) {
t.Helper()
attempt := waitForAttemptSnapshot(t, server, dagName, dagRunID)
require.Eventually(t, func() bool {
status, err := attempt.ReadStatus(server.Context)
require.NoError(t, err)
return status.Status == core.Succeeded
}, 10*time.Second, 200*time.Millisecond)
status, err := attempt.ReadStatus(server.Context)
require.NoError(t, err)
require.Equal(t, []string{"KEY=hello world", "COUNT=3"}, status.ParamsList)
}
func assertInlineRescheduledRunParams(t *testing.T, server test.Server, dagName, dagRunID string) {
t.Helper()
attempt := waitForAttemptSnapshot(t, server, dagName, dagRunID)
require.Eventually(t, func() bool {
status, err := attempt.ReadStatus(server.Context)
if err != nil {
return false
}
return status.Status == core.Succeeded
}, 10*time.Second, 200*time.Millisecond)
status, err := attempt.ReadStatus(server.Context)
require.NoError(t, err)
require.Equal(t, []string{"KEY=hello world", "COUNT=3"}, status.ParamsList)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/api/v1/dagruns_test.go` around lines 1244 - 1257,
The callback passed to require.Eventually in assertInlineRescheduledRunParams
uses require.NoError(t, err) which will fail the test immediately on a transient
error; change the callback to handle errors by returning false when
attempt.ReadStatus(server.Context) returns an error (i.e., if err != nil {
return false }) and only check status.Status == core.Succeeded otherwise,
leaving the existing require.NoError and require.Equal after the Eventually call
to assert the final state; references: function
assertInlineRescheduledRunParams, helper waitForAttemptSnapshot, and method
attempt.ReadStatus.

@yottahmd yottahmd merged commit 8c9106d into main Apr 4, 2026
5 checks passed
@yottahmd yottahmd deleted the issue-1963-fix-reschedule-selected-for-dag-runs-cre branch April 4, 2026 02:29
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 4, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 68.03%. Comparing base (c4e6ce9) to head (95b9bfe).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1965      +/-   ##
==========================================
- Coverage   68.09%   68.03%   -0.06%     
==========================================
  Files         475      475              
  Lines       61381    61389       +8     
==========================================
- Hits        41795    41769      -26     
- Misses      15609    15641      +32     
- Partials     3977     3979       +2     

see 21 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c4e6ce9...95b9bfe. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Fix "Reschedule selected" for DAG runs created from inline specs

1 participant