Skip to content

fix: route local dag-run stop correctly#1993

Merged
yottahmd merged 2 commits intomainfrom
fix/local-stop-routing
Apr 12, 2026
Merged

fix: route local dag-run stop correctly#1993
yottahmd merged 2 commits intomainfrom
fix/local-stop-routing

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Apr 12, 2026

Summary

  • determine whether a DAG run should stop locally or through the coordinator from the DAG definition and execution mode
  • keep coordinator cancellation for distributed DAGs and route local stop requests through the local DAG run manager

Why

A local DAG run could still have a non-empty WorkerID in its saved status. The stop endpoint used that field as the distributed/local switch, which made local stop requests attempt coordinator cancellation and fail with 500 error requesting cancel: no coordinators available.

Test plan

  • go test ./internal/service/frontend/api/v1 -count=1

Summary by CodeRabbit

Release Notes

  • Bug Fixes
    • Improved DAG run termination handling to correctly identify and route cancellation requests based on execution environment, ensuring proper termination behavior in both distributed and local deployment scenarios.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 12, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c4c85acf-3e46-4b70-a7f2-5d3315acabca

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

The TerminateDAGRun function was refactored to determine coordinator dispatch using core.ShouldDispatchToCoordinator(dag, a.coordinatorCli != nil, a.defaultExecMode) instead of checking savedStatus.WorkerID != "". The DAG-read operation was relocated from the inner local branch to the outer decision path, with cancellation routed through coordinatorCli.RequestCancel or a.dagRunMgr.Stop based on the dispatch decision.

Changes

Cohort / File(s) Summary
Termination Logic Refactor
internal/service/frontend/api/v1/dagruns.go
Updated TerminateDAGRun to use ShouldDispatchToCoordinator for deciding between distributed and local termination paths instead of WorkerID check; relocated DAG reading to outer control flow branch and adjusted cancellation routing accordingly.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly describes the main change: fixing the routing logic for local DAG-run termination, which is the core purpose of this fix.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/local-stop-routing

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
internal/service/frontend/api/v1/dagruns.go (1)

2207-2254: Routing logic change looks correct for the PR objective.

The switch from WorkerID != "" to ShouldDispatchToCoordinator(dag, ...) correctly determines the stop path based on DAG definition properties (ForceLocal, WorkerSelector) and execution mode rather than a potentially stale WorkerID in saved status.

However, lines 2222-2227 contain dead code:

if a.coordinatorCli == nil {
    return nil, &Error{
        HTTPStatus: http.StatusServiceUnavailable,
        Code:       api.ErrorCodeInternalError,
        Message:    "coordinator not configured for distributed DAG cancellation",
    }
}

Per the ShouldDispatchToCoordinator implementation (context snippet 1, lines 11-13), the function returns false when !hasCoordinator. Since you pass a.coordinatorCli != nil as hasCoordinator, if ShouldDispatchToCoordinator returns true, then a.coordinatorCli is guaranteed non-nil. This nil check can never be true.

🔧 Optional: Remove dead code
 		if core.ShouldDispatchToCoordinator(dag, a.coordinatorCli != nil, a.defaultExecMode) {
 			// For distributed DAGs, use saved status for running check
 			if savedStatus.Status != core.Running {
 				return nil, &Error{
 					HTTPStatus: http.StatusBadRequest,
 					Code:       api.ErrorCodeNotRunning,
 					Message:    "DAG is not running",
 				}
 			}
-			// Send cancel request via coordinator
-			if a.coordinatorCli == nil {
-				return nil, &Error{
-					HTTPStatus: http.StatusServiceUnavailable,
-					Code:       api.ErrorCodeInternalError,
-					Message:    "coordinator not configured for distributed DAG cancellation",
-				}
-			}
 			if err := a.coordinatorCli.RequestCancel(ctx, request.Name, request.DagRunId, nil); err != nil {
 				return nil, fmt.Errorf("error requesting cancel: %w", err)
 			}
 		} else {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/api/v1/dagruns.go` around lines 2207 - 2254, The
nil-check block that returns ServiceUnavailable is dead code because
ShouldDispatchToCoordinator(dag, a.coordinatorCli != nil, a.defaultExecMode)
guarantees a.coordinatorCli is non-nil when it returns true; remove the if
a.coordinatorCli == nil { ... } block (the ServiceUnavailable Error return) from
the coordinator branch so the code proceeds directly to calling
a.coordinatorCli.RequestCancel(ctx, request.Name, request.DagRunId, nil) (keep
RequestCancel error handling as-is).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@internal/service/frontend/api/v1/dagruns.go`:
- Around line 2207-2254: The nil-check block that returns ServiceUnavailable is
dead code because ShouldDispatchToCoordinator(dag, a.coordinatorCli != nil,
a.defaultExecMode) guarantees a.coordinatorCli is non-nil when it returns true;
remove the if a.coordinatorCli == nil { ... } block (the ServiceUnavailable
Error return) from the coordinator branch so the code proceeds directly to
calling a.coordinatorCli.RequestCancel(ctx, request.Name, request.DagRunId, nil)
(keep RequestCancel error handling as-is).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ebe14d74-56a7-45c1-ae2a-c48113614a87

📥 Commits

Reviewing files that changed from the base of the PR and between ddcabd5 and b721496.

📒 Files selected for processing (1)
  • internal/service/frontend/api/v1/dagruns.go

@yottahmd yottahmd merged commit d48a0b7 into main Apr 12, 2026
3 checks passed
@yottahmd yottahmd deleted the fix/local-stop-routing branch April 12, 2026 02:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant