Skip to content

Fix worker selector for sub-DAG calls#2029

Merged
yottahmd merged 3 commits intomainfrom
fix-call-step-worker-selector
Apr 22, 2026
Merged

Fix worker selector for sub-DAG calls#2029
yottahmd merged 3 commits intomainfrom
fix-call-step-worker-selector

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Apr 22, 2026

Summary

  • pass step-level worker_selector through sub-DAG and parallel sub-DAG execution
  • use the effective per-invocation selector for sub-DAG dispatch decisions and coordinator task construction
  • add a distributed integration regression test for a call step whose child DAG has no DAG-level selector

Root Cause

worker_selector on call steps was parsed and validated, but SubDAGExecutor only consulted the child DAG's own WorkerSelector. When the selector existed only on the parent step, the child DAG was treated as local and the coordinator task was never created with the requested worker labels.

Testing

  • go test ./internal/intg/distr -run 'TestSubDAG_CallStepWorkerSelector/immediateParentDispatchesChildUsingCallStepSelector' -count=1 -v
  • go test ./internal/intg/distr -run 'TestSubDAG' -count=1
  • go test ./internal/runtime/executor ./internal/runtime/builtin/dag -count=1
  • go test ./internal/intg/distr -count=1

Closes #1638

Summary by CodeRabbit

Release Notes

  • New Features

    • Parent DAG worker selection settings now propagate to child DAGs (sub-DAGs) when executed, providing improved control over distributed task execution routing.
    • Sub-DAGs called from parent DAGs inherit and respect the parent's worker selector constraints.
  • Tests

    • Added integration test validating worker selection propagation for sub-DAG execution.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 22, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6b5ec9cb-b82c-4064-b51e-6decc4c86ef0

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR implements worker selector propagation from parent DAGs to sub-DAGs during execution. A new SetWorkerSelector method on SubDAGExecutor allows runtime override of the child DAG's worker selector. The executor instantiation logic in both standard and parallel DAG builders now passes the parent step's worker selector to children. A new integration test validates that sub-DAGs inherit the parent's worker selection configuration.

Changes

Cohort / File(s) Summary
Worker Selector Propagation
internal/runtime/builtin/dag/dag.go, internal/runtime/builtin/dag/parallel.go
Both newDAGExecutor and newChildExecutor now call child.SetWorkerSelector(step.WorkerSelector) to propagate the parent step's worker selector to sub-DAG executors.
SubDAGExecutor Enhancement
internal/runtime/executor/dag_runner.go
Added public method SetWorkerSelector(selector map[string]string) and updated dispatch logic; shouldDispatchToCoordinator now considers the effective worker selector (override or DAG-defined) when deciding to dispatch to coordinator, replacing the prior static check.
Integration Test
internal/intg/distr/subdag_test.go
New test TestSubDAG_CallStepWorkerSelector validates that a parent DAG dispatches a sub-DAG with the specified worker selector (serverA) and verifies execution occurs on the selected worker.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely summarizes the main change—fixing worker selector propagation for sub-DAG calls.
Linked Issues check ✅ Passed Changes fully address issue #1638 by propagating step-level worker_selector through sub-DAG execution paths and fixing dispatch logic to respect the effective selector.
Out of Scope Changes check ✅ Passed All changes are directly related to fixing worker selector propagation in sub-DAG calls and adding supporting integration tests.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix-call-step-worker-selector

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/runtime/executor/dag_runner.go (1)

439-444: ⚠️ Potential issue | 🟠 Major

Track distributed retry runs before waiting.

Kill only requests coordinator cancellation for IDs in distributedRuns. The distributed Execute path records the run before dispatch, but Retry does not, so canceling a parent while a distributed step retry is running can leave the coordinator task uncanceled and wait until timeout.

Proposed fix
 	if e.shouldDispatchToCoordinator(rCtx.DefaultExecMode) {
 		logger.Info(ctx, "Retrying sub DAG via distributed execution", tag.Step(stepName))
+		e.mu.Lock()
+		e.distributedRuns[runParams.RunID] = true
+		e.mu.Unlock()
 		if err := e.dispatchRetryToCoordinator(ctx, runParams, stepName); err != nil {
 			return nil, fmt.Errorf("distributed step retry failed: %w", err)
 		}
 		return e.waitCompletion(ctx, runParams.RunID)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/runtime/executor/dag_runner.go` around lines 439 - 444, The retry
path doesn't register the distributed run ID before dispatching, so the Kill
logic (which cancels IDs in distributedRuns) can miss it; modify the retry flow
in the block guarded by e.shouldDispatchToCoordinator to record the run into the
same distributed run tracking used by the Execute path (the distributedRuns
registry/collection on the executor) prior to calling
e.dispatchRetryToCoordinator(ctx, runParams, stepName), and ensure any cleanup
or removal remains paired (e.g., after waitCompletion or on error) so the
coordinator cancellation logic sees the retry run ID.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@internal/runtime/executor/dag_runner.go`:
- Around line 439-444: The retry path doesn't register the distributed run ID
before dispatching, so the Kill logic (which cancels IDs in distributedRuns) can
miss it; modify the retry flow in the block guarded by
e.shouldDispatchToCoordinator to record the run into the same distributed run
tracking used by the Execute path (the distributedRuns registry/collection on
the executor) prior to calling e.dispatchRetryToCoordinator(ctx, runParams,
stepName), and ensure any cleanup or removal remains paired (e.g., after
waitCompletion or on error) so the coordinator cancellation logic sees the retry
run ID.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 26d170d0-16f0-41ab-a2c0-babdc226ed30

📥 Commits

Reviewing files that changed from the base of the PR and between 852cecb and 66d6e0d.

📒 Files selected for processing (4)
  • internal/intg/distr/subdag_test.go
  • internal/runtime/builtin/dag/dag.go
  • internal/runtime/builtin/dag/parallel.go
  • internal/runtime/executor/dag_runner.go

@yottahmd yottahmd merged commit 67c4663 into main Apr 22, 2026
10 checks passed
@yottahmd yottahmd deleted the fix-call-step-worker-selector branch April 22, 2026 12:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

immediate execution does not respect workerSelector

1 participant