Skip to content

fix(cmd): refactor start command not to prevent DAG from running unexpectedly#1497

Merged
yottahmd merged 10 commits intomainfrom
not-enqueue
Dec 21, 2025
Merged

fix(cmd): refactor start command not to prevent DAG from running unexpectedly#1497
yottahmd merged 10 commits intomainfrom
not-enqueue

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Dec 21, 2025

Summary by CodeRabbit

  • Behavior Changes

    • Removed explicit no-queue and disable-max-active-runs options; start/retry flows simplified to default local execution and streamlined retry semantics.
  • New Features

    • Singleton mode now returns clear HTTP 409 conflicts when a singleton DAG is already running or queued.
    • Added top-level run Error field and an API to record early failures for quicker failure visibility.
  • Bug Fixes

    • Improved process acquisition and restart/retry error handling; tests updated to reflect local execution behavior.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Dec 21, 2025

Walkthrough

This PR removes queue-related flags and enqueue branches, simplifies DAG execution to direct/local runs, inlines singleton checks in API handlers, adds Context.RecordEarlyFailure and tests, exposes a top-level DAGRunStatus Error field and status option, and updates many tests and subcommand builders accordingly.

Changes

Cohort / File(s) Summary
IDE debug
\.vscode/launch.json
Removed --no-queue from Start debug args.
CLI flags
internal/cmd/flags.go
Removed --no-queue and --disable-max-active-runs flag definitions.
Exec command
internal/cmd/exec.go, internal/cmd/exec_spec.go
Deleted queue/no-queue handling and enqueue branches; removed ExecOptions fields (Queue, NoQueue, Singleton) and execSpec queue/max fields; execution always proceeds inline.
Start / Retry / Restart handlers
internal/cmd/start.go, internal/cmd/retry.go, internal/cmd/restart.go
Removed queue/distribution and max-active-runs logic; simplified tryExecuteDAG signature and locking; restart now generates new run ID and uses executeDAGWithRunID; retry/start flows no longer enqueue.
Context & early failure
internal/cmd/context.go, internal/cmd/record_early_failure_test.go
Added Context.RecordEarlyFailure(dag, dagRunID, err) and comprehensive tests exercising early-failure recording and retry interplay.
Runtime subcommands
internal/runtime/subcmd.go, internal/runtime/subcmd_test.go, internal/runtime/subcmd_test.go
Removed StartOptions.NoQueue and Retry's disableMaxActiveRuns param; Start/Retry/TaskStart no longer emit --no-queue/--disable-max-active-runs; tests updated.
Executor / dag runner
internal/runtime/executor/dag_runner.go, internal/runtime/executor/dag_runner_test.go
Sub-DAG start no longer appends --no-queue/--disable-max-active-runs; tests updated to expect omission.
Scheduler / local executor
internal/service/scheduler/dag_executor.go
Local retry builds retry spec without boolean flag (Retry(dag, runID, "")); distributed path unchanged.
API v1 / v2 handlers
internal/service/frontend/api/v1/dags.go, internal/service/frontend/api/v2/dagruns.go, internal/service/frontend/api/v2/dags.go
Retry calls updated to new signature (no boolean); Execute/Enqueue handlers inline singleton checks and use ensureDAGRunIDUnique for uniqueness; removed helper-based capacity checks.
Singleton API tests
internal/service/frontend/api/v2/singleton_test.go
New tests asserting HTTP 409 on concurrent/queued singleton DAG runs.
Worker & handler tests
internal/service/worker/handler_test.go
Removed assertions for --no-queue/--disable-max-active-runs in command args.
Integration / E2E
internal/integration/distributed_e2e_test.go, internal/integration/distributed_test.go, internal/integration/gha_test.go
Tests adjusted for local execution semantics and Enqueue usage; GH Actions runner image changed to node:22-bookworm.
Status model & transform
internal/core/execution/runstatus.go, internal/runtime/transform/status.go
Added top-level Error string to DAGRunStatus, included it in Errors() output; added WithError(err string) status option.

Sequence Diagram(s)

sequenceDiagram
    participant Client as HTTP Client
    participant API as Frontend API (v2)
    participant Proc as ProcStore
    participant Scheduler as Scheduler
    participant Executor as Local Executor

    Client->>API: POST /dag_runs (Execute/Enqueue)
    API->>Proc: Check alive runs / queued runs (singleton)
    alt conflict found
        Proc-->>API: found active/queued
        API-->>Client: 409 Conflict
    else unique
        API->>Scheduler: build spec & request start/enqueue
        Scheduler->>Proc: acquire process handle
        alt acquisition fails
            Proc-->>Scheduler: error
            Scheduler->>API: RecordEarlyFailure(dag, runID, err)
            API-->>Client: error response
        else acquired
            Scheduler->>Executor: tryExecuteDAG (local execution)
            Executor-->>Scheduler: start/status updates
            Scheduler-->>Client: 202/200 with run id
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Areas needing close review:
    • internal/service/frontend/api/v2/dags.go / internal/service/frontend/api/v2/dagruns.go — correctness of inline singleton/uniqueness checks and HTTP 409 handling.
    • internal/cmd/context.go (RecordEarlyFailure) and its tests — status construction, log path handling, and concurrency semantics.
    • internal/cmd/start.go, retry.go, restart.go, exec.go — locking/unlocking changes, new run ID handling in restart, and removal of enqueue paths.
    • Integration/e2e test changes to ensure they reflect intended local vs enqueue behavior.

Possibly related PRs

Poem

🐰 I nudged the flags and woke the run,
Queues tucked in, local starts begun.
If starts stumble, I mark the tale —
An Error field, a log, a failed trail.
Hop—singleton guards keep the burrow well.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 54.17% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately reflects the main objective: refactoring the start command to remove queue-based execution that was preventing DAGs from running directly.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch not-enqueue

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@yottahmd yottahmd changed the title fix(cmd): refactor start command not to prevent DAG running fix(cmd): refactor start command not to prevent DAG from running unexpectedly Dec 21, 2025
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
internal/cmd/retry.go (1)

154-212: Remove the dead enqueueRetry function.

The enqueueRetry function at lines 154-212 is not called anywhere in the codebase and should be deleted. This function implemented queue-based retry logic that has been removed from the retry command execution paths.

internal/service/frontend/api/v2/dagruns.go (1)

43-71: Remove unused singleton parameter from startDAGRun chain or add enforcement logic.

The singleton variable is extracted at line 58 and passed to startDAGRun, but it's never used in startDAGRunWithOptions. Singleton enforcement happens at the handler level (in CreateDAGRun and RetriggerDAGRun), not within startDAGRun. Either remove the unused singleton parameter from the function signature and startDAGRunOptions struct, or add singleton enforcement logic within startDAGRunWithOptions if it should be enforced at that level.

🧹 Nitpick comments (3)
internal/cmd/exec_spec.go (1)

66-66: Consider eliminating the intermediate variable.

The queueValue variable is now a direct assignment from opts.Queue and is only used once at line 75. Consider inlining it directly.

🔎 Proposed simplification
-	queueValue := opts.Queue
-
 	specDoc := execSpec{
 		Name:           name,
 		Type:           core.TypeChain,
 		WorkingDir:     opts.WorkingDir,
 		Env:            opts.Env,
 		Dotenv:         opts.DotenvFiles,
 		MaxActiveRuns:  maxActiveRuns,
-		Queue:          queueValue,
+		Queue:          opts.Queue,
 		WorkerSelector: opts.WorkerLabels,
internal/cmd/restart.go (1)

103-113: Inconsistent error handling pattern.

Lines 105 and 113 handle errProcAcquisitionFailed differently:

  • Line 105: Returns the sentinel error directly
  • Line 113: Wraps it with additional context using fmt.Errorf

Consider using a consistent approach. If the additional context at line 113 is valuable, consider adding similar context at line 105.

🔎 Proposed consistency fix
 	if err := ctx.ProcStore.Lock(ctx, d.ProcGroup()); err != nil {
 		logger.Debug(ctx, "Failed to lock process group", tag.Error(err))
-		return errProcAcquisitionFailed
+		return fmt.Errorf("failed to lock process group: %w", errProcAcquisitionFailed)
 	}
internal/integration/distributed_test.go (1)

59-63: Consider removing dead code.

This block logs queue item data if queueItems is non-empty, but line 57 requires the queue to be empty. This code path is unreachable after the require.Len(t, queueItems, 0, ...) assertion.

🔎 Proposed fix
 		require.Len(t, queueItems, 0, "DAG should NOT be enqueued (dagu start runs locally)")
 
-		if len(queueItems) > 0 {
-			data, err := queueItems[0].Data()
-			require.NoError(t, err, "Should be able to get queue item data")
-			t.Logf("DAG enqueued: dag=%s runId=%s", data.Name, data.ID)
-		}
-
 		// Verify the DAG status is "succeeded" (executed locally)
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5d90d27 and 1607c53.

📒 Files selected for processing (20)
  • .vscode/launch.json (1 hunks)
  • internal/cmd/exec.go (1 hunks)
  • internal/cmd/exec_spec.go (2 hunks)
  • internal/cmd/flags.go (0 hunks)
  • internal/cmd/restart.go (1 hunks)
  • internal/cmd/retry.go (2 hunks)
  • internal/cmd/start.go (2 hunks)
  • internal/integration/distributed_e2e_test.go (4 hunks)
  • internal/integration/distributed_test.go (5 hunks)
  • internal/integration/gha_test.go (1 hunks)
  • internal/runtime/executor/dag_runner.go (0 hunks)
  • internal/runtime/executor/dag_runner_test.go (0 hunks)
  • internal/runtime/subcmd.go (5 hunks)
  • internal/runtime/subcmd_test.go (5 hunks)
  • internal/service/frontend/api/v1/dags.go (1 hunks)
  • internal/service/frontend/api/v2/dagruns.go (3 hunks)
  • internal/service/frontend/api/v2/dags.go (2 hunks)
  • internal/service/frontend/api/v2/singleton_test.go (1 hunks)
  • internal/service/scheduler/dag_executor.go (1 hunks)
  • internal/service/worker/handler_test.go (0 hunks)
💤 Files with no reviewable changes (4)
  • internal/service/worker/handler_test.go
  • internal/runtime/executor/dag_runner_test.go
  • internal/cmd/flags.go
  • internal/runtime/executor/dag_runner.go
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Backend entrypoint in cmd/ orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under internal/* (for example internal/runtime, internal/persistence)
Keep Go files gofmt/goimports clean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, and Err... names for package-level errors
Repository linting relies on golangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in internal/common

Files:

  • internal/service/scheduler/dag_executor.go
  • internal/cmd/restart.go
  • internal/service/frontend/api/v2/singleton_test.go
  • internal/cmd/start.go
  • internal/integration/gha_test.go
  • internal/cmd/exec.go
  • internal/runtime/subcmd_test.go
  • internal/cmd/exec_spec.go
  • internal/service/frontend/api/v2/dags.go
  • internal/service/frontend/api/v2/dagruns.go
  • internal/cmd/retry.go
  • internal/integration/distributed_test.go
  • internal/integration/distributed_e2e_test.go
  • internal/service/frontend/api/v1/dags.go
  • internal/runtime/subcmd.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Co-locate Go tests as *_test.go; favour table-driven cases and cover failure paths
Use stretchr/testify/require and shared fixtures from internal/test instead of duplicating mocks

Files:

  • internal/service/frontend/api/v2/singleton_test.go
  • internal/integration/gha_test.go
  • internal/runtime/subcmd_test.go
  • internal/integration/distributed_test.go
  • internal/integration/distributed_e2e_test.go
🧠 Learnings (1)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Backend entrypoint in `cmd/` orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under `internal/*` (for example `internal/runtime`, `internal/persistence`)

Applied to files:

  • internal/integration/distributed_e2e_test.go
🧬 Code graph analysis (10)
internal/service/scheduler/dag_executor.go (2)
internal/cmd/retry.go (1)
  • Retry (19-37)
internal/common/backoff/retry.go (1)
  • Retry (31-83)
internal/service/frontend/api/v2/singleton_test.go (3)
internal/test/server.go (1)
  • SetupServer (26-63)
api/v2/api.gen.go (3)
  • ExecuteDAG200JSONResponse (6172-6175)
  • EnqueueDAGDAGRunJSONRequestBody (1621-1621)
  • EnqueueDAGDAGRun200JSONResponse (6008-6011)
internal/core/status.go (1)
  • Running (8-8)
internal/runtime/subcmd_test.go (4)
internal/cmd/retry.go (1)
  • Retry (19-37)
internal/common/backoff/retry.go (1)
  • Retry (31-83)
internal/runtime/subcmd.go (1)
  • Run (247-269)
internal/runtime/data.go (1)
  • Parallel (69-73)
internal/service/frontend/api/v2/dags.go (2)
internal/persistence/fileproc/procgrp.go (1)
  • ProcGroup (20-27)
api/v2/api.gen.go (2)
  • Error (465-474)
  • ErrorCodeAlreadyExists (39-39)
internal/service/frontend/api/v2/dagruns.go (2)
internal/cmd/retry.go (1)
  • Retry (19-37)
internal/runtime/subcmd.go (1)
  • Start (272-298)
internal/cmd/retry.go (5)
internal/cmd/context.go (1)
  • Context (40-55)
internal/core/execution/context.go (1)
  • Context (16-27)
internal/core/execution/proc.go (1)
  • ProcStore (8-27)
internal/persistence/fileproc/procgrp.go (1)
  • ProcGroup (20-27)
internal/common/logger/tag/tag.go (1)
  • Error (20-22)
internal/integration/distributed_test.go (4)
internal/core/status.go (2)
  • Succeeded (11-11)
  • Failed (9-9)
internal/core/execution/queue.go (1)
  • QueueStore (16-35)
internal/runtime/subcmd.go (2)
  • Run (247-269)
  • Start (272-298)
internal/common/backoff/retry.go (1)
  • Retry (31-83)
internal/integration/distributed_e2e_test.go (3)
internal/runtime/subcmd.go (5)
  • NewSubCmdBuilder (27-33)
  • EnqueueOptions (233-239)
  • Start (272-298)
  • Run (247-269)
  • StartOptions (222-230)
internal/cmd/enqueue.go (1)
  • Enqueue (17-30)
internal/cmd/start.go (1)
  • Start (35-57)
internal/service/frontend/api/v1/dags.go (2)
internal/cmd/retry.go (1)
  • Retry (19-37)
internal/runtime/subcmd.go (1)
  • Start (272-298)
internal/runtime/subcmd.go (1)
internal/cmd/retry.go (1)
  • Retry (19-37)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test on ubuntu-latest
🔇 Additional comments (31)
.vscode/launch.json (1)

45-45: LGTM!

The removal of the --no-queue flag aligns with the PR's objective to simplify command-line arguments by removing queue-related flags.

internal/cmd/exec_spec.go (1)

126-136: LGTM!

The queue and singleton handling logic is clear and correctly sets dag.Queue only when a non-empty value is provided. The MaxActiveRuns logic correctly handles the singleton case.

internal/service/frontend/api/v2/dagruns.go (2)

630-644: LGTM!

The simplified Retry signature aligns with the broader PR changes removing queue-related parameters. Both paths (with and without StepName) now consistently use the three-parameter signature.


797-799: LGTM!

The validation change to ensureDAGRunIDUnique is consistent with the approach in ExecuteDAGRunFromSpec. The singleton enforcement is appropriately delegated to startDAGRunWithOptions at line 807.

internal/service/scheduler/dag_executor.go (1)

145-147: LGTM!

The simplified Retry call signature aligns with the PR's objective of removing queue-related parameters. The local retry path now uses the consistent three-parameter signature Retry(dag, runID, "").

internal/service/frontend/api/v1/dags.go (1)

599-611: LGTM!

The Retry invocations are correctly updated to use the simplified three-parameter signature, consistent with the changes across the codebase. The distinction between step-specific retry (with *request.Body.Step) and full DAG retry (with empty string) is properly maintained.

internal/cmd/retry.go (2)

39-44: LGTM!

The simplified flags and early parameter extraction improve code clarity. The comment at line 42 helpfully documents the purpose of the extraction.


78-90: Good resource management pattern.

The explicit unlock at line 81 before returning on error prevents leaving the process group locked. The overall pattern (lock → acquire → unlock on failure → defer stop → unlock before execute) correctly manages the lock lifecycle.

internal/runtime/subcmd.go (5)

36-69: LGTM - Start method simplified correctly.

The Start method no longer handles NoQueue option, aligning with the PR's goal to remove queue-related flags. The method cleanly constructs arguments for direct execution.


142-159: LGTM - Retry signature simplified.

The Retry method signature correctly removes the disableMaxActiveRuns parameter. The method now focuses on core retry functionality with --run-id and optional --step flags only.


162-189: LGTM - TaskStart cleaned up.

The TaskStart method no longer emits --no-queue or --disable-max-active-runs flags, consistent with the direct execution model.


191-210: LGTM - TaskRetry simplified.

The TaskRetry method correctly removes the --disable-max-active-runs flag and now relies solely on --run-id and optional --step for retry operations.


221-230: LGTM - StartOptions struct simplified.

The NoQueue field has been correctly removed from StartOptions, aligning with the removal of queue-bypass logic from the start command flow.

internal/service/frontend/api/v2/singleton_test.go (4)

15-17: Consider using t.Parallel() and subtests with unique resources.

Since all subtests share the same server instance created before the subtests run, this is fine. However, ensure tests don't interfere with each other if run in parallel in the future. The unique DAG names per test case (singleton_exec_dag, singleton_enq_run_dag, singleton_enq_q_dag) help prevent this.


18-56: Well-structured singleton execution conflict test.

The test correctly:

  1. Creates a DAG with a long-running step (sleep 10)
  2. Starts it with Singleton: true
  3. Waits for running state using require.Eventually
  4. Verifies second start returns 409 Conflict
  5. Cleans up by deleting the DAG

58-94: LGTM - Enqueue conflict while running test.

The test properly validates that enqueuing with Singleton: true while a DAG is already running returns 409 Conflict.


96-123: LGTM - Enqueue conflict while queued test.

The test correctly validates that enqueuing with Singleton: true while a DAG is already queued returns 409 Conflict.

internal/integration/distributed_test.go (3)

48-68: LGTM - Test correctly updated for local execution semantics.

The test now:

  1. Waits for DAG completion via DAGRunMgr.GetLatestStatus
  2. Verifies the DAG was NOT enqueued (confirming local execution)
  3. Asserts final status is Succeeded

This aligns with the PR's goal of making dagu start always execute locally.


71-105: LGTM - Local execution test correctly structured.

The test StartCommand_WorkerSelector_ShouldExecuteLocally properly verifies that dagu start executes locally even when workerSelector is present, with appropriate assertions on queue state and final status.


134-168: LGTM - Retry test updated for local execution.

The retry test correctly:

  1. Starts DAG locally (not enqueued)
  2. Waits for failure via status check
  3. Retries locally using simplified Retry signature (3 args, no disableMaxActiveRuns)
  4. Verifies no queue entries after retry
internal/cmd/exec.go (2)

27-37: LGTM - Exec flags simplified.

The execFlags array correctly no longer includes the removed noQueueFlag, aligning with the simplified execution model.


193-224: LGTM - Exec now uses direct execution path.

The ExecOptions construction no longer sets NoQueue, and execution now routes directly through tryExecuteDAG. This simplification aligns with the PR's objective to remove queue-bypass logic.

internal/cmd/start.go (3)

60-60: LGTM - Start flags simplified.

The startFlags array correctly removes queue-related flags (noQueueFlag, disableMaxActiveRunsFlag), leaving only the essential flags for direct execution.


175-176: LGTM - Start now uses direct execution.

The runStart function now routes directly to tryExecuteDAG, eliminating the previous conditional logic for enqueuing or queue bypass.


178-206: Verify the immediate unlock behavior is intentional.

The tryExecuteDAG function now unlocks the process group immediately after acquiring the handle (lines 201-203). This allows multiple instances of the same DAG to start concurrently.

Previously, the max-active-runs check would have prevented this. The comment states this is intentional: "to allow other instances of the same DAG to start."

Please confirm this behavioral change aligns with the desired semantics for the start command after this refactor.

internal/runtime/subcmd_test.go (2)

408-423: LGTM! TaskStart tests properly cleaned up.

The removal of --no-queue assertions aligns with the PR objective. The remaining assertions adequately verify command construction including run-id, config, and hierarchy flags.


354-364: LGTM! Retry API signature updated correctly.

The test correctly reflects the simplified Retry(dag, runID, stepName) signature after removing the disableMaxActiveRuns boolean parameter. The assertions remain comprehensive for verifying the command spec construction.

internal/service/frontend/api/v2/dags.go (2)

761-788: Singleton check covers both running and queued states — LGTM with same TOCTOU caveat.

The implementation correctly checks:

  1. Active processes via procStore.CountAliveByDAGName (line 764)
  2. Queued items via queueStore.ListByDAGName (line 777)

Error messages clearly distinguish between "already running" and "already in queue" scenarios. The same TOCTOU consideration from ExecuteDAG applies here.


610-612: LGTM! Correct ordering of validations.

The singleton check precedes the DAG run ID uniqueness check, which is the right order — broader constraints first, then finer-grained validations.

internal/integration/distributed_e2e_test.go (2)

53-61: LGTM! Test correctly demonstrates the enqueue path.

The test now explicitly uses subCmdBuilder.Enqueue() to demonstrate the distributed execution flow via the queue system. This accurately reflects the intended behavior where DAGs with workerSelector should be enqueued for scheduler dispatch rather than executed locally.


141-173: LGTM! Test clearly validates local execution semantics.

The renamed test E2E_StartCommand_WorkerSelector_ShouldExecuteLocally now accurately describes its purpose: verifying that dagu start always executes locally, even when workerSelector is defined. The assertion at line 172 correctly confirms the DAG is not enqueued.

This provides good documentation of the behavioral distinction between start (local) and enqueue (distributed) commands.

Comment thread internal/integration/gha_test.go
Comment thread internal/service/frontend/api/v2/dags.go
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
internal/cmd/restart.go (1)

108-122: Potential double unlock of process group.

The deferred Unlock at line 108 will execute after the explicit Unlock at line 122, resulting in two unlock calls for the same process group. Depending on the ProcStore implementation, this could be a no-op or cause issues.

Consider removing the defer at line 108 since you're explicitly unlocking at line 122, or guard the defer with a flag.

🔎 Proposed fix
 	if err := ctx.ProcStore.Lock(ctx, d.ProcGroup()); err != nil {
 		logger.Debug(ctx, "Failed to lock process group", tag.Error(err))
 		_ = ctx.RecordEarlyFailure(d, dagRunID, err)
 		return errProcAcquisitionFailed
 	}
-	defer ctx.ProcStore.Unlock(ctx, d.ProcGroup())

 	// Acquire process handle
 	proc, err := ctx.ProcStore.Acquire(ctx, d.ProcGroup(), execution.NewDAGRunRef(d.Name, dagRunID))
 	if err != nil {
 		logger.Debug(ctx, "Failed to acquire process handle", tag.Error(err))
 		_ = ctx.RecordEarlyFailure(d, dagRunID, err)
+		ctx.ProcStore.Unlock(ctx, d.ProcGroup())
 		return fmt.Errorf("failed to acquire process handle: %w", errProcAcquisitionFailed)
 	}
internal/cmd/exec.go (1)

181-185: Correct worker label validation to check for coordinator configuration, not queues.

Worker labels are functional for distributed execution through the coordinator service, which is independent of the queue system. The validation at lines 181-185 incorrectly requires Queues.Enabled, but should instead verify that a coordinator is configured. Users can enable queues without a coordinator and worker labels still won't function; conversely, users can use worker labels without queues if a coordinator is configured.

Update the check to validate coordinator availability rather than queue enablement, and correct the error message to reflect the actual requirement.

🧹 Nitpick comments (4)
internal/cmd/start.go (1)

191-196: Consider preserving the original error context.

The actual error from Acquire is logged but not included in the returned error. The caller only sees errProcAcquisitionFailed without knowing the root cause (e.g., resource exhaustion vs. permission issue).

🔎 Proposed improvement
 	proc, err := ctx.ProcStore.Acquire(ctx, dag.ProcGroup(), execution.NewDAGRunRef(dag.Name, dagRunID))
 	if err != nil {
 		ctx.ProcStore.Unlock(ctx, dag.ProcGroup())
 		logger.Debug(ctx, "Failed to acquire process handle", tag.Error(err))
 		_ = ctx.RecordEarlyFailure(dag, dagRunID, err)
-		return fmt.Errorf("failed to acquire process handle: %w", errProcAcquisitionFailed)
+		return fmt.Errorf("failed to acquire process handle (%v): %w", err, errProcAcquisitionFailed)
 	}
internal/cmd/exec.go (1)

61-68: Outdated function documentation.

The docstring still references "enqueueing" and "queues/worker labels" decision logic (lines 64-65), but this PR removes queue-based execution paths. The function now always executes directly via tryExecuteDAG.

🔎 Proposed fix
 // runExec parses flags and arguments and executes the provided command as an inline DAG run,
-// either enqueueing it for distributed execution or running it immediately in-process.
+// running it immediately in-process.
 // It validates inputs (run-id, working directory, base and dotenv files, env vars, worker labels,
-// singleton flags), builds the DAG for the inline command, and chooses between enqueueing
-// (when queues/worker labels require it or when max runs are reached) or direct execution.
+// singleton flags), builds the DAG for the inline command, and executes it directly.
 // ctx provides CLI and application context; args are the command and its arguments.
-// Returns an error for validation failures, when a dag-run with the same run-id already exists,
-// or if enqueueing/execution fails.
+// Returns an error for validation failures, when a dag-run with the same run-id already exists,
+// or if execution fails.
internal/cmd/context.go (1)

517-523: Consider logging if log path generation fails.

The error from GenLogFileName is silently ignored at line 518. While this is acceptable for a best-effort failure recording, logging a warning could help with debugging.

🔎 Proposed improvement
 	statusBuilder := transform.NewStatusBuilder(dag)
-	logPath, _ := c.GenLogFileName(dag, dagRunID)
+	logPath, logErr := c.GenLogFileName(dag, dagRunID)
+	if logErr != nil {
+		logger.Debug(c, "Failed to generate log path for early failure", tag.Error(logErr))
+	}
 	status := statusBuilder.Create(dagRunID, core.Failed, 0, time.Now(),
internal/cmd/retry.go (1)

78-81: Consider logging if RecordEarlyFailure fails.

The error from RecordEarlyFailure is silently ignored. If recording the failure itself fails, this could make debugging harder. Consider logging the error to aid troubleshooting.

🔎 Proposed enhancement
 	ctx.ProcStore.Unlock(ctx, dag.ProcGroup())
 	logger.Debug(ctx, "Failed to acquire process handle", tag.Error(err))
-	_ = ctx.RecordEarlyFailure(dag, dagRunID, err)
+	if recordErr := ctx.RecordEarlyFailure(dag, dagRunID, err); recordErr != nil {
+		logger.Debug(ctx, "Failed to record early failure", tag.Error(recordErr))
+	}
 	return fmt.Errorf("failed to acquire process handle: %w", errProcAcquisitionFailed)
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1607c53 and 6789951.

📒 Files selected for processing (9)
  • internal/cmd/context.go (3 hunks)
  • internal/cmd/exec.go (3 hunks)
  • internal/cmd/exec_spec.go (0 hunks)
  • internal/cmd/record_early_failure_test.go (1 hunks)
  • internal/cmd/restart.go (1 hunks)
  • internal/cmd/retry.go (2 hunks)
  • internal/cmd/start.go (2 hunks)
  • internal/core/execution/runstatus.go (2 hunks)
  • internal/runtime/transform/status.go (1 hunks)
💤 Files with no reviewable changes (1)
  • internal/cmd/exec_spec.go
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Backend entrypoint in cmd/ orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under internal/* (for example internal/runtime, internal/persistence)
Keep Go files gofmt/goimports clean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, and Err... names for package-level errors
Repository linting relies on golangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in internal/common

Files:

  • internal/core/execution/runstatus.go
  • internal/runtime/transform/status.go
  • internal/cmd/record_early_failure_test.go
  • internal/cmd/start.go
  • internal/cmd/retry.go
  • internal/cmd/restart.go
  • internal/cmd/exec.go
  • internal/cmd/context.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Co-locate Go tests as *_test.go; favour table-driven cases and cover failure paths
Use stretchr/testify/require and shared fixtures from internal/test instead of duplicating mocks

Files:

  • internal/cmd/record_early_failure_test.go
🧠 Learnings (1)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Backend entrypoint in `cmd/` orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under `internal/*` (for example `internal/runtime`, `internal/persistence`)

Applied to files:

  • internal/cmd/start.go
  • internal/cmd/retry.go
🧬 Code graph analysis (6)
internal/core/execution/runstatus.go (3)
api/v2/api.gen.go (1)
  • Error (465-474)
api/v1/api.gen.go (1)
  • Error (343-352)
internal/common/logger/context.go (1)
  • Errorf (75-77)
internal/runtime/transform/status.go (1)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (35-58)
internal/cmd/record_early_failure_test.go (2)
internal/core/execution/dagrun.go (1)
  • DAGRunStore (23-48)
internal/core/status.go (1)
  • Failed (9-9)
internal/cmd/retry.go (5)
internal/cmd/context.go (1)
  • Context (42-57)
internal/core/execution/proc.go (1)
  • ProcStore (8-27)
internal/persistence/fileproc/procgrp.go (1)
  • ProcGroup (20-27)
internal/common/logger/context.go (3)
  • Debug (35-37)
  • Error (50-52)
  • Errorf (75-77)
internal/common/logger/tag/tag.go (1)
  • Error (20-22)
internal/cmd/restart.go (1)
internal/core/execution/proc.go (1)
  • ProcStore (8-27)
internal/cmd/context.go (4)
internal/core/execution/context.go (1)
  • Context (16-27)
internal/core/dag.go (1)
  • DAG (33-135)
internal/core/execution/dagrun.go (3)
  • DAGRunStore (23-48)
  • ErrDAGRunIDNotFound (14-14)
  • NewDAGRunAttemptOptions (123-128)
internal/runtime/transform/status.go (4)
  • NewStatusBuilder (17-19)
  • WithLogFilePath (119-123)
  • WithFinishedAt (67-71)
  • WithError (126-130)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test on ubuntu-latest
🔇 Additional comments (12)
internal/cmd/start.go (1)

182-208: LGTM!

The simplified tryExecuteDAG function correctly handles the lock/unlock lifecycle:

  • Unlocks explicitly on acquire failure (line 193)
  • Unlocks after successful acquire to allow concurrent DAG starts (lines 203-205)
  • Process handle cleanup via defer (lines 198-200)

The removal of queue-related logic streamlines the execution path.

internal/core/execution/runstatus.go (2)

54-54: LGTM!

The new Error field enables capturing top-level DAG run errors (e.g., process acquisition failures) separately from per-node errors. The omitempty JSON tag maintains backward compatibility.


66-70: LGTM!

The Errors() method correctly prepends the top-level error to the error slice, ensuring it appears first in any error aggregation. This provides a clear hierarchy: DAG-level error → node errors → handler errors.

internal/runtime/transform/status.go (1)

125-130: LGTM!

The WithError option follows the established functional option pattern and integrates cleanly with the StatusBuilder. This enables setting the top-level error when constructing DAG run statuses.

internal/cmd/context.go (1)

493-498: LGTM!

The RecordEarlyFailure method provides a clean way to record infrastructure failures (like process acquisition errors) before the DAG agent starts. The input validation at lines 496-498 ensures required parameters are present.

internal/cmd/record_early_failure_test.go (5)

1-12: LGTM!

Package declaration and imports are appropriate for testing the RecordEarlyFailure functionality.


14-48: LGTM!

Test correctly validates that RecordEarlyFailure creates a new DAG run attempt with Failed status and preserves the error message.


50-91: LGTM!

Test correctly validates that RecordEarlyFailure updates an existing DAG run attempt's status to Failed and preserves the new error message.


93-105: LGTM!

Test correctly validates error handling when DAG is nil, ensuring required field validation.


107-125: LGTM!

Test correctly validates error handling when dag-run ID is empty, ensuring required field validation.

internal/cmd/retry.go (2)

36-36: LGTM!

Removal of queue-related flags simplifies the retry command interface and aligns with the PR objective to eliminate enqueue paths.


39-41: LGTM!

Early extraction of retry parameters improves code organization and readability.

Comment thread internal/cmd/context.go
Comment thread internal/cmd/restart.go
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
internal/cmd/restart.go (1)

103-116: Critical: Using old dag-run ID for new run's lock/acquire operations and failure recording.

This issue was previously flagged and remains unresolved. The dagRunID parameter (line 90) refers to the old run being restarted, but:

  1. Line 105: RecordEarlyFailure associates the lock failure with the old run, but this failure prevents the new run from starting
  2. Line 110: Acquire creates a process handle with execution.NewDAGRunRef(d.Name, dagRunID) using the old run ID, but the new run needs its own handle
  3. Line 114: RecordEarlyFailure again uses the old run ID for an acquire failure that belongs to the new run

The new dagRunID is only generated at line 129 inside executeDAG(), after these operations complete.

Fix: Generate the new dag-run ID before attempting to lock/acquire, then use that new ID consistently:

🔎 Suggested refactoring
 func handleRestartProcess(ctx *Context, d *core.DAG, dagRunID string) error {
 	// Stop if running
 	if err := stopDAGIfRunning(ctx, ctx.DAGRunMgr, d, dagRunID); err != nil {
 		return err
 	}
 
 	// Wait before restart if configured
 	if d.RestartWait > 0 {
 		logger.Info(ctx, "Waiting for restart", tag.Duration(d.RestartWait))
 		time.Sleep(d.RestartWait)
 	}
 
+	// Generate new dag-run ID for the restart attempt
+	newDagRunID, err := genRunID()
+	if err != nil {
+		return fmt.Errorf("failed to generate dag-run ID: %w", err)
+	}
+
 	// Execute the exact same DAG with the same parameters but a new dag-run ID
 	if err := ctx.ProcStore.Lock(ctx, d.ProcGroup()); err != nil {
 		logger.Debug(ctx, "Failed to lock process group", tag.Error(err))
-		_ = ctx.RecordEarlyFailure(d, dagRunID, err)
+		_ = ctx.RecordEarlyFailure(d, newDagRunID, err)
 		return errProcAcquisitionFailed
 	}
 
 	// Acquire process handle
-	proc, err := ctx.ProcStore.Acquire(ctx, d.ProcGroup(), execution.NewDAGRunRef(d.Name, dagRunID))
+	proc, err := ctx.ProcStore.Acquire(ctx, d.ProcGroup(), execution.NewDAGRunRef(d.Name, newDagRunID))
 	if err != nil {
 		ctx.ProcStore.Unlock(ctx, d.ProcGroup())
 		logger.Debug(ctx, "Failed to acquire process handle", tag.Error(err))
-		_ = ctx.RecordEarlyFailure(d, dagRunID, err)
+		_ = ctx.RecordEarlyFailure(d, newDagRunID, err)
 		return fmt.Errorf("failed to acquire process handle: %w", errProcAcquisitionFailed)
 	}
 	defer func() {
 		_ = proc.Stop(ctx)
 	}()
 
 	// Unlock the process group immediately after acquiring the handle
 	ctx.ProcStore.Unlock(ctx, d.ProcGroup())
 
-	return executeDAG(ctx, ctx.DAGRunMgr, d)
+	return executeDAGWithRunID(ctx, ctx.DAGRunMgr, d, newDagRunID)
 }

Then refactor executeDAG to accept the pre-generated run ID:

-func executeDAG(ctx *Context, cli runtime.Manager, dag *core.DAG) error {
-	dagRunID, err := genRunID()
-	if err != nil {
-		return fmt.Errorf("failed to generate dag-run ID: %w", err)
-	}
+func executeDAGWithRunID(ctx *Context, cli runtime.Manager, dag *core.DAG, dagRunID string) error {
 	logFile, err := ctx.OpenLogFile(dag, dagRunID)
 	...
 }

Based on learnings from past review: This issue was identified in a previous review and requires resolution before merging.

🧹 Nitpick comments (2)
internal/cmd/record_early_failure_test.go (2)

94-126: Consider refactoring validation tests to table-driven format.

Per the coding guidelines to "favour table-driven cases," these two validation tests could be combined into a single table-driven test since they follow the same pattern with different inputs.

🔎 Proposed refactor to table-driven test
-	t.Run("ReturnsErrorForNilDAG", func(t *testing.T) {
-		th := test.SetupCommand(t)
-
-		ctx := &cmd.Context{
-			Context:     th.Context,
-			Config:      th.Config,
-			DAGRunStore: th.DAGRunStore,
-		}
-
-		err := ctx.RecordEarlyFailure(nil, "some-run-id", errors.New("test error"))
-		require.Error(t, err)
-		require.Contains(t, err.Error(), "DAG and dag-run ID are required")
-	})
-
-	t.Run("ReturnsErrorForEmptyDAGRunID", func(t *testing.T) {
-		th := test.SetupCommand(t)
-
-		dag := th.DAG(t, `
-steps:
-  - name: step1
-    command: echo hello
-`)
-
-		ctx := &cmd.Context{
-			Context:     th.Context,
-			Config:      th.Config,
-			DAGRunStore: th.DAGRunStore,
-		}
-
-		err := ctx.RecordEarlyFailure(dag.DAG, "", errors.New("test error"))
-		require.Error(t, err)
-		require.Contains(t, err.Error(), "DAG and dag-run ID are required")
-	})
+	t.Run("ReturnsErrorForInvalidInputs", func(t *testing.T) {
+		th := test.SetupCommand(t)
+
+		dag := th.DAG(t, `
+steps:
+  - name: step1
+    command: echo hello
+`)
+
+		ctx := &cmd.Context{
+			Context:     th.Context,
+			Config:      th.Config,
+			DAGRunStore: th.DAGRunStore,
+		}
+
+		testCases := []struct {
+			name      string
+			dag       *core.DAG
+			dagRunID  string
+			expectErr string
+		}{
+			{
+				name:      "nil DAG",
+				dag:       nil,
+				dagRunID:  "some-run-id",
+				expectErr: "DAG and dag-run ID are required",
+			},
+			{
+				name:      "empty dagRunID",
+				dag:       dag.DAG,
+				dagRunID:  "",
+				expectErr: "DAG and dag-run ID are required",
+			},
+		}
+
+		for _, tc := range testCases {
+			t.Run(tc.name, func(t *testing.T) {
+				err := ctx.RecordEarlyFailure(tc.dag, tc.dagRunID, errors.New("test error"))
+				require.Error(t, err)
+				require.Contains(t, err.Error(), tc.expectErr)
+			})
+		}
+	})

Based on learnings, favour table-driven cases.


29-33: Optional: Extract Context creation to helper.

The Context initialization pattern appears 5 times across the test suite. While the duplication is minimal, extracting it to a helper function could improve maintainability.

💡 Example helper function
func newTestContext(th *test.CommandTestHelper) *cmd.Context {
	return &cmd.Context{
		Context:     th.Context,
		Config:      th.Config,
		DAGRunStore: th.DAGRunStore,
	}
}

Then use it as:

ctx := newTestContext(th)

Also applies to: 73-77, 97-101, 117-121, 141-145

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6789951 and 8a13a42.

📒 Files selected for processing (2)
  • internal/cmd/record_early_failure_test.go (1 hunks)
  • internal/cmd/restart.go (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Backend entrypoint in cmd/ orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under internal/* (for example internal/runtime, internal/persistence)
Keep Go files gofmt/goimports clean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, and Err... names for package-level errors
Repository linting relies on golangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in internal/common

Files:

  • internal/cmd/record_early_failure_test.go
  • internal/cmd/restart.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Co-locate Go tests as *_test.go; favour table-driven cases and cover failure paths
Use stretchr/testify/require and shared fixtures from internal/test instead of duplicating mocks

Files:

  • internal/cmd/record_early_failure_test.go
🧠 Learnings (1)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*_test.go : Co-locate Go tests as `*_test.go`; favour table-driven cases and cover failure paths

Applied to files:

  • internal/cmd/record_early_failure_test.go
🧬 Code graph analysis (2)
internal/cmd/record_early_failure_test.go (3)
internal/core/execution/dagrun.go (1)
  • DAGRunStore (23-48)
internal/core/status.go (2)
  • Failed (9-9)
  • Succeeded (11-11)
internal/test/command.go (1)
  • CmdTest (15-19)
internal/cmd/restart.go (2)
internal/core/execution/proc.go (1)
  • ProcStore (8-27)
internal/persistence/fileproc/procgrp.go (1)
  • ProcGroup (20-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test on ubuntu-latest
🔇 Additional comments (3)
internal/cmd/record_early_failure_test.go (3)

1-13: LGTM: Clean package structure and imports.

The external test package cmd_test is a good practice, and the imports are appropriate for the test scenarios.


128-181: Excellent comprehensive retry flow coverage.

This test thoroughly validates the complete lifecycle: early failure recording → status verification → DAG retrieval → retry → success. The use of require.Eventually with appropriate timeout (5s) and polling interval (100ms) properly handles the asynchronous retry operation.


15-182: Strong test coverage for RecordEarlyFailure functionality.

The test suite provides comprehensive coverage:

  • ✅ New DAG run failure recording
  • ✅ Existing attempt failure update
  • ✅ Validation error paths (nil DAG, empty run ID)
  • ✅ End-to-end retry flow after early failure

The tests properly use the shared test harness and testify/require assertions as specified in the coding guidelines.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
internal/cmd/restart.go (1)

196-214: Infinite loop lacks timeout and context cancellation check.

stopRunningDAG polls indefinitely with no timeout or ctx.Done() check. If the DAG fails to stop (e.g., stuck process or unresponsive socket), the restart command hangs forever and ignores cancellation signals.

🔎 Suggested fix with timeout and context check
 func stopRunningDAG(ctx context.Context, cli runtime.Manager, dag *core.DAG, dagRunID string) error {
 	const stopPollInterval = 100 * time.Millisecond
+	const stopTimeout = 5 * time.Minute
+	deadline := time.Now().Add(stopTimeout)
 	for {
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		default:
+		}
+		if time.Now().After(deadline) {
+			return fmt.Errorf("timed out waiting for DAG %s to stop", dag.Name)
+		}
 		dagStatus, err := cli.GetCurrentStatus(ctx, dag, dagRunID)
 		if err != nil {
 			return fmt.Errorf("failed to get current status: %w", err)
 		}
internal/cmd/exec.go (1)

194-199: Ignored error from FindAttempt may mask failures.

Line 196 discards the error from FindAttempt. If the store returns an error due to connectivity issues or corruption, the code proceeds as if no attempt exists, potentially allowing duplicate runs or masking underlying problems.

🔎 Suggested fix
-	attempt, _ := ctx.DAGRunStore.FindAttempt(ctx, dagRunRef)
-	if attempt != nil {
+	attempt, err := ctx.DAGRunStore.FindAttempt(ctx, dagRunRef)
+	if err != nil {
+		logger.Warn(ctx, "Failed to check existing dag-run", tag.Error(err))
+	}
+	if attempt != nil {
 		return fmt.Errorf("dag-run ID %s already exists for DAG %s", runID, dag.Name)
 	}
🧹 Nitpick comments (1)
internal/cmd/restart.go (1)

169-176: Consider flushing/closing log file before os.Exit(1).

When ctx.Quiet is true and execution fails, os.Exit(1) at line 171 bypasses the deferred logFile.Close() at line 141. While the OS reclaims the handle, any buffered log data may be lost.

🔎 Suggested fix
 	if err := agentInstance.Run(ctx); err != nil {
 		if ctx.Quiet {
+			_ = logFile.Close()
 			os.Exit(1)
 		} else {
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8a13a42 and 57c64f3.

📒 Files selected for processing (3)
  • internal/cmd/exec.go (3 hunks)
  • internal/cmd/exec_spec.go (1 hunks)
  • internal/cmd/restart.go (2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Backend entrypoint in cmd/ orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under internal/* (for example internal/runtime, internal/persistence)
Keep Go files gofmt/goimports clean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, and Err... names for package-level errors
Repository linting relies on golangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in internal/common

Files:

  • internal/cmd/exec.go
  • internal/cmd/exec_spec.go
  • internal/cmd/restart.go
🧬 Code graph analysis (1)
internal/cmd/restart.go (3)
internal/cmd/context.go (1)
  • Context (42-57)
internal/core/execution/proc.go (1)
  • ProcStore (8-27)
internal/runtime/manager.go (2)
  • Manager (32-36)
  • Manager (264-278)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test on ubuntu-latest
🔇 Additional comments (3)
internal/cmd/restart.go (1)

90-131: LGTM on the refactored restart process with new run ID handling.

The fix correctly addresses the previous review concern about recording early failures with the wrong run ID. The flow now:

  1. Generates newDagRunID upfront (lines 102-106)
  2. Uses newDagRunID for both RecordEarlyFailure calls and the Acquire call
  3. Properly unlocks the process group on failure paths (line 118)

The immediate unlock after acquire (line 128) followed by deferred proc.Stop (lines 123-125) is the correct pattern since the process handle is now held independently.

internal/cmd/exec.go (1)

59-64: LGTM on simplified execution flow.

The updated documentation accurately reflects the removal of queue-related logic, and the call to tryExecuteDAG aligns with the simplified direct execution path.

Also applies to: 207-207

internal/cmd/exec_spec.go (1)

116-117: Setting MaxActiveRuns = -1 is the correct semantic for inline exec DAGs. This value enables "unlimited instances" by disabling concurrency limits. Downstream enforcement logic correctly handles this through checks that only apply limits when maxActiveRuns > 0, so no integer underflow or invalid-value issues will occur.

@yottahmd yottahmd merged commit f3d4577 into main Dec 21, 2025
5 checks passed
@yottahmd yottahmd deleted the not-enqueue branch December 21, 2025 09:42
@codecov
Copy link
Copy Markdown

codecov Bot commented Dec 21, 2025

Codecov Report

❌ Patch coverage is 53.22581% with 29 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.09%. Comparing base (5d90d27) to head (57c64f3).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
internal/cmd/context.go 70.37% 4 Missing and 4 partials ⚠️
internal/cmd/restart.go 38.46% 6 Missing and 2 partials ⚠️
internal/cmd/start.go 28.57% 5 Missing ⚠️
internal/cmd/retry.go 0.00% 3 Missing ⚠️
internal/runtime/transform/status.go 0.00% 3 Missing ⚠️
internal/core/execution/runstatus.go 0.00% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1497      +/-   ##
==========================================
+ Coverage   59.91%   60.09%   +0.18%     
==========================================
  Files         196      196              
  Lines       22023    21925      -98     
==========================================
- Hits        13194    13175      -19     
+ Misses       7413     7351      -62     
+ Partials     1416     1399      -17     
Files with missing lines Coverage Δ
internal/cmd/exec.go 60.36% <100.00%> (+11.34%) ⬆️
internal/cmd/exec_spec.go 61.66% <100.00%> (+7.00%) ⬆️
internal/cmd/flags.go 100.00% <ø> (ø)
internal/runtime/executor/dag_runner.go 32.69% <ø> (-0.43%) ⬇️
internal/runtime/subcmd.go 78.01% <100.00%> (-0.61%) ⬇️
internal/service/scheduler/dag_executor.go 76.05% <100.00%> (ø)
internal/core/execution/runstatus.go 4.54% <0.00%> (-0.15%) ⬇️
internal/cmd/retry.go 71.95% <0.00%> (+22.33%) ⬆️
internal/runtime/transform/status.go 89.23% <0.00%> (-4.32%) ⬇️
internal/cmd/start.go 48.63% <28.57%> (+1.52%) ⬆️
... and 2 more

... and 4 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 5d90d27...57c64f3. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

yottahmd added a commit that referenced this pull request Dec 21, 2025
…pectedly (#1497)

* **Behavior Changes**
* Removed explicit no-queue and disable-max-active-runs options;
start/retry flows simplified to default local execution and streamlined
retry semantics.

* **New Features**
* Singleton mode now returns clear HTTP 409 conflicts when a singleton
DAG is already running or queued.
* Added top-level run Error field and an API to record early failures
for quicker failure visibility.

* **Bug Fixes**
* Improved process acquisition and restart/retry error handling; tests
updated to reflect local execution behavior.
@coderabbitai coderabbitai Bot mentioned this pull request Jan 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant