Skip to content

fix: route reschedule through enqueue#1966

Merged
yottahmd merged 10 commits intomainfrom
reschedule-bugfix
Apr 4, 2026
Merged

fix: route reschedule through enqueue#1966
yottahmd merged 10 commits intomainfrom
reschedule-bugfix

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Apr 4, 2026

Summary

  • route DAG-run reschedule through the existing enqueue flow so it creates a fresh queued run from the persisted historical snapshot
  • require queues for reschedule and preserve the existing retry path as separate behavior
  • reuse the inline enqueue temp-spec lifecycle so reschedule temp files are cleaned up after the request, and drop the unsupported reschedule UI field

Testing

  • go test ./internal/service/frontend/api/v1 -run 'TestRescheduleDAGRun|TestRescheduleDAGRunResolvesLatest|TestRescheduleDAGRunFromInlineStartUsesPersistedSnapshot|TestRescheduleDAGRunFromInlineEnqueueUsesPersistedSnapshot|TestRescheduleDAGRunRequiresQueuesEnabled|TestRetryDAGRunQueuesRetryForQueuedDAGs' -count=1
  • pnpm -C ui test --run src/features/dag-runs/components/common/tests/DAGRunBatchActions.test.tsx

Summary by CodeRabbit

  • New Features

    • Option to reschedule a run using the current original DAG file (API, UI checkbox, and response flag indicating availability).
    • DAG-run details now indicate whether the run can reload its spec from an on-disk file.
    • Rescheduled runs explicitly report being queued and are processed accordingly.
  • Bug Fixes

    • Reschedule now rejects requests when queue support is disabled; improved snapshot validation during reschedule.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 4, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2031299b-a079-469e-b64d-c81522672c33

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Reschedule and retry flows were changed to support loading DAG specs from on-disk source files, propagate a new DAG.SourceFile provenance field through task dispatch, and enqueue rescheduled runs via the queue system with a config guard requiring queues enabled.

Changes

Cohort / File(s) Summary
Frontend API — reschedule & details
internal/service/frontend/api/v1/dagruns.go, internal/service/frontend/api/v1/dagruns_test.go
Fetch DAG-run attempts (not just status), surface specFromFile in responses, add useCurrentDagFile option to reschedule, require queues enabled, validate/load snapshot or current file, enqueue rescheduled runs instead of directly starting, and update tests to enable queues and assert queued behavior.
API surface / docs
api/v1/api.gen.go, api/v1/api.yaml, ui/src/api/v1/schema.ts
Added specFromFile to DAGRunDetails and useCurrentDagFile to reschedule request schema; updated generated swagger/spec and UI schema types.
UI — retry/reschedule UX
ui/src/features/dags/components/common/DAGActions.tsx, ui/src/features/dag-runs/components/common/DAGRunActions.tsx
Added UI state and effect to fetch specFromFile, introduced Use original DAG file checkbox, wired useCurrentDagFile into reschedule requests, and reset state on dismiss/errors.
DAG provenance field & propagation
internal/core/dag.go, internal/core/spec/loader.go, internal/core/spec/loader_test.go
Added DAG.SourceFile field and ensured loader sets/preserves it for file-based loads; tests validate source tracking.
Rebuild/load preservation
internal/cmd/helper.go, internal/cmd/exec_spec.go
Preserve SourceFile across YAML rebuild/loading and clear SourceFile in exec loading where appropriate.
Start/dispatch/task plumbing
internal/service/frontend/api/v1/dags.go, internal/service/scheduler/dag_executor.go, internal/service/coordinator/handler.go, internal/service/worker/handler.go, internal/service/worker/remote_handler.go, internal/runtime/executor/dag_runner.go
Propagate SourceFile into coordinator/worker flows and set it on parsed/loaded DAGs; removed dedicated startRescheduledDAGRunWithOptions helper.
Task option / proto / CLI / subcommand
internal/runtime/executor/task.go, internal/runtime/executor/task_test.go, proto/coordinator/v1/coordinator.proto, internal/runtime/subcmd.go, internal/runtime/subcmd_test.go, internal/cmd/start.go
Added WithSourceFile TaskOption and Task.SourceFile proto field, tests for option/arg, coordinator --source-file CLI flag, and include source-file in dispatched coordinator task args.
Scheduler / executor enqueue changes
internal/service/frontend/api/v1/dagruns.go (enqueue call), related queue setup in tests
Reschedule now enqueues with manual trigger semantics (via enqueueDAGRun) and selects params based on useCurrentDagFile.

Sequence Diagram(s)

sequenceDiagram
    actor User
    participant API as API Server
    participant Config as Config
    participant Store as DagRunStore
    participant Loader as Spec Loader
    participant Queue as Queue System
    participant Executor as Executor

    User->>API: POST /api/v1/dag-runs/{name}/{id}/reschedule
    API->>Config: Check queues.enabled
    alt queues disabled
        Config-->>API: false
        API-->>User: 400 BadRequest
    else queues enabled
        API->>Store: FindAttempt / LatestAttempt
        Store-->>API: attempt (includes dag.SourceFile)
        API->>Loader: if useCurrentDagFile -> load from disk(sourceFile)
        alt useCurrentDagFile
            Loader-->>API: DAG (SourceFile set)
        else use snapshot
            Loader-->>API: DAG built from stored YAML
        end
        API->>Queue: enqueueDAGRun(ctx, trigger=manual, options include SourceFile)
        Queue-->>API: queued=true, dagRunId
        API-->>User: 200 OK + queued status
        Queue->>Executor: deliver queued task (includes SourceFile)
        Executor-->>Queue: execute run (uses SourceFile for provenance)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 31.43% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: routing DAG-run reschedule through the enqueue flow instead of the previous separate reschedule path.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch reschedule-bugfix

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@yottahmd
Copy link
Copy Markdown
Collaborator Author

yottahmd commented Apr 4, 2026

@coderabbitai full review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 4, 2026

✅ Actions performed

Full review triggered.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/service/frontend/api/v1/dagruns.go (1)

1537-1542: ⚠️ Potential issue | 🔴 Critical

Guard the latest path against a nil status before dereferencing it.

ReadStatus() is already treated as nullable later in this file inside rescheduleDAGRun. If the latest attempt exists before its first status snapshot is written, *status will panic this handler.

💡 Suggested change
 		status, err := attempt.ReadStatus(ctx)
 		if err != nil {
 			return api.GetDAGRunDetails200JSONResponse{}, fmt.Errorf("error getting latest status: %w", err)
 		}
+		if status == nil {
+			return api.GetDAGRunDetails200JSONResponse{}, fmt.Errorf("latest dag-run status is unavailable for DAG %s", dagName)
+		}
 		return api.GetDAGRunDetails200JSONResponse{
 			DagRunDetails: a.toDAGRunDetailsWithSpecSource(ctx, attempt, *status),
 		}, nil
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/api/v1/dagruns.go` around lines 1537 - 1542, The
handler currently dereferences status returned by attempt.ReadStatus and can
panic if that status is nil; add a nil check after calling ReadStatus() (which
may return nil) and handle the "latest" path without dereferencing: if status ==
nil, return api.GetDAGRunDetails200JSONResponse with DagRunDetails produced for
the attempt when no status exists (e.g., call or create the same DAG run details
path used elsewhere for nil statuses or pass an explicit empty/zero status),
otherwise continue to call a.toDAGRunDetailsWithSpecSource(ctx, attempt,
*status). Ensure you reference attempt.ReadStatus,
a.toDAGRunDetailsWithSpecSource, and the GetDAGRunDetails200JSONResponse return
to locate the change.
🧹 Nitpick comments (3)
internal/cmd/helper.go (1)

116-116: Remove redundant fresh.SourceFile assignment.

fresh is not returned, and dag.SourceFile is not overwritten later, so this write has no effect.

♻️ Proposed cleanup
-	fresh.SourceFile = dag.SourceFile
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmd/helper.go` at line 116, Remove the redundant assignment to
fresh.SourceFile: delete the statement "fresh.SourceFile = dag.SourceFile" in
internal/cmd/helper.go (leave fresh and dag usages unchanged); if the original
intent was to propagate SourceFile into an object that will be returned, instead
ensure that fresh is returned or that the destination field is used—otherwise
simply remove the assignment since it has no effect.
ui/src/features/dag-runs/components/common/DAGRunActions.tsx (1)

98-148: Consider logging errors in the catch block for debuggability.

The error is silently swallowed, which could make troubleshooting difficult if the DAG run details fetch fails. Consider adding a console.error or similar logging.

The useEffect correctly handles cancellation to prevent state updates on unmounted components, and includes remoteNode in the API call as required.

♻️ Optional: Add error logging
       .catch(() => {
         if (cancelled) {
           return;
         }
+        console.error('Failed to fetch DAG run details for reschedule source');
         setSpecFromFile(false);
         setUseCurrentDagFile(false);
       })
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/dag-runs/components/common/DAGRunActions.tsx` around lines 98
- 148, The catch block in the React.useEffect that calls client.GET for
'/dag-runs/{name}/{dagRunId}' silently swallows errors; update that catch
handler to log the error (e.g., via console.error or processLogger) while
preserving the existing cancelled checks and state updates
(setSpecFromFile(false), setUseCurrentDagFile(false),
setRescheduleSourceLoading(false) in finally); locate the effect by the
React.useEffect wrapper, the client.GET call, and the
setSpecFromFile/setUseCurrentDagFile state setters to add the logging without
changing the cancellation logic.
ui/src/features/dags/components/common/DAGActions.tsx (1)

119-150: Prefer the same async/await flow used elsewhere in this component.

This new request path is harder to compare with the async IIFE below because success, failure, and cleanup are split across .then/.catch/.finally. Converting it to async/await + try/catch/finally would keep the modal fetch logic consistent.

As per coding guidelines Prefer async/await over .then() for promise handling and Use error handling with try/catch for async operations.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/dags/components/common/DAGActions.tsx` around lines 119 -
150, Convert the client.GET(...).then/.catch/.finally chain into the same
async/await + try/catch/finally pattern used elsewhere: create an async function
or IIFE that awaits client.GET('/dag-runs/{name}/{dagRunId}', { params: { path:
{ name: status.name, dagRunId: retryDagRunId }, query: { remoteNode:
appBarContext.selectedRemoteNode || 'local' } } }), then inside try check
cancelled before using response.data to call setSpecFromFile and
setUseCurrentDagFile; in catch check cancelled before setting both to false; and
in finally check cancelled before calling setRescheduleSourceLoading(false).
Ensure all references to cancelled, setSpecFromFile, setUseCurrentDagFile, and
setRescheduleSourceLoading remain and preserve the same logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/service/frontend/api/v1/dagruns_test.go`:
- Around line 1331-1334: The test currently treats a nil SpecFromFile pointer as
equivalent to false; change it to assert the field is present first by requiring
body.DagRunDetails.SpecFromFile is not nil (use require.NotNil or
require.Assert) and then dereference it to compare its boolean value to want
(i.e., require.NotNil(t, body.DagRunDetails.SpecFromFile) followed by
require.Equal(t, want, *body.DagRunDetails.SpecFromFile)), referencing body and
DagRunDetails.SpecFromFile and the existing want/got comparison.

In `@ui/src/features/dags/components/common/DAGActions.tsx`:
- Around line 135-137: The code currently sets both setSpecFromFile and
setUseCurrentDagFile from data?.dagRunDetails?.specFromFile causing file-backed
DAGs to default to using the on-disk file; instead preserve the snapshot-default
behavior by only initializing the specFromFile state from
data?.dagRunDetails?.specFromFile and do NOT change useCurrentDagFile from this
value—remove the setUseCurrentDagFile(available) call (or explicitly
setUseCurrentDagFile(false) if you must initialize) so useCurrentDagFile remains
controlled by the user's checkbox rather than the specFromFile flag; update the
block that references data?.dagRunDetails?.specFromFile, setSpecFromFile, and
setUseCurrentDagFile accordingly.

---

Outside diff comments:
In `@internal/service/frontend/api/v1/dagruns.go`:
- Around line 1537-1542: The handler currently dereferences status returned by
attempt.ReadStatus and can panic if that status is nil; add a nil check after
calling ReadStatus() (which may return nil) and handle the "latest" path without
dereferencing: if status == nil, return api.GetDAGRunDetails200JSONResponse with
DagRunDetails produced for the attempt when no status exists (e.g., call or
create the same DAG run details path used elsewhere for nil statuses or pass an
explicit empty/zero status), otherwise continue to call
a.toDAGRunDetailsWithSpecSource(ctx, attempt, *status). Ensure you reference
attempt.ReadStatus, a.toDAGRunDetailsWithSpecSource, and the
GetDAGRunDetails200JSONResponse return to locate the change.

---

Nitpick comments:
In `@internal/cmd/helper.go`:
- Line 116: Remove the redundant assignment to fresh.SourceFile: delete the
statement "fresh.SourceFile = dag.SourceFile" in internal/cmd/helper.go (leave
fresh and dag usages unchanged); if the original intent was to propagate
SourceFile into an object that will be returned, instead ensure that fresh is
returned or that the destination field is used—otherwise simply remove the
assignment since it has no effect.

In `@ui/src/features/dag-runs/components/common/DAGRunActions.tsx`:
- Around line 98-148: The catch block in the React.useEffect that calls
client.GET for '/dag-runs/{name}/{dagRunId}' silently swallows errors; update
that catch handler to log the error (e.g., via console.error or processLogger)
while preserving the existing cancelled checks and state updates
(setSpecFromFile(false), setUseCurrentDagFile(false),
setRescheduleSourceLoading(false) in finally); locate the effect by the
React.useEffect wrapper, the client.GET call, and the
setSpecFromFile/setUseCurrentDagFile state setters to add the logging without
changing the cancellation logic.

In `@ui/src/features/dags/components/common/DAGActions.tsx`:
- Around line 119-150: Convert the client.GET(...).then/.catch/.finally chain
into the same async/await + try/catch/finally pattern used elsewhere: create an
async function or IIFE that awaits client.GET('/dag-runs/{name}/{dagRunId}', {
params: { path: { name: status.name, dagRunId: retryDagRunId }, query: {
remoteNode: appBarContext.selectedRemoteNode || 'local' } } }), then inside try
check cancelled before using response.data to call setSpecFromFile and
setUseCurrentDagFile; in catch check cancelled before setting both to false; and
in finally check cancelled before calling setRescheduleSourceLoading(false).
Ensure all references to cancelled, setSpecFromFile, setUseCurrentDagFile, and
setRescheduleSourceLoading remain and preserve the same logic.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ab01d66d-0eea-4c87-8fff-e31087af3862

📥 Commits

Reviewing files that changed from the base of the PR and between 8c9106d and 3f45c19.

⛔ Files ignored due to path filters (1)
  • proto/coordinator/v1/coordinator.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (25)
  • api/v1/api.gen.go
  • api/v1/api.yaml
  • internal/cmd/exec_spec.go
  • internal/cmd/helper.go
  • internal/cmd/start.go
  • internal/core/dag.go
  • internal/core/spec/loader.go
  • internal/core/spec/loader_test.go
  • internal/intg/reschedule_inline_api_test.go
  • internal/runtime/executor/dag_runner.go
  • internal/runtime/executor/task.go
  • internal/runtime/executor/task_test.go
  • internal/runtime/subcmd.go
  • internal/runtime/subcmd_test.go
  • internal/service/coordinator/handler.go
  • internal/service/frontend/api/v1/dagruns.go
  • internal/service/frontend/api/v1/dagruns_test.go
  • internal/service/frontend/api/v1/dags.go
  • internal/service/scheduler/dag_executor.go
  • internal/service/worker/handler.go
  • internal/service/worker/remote_handler.go
  • proto/coordinator/v1/coordinator.proto
  • ui/src/api/v1/schema.ts
  • ui/src/features/dag-runs/components/common/DAGRunActions.tsx
  • ui/src/features/dags/components/common/DAGActions.tsx

Comment thread internal/service/frontend/api/v1/dagruns_test.go
Comment thread internal/service/frontend/api/v1/dags.go
Comment thread ui/src/features/dags/components/common/DAGActions.tsx
@yottahmd yottahmd merged commit 955b0bc into main Apr 4, 2026
6 checks passed
@yottahmd yottahmd deleted the reschedule-bugfix branch April 4, 2026 08:32
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 4, 2026

Codecov Report

❌ Patch coverage is 84.00000% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.10%. Comparing base (8c9106d) to head (1c86980).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
internal/cmd/start.go 0.00% 3 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1966      +/-   ##
==========================================
- Coverage   68.13%   68.10%   -0.03%     
==========================================
  Files         475      475              
  Lines       61389    61413      +24     
==========================================
- Hits        41829    41827       -2     
- Misses      15588    15612      +24     
- Partials     3972     3974       +2     
Files with missing lines Coverage Δ
internal/cmd/exec_spec.go 61.66% <100.00%> (+0.64%) ⬆️
internal/core/dag.go 88.65% <ø> (ø)
internal/core/spec/loader.go 76.68% <100.00%> (+0.10%) ⬆️
internal/persis/filedagrun/pagination.go 76.42% <100.00%> (+0.38%) ⬆️
internal/runtime/executor/dag_runner.go 78.76% <100.00%> (-1.37%) ⬇️
internal/runtime/executor/task.go 91.17% <100.00%> (+0.40%) ⬆️
internal/runtime/subcmd.go 89.49% <100.00%> (-1.49%) ⬇️
internal/service/coordinator/handler.go 66.62% <100.00%> (+0.04%) ⬆️
internal/service/scheduler/dag_executor.go 77.27% <100.00%> (+0.42%) ⬆️
internal/service/worker/handler.go 79.79% <100.00%> (+0.20%) ⬆️
... and 2 more

... and 13 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 8c9106d...1c86980. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant