fix(cmd): refactor start command not to prevent DAG from running unexpectedly#1497
fix(cmd): refactor start command not to prevent DAG from running unexpectedly#1497
Conversation
WalkthroughThis PR removes queue-related flags and enqueue branches, simplifies DAG execution to direct/local runs, inlines singleton checks in API handlers, adds Context.RecordEarlyFailure and tests, exposes a top-level DAGRunStatus Error field and status option, and updates many tests and subcommand builders accordingly. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as HTTP Client
participant API as Frontend API (v2)
participant Proc as ProcStore
participant Scheduler as Scheduler
participant Executor as Local Executor
Client->>API: POST /dag_runs (Execute/Enqueue)
API->>Proc: Check alive runs / queued runs (singleton)
alt conflict found
Proc-->>API: found active/queued
API-->>Client: 409 Conflict
else unique
API->>Scheduler: build spec & request start/enqueue
Scheduler->>Proc: acquire process handle
alt acquisition fails
Proc-->>Scheduler: error
Scheduler->>API: RecordEarlyFailure(dag, runID, err)
API-->>Client: error response
else acquired
Scheduler->>Executor: tryExecuteDAG (local execution)
Executor-->>Scheduler: start/status updates
Scheduler-->>Client: 202/200 with run id
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
internal/cmd/retry.go (1)
154-212: Remove the deadenqueueRetryfunction.The
enqueueRetryfunction at lines 154-212 is not called anywhere in the codebase and should be deleted. This function implemented queue-based retry logic that has been removed from the retry command execution paths.internal/service/frontend/api/v2/dagruns.go (1)
43-71: Remove unused singleton parameter from startDAGRun chain or add enforcement logic.The
singletonvariable is extracted at line 58 and passed tostartDAGRun, but it's never used instartDAGRunWithOptions. Singleton enforcement happens at the handler level (inCreateDAGRunandRetriggerDAGRun), not withinstartDAGRun. Either remove the unusedsingletonparameter from the function signature andstartDAGRunOptionsstruct, or add singleton enforcement logic withinstartDAGRunWithOptionsif it should be enforced at that level.
🧹 Nitpick comments (3)
internal/cmd/exec_spec.go (1)
66-66: Consider eliminating the intermediate variable.The
queueValuevariable is now a direct assignment fromopts.Queueand is only used once at line 75. Consider inlining it directly.🔎 Proposed simplification
- queueValue := opts.Queue - specDoc := execSpec{ Name: name, Type: core.TypeChain, WorkingDir: opts.WorkingDir, Env: opts.Env, Dotenv: opts.DotenvFiles, MaxActiveRuns: maxActiveRuns, - Queue: queueValue, + Queue: opts.Queue, WorkerSelector: opts.WorkerLabels,internal/cmd/restart.go (1)
103-113: Inconsistent error handling pattern.Lines 105 and 113 handle
errProcAcquisitionFaileddifferently:
- Line 105: Returns the sentinel error directly
- Line 113: Wraps it with additional context using
fmt.ErrorfConsider using a consistent approach. If the additional context at line 113 is valuable, consider adding similar context at line 105.
🔎 Proposed consistency fix
if err := ctx.ProcStore.Lock(ctx, d.ProcGroup()); err != nil { logger.Debug(ctx, "Failed to lock process group", tag.Error(err)) - return errProcAcquisitionFailed + return fmt.Errorf("failed to lock process group: %w", errProcAcquisitionFailed) }internal/integration/distributed_test.go (1)
59-63: Consider removing dead code.This block logs queue item data if
queueItemsis non-empty, but line 57 requires the queue to be empty. This code path is unreachable after therequire.Len(t, queueItems, 0, ...)assertion.🔎 Proposed fix
require.Len(t, queueItems, 0, "DAG should NOT be enqueued (dagu start runs locally)") - if len(queueItems) > 0 { - data, err := queueItems[0].Data() - require.NoError(t, err, "Should be able to get queue item data") - t.Logf("DAG enqueued: dag=%s runId=%s", data.Name, data.ID) - } - // Verify the DAG status is "succeeded" (executed locally)
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (20)
.vscode/launch.json(1 hunks)internal/cmd/exec.go(1 hunks)internal/cmd/exec_spec.go(2 hunks)internal/cmd/flags.go(0 hunks)internal/cmd/restart.go(1 hunks)internal/cmd/retry.go(2 hunks)internal/cmd/start.go(2 hunks)internal/integration/distributed_e2e_test.go(4 hunks)internal/integration/distributed_test.go(5 hunks)internal/integration/gha_test.go(1 hunks)internal/runtime/executor/dag_runner.go(0 hunks)internal/runtime/executor/dag_runner_test.go(0 hunks)internal/runtime/subcmd.go(5 hunks)internal/runtime/subcmd_test.go(5 hunks)internal/service/frontend/api/v1/dags.go(1 hunks)internal/service/frontend/api/v2/dagruns.go(3 hunks)internal/service/frontend/api/v2/dags.go(2 hunks)internal/service/frontend/api/v2/singleton_test.go(1 hunks)internal/service/scheduler/dag_executor.go(1 hunks)internal/service/worker/handler_test.go(0 hunks)
💤 Files with no reviewable changes (4)
- internal/service/worker/handler_test.go
- internal/runtime/executor/dag_runner_test.go
- internal/cmd/flags.go
- internal/runtime/executor/dag_runner.go
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Backend entrypoint incmd/orchestrates the scheduler and CLI; runtime, persistence, and service layers sit underinternal/*(for exampleinternal/runtime,internal/persistence)
Keep Go filesgofmt/goimportsclean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, andErr...names for package-level errors
Repository linting relies ongolangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers ininternal/common
Files:
internal/service/scheduler/dag_executor.gointernal/cmd/restart.gointernal/service/frontend/api/v2/singleton_test.gointernal/cmd/start.gointernal/integration/gha_test.gointernal/cmd/exec.gointernal/runtime/subcmd_test.gointernal/cmd/exec_spec.gointernal/service/frontend/api/v2/dags.gointernal/service/frontend/api/v2/dagruns.gointernal/cmd/retry.gointernal/integration/distributed_test.gointernal/integration/distributed_e2e_test.gointernal/service/frontend/api/v1/dags.gointernal/runtime/subcmd.go
**/*_test.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*_test.go: Co-locate Go tests as*_test.go; favour table-driven cases and cover failure paths
Usestretchr/testify/requireand shared fixtures frominternal/testinstead of duplicating mocks
Files:
internal/service/frontend/api/v2/singleton_test.gointernal/integration/gha_test.gointernal/runtime/subcmd_test.gointernal/integration/distributed_test.gointernal/integration/distributed_e2e_test.go
🧠 Learnings (1)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Backend entrypoint in `cmd/` orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under `internal/*` (for example `internal/runtime`, `internal/persistence`)
Applied to files:
internal/integration/distributed_e2e_test.go
🧬 Code graph analysis (10)
internal/service/scheduler/dag_executor.go (2)
internal/cmd/retry.go (1)
Retry(19-37)internal/common/backoff/retry.go (1)
Retry(31-83)
internal/service/frontend/api/v2/singleton_test.go (3)
internal/test/server.go (1)
SetupServer(26-63)api/v2/api.gen.go (3)
ExecuteDAG200JSONResponse(6172-6175)EnqueueDAGDAGRunJSONRequestBody(1621-1621)EnqueueDAGDAGRun200JSONResponse(6008-6011)internal/core/status.go (1)
Running(8-8)
internal/runtime/subcmd_test.go (4)
internal/cmd/retry.go (1)
Retry(19-37)internal/common/backoff/retry.go (1)
Retry(31-83)internal/runtime/subcmd.go (1)
Run(247-269)internal/runtime/data.go (1)
Parallel(69-73)
internal/service/frontend/api/v2/dags.go (2)
internal/persistence/fileproc/procgrp.go (1)
ProcGroup(20-27)api/v2/api.gen.go (2)
Error(465-474)ErrorCodeAlreadyExists(39-39)
internal/service/frontend/api/v2/dagruns.go (2)
internal/cmd/retry.go (1)
Retry(19-37)internal/runtime/subcmd.go (1)
Start(272-298)
internal/cmd/retry.go (5)
internal/cmd/context.go (1)
Context(40-55)internal/core/execution/context.go (1)
Context(16-27)internal/core/execution/proc.go (1)
ProcStore(8-27)internal/persistence/fileproc/procgrp.go (1)
ProcGroup(20-27)internal/common/logger/tag/tag.go (1)
Error(20-22)
internal/integration/distributed_test.go (4)
internal/core/status.go (2)
Succeeded(11-11)Failed(9-9)internal/core/execution/queue.go (1)
QueueStore(16-35)internal/runtime/subcmd.go (2)
Run(247-269)Start(272-298)internal/common/backoff/retry.go (1)
Retry(31-83)
internal/integration/distributed_e2e_test.go (3)
internal/runtime/subcmd.go (5)
NewSubCmdBuilder(27-33)EnqueueOptions(233-239)Start(272-298)Run(247-269)StartOptions(222-230)internal/cmd/enqueue.go (1)
Enqueue(17-30)internal/cmd/start.go (1)
Start(35-57)
internal/service/frontend/api/v1/dags.go (2)
internal/cmd/retry.go (1)
Retry(19-37)internal/runtime/subcmd.go (1)
Start(272-298)
internal/runtime/subcmd.go (1)
internal/cmd/retry.go (1)
Retry(19-37)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test on ubuntu-latest
🔇 Additional comments (31)
.vscode/launch.json (1)
45-45: LGTM!The removal of the
--no-queueflag aligns with the PR's objective to simplify command-line arguments by removing queue-related flags.internal/cmd/exec_spec.go (1)
126-136: LGTM!The queue and singleton handling logic is clear and correctly sets
dag.Queueonly when a non-empty value is provided. TheMaxActiveRunslogic correctly handles the singleton case.internal/service/frontend/api/v2/dagruns.go (2)
630-644: LGTM!The simplified
Retrysignature aligns with the broader PR changes removing queue-related parameters. Both paths (with and withoutStepName) now consistently use the three-parameter signature.
797-799: LGTM!The validation change to
ensureDAGRunIDUniqueis consistent with the approach inExecuteDAGRunFromSpec. The singleton enforcement is appropriately delegated tostartDAGRunWithOptionsat line 807.internal/service/scheduler/dag_executor.go (1)
145-147: LGTM!The simplified
Retrycall signature aligns with the PR's objective of removing queue-related parameters. The local retry path now uses the consistent three-parameter signatureRetry(dag, runID, "").internal/service/frontend/api/v1/dags.go (1)
599-611: LGTM!The
Retryinvocations are correctly updated to use the simplified three-parameter signature, consistent with the changes across the codebase. The distinction between step-specific retry (with*request.Body.Step) and full DAG retry (with empty string) is properly maintained.internal/cmd/retry.go (2)
39-44: LGTM!The simplified flags and early parameter extraction improve code clarity. The comment at line 42 helpfully documents the purpose of the extraction.
78-90: Good resource management pattern.The explicit unlock at line 81 before returning on error prevents leaving the process group locked. The overall pattern (lock → acquire → unlock on failure → defer stop → unlock before execute) correctly manages the lock lifecycle.
internal/runtime/subcmd.go (5)
36-69: LGTM - Start method simplified correctly.The
Startmethod no longer handlesNoQueueoption, aligning with the PR's goal to remove queue-related flags. The method cleanly constructs arguments for direct execution.
142-159: LGTM - Retry signature simplified.The
Retrymethod signature correctly removes thedisableMaxActiveRunsparameter. The method now focuses on core retry functionality with--run-idand optional--stepflags only.
162-189: LGTM - TaskStart cleaned up.The
TaskStartmethod no longer emits--no-queueor--disable-max-active-runsflags, consistent with the direct execution model.
191-210: LGTM - TaskRetry simplified.The
TaskRetrymethod correctly removes the--disable-max-active-runsflag and now relies solely on--run-idand optional--stepfor retry operations.
221-230: LGTM - StartOptions struct simplified.The
NoQueuefield has been correctly removed fromStartOptions, aligning with the removal of queue-bypass logic from the start command flow.internal/service/frontend/api/v2/singleton_test.go (4)
15-17: Consider usingt.Parallel()and subtests with unique resources.Since all subtests share the same
serverinstance created before the subtests run, this is fine. However, ensure tests don't interfere with each other if run in parallel in the future. The unique DAG names per test case (singleton_exec_dag,singleton_enq_run_dag,singleton_enq_q_dag) help prevent this.
18-56: Well-structured singleton execution conflict test.The test correctly:
- Creates a DAG with a long-running step (
sleep 10)- Starts it with
Singleton: true- Waits for running state using
require.Eventually- Verifies second start returns 409 Conflict
- Cleans up by deleting the DAG
58-94: LGTM - Enqueue conflict while running test.The test properly validates that enqueuing with
Singleton: truewhile a DAG is already running returns 409 Conflict.
96-123: LGTM - Enqueue conflict while queued test.The test correctly validates that enqueuing with
Singleton: truewhile a DAG is already queued returns 409 Conflict.internal/integration/distributed_test.go (3)
48-68: LGTM - Test correctly updated for local execution semantics.The test now:
- Waits for DAG completion via
DAGRunMgr.GetLatestStatus- Verifies the DAG was NOT enqueued (confirming local execution)
- Asserts final status is
SucceededThis aligns with the PR's goal of making
dagu startalways execute locally.
71-105: LGTM - Local execution test correctly structured.The test
StartCommand_WorkerSelector_ShouldExecuteLocallyproperly verifies thatdagu startexecutes locally even whenworkerSelectoris present, with appropriate assertions on queue state and final status.
134-168: LGTM - Retry test updated for local execution.The retry test correctly:
- Starts DAG locally (not enqueued)
- Waits for failure via status check
- Retries locally using simplified
Retrysignature (3 args, nodisableMaxActiveRuns)- Verifies no queue entries after retry
internal/cmd/exec.go (2)
27-37: LGTM - Exec flags simplified.The
execFlagsarray correctly no longer includes the removednoQueueFlag, aligning with the simplified execution model.
193-224: LGTM - Exec now uses direct execution path.The
ExecOptionsconstruction no longer setsNoQueue, and execution now routes directly throughtryExecuteDAG. This simplification aligns with the PR's objective to remove queue-bypass logic.internal/cmd/start.go (3)
60-60: LGTM - Start flags simplified.The
startFlagsarray correctly removes queue-related flags (noQueueFlag,disableMaxActiveRunsFlag), leaving only the essential flags for direct execution.
175-176: LGTM - Start now uses direct execution.The
runStartfunction now routes directly totryExecuteDAG, eliminating the previous conditional logic for enqueuing or queue bypass.
178-206: Verify the immediate unlock behavior is intentional.The
tryExecuteDAGfunction now unlocks the process group immediately after acquiring the handle (lines 201-203). This allows multiple instances of the same DAG to start concurrently.Previously, the max-active-runs check would have prevented this. The comment states this is intentional: "to allow other instances of the same DAG to start."
Please confirm this behavioral change aligns with the desired semantics for the
startcommand after this refactor.internal/runtime/subcmd_test.go (2)
408-423: LGTM! TaskStart tests properly cleaned up.The removal of
--no-queueassertions aligns with the PR objective. The remaining assertions adequately verify command construction including run-id, config, and hierarchy flags.
354-364: LGTM! Retry API signature updated correctly.The test correctly reflects the simplified
Retry(dag, runID, stepName)signature after removing thedisableMaxActiveRunsboolean parameter. The assertions remain comprehensive for verifying the command spec construction.internal/service/frontend/api/v2/dags.go (2)
761-788: Singleton check covers both running and queued states — LGTM with same TOCTOU caveat.The implementation correctly checks:
- Active processes via
procStore.CountAliveByDAGName(line 764)- Queued items via
queueStore.ListByDAGName(line 777)Error messages clearly distinguish between "already running" and "already in queue" scenarios. The same TOCTOU consideration from
ExecuteDAGapplies here.
610-612: LGTM! Correct ordering of validations.The singleton check precedes the DAG run ID uniqueness check, which is the right order — broader constraints first, then finer-grained validations.
internal/integration/distributed_e2e_test.go (2)
53-61: LGTM! Test correctly demonstrates the enqueue path.The test now explicitly uses
subCmdBuilder.Enqueue()to demonstrate the distributed execution flow via the queue system. This accurately reflects the intended behavior where DAGs withworkerSelectorshould be enqueued for scheduler dispatch rather than executed locally.
141-173: LGTM! Test clearly validates local execution semantics.The renamed test
E2E_StartCommand_WorkerSelector_ShouldExecuteLocallynow accurately describes its purpose: verifying thatdagu startalways executes locally, even whenworkerSelectoris defined. The assertion at line 172 correctly confirms the DAG is not enqueued.This provides good documentation of the behavioral distinction between
start(local) andenqueue(distributed) commands.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
internal/cmd/restart.go (1)
108-122: Potential double unlock of process group.The deferred
Unlockat line 108 will execute after the explicitUnlockat line 122, resulting in two unlock calls for the same process group. Depending on theProcStoreimplementation, this could be a no-op or cause issues.Consider removing the defer at line 108 since you're explicitly unlocking at line 122, or guard the defer with a flag.
🔎 Proposed fix
if err := ctx.ProcStore.Lock(ctx, d.ProcGroup()); err != nil { logger.Debug(ctx, "Failed to lock process group", tag.Error(err)) _ = ctx.RecordEarlyFailure(d, dagRunID, err) return errProcAcquisitionFailed } - defer ctx.ProcStore.Unlock(ctx, d.ProcGroup()) // Acquire process handle proc, err := ctx.ProcStore.Acquire(ctx, d.ProcGroup(), execution.NewDAGRunRef(d.Name, dagRunID)) if err != nil { logger.Debug(ctx, "Failed to acquire process handle", tag.Error(err)) _ = ctx.RecordEarlyFailure(d, dagRunID, err) + ctx.ProcStore.Unlock(ctx, d.ProcGroup()) return fmt.Errorf("failed to acquire process handle: %w", errProcAcquisitionFailed) }internal/cmd/exec.go (1)
181-185: Correct worker label validation to check for coordinator configuration, not queues.Worker labels are functional for distributed execution through the coordinator service, which is independent of the queue system. The validation at lines 181-185 incorrectly requires
Queues.Enabled, but should instead verify that a coordinator is configured. Users can enable queues without a coordinator and worker labels still won't function; conversely, users can use worker labels without queues if a coordinator is configured.Update the check to validate coordinator availability rather than queue enablement, and correct the error message to reflect the actual requirement.
🧹 Nitpick comments (4)
internal/cmd/start.go (1)
191-196: Consider preserving the original error context.The actual error from
Acquireis logged but not included in the returned error. The caller only seeserrProcAcquisitionFailedwithout knowing the root cause (e.g., resource exhaustion vs. permission issue).🔎 Proposed improvement
proc, err := ctx.ProcStore.Acquire(ctx, dag.ProcGroup(), execution.NewDAGRunRef(dag.Name, dagRunID)) if err != nil { ctx.ProcStore.Unlock(ctx, dag.ProcGroup()) logger.Debug(ctx, "Failed to acquire process handle", tag.Error(err)) _ = ctx.RecordEarlyFailure(dag, dagRunID, err) - return fmt.Errorf("failed to acquire process handle: %w", errProcAcquisitionFailed) + return fmt.Errorf("failed to acquire process handle (%v): %w", err, errProcAcquisitionFailed) }internal/cmd/exec.go (1)
61-68: Outdated function documentation.The docstring still references "enqueueing" and "queues/worker labels" decision logic (lines 64-65), but this PR removes queue-based execution paths. The function now always executes directly via
tryExecuteDAG.🔎 Proposed fix
// runExec parses flags and arguments and executes the provided command as an inline DAG run, -// either enqueueing it for distributed execution or running it immediately in-process. +// running it immediately in-process. // It validates inputs (run-id, working directory, base and dotenv files, env vars, worker labels, -// singleton flags), builds the DAG for the inline command, and chooses between enqueueing -// (when queues/worker labels require it or when max runs are reached) or direct execution. +// singleton flags), builds the DAG for the inline command, and executes it directly. // ctx provides CLI and application context; args are the command and its arguments. -// Returns an error for validation failures, when a dag-run with the same run-id already exists, -// or if enqueueing/execution fails. +// Returns an error for validation failures, when a dag-run with the same run-id already exists, +// or if execution fails.internal/cmd/context.go (1)
517-523: Consider logging if log path generation fails.The error from
GenLogFileNameis silently ignored at line 518. While this is acceptable for a best-effort failure recording, logging a warning could help with debugging.🔎 Proposed improvement
statusBuilder := transform.NewStatusBuilder(dag) - logPath, _ := c.GenLogFileName(dag, dagRunID) + logPath, logErr := c.GenLogFileName(dag, dagRunID) + if logErr != nil { + logger.Debug(c, "Failed to generate log path for early failure", tag.Error(logErr)) + } status := statusBuilder.Create(dagRunID, core.Failed, 0, time.Now(),internal/cmd/retry.go (1)
78-81: Consider logging if RecordEarlyFailure fails.The error from
RecordEarlyFailureis silently ignored. If recording the failure itself fails, this could make debugging harder. Consider logging the error to aid troubleshooting.🔎 Proposed enhancement
ctx.ProcStore.Unlock(ctx, dag.ProcGroup()) logger.Debug(ctx, "Failed to acquire process handle", tag.Error(err)) - _ = ctx.RecordEarlyFailure(dag, dagRunID, err) + if recordErr := ctx.RecordEarlyFailure(dag, dagRunID, err); recordErr != nil { + logger.Debug(ctx, "Failed to record early failure", tag.Error(recordErr)) + } return fmt.Errorf("failed to acquire process handle: %w", errProcAcquisitionFailed)
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
internal/cmd/context.go(3 hunks)internal/cmd/exec.go(3 hunks)internal/cmd/exec_spec.go(0 hunks)internal/cmd/record_early_failure_test.go(1 hunks)internal/cmd/restart.go(1 hunks)internal/cmd/retry.go(2 hunks)internal/cmd/start.go(2 hunks)internal/core/execution/runstatus.go(2 hunks)internal/runtime/transform/status.go(1 hunks)
💤 Files with no reviewable changes (1)
- internal/cmd/exec_spec.go
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Backend entrypoint incmd/orchestrates the scheduler and CLI; runtime, persistence, and service layers sit underinternal/*(for exampleinternal/runtime,internal/persistence)
Keep Go filesgofmt/goimportsclean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, andErr...names for package-level errors
Repository linting relies ongolangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers ininternal/common
Files:
internal/core/execution/runstatus.gointernal/runtime/transform/status.gointernal/cmd/record_early_failure_test.gointernal/cmd/start.gointernal/cmd/retry.gointernal/cmd/restart.gointernal/cmd/exec.gointernal/cmd/context.go
**/*_test.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*_test.go: Co-locate Go tests as*_test.go; favour table-driven cases and cover failure paths
Usestretchr/testify/requireand shared fixtures frominternal/testinstead of duplicating mocks
Files:
internal/cmd/record_early_failure_test.go
🧠 Learnings (1)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Backend entrypoint in `cmd/` orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under `internal/*` (for example `internal/runtime`, `internal/persistence`)
Applied to files:
internal/cmd/start.gointernal/cmd/retry.go
🧬 Code graph analysis (6)
internal/core/execution/runstatus.go (3)
api/v2/api.gen.go (1)
Error(465-474)api/v1/api.gen.go (1)
Error(343-352)internal/common/logger/context.go (1)
Errorf(75-77)
internal/runtime/transform/status.go (1)
internal/core/execution/runstatus.go (1)
DAGRunStatus(35-58)
internal/cmd/record_early_failure_test.go (2)
internal/core/execution/dagrun.go (1)
DAGRunStore(23-48)internal/core/status.go (1)
Failed(9-9)
internal/cmd/retry.go (5)
internal/cmd/context.go (1)
Context(42-57)internal/core/execution/proc.go (1)
ProcStore(8-27)internal/persistence/fileproc/procgrp.go (1)
ProcGroup(20-27)internal/common/logger/context.go (3)
Debug(35-37)Error(50-52)Errorf(75-77)internal/common/logger/tag/tag.go (1)
Error(20-22)
internal/cmd/restart.go (1)
internal/core/execution/proc.go (1)
ProcStore(8-27)
internal/cmd/context.go (4)
internal/core/execution/context.go (1)
Context(16-27)internal/core/dag.go (1)
DAG(33-135)internal/core/execution/dagrun.go (3)
DAGRunStore(23-48)ErrDAGRunIDNotFound(14-14)NewDAGRunAttemptOptions(123-128)internal/runtime/transform/status.go (4)
NewStatusBuilder(17-19)WithLogFilePath(119-123)WithFinishedAt(67-71)WithError(126-130)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test on ubuntu-latest
🔇 Additional comments (12)
internal/cmd/start.go (1)
182-208: LGTM!The simplified
tryExecuteDAGfunction correctly handles the lock/unlock lifecycle:
- Unlocks explicitly on acquire failure (line 193)
- Unlocks after successful acquire to allow concurrent DAG starts (lines 203-205)
- Process handle cleanup via defer (lines 198-200)
The removal of queue-related logic streamlines the execution path.
internal/core/execution/runstatus.go (2)
54-54: LGTM!The new
Errorfield enables capturing top-level DAG run errors (e.g., process acquisition failures) separately from per-node errors. TheomitemptyJSON tag maintains backward compatibility.
66-70: LGTM!The
Errors()method correctly prepends the top-level error to the error slice, ensuring it appears first in any error aggregation. This provides a clear hierarchy: DAG-level error → node errors → handler errors.internal/runtime/transform/status.go (1)
125-130: LGTM!The
WithErroroption follows the established functional option pattern and integrates cleanly with theStatusBuilder. This enables setting the top-level error when constructing DAG run statuses.internal/cmd/context.go (1)
493-498: LGTM!The
RecordEarlyFailuremethod provides a clean way to record infrastructure failures (like process acquisition errors) before the DAG agent starts. The input validation at lines 496-498 ensures required parameters are present.internal/cmd/record_early_failure_test.go (5)
1-12: LGTM!Package declaration and imports are appropriate for testing the RecordEarlyFailure functionality.
14-48: LGTM!Test correctly validates that RecordEarlyFailure creates a new DAG run attempt with Failed status and preserves the error message.
50-91: LGTM!Test correctly validates that RecordEarlyFailure updates an existing DAG run attempt's status to Failed and preserves the new error message.
93-105: LGTM!Test correctly validates error handling when DAG is nil, ensuring required field validation.
107-125: LGTM!Test correctly validates error handling when dag-run ID is empty, ensuring required field validation.
internal/cmd/retry.go (2)
36-36: LGTM!Removal of queue-related flags simplifies the retry command interface and aligns with the PR objective to eliminate enqueue paths.
39-41: LGTM!Early extraction of retry parameters improves code organization and readability.
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (1)
internal/cmd/restart.go (1)
103-116: Critical: Using old dag-run ID for new run's lock/acquire operations and failure recording.This issue was previously flagged and remains unresolved. The
dagRunIDparameter (line 90) refers to the old run being restarted, but:
- Line 105:
RecordEarlyFailureassociates the lock failure with the old run, but this failure prevents the new run from starting- Line 110:
Acquirecreates a process handle withexecution.NewDAGRunRef(d.Name, dagRunID)using the old run ID, but the new run needs its own handle- Line 114:
RecordEarlyFailureagain uses the old run ID for an acquire failure that belongs to the new runThe new
dagRunIDis only generated at line 129 insideexecuteDAG(), after these operations complete.Fix: Generate the new dag-run ID before attempting to lock/acquire, then use that new ID consistently:
🔎 Suggested refactoring
func handleRestartProcess(ctx *Context, d *core.DAG, dagRunID string) error { // Stop if running if err := stopDAGIfRunning(ctx, ctx.DAGRunMgr, d, dagRunID); err != nil { return err } // Wait before restart if configured if d.RestartWait > 0 { logger.Info(ctx, "Waiting for restart", tag.Duration(d.RestartWait)) time.Sleep(d.RestartWait) } + // Generate new dag-run ID for the restart attempt + newDagRunID, err := genRunID() + if err != nil { + return fmt.Errorf("failed to generate dag-run ID: %w", err) + } + // Execute the exact same DAG with the same parameters but a new dag-run ID if err := ctx.ProcStore.Lock(ctx, d.ProcGroup()); err != nil { logger.Debug(ctx, "Failed to lock process group", tag.Error(err)) - _ = ctx.RecordEarlyFailure(d, dagRunID, err) + _ = ctx.RecordEarlyFailure(d, newDagRunID, err) return errProcAcquisitionFailed } // Acquire process handle - proc, err := ctx.ProcStore.Acquire(ctx, d.ProcGroup(), execution.NewDAGRunRef(d.Name, dagRunID)) + proc, err := ctx.ProcStore.Acquire(ctx, d.ProcGroup(), execution.NewDAGRunRef(d.Name, newDagRunID)) if err != nil { ctx.ProcStore.Unlock(ctx, d.ProcGroup()) logger.Debug(ctx, "Failed to acquire process handle", tag.Error(err)) - _ = ctx.RecordEarlyFailure(d, dagRunID, err) + _ = ctx.RecordEarlyFailure(d, newDagRunID, err) return fmt.Errorf("failed to acquire process handle: %w", errProcAcquisitionFailed) } defer func() { _ = proc.Stop(ctx) }() // Unlock the process group immediately after acquiring the handle ctx.ProcStore.Unlock(ctx, d.ProcGroup()) - return executeDAG(ctx, ctx.DAGRunMgr, d) + return executeDAGWithRunID(ctx, ctx.DAGRunMgr, d, newDagRunID) }Then refactor
executeDAGto accept the pre-generated run ID:-func executeDAG(ctx *Context, cli runtime.Manager, dag *core.DAG) error { - dagRunID, err := genRunID() - if err != nil { - return fmt.Errorf("failed to generate dag-run ID: %w", err) - } +func executeDAGWithRunID(ctx *Context, cli runtime.Manager, dag *core.DAG, dagRunID string) error { logFile, err := ctx.OpenLogFile(dag, dagRunID) ... }Based on learnings from past review: This issue was identified in a previous review and requires resolution before merging.
🧹 Nitpick comments (2)
internal/cmd/record_early_failure_test.go (2)
94-126: Consider refactoring validation tests to table-driven format.Per the coding guidelines to "favour table-driven cases," these two validation tests could be combined into a single table-driven test since they follow the same pattern with different inputs.
🔎 Proposed refactor to table-driven test
- t.Run("ReturnsErrorForNilDAG", func(t *testing.T) { - th := test.SetupCommand(t) - - ctx := &cmd.Context{ - Context: th.Context, - Config: th.Config, - DAGRunStore: th.DAGRunStore, - } - - err := ctx.RecordEarlyFailure(nil, "some-run-id", errors.New("test error")) - require.Error(t, err) - require.Contains(t, err.Error(), "DAG and dag-run ID are required") - }) - - t.Run("ReturnsErrorForEmptyDAGRunID", func(t *testing.T) { - th := test.SetupCommand(t) - - dag := th.DAG(t, ` -steps: - - name: step1 - command: echo hello -`) - - ctx := &cmd.Context{ - Context: th.Context, - Config: th.Config, - DAGRunStore: th.DAGRunStore, - } - - err := ctx.RecordEarlyFailure(dag.DAG, "", errors.New("test error")) - require.Error(t, err) - require.Contains(t, err.Error(), "DAG and dag-run ID are required") - }) + t.Run("ReturnsErrorForInvalidInputs", func(t *testing.T) { + th := test.SetupCommand(t) + + dag := th.DAG(t, ` +steps: + - name: step1 + command: echo hello +`) + + ctx := &cmd.Context{ + Context: th.Context, + Config: th.Config, + DAGRunStore: th.DAGRunStore, + } + + testCases := []struct { + name string + dag *core.DAG + dagRunID string + expectErr string + }{ + { + name: "nil DAG", + dag: nil, + dagRunID: "some-run-id", + expectErr: "DAG and dag-run ID are required", + }, + { + name: "empty dagRunID", + dag: dag.DAG, + dagRunID: "", + expectErr: "DAG and dag-run ID are required", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := ctx.RecordEarlyFailure(tc.dag, tc.dagRunID, errors.New("test error")) + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectErr) + }) + } + })Based on learnings, favour table-driven cases.
29-33: Optional: Extract Context creation to helper.The Context initialization pattern appears 5 times across the test suite. While the duplication is minimal, extracting it to a helper function could improve maintainability.
💡 Example helper function
func newTestContext(th *test.CommandTestHelper) *cmd.Context { return &cmd.Context{ Context: th.Context, Config: th.Config, DAGRunStore: th.DAGRunStore, } }Then use it as:
ctx := newTestContext(th)Also applies to: 73-77, 97-101, 117-121, 141-145
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
internal/cmd/record_early_failure_test.go(1 hunks)internal/cmd/restart.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Backend entrypoint incmd/orchestrates the scheduler and CLI; runtime, persistence, and service layers sit underinternal/*(for exampleinternal/runtime,internal/persistence)
Keep Go filesgofmt/goimportsclean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, andErr...names for package-level errors
Repository linting relies ongolangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers ininternal/common
Files:
internal/cmd/record_early_failure_test.gointernal/cmd/restart.go
**/*_test.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*_test.go: Co-locate Go tests as*_test.go; favour table-driven cases and cover failure paths
Usestretchr/testify/requireand shared fixtures frominternal/testinstead of duplicating mocks
Files:
internal/cmd/record_early_failure_test.go
🧠 Learnings (1)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*_test.go : Co-locate Go tests as `*_test.go`; favour table-driven cases and cover failure paths
Applied to files:
internal/cmd/record_early_failure_test.go
🧬 Code graph analysis (2)
internal/cmd/record_early_failure_test.go (3)
internal/core/execution/dagrun.go (1)
DAGRunStore(23-48)internal/core/status.go (2)
Failed(9-9)Succeeded(11-11)internal/test/command.go (1)
CmdTest(15-19)
internal/cmd/restart.go (2)
internal/core/execution/proc.go (1)
ProcStore(8-27)internal/persistence/fileproc/procgrp.go (1)
ProcGroup(20-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test on ubuntu-latest
🔇 Additional comments (3)
internal/cmd/record_early_failure_test.go (3)
1-13: LGTM: Clean package structure and imports.The external test package
cmd_testis a good practice, and the imports are appropriate for the test scenarios.
128-181: Excellent comprehensive retry flow coverage.This test thoroughly validates the complete lifecycle: early failure recording → status verification → DAG retrieval → retry → success. The use of
require.Eventuallywith appropriate timeout (5s) and polling interval (100ms) properly handles the asynchronous retry operation.
15-182: Strong test coverage for RecordEarlyFailure functionality.The test suite provides comprehensive coverage:
- ✅ New DAG run failure recording
- ✅ Existing attempt failure update
- ✅ Validation error paths (nil DAG, empty run ID)
- ✅ End-to-end retry flow after early failure
The tests properly use the shared test harness and testify/require assertions as specified in the coding guidelines.
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
internal/cmd/restart.go (1)
196-214: Infinite loop lacks timeout and context cancellation check.
stopRunningDAGpolls indefinitely with no timeout orctx.Done()check. If the DAG fails to stop (e.g., stuck process or unresponsive socket), the restart command hangs forever and ignores cancellation signals.🔎 Suggested fix with timeout and context check
func stopRunningDAG(ctx context.Context, cli runtime.Manager, dag *core.DAG, dagRunID string) error { const stopPollInterval = 100 * time.Millisecond + const stopTimeout = 5 * time.Minute + deadline := time.Now().Add(stopTimeout) for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if time.Now().After(deadline) { + return fmt.Errorf("timed out waiting for DAG %s to stop", dag.Name) + } dagStatus, err := cli.GetCurrentStatus(ctx, dag, dagRunID) if err != nil { return fmt.Errorf("failed to get current status: %w", err) }internal/cmd/exec.go (1)
194-199: Ignored error fromFindAttemptmay mask failures.Line 196 discards the error from
FindAttempt. If the store returns an error due to connectivity issues or corruption, the code proceeds as if no attempt exists, potentially allowing duplicate runs or masking underlying problems.🔎 Suggested fix
- attempt, _ := ctx.DAGRunStore.FindAttempt(ctx, dagRunRef) - if attempt != nil { + attempt, err := ctx.DAGRunStore.FindAttempt(ctx, dagRunRef) + if err != nil { + logger.Warn(ctx, "Failed to check existing dag-run", tag.Error(err)) + } + if attempt != nil { return fmt.Errorf("dag-run ID %s already exists for DAG %s", runID, dag.Name) }
🧹 Nitpick comments (1)
internal/cmd/restart.go (1)
169-176: Consider flushing/closing log file beforeos.Exit(1).When
ctx.Quietis true and execution fails,os.Exit(1)at line 171 bypasses the deferredlogFile.Close()at line 141. While the OS reclaims the handle, any buffered log data may be lost.🔎 Suggested fix
if err := agentInstance.Run(ctx); err != nil { if ctx.Quiet { + _ = logFile.Close() os.Exit(1) } else {
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
internal/cmd/exec.go(3 hunks)internal/cmd/exec_spec.go(1 hunks)internal/cmd/restart.go(2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Backend entrypoint incmd/orchestrates the scheduler and CLI; runtime, persistence, and service layers sit underinternal/*(for exampleinternal/runtime,internal/persistence)
Keep Go filesgofmt/goimportsclean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, andErr...names for package-level errors
Repository linting relies ongolangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers ininternal/common
Files:
internal/cmd/exec.gointernal/cmd/exec_spec.gointernal/cmd/restart.go
🧬 Code graph analysis (1)
internal/cmd/restart.go (3)
internal/cmd/context.go (1)
Context(42-57)internal/core/execution/proc.go (1)
ProcStore(8-27)internal/runtime/manager.go (2)
Manager(32-36)Manager(264-278)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test on ubuntu-latest
🔇 Additional comments (3)
internal/cmd/restart.go (1)
90-131: LGTM on the refactored restart process with new run ID handling.The fix correctly addresses the previous review concern about recording early failures with the wrong run ID. The flow now:
- Generates
newDagRunIDupfront (lines 102-106)- Uses
newDagRunIDfor bothRecordEarlyFailurecalls and theAcquirecall- Properly unlocks the process group on failure paths (line 118)
The immediate unlock after acquire (line 128) followed by deferred
proc.Stop(lines 123-125) is the correct pattern since the process handle is now held independently.internal/cmd/exec.go (1)
59-64: LGTM on simplified execution flow.The updated documentation accurately reflects the removal of queue-related logic, and the call to
tryExecuteDAGaligns with the simplified direct execution path.Also applies to: 207-207
internal/cmd/exec_spec.go (1)
116-117: SettingMaxActiveRuns = -1is the correct semantic for inline exec DAGs. This value enables "unlimited instances" by disabling concurrency limits. Downstream enforcement logic correctly handles this through checks that only apply limits whenmaxActiveRuns > 0, so no integer underflow or invalid-value issues will occur.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1497 +/- ##
==========================================
+ Coverage 59.91% 60.09% +0.18%
==========================================
Files 196 196
Lines 22023 21925 -98
==========================================
- Hits 13194 13175 -19
+ Misses 7413 7351 -62
+ Partials 1416 1399 -17
... and 4 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
…pectedly (#1497) * **Behavior Changes** * Removed explicit no-queue and disable-max-active-runs options; start/retry flows simplified to default local execution and streamlined retry semantics. * **New Features** * Singleton mode now returns clear HTTP 409 conflicts when a singleton DAG is already running or queued. * Added top-level run Error field and an API to record early failures for quicker failure visibility. * **Bug Fixes** * Improved process acquisition and restart/retry error handling; tests updated to reflect local execution behavior.
Summary by CodeRabbit
Behavior Changes
New Features
Bug Fixes
✏️ Tip: You can customize this high-level summary in your review settings.