Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR introduces a new public embedded API for the Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Engine
participant DAGStore
participant Agent
participant Steps
Client->>Engine: RunFile/RunYAML (with RunOptions)
activate Engine
Engine->>DAGStore: Load DAG definition
DAGStore-->>Engine: DAG metadata
Engine->>Engine: Apply run overrides (params, tags, selector)
Engine->>Agent: Create & start local execution agent
deactivate Engine
activate Agent
Client->>Engine: Wait (on Run object)
Agent->>Steps: Execute each step in DAG
activate Steps
Steps->>Steps: Run with params
Steps-->>Agent: Result & logs
deactivate Steps
Agent->>Agent: Record attempt status
Agent-->>Engine: Signal completion
deactivate Agent
Engine-->>Client: Return final Status
sequenceDiagram
participant Client
participant Engine
participant Coordinator
participant Worker
participant Agent
participant Steps
Client->>Engine: RunFile/RunYAML (ExecutionModeDistributed)
activate Engine
Engine->>Coordinator: Submit task for scheduling
deactivate Engine
Coordinator->>Coordinator: Match worker by selector/tags
Coordinator->>Worker: Dispatch to matching worker
activate Worker
Worker->>Agent: Create execution agent
activate Agent
Agent->>Steps: Execute each step in DAG
activate Steps
Steps->>Steps: Run with params
Steps-->>Agent: Result & logs
deactivate Steps
Agent->>Coordinator: Report status updates
deactivate Agent
Worker-->>Coordinator: Execution complete
deactivate Worker
Client->>Engine: Poll Status(RunRef)
Engine->>Coordinator: Query run status
Coordinator-->>Engine: Return Status snapshot
Engine-->>Client: Return final Status
sequenceDiagram
participant User
participant User Code
participant Engine
participant Runtime
participant ExecutorRegistry
participant Custom Executor
User->>User Code: Call RegisterExecutor("custom_type", factory, opts)
activate User Code
User Code->>ExecutorRegistry: RegisterExecutor with factory & options
ExecutorRegistry->>ExecutorRegistry: Add factory mapping
ExecutorRegistry->>ExecutorRegistry: Register type name
deactivate User Code
User->>Engine: RunYAML (DAG step uses custom_type)
activate Engine
Engine->>Runtime: Prepare & execute step
Runtime->>ExecutorRegistry: Get executor for custom_type
ExecutorRegistry-->>Runtime: Return custom executor instance
activate Custom Executor
Runtime->>Custom Executor: Run(Step)
Custom Executor->>Custom Executor: Execute custom logic
Custom Executor-->>Runtime: Return result
deactivate Custom Executor
deactivate Engine
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 10
🧹 Nitpick comments (8)
internal/core/spec/step_types.go (1)
86-98: Thread-safe registration looks correct.Read/write locking around
builtinStepTypeNamesis properly paired acrossRegisterExecutorTypeNameandisBuiltinStepTypeName. One optional consideration:RegisterExecutorTypeNamedoes not validate the name againstcustomStepTypeNameRegexp(the same validator applied to custom step types). Registering an arbitrary string here means any garbage value will silently be accepted by DAG loading. Consider rejecting names that don't matchcustomStepTypeNameRegexp(or returning an error) to prevent malformed executor names from poisoning the registry.Also applies to: 287-292
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/core/spec/step_types.go` around lines 86 - 98, RegisterExecutorTypeName currently accepts any non-empty string and inserts it into builtinStepTypeNames; change it to validate the trimmed name against customStepTypeNameRegexp and refuse to register invalid names (i.e., return early without inserting) to avoid poisoning the registry with garbage. Update the body of RegisterExecutorTypeName to: trim the input, if empty return, if !customStepTypeNameRegexp.MatchString(name) return (or optionally log/debug), otherwise acquire stepTypeNamesMu and insert into builtinStepTypeNames; apply the same validation logic to the other registration site that manipulates builtinStepTypeNames (the one around the other RegisterExecutorTypeName-like block).executor.go (1)
14-51: GoDoc comments are a best practice for public APIs, but not currently enforced by your linter configuration.This file defines the public API surface (
Step,Executor,ExecutorFactory,StepValidator,ExecutorCapabilities,ExecutorOption,RegisterExecutor,WithStepValidator,WithExecutorCapabilities). While adding one-line doc comments to these exported identifiers would improve documentation, therevivelinter'sexportedrule is not enabled in.golangci.yml(only specific rules are listed: unused-parameter, import-alias-naming, if-return, error-return, dot-imports, struct-tag, function-result-limit, atomic, use-any, redundant-import-alias, enforce-map-style, enforce-slice-style). Adding GoDoc is optional unless your team adopts a style guide that requires it.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@executor.go` around lines 14 - 51, Add concise GoDoc one-line comments for all exported identifiers in this file to improve public API documentation: document type aliases Step, Executor, ExecutorFactory, StepValidator, ExecutorCapabilities, ExecutorOption, the executorRegistration struct, and exported functions RegisterExecutor, WithStepValidator, and WithExecutorCapabilities by adding a brief comment above each declaration describing its purpose and usage (e.g., what RegisterExecutor does and what arguments it expects, and what the option helpers set). Ensure comments follow GoDoc style (start with the identifier name) and are placed immediately above the corresponding declarations.internal/engine/logger.go (1)
41-58: Passslog.Attrvalues directly toWith()for efficiency, but the LogValuer preservation claim is inaccurate.
slog.Logger.Log(whichWithuses) explicitly handlesslog.Attrarguments: "If an argument is an Attr, it is used as is." The suggested change is valid and more efficient—it avoids unpacking and reconstructing the Attr.However, the reasoning about preserving "lazy evaluation semantics" is incorrect. Both approaches preserve the
LogValuerinstance: callingattr.Value.Any()on aLogValuer-typeValuereturns theLogValuerinstance itself, not a resolved value. Neither approach "flattens" the structure; both maintain the typedLogValuerfor handler processing.♻️ Suggested change
func (l *slogAdapter) With(attrs ...slog.Attr) logger.Logger { - args := make([]any, 0, len(attrs)*2) - for _, attr := range attrs { - args = append(args, attr.Key, attr.Value.Any()) - } - return &slogAdapter{logger: l.logger.With(args...)} + args := make([]any, 0, len(attrs)) + for _, attr := range attrs { + args = append(args, attr) + } + return &slogAdapter{logger: l.logger.With(args...)} }The same applies to
log()on line 53.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/engine/logger.go` around lines 41 - 58, The current slogAdapter.With and slogAdapter.log unpack slog.Attr into key/value pairs by calling attr.Value.Any(), which is unnecessary and less efficient; instead pass the slog.Attr values directly to the underlying logger (use l.logger.With(attrs...) in With and l.logger.Log(..., tags...) in log) so you stop constructing args slices and preserve Attrs unchanged; update the With method to forward attrs as-is and update log to forward tags as-is (remove attr.Value.Any() usage and the key/value unpacking).engine_test.go (1)
30-56: Use testify assertions in the new tests.These tests use manual
if/t.Fatalfchecks instead ofrequire/assert. Per the coding guidelines for**/*_test.go: Use stretchr/testify assertions for testing in Go. Testify (v1.11.1) is already available as a dependency.♻️ Example conversion pattern
import ( "context" "os" "path/filepath" "strings" "testing" "time" "github.com/dagucloud/dagu" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) @@ engine, err := dagu.New(ctx, dagu.Options{HomeDir: home}) - if err != nil { - t.Fatalf("New() error = %v", err) - } + require.NoError(t, err, "New()") @@ run, err := engine.RunFile(ctx, dagFile, dagu.WithParams(map[string]string{"FOO": "bar"})) - if err != nil { - t.Fatalf("RunFile() error = %v", err) - } + require.NoError(t, err, "RunFile()") status, err := run.Wait(ctx) - if err != nil { - t.Fatalf("Wait() error = %v", err) - } - if status.Status != "succeeded" { - t.Fatalf("status = %q, want succeeded", status.Status) - } - if status.Name != "embedded-file" { - t.Fatalf("name = %q, want embedded-file", status.Name) - } - if status.RunID == "" { - t.Fatal("run ID is empty") - } + require.NoError(t, err, "Wait()") + assert.Equal(t, "succeeded", status.Status) + assert.Equal(t, "embedded-file", status.Name) + assert.NotEmpty(t, status.RunID)Also applies to: 63-90, 98-118, 122-126
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine_test.go` around lines 30 - 56, Replace the manual if/t.Fatalf style checks in engine_test.go with testify/require assertions: import "github.com/stretchr/testify/require" and use require.NoError(t, err) after New, RunFile and Wait calls, use require.Equal(t, "succeeded", status.Status) and require.Equal(t, "embedded-file", status.Name) for the expected values, and use require.NotEmpty(t, status.RunID) for the run ID; also replace the deferred Close() error check with require.NoError(t, engine.Close(context.Background())) inside the deferred function to keep failure reporting consistent (update the other similar test blocks at lines referenced in the comment as well).engine.go (2)
236-257:Run.Wait(andRun.Statuspartly) can be simplified.The
if err != nilbranch returns the same tuple as the success branch inWait. Collapsing avoids the visual implication that the error case is special-cased.♻️ Suggested simplification
- status, err := r.inner.Wait(ctx) - if err != nil { - return publicStatus(status), err - } - return publicStatus(status), nil + status, err := r.inner.Wait(ctx) + return publicStatus(status), err🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine.go` around lines 236 - 257, The error branch in Run.Wait duplicates the success return; simplify both Wait (and mirror for Status) by removing the redundant if err != nil block and directly return publicStatus(status), err from the call to r.inner.Wait(ctx) (and similarly return publicStatus(status), err for r.inner.Status(ctx)), keeping the initial nil-check for r and r.inner and using publicStatus to convert the inner status.
307-318: Inconsistent empty-vs-nil normalization between maps and slices.
WithParams/WithWorkerSelector/WithTagsgo throughcloneMap(which returnsnilfor an empty input), whileWithParamsList/WithTagsuseappend([]string{}, …)which returns a non-nil empty slice. TheninternalRunOptionsre-clones again, doubling the work and re-introducing the same nil/empty mismatch. Consider:
- Normalizing to one helper (e.g., a
cloneSlicethat returnsnilfor empty input) for symmetry, and- Skipping the second clone in
internalRunOptionssinceapplyRunOptionsalready produced a defensive copy.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine.go` around lines 307 - 318, WithParams/WithWorkerSelector/WithTags currently use cloneMap which returns nil for empty maps while WithParamsList/WithTags create non-nil empty slices via append, causing inconsistent empty-vs-nil behavior and redundant cloning in internalRunOptions; add a cloneSlice helper that mirrors cloneMap semantics (returns nil for empty input) and update WithParamsList (and any slice-using WithTags overloads) to use cloneSlice, and then remove the second cloning step inside internalRunOptions (since applyRunOptions already makes defensive copies) so the code normalizes empties consistently and avoids double work; reference functions: WithParams, WithParamsList, WithWorkerSelector, WithTags, cloneMap, cloneSlice (new), applyRunOptions, internalRunOptions.internal/engine/run.go (2)
260-267:applyRunOverridesdoesn't dedupe tags and only handles two fields.
dag.Tags = append(dag.Tags, core.NewTags(opts.Tags)...)will accumulate duplicates if the embedder callsWithTagswith values already present in the YAML. Also, given that other run-level overrides likeName/DefaultWorkingDir/params live inloadOptions, it's worth a comment here noting the split so future maintainers don't re-add overlapping logic.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/engine/run.go` around lines 260 - 267, applyRunOverrides currently appends opts.Tags to dag.Tags without deduping and only handles WorkerSelector/Tags; change applyRunOverrides to merge tags idempotently by building a set of existing dag.Tags (from dag.Tags and core.NewTags(opts.Tags)) and only append new tags that aren't present (use the tag string/identifier comparisons consistent with core.NewTags), and add a short comment in applyRunOverrides explaining that other run-level overrides (Name, DefaultWorkingDir, params) are handled in loadOptions so logic is intentionally limited here; keep the existing WorkerSelector override behavior.
151-216:waitLocal/waitDistributedignore caller context for the final status fetch.In both branches the post-completion (or post-cancellation)
Statuslookup usescontext.Background(), even when the caller'sctxis the one carrying deadlines, tracing, or cancellation reasons. For the timeout case (line 167, 190) this is arguably required so we can still return the latest snapshot, but it also means the call can hang indefinitely on a slow file/coord backend after the user's context already expired. Consider using a short bounded context derived fromcontext.Background()(e.g.context.WithTimeout(context.Background(), 5*time.Second)) so the embedder isn't stuck.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/engine/run.go` around lines 151 - 216, Both waitLocal and waitDistributed call r.Status(context.Background()) after the caller ctx is done, which can hang indefinitely; replace those background calls with a short bounded background context (e.g. ctxStatus, cancel := context.WithTimeout(context.Background(), 5*time.Second); defer cancel()) and pass ctxStatus into r.Status. Update all occurrences inside waitLocal (the status/statusErr fetch after <-r.done) and both places in waitLocal/waitDistributed where you currently call r.Status(context.Background()) on ctx.Done() paths so the status fetch times out quickly; ensure you call cancel() to avoid leaks.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@engine_test.go`:
- Around line 23-28: The DAG uses a POSIX-only command ("test \"$FOO\" =
\"bar\"") which breaks on Windows; in the TestEngineRunFile where writeDAG(...)
is called, detect runtime.GOOS and write a platform-appropriate command string:
on non-windows keep the existing POSIX command, on windows emit a CMD
conditional like `cmd /C if "%FOO%"=="bar" exit 0 else exit 1`; update the
writeDAG call to use that conditional command string so the DAG step (named
"check-param") runs correctly on both platforms.
In `@engine.go`:
- Around line 96-104: Add clear doc comments to the public WorkerOptions struct
to document precedence: state that WorkerOptions.Coordinators and
WorkerOptions.TLS, if non-empty, override the engine-level DistributedOptions,
and if they are empty the engine will fall back to DistributedOptions; also
mention that Coordinators are validated to be non-empty when provided. Reference
the symbols WorkerOptions.Coordinators, WorkerOptions.TLS and DistributedOptions
in the comments so callers know the exact override/fallback behavior and
validation contract.
In `@examples/embedded/distributed/main.go`:
- Around line 66-82: The worker may not be registered before submitting the DAG
(engine.RunFile), causing a race; after launching the goroutine that calls
worker.Start(workerCtx) add a readiness wait before calling engine.RunFile:
either expose a readiness signal from Start (e.g., have Start return a ready
channel or provide a Worker.Ready() method) and block until it signals, or
implement a short poll loop that queries the coordinator (via the
coordinator/client API) for the worker's registration and only then call
engine.RunFile; update the code around worker.Start/workerCtx and the
engine.RunFile call to wait for that readiness signal or successful poll before
dispatching the workflow.
In `@examples/embedded/local/workflow.yaml`:
- Line 5: The command in workflow.yaml uses POSIX shell variable syntax
(${TARGET}) which won't expand in Windows cmd.exe; update the example so it is
shell-agnostic by either declaring the shell to use (e.g., ensure the step uses
a POSIX shell such as bash) or replace the environment expansion with Dagu's
parameter substitution mechanism (instead of ${TARGET}) so the value is injected
by the workflow engine; update the command entry and add a short note in the
example explaining the chosen approach.
In `@executor.go`:
- Around line 27-39: Trim and validate the provided name in RegisterExecutor
(use strings.TrimSpace) and panic (or return an error if your API prefers) when
the trimmed name is empty to avoid inserting an empty key into runtimeexec's
registry; perform this check before calling runtimeexec.RegisterExecutor or
spec.RegisterExecutorTypeName. Keep the rest of the logic (applying
ExecutorOption to executorRegistration and passing
registration.validator/registration.caps) unchanged, and add a short doc comment
on RegisterExecutor that registrations are not concurrency-safe and must occur
before engine/runtime use (or alternatively move synchronization into
runtimeexec.RegisterExecutor if you control that package).
In `@internal/engine/engine.go`:
- Around line 207-228: The coordinatorClient currently silently sets
cfg.Insecure = true when no TLS material is present; change this to fail fast:
in Engine.coordinatorClient, detect the case where cfg.Insecure is false (use
!cfg.Insecure), cfg.CAFile/CertFile/KeyFile are empty and !cfg.SkipTLSVerify,
and return a clear error asking the caller to either provide TLS files or
explicitly set TLS.Insecure=true (or introduce an explicit AllowInsecure flag),
instead of flipping Insecure; also update any messaging to use e.logger where
appropriate and clean up bool comparisons to use !cfg.Insecure and
!cfg.SkipTLSVerify for revive style compliance.
In `@internal/engine/run.go`:
- Around line 388-437: runDistributed creates a coordinator client and stores it
on the returned Run but never guarantees client.Cleanup if the Run is dropped
without calling Wait/Stop; modify runDistributed/Run so the coordinator client
is always cleaned up when the Run's done channel is closed (mirror runLocal's
cleanup of logFile): ensure the goroutine that closes Run.done calls
client.Cleanup (or install a runtime.SetFinalizer on *Run to call Cleanup) and
make the coordinator client's Cleanup implementation idempotent so explicit
Stop/Wait paths remain safe; refer to runDistributed, Run (ref/done/coordinator
fields), coordinatorClient, client.Cleanup, waitDistributed, and Run.Stop to
locate where to add the cleanup and idempotency changes.
- Around line 538-575: The agentStoresResult.ContextResolver is left nil in
Engine.agentStores; fix by initializing it like the cmd version: call
buildRemoteContextResolver(...) from within Engine.agentStores and assign its
return to result.ContextResolver, handling any error by logging with logger.Warn
(similar to other stores) so the resolver is not silently left uninitialized;
alternatively, if remote context support isn’t desired yet, remove the
ContextResolver field from agentStoresResult and any wiring that expects it.
In `@internal/intg/embed/embed_test.go`:
- Around line 64-98: TestEmbeddedCustomExecutorRunYAML registers a custom
executor via dagu.RegisterExecutor which mutates global state; add a t.Cleanup
that restores registry state after the test (e.g., call
dagu.UnregisterExecutor(executorType) if available, or capture any previous
registration returned/visible and re-register it in t.Cleanup) so that the
echoExecutor/ executorType registration is removed/restored when the test
finishes to avoid cross-test contamination.
---
Nitpick comments:
In `@engine_test.go`:
- Around line 30-56: Replace the manual if/t.Fatalf style checks in
engine_test.go with testify/require assertions: import
"github.com/stretchr/testify/require" and use require.NoError(t, err) after New,
RunFile and Wait calls, use require.Equal(t, "succeeded", status.Status) and
require.Equal(t, "embedded-file", status.Name) for the expected values, and use
require.NotEmpty(t, status.RunID) for the run ID; also replace the deferred
Close() error check with require.NoError(t, engine.Close(context.Background()))
inside the deferred function to keep failure reporting consistent (update the
other similar test blocks at lines referenced in the comment as well).
In `@engine.go`:
- Around line 236-257: The error branch in Run.Wait duplicates the success
return; simplify both Wait (and mirror for Status) by removing the redundant if
err != nil block and directly return publicStatus(status), err from the call to
r.inner.Wait(ctx) (and similarly return publicStatus(status), err for
r.inner.Status(ctx)), keeping the initial nil-check for r and r.inner and using
publicStatus to convert the inner status.
- Around line 307-318: WithParams/WithWorkerSelector/WithTags currently use
cloneMap which returns nil for empty maps while WithParamsList/WithTags create
non-nil empty slices via append, causing inconsistent empty-vs-nil behavior and
redundant cloning in internalRunOptions; add a cloneSlice helper that mirrors
cloneMap semantics (returns nil for empty input) and update WithParamsList (and
any slice-using WithTags overloads) to use cloneSlice, and then remove the
second cloning step inside internalRunOptions (since applyRunOptions already
makes defensive copies) so the code normalizes empties consistently and avoids
double work; reference functions: WithParams, WithParamsList,
WithWorkerSelector, WithTags, cloneMap, cloneSlice (new), applyRunOptions,
internalRunOptions.
In `@executor.go`:
- Around line 14-51: Add concise GoDoc one-line comments for all exported
identifiers in this file to improve public API documentation: document type
aliases Step, Executor, ExecutorFactory, StepValidator, ExecutorCapabilities,
ExecutorOption, the executorRegistration struct, and exported functions
RegisterExecutor, WithStepValidator, and WithExecutorCapabilities by adding a
brief comment above each declaration describing its purpose and usage (e.g.,
what RegisterExecutor does and what arguments it expects, and what the option
helpers set). Ensure comments follow GoDoc style (start with the identifier
name) and are placed immediately above the corresponding declarations.
In `@internal/core/spec/step_types.go`:
- Around line 86-98: RegisterExecutorTypeName currently accepts any non-empty
string and inserts it into builtinStepTypeNames; change it to validate the
trimmed name against customStepTypeNameRegexp and refuse to register invalid
names (i.e., return early without inserting) to avoid poisoning the registry
with garbage. Update the body of RegisterExecutorTypeName to: trim the input, if
empty return, if !customStepTypeNameRegexp.MatchString(name) return (or
optionally log/debug), otherwise acquire stepTypeNamesMu and insert into
builtinStepTypeNames; apply the same validation logic to the other registration
site that manipulates builtinStepTypeNames (the one around the other
RegisterExecutorTypeName-like block).
In `@internal/engine/logger.go`:
- Around line 41-58: The current slogAdapter.With and slogAdapter.log unpack
slog.Attr into key/value pairs by calling attr.Value.Any(), which is unnecessary
and less efficient; instead pass the slog.Attr values directly to the underlying
logger (use l.logger.With(attrs...) in With and l.logger.Log(..., tags...) in
log) so you stop constructing args slices and preserve Attrs unchanged; update
the With method to forward attrs as-is and update log to forward tags as-is
(remove attr.Value.Any() usage and the key/value unpacking).
In `@internal/engine/run.go`:
- Around line 260-267: applyRunOverrides currently appends opts.Tags to dag.Tags
without deduping and only handles WorkerSelector/Tags; change applyRunOverrides
to merge tags idempotently by building a set of existing dag.Tags (from dag.Tags
and core.NewTags(opts.Tags)) and only append new tags that aren't present (use
the tag string/identifier comparisons consistent with core.NewTags), and add a
short comment in applyRunOverrides explaining that other run-level overrides
(Name, DefaultWorkingDir, params) are handled in loadOptions so logic is
intentionally limited here; keep the existing WorkerSelector override behavior.
- Around line 151-216: Both waitLocal and waitDistributed call
r.Status(context.Background()) after the caller ctx is done, which can hang
indefinitely; replace those background calls with a short bounded background
context (e.g. ctxStatus, cancel := context.WithTimeout(context.Background(),
5*time.Second); defer cancel()) and pass ctxStatus into r.Status. Update all
occurrences inside waitLocal (the status/statusErr fetch after <-r.done) and
both places in waitLocal/waitDistributed where you currently call
r.Status(context.Background()) on ctx.Done() paths so the status fetch times out
quickly; ensure you call cancel() to avoid leaks.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4a340579-3fb3-42dc-b3b3-79ca9a561f84
📒 Files selected for processing (20)
doc.goengine.goengine_test.goexample_test.goexamples/embedded/README.mdexamples/embedded/custom-executor/main.goexamples/embedded/custom-executor/workflow.yamlexamples/embedded/distributed/main.goexamples/embedded/distributed/workflow.yamlexamples/embedded/local/main.goexamples/embedded/local/workflow.yamlexecutor.gointernal/core/spec/step_types.gointernal/engine/engine.gointernal/engine/logger.gointernal/engine/run.gointernal/engine/types.gointernal/engine/worker.gointernal/intg/embed/embed_test.gotools.go
| writeDAG(t, dagFile, ` | ||
| name: embedded-file | ||
| steps: | ||
| - name: check-param | ||
| command: test "$FOO" = "bar" | ||
| `) |
There was a problem hiding this comment.
Make this DAG command portable for Windows coverage.
Line 27 uses POSIX test, so TestEngineRunFile can fail on Windows despite this PR requesting Windows runtime validation.
🐛 Proposed portable command selection
import (
"context"
+ "fmt"
"os"
"path/filepath"
+ "runtime"
"strings"
"testing"
"time"
@@
home := t.TempDir()
dagFile := filepath.Join(home, "embedded-file.yaml")
- writeDAG(t, dagFile, `
+ checkParamCommand := `test "$FOO" = "bar"`
+ if runtime.GOOS == "windows" {
+ checkParamCommand = `if not "%FOO%"=="bar" exit /b 1`
+ }
+ writeDAG(t, dagFile, fmt.Sprintf(`
name: embedded-file
steps:
- name: check-param
- command: test "$FOO" = "bar"
-`)
+ command: %q
+`, checkParamCommand))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| writeDAG(t, dagFile, ` | |
| name: embedded-file | |
| steps: | |
| - name: check-param | |
| command: test "$FOO" = "bar" | |
| `) | |
| checkParamCommand := `test "$FOO" = "bar"` | |
| if runtime.GOOS == "windows" { | |
| checkParamCommand = `if not "%FOO%"=="bar" exit /b 1` | |
| } | |
| writeDAG(t, dagFile, fmt.Sprintf(` | |
| name: embedded-file | |
| steps: | |
| - name: check-param | |
| command: %q | |
| `, checkParamCommand)) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine_test.go` around lines 23 - 28, The DAG uses a POSIX-only command
("test \"$FOO\" = \"bar\"") which breaks on Windows; in the TestEngineRunFile
where writeDAG(...) is called, detect runtime.GOOS and write a
platform-appropriate command string: on non-windows keep the existing POSIX
command, on windows emit a CMD conditional like `cmd /C if "%FOO%"=="bar" exit 0
else exit 1`; update the writeDAG call to use that conditional command string so
the DAG step (named "check-param") runs correctly on both platforms.
| // WorkerOptions configures an embedded shared-nothing worker. | ||
| type WorkerOptions struct { | ||
| ID string | ||
| MaxActiveRuns int | ||
| Labels map[string]string | ||
| Coordinators []string | ||
| TLS TLSOptions | ||
| HealthPort int | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# See how internal NewWorker reconciles WorkerOptions.Coordinators with engine.distributed.Coordinators.
fd -t f 'worker.go$' -p 'internal/engine' -x rg -n -C10 'func .*NewWorker' {} \;Repository: dagucloud/dagu
Length of output: 600
🏁 Script executed:
cd /repo && cat -n engine.go | sed -n '85,115p'Repository: dagucloud/dagu
Length of output: 113
🏁 Script executed:
cat -n engine.go | sed -n '85,115p'Repository: dagucloud/dagu
Length of output: 911
🏁 Script executed:
rg -n 'type TLSOptions' engine.go -A 5Repository: dagucloud/dagu
Length of output: 206
Document WorkerOptions.Coordinators and TLS field precedence in public API.
The public WorkerOptions struct lacks doc comments explaining how its Coordinators and TLS fields interact with engine-level DistributedOptions. Users cannot tell from the API signature alone whether these fields override or augment the engine configuration. The implementation correctly handles this (falls back to DistributedOptions if empty, with validation that coordinators are non-empty), but this contract should be documented on the struct fields themselves.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine.go` around lines 96 - 104, Add clear doc comments to the public
WorkerOptions struct to document precedence: state that
WorkerOptions.Coordinators and WorkerOptions.TLS, if non-empty, override the
engine-level DistributedOptions, and if they are empty the engine will fall back
to DistributedOptions; also mention that Coordinators are validated to be
non-empty when provided. Reference the symbols WorkerOptions.Coordinators,
WorkerOptions.TLS and DistributedOptions in the comments so callers know the
exact override/fallback behavior and validation contract.
| workerCtx, stopWorker := context.WithCancel(ctx) | ||
| defer stopWorker() | ||
| go func() { | ||
| if err := worker.Start(workerCtx); err != nil && !errors.Is(err, context.Canceled) { | ||
| log.Printf("worker stopped: %v", err) | ||
| } | ||
| }() | ||
|
|
||
| run, err := engine.RunFile(ctx, examplePath("workflow.yaml")) | ||
| if err != nil { | ||
| log.Fatal(err) | ||
| } | ||
| status, err := run.Wait(ctx) | ||
| if err != nil { | ||
| log.Fatal(err) | ||
| } | ||
| fmt.Printf("%s %s finished with %s on worker %s\n", status.Name, status.RunID, status.Status, status.WorkerID) |
There was a problem hiding this comment.
Wait for the worker to be ready before dispatching.
RunFile can reach the coordinator before the goroutine has registered the worker, making the distributed example timing-dependent. Add a readiness wait/poll against the coordinator or expose a worker start signal before submitting the DAG.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/embedded/distributed/main.go` around lines 66 - 82, The worker may
not be registered before submitting the DAG (engine.RunFile), causing a race;
after launching the goroutine that calls worker.Start(workerCtx) add a readiness
wait before calling engine.RunFile: either expose a readiness signal from Start
(e.g., have Start return a ready channel or provide a Worker.Ready() method) and
block until it signals, or implement a short poll loop that queries the
coordinator (via the coordinator/client API) for the worker's registration and
only then call engine.RunFile; update the code around worker.Start/workerCtx and
the engine.RunFile call to wait for that readiness signal or successful poll
before dispatching the workflow.
| type: graph | ||
| steps: | ||
| - name: greet | ||
| command: echo "hello ${TARGET}" |
There was a problem hiding this comment.
${TARGET} expansion is shell-dependent.
The PR requests CI validation on Windows. echo "hello ${TARGET}" is expanded by POSIX shells but not by Windows cmd.exe (which uses %TARGET%). If this example is exercised on Windows runners, it will print the literal string. Consider documenting the required shell, or using a shell-agnostic mechanism (e.g., relying on Dagu's parameter substitution rather than environment-variable expansion).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/embedded/local/workflow.yaml` at line 5, The command in
workflow.yaml uses POSIX shell variable syntax (${TARGET}) which won't expand in
Windows cmd.exe; update the example so it is shell-agnostic by either declaring
the shell to use (e.g., ensure the step uses a POSIX shell such as bash) or
replace the environment expansion with Dagu's parameter substitution mechanism
(instead of ${TARGET}) so the value is injected by the workflow engine; update
the command entry and add a short note in the example explaining the chosen
approach.
| func RegisterExecutor(name string, factory ExecutorFactory, opts ...ExecutorOption) { | ||
| var registration executorRegistration | ||
| for _, opt := range opts { | ||
| opt(®istration) | ||
| } | ||
| runtimeexec.RegisterExecutor( | ||
| name, | ||
| runtimeexec.ExecutorFactory(factory), | ||
| registration.validator, | ||
| registration.caps, | ||
| ) | ||
| spec.RegisterExecutorTypeName(name) | ||
| } |
There was a problem hiding this comment.
Validate the executor name and consider concurrency expectations.
Two concerns:
RegisterExecutordoes not reject empty/whitespacename.spec.RegisterExecutorTypeNamesilently ignores it, butruntimeexec.RegisterExecutorwill still insert an entry under the empty key intoexecutorRegistry(perinternal/runtime/executor/executor.go:58), which is a footgun. Trim and reject empty names early (or panic, matching typicalRegister*semantics).runtimeexec.RegisterExecutorwrites to a package-levelexecutorRegistrywithout synchronization. In the embedded use case users may calldagu.RegisterExecutorconcurrently with engine setup or even at runtime; document that registration must happen before any engine use, or add a mutex on the runtime side.
🛡️ Suggested guard
func RegisterExecutor(name string, factory ExecutorFactory, opts ...ExecutorOption) {
+ name = strings.TrimSpace(name)
+ if name == "" {
+ panic("dagu: RegisterExecutor called with empty name")
+ }
+ if factory == nil {
+ panic("dagu: RegisterExecutor called with nil factory")
+ }
var registration executorRegistration
for _, opt := range opts {
opt(®istration)
}(Add "strings" to imports.)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@executor.go` around lines 27 - 39, Trim and validate the provided name in
RegisterExecutor (use strings.TrimSpace) and panic (or return an error if your
API prefers) when the trimmed name is empty to avoid inserting an empty key into
runtimeexec's registry; perform this check before calling
runtimeexec.RegisterExecutor or spec.RegisterExecutorTypeName. Keep the rest of
the logic (applying ExecutorOption to executorRegistration and passing
registration.validator/registration.caps) unchanged, and add a short doc comment
on RegisterExecutor that registrations are not concurrency-safe and must occur
before engine/runtime use (or alternatively move synchronization into
runtimeexec.RegisterExecutor if you control that package).
| func (e *Engine) coordinatorClient(opts DistributedOptions) (coordinator.Client, error) { | ||
| if len(opts.Coordinators) == 0 { | ||
| return nil, fmt.Errorf("distributed execution requires at least one coordinator address") | ||
| } | ||
| cfg := coordinator.DefaultConfig() | ||
| cfg.CAFile = opts.TLS.ClientCAFile | ||
| cfg.CertFile = opts.TLS.CertFile | ||
| cfg.KeyFile = opts.TLS.KeyFile | ||
| cfg.SkipTLSVerify = opts.TLS.SkipTLSVerify | ||
| cfg.Insecure = opts.TLS.Insecure | ||
| if cfg.Insecure == false && cfg.CAFile == "" && cfg.CertFile == "" && cfg.KeyFile == "" && !cfg.SkipTLSVerify { | ||
| cfg.Insecure = true | ||
| } | ||
| if err := cfg.Validate(); err != nil { | ||
| return nil, err | ||
| } | ||
| registry, err := coordinator.NewStaticRegistry(opts.Coordinators) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return coordinator.New(registry, cfg), nil | ||
| } |
There was a problem hiding this comment.
Silent downgrade to insecure transport is a security footgun.
When the caller passes a non-empty Coordinators list but no TLS material and doesn't explicitly set Insecure or SkipTLSVerify, the code silently flips cfg.Insecure = true. From a library-API perspective this means "I forgot to configure TLS" and "I want plaintext" become indistinguishable, and the embedded test in internal/intg/embed/embed_test.go (which leaves TLS zero-valued) is what makes this convenient — at the cost of making it the default for every embedder.
Recommend one of:
- Require an explicit
TLS.Insecure = true(or a dedicatedAllowInsecureflag) and return an error otherwise. - At minimum, log a warning via
e.loggerwhen this fallback kicks in so embedders see it in their logs.
Also worth aligning style with the rest of the repo — cfg.Insecure == false is flagged by revive (bool-comparison) and the mixed ==false / !cfg.SkipTLSVerify is hard to read:
♻️ Stylistic cleanup (orthogonal to the security concern)
- if cfg.Insecure == false && cfg.CAFile == "" && cfg.CertFile == "" && cfg.KeyFile == "" && !cfg.SkipTLSVerify {
- cfg.Insecure = true
- }
+ noTLSMaterial := cfg.CAFile == "" && cfg.CertFile == "" && cfg.KeyFile == ""
+ if !cfg.Insecure && !cfg.SkipTLSVerify && noTLSMaterial {
+ // TODO(security): require explicit opt-in instead of silent downgrade.
+ cfg.Insecure = true
+ }As per coding guidelines: "Use golangci-lint v2 for linting Go code (errcheck, govet, staticcheck, gosec, revive, etc.)" — revive will flag cfg.Insecure == false.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/engine/engine.go` around lines 207 - 228, The coordinatorClient
currently silently sets cfg.Insecure = true when no TLS material is present;
change this to fail fast: in Engine.coordinatorClient, detect the case where
cfg.Insecure is false (use !cfg.Insecure), cfg.CAFile/CertFile/KeyFile are empty
and !cfg.SkipTLSVerify, and return a clear error asking the caller to either
provide TLS files or explicitly set TLS.Insecure=true (or introduce an explicit
AllowInsecure flag), instead of flipping Insecure; also update any messaging to
use e.logger where appropriate and clean up bool comparisons to use
!cfg.Insecure and !cfg.SkipTLSVerify for revive style compliance.
| func (l *slogAdapter) Info(msg string, tags ...slog.Attr) { l.log(slog.LevelInfo, msg, tags...) } | ||
| func (l *slogAdapter) Warn(msg string, tags ...slog.Attr) { l.log(slog.LevelWarn, msg, tags...) } | ||
| func (l *slogAdapter) Error(msg string, tags ...slog.Attr) { l.log(slog.LevelError, msg, tags...) } | ||
| func (l *slogAdapter) Fatal(msg string, tags ...slog.Attr) { l.log(slog.LevelError, msg, tags...) } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find other logger.Logger implementations and inspect their Fatal/Fatalf behavior.
rg -nP --type=go '\bfunc \([^)]+\) Fatal(f)?\(' -C3Repository: dagucloud/dagu
Length of output: 3402
🏁 Script executed:
sed -n '218,230p' internal/cmn/logger/logger.goRepository: dagucloud/dagu
Length of output: 414
🏁 Script executed:
sed -n '264,280p' internal/cmn/logger/logger.goRepository: dagucloud/dagu
Length of output: 513
🏁 Script executed:
# Check the logger.Logger interface definition
rg -nP 'type Logger interface' -A 20 --type=goRepository: dagucloud/dagu
Length of output: 1298
🏁 Script executed:
# Find where slogAdapter is instantiated and used
rg -n 'slogAdapter|NewSlogAdapter' --type=go -B 2 -A 2Repository: dagucloud/dagu
Length of output: 3054
🏁 Script executed:
# Check internal/engine/logger.go for context and comments about slogAdapter
head -50 internal/engine/logger.goRepository: dagucloud/dagu
Length of output: 1817
🏁 Script executed:
# Find all calls to newSlogAdapter
rg -n 'newSlogAdapter' --type=goRepository: dagucloud/dagu
Length of output: 207
🏁 Script executed:
# Check the broader context of internal/engine/logger.go - what is this engine module for?
head -100 internal/engine/logger.go | tail -60Repository: dagucloud/dagu
Length of output: 2075
🏁 Script executed:
# Check engine.go around line 48 to understand the context
sed -n '40,60p' internal/engine/engine.goRepository: dagucloud/dagu
Length of output: 701
🏁 Script executed:
# Check if there's any documentation about why slogAdapter might be embedded
grep -n 'embedded\|embed' internal/engine/logger.go internal/engine/engine.goRepository: dagucloud/dagu
Length of output: 40
🏁 Script executed:
# Find the Engine type definition and its documentation
rg -n 'type Engine struct' -A 10 --type=go internal/engine/engine.goRepository: dagucloud/dagu
Length of output: 427
🏁 Script executed:
# Check if there are any examples or documentation about embedding the engine
find . -name '*.md' -o -name '*.go' | xargs grep -l 'Engine.*embed\|embed.*Engine' 2>/dev/null | head -5Repository: dagucloud/dagu
Length of output: 69
🏁 Script executed:
# Get more context from engine.go to understand the embedded pattern
sed -n '1,100p' internal/engine/engine.go | head -50Repository: dagucloud/dagu
Length of output: 1485
Fatal/Fatalf do not terminate the process.
The appLogger implementation (the default logger) explicitly calls os.Exit(1) after logging Fatal errors, consistent with standard Go logging behavior. However, slogAdapter (used when a custom *slog.Logger is provided) logs at LevelError and returns without exiting. This creates a behavioral inconsistency: callers relying on Fatal to abort execution will see different behavior depending on which logger implementation is active.
This appears intentional for the embedded library use case (avoiding process termination when Engine is used as a library), but it is not documented. Either add os.Exit(1) calls in slogAdapter or add a clear comment explaining why this logger variant intentionally degrades Fatal to prevent surprising callers.
Also applies to: 35-35
| func (e *Engine) runDistributed(ctx context.Context, dag *core.DAG, runID string, opts RunOptions) (*Run, error) { | ||
| dist := e.distributed | ||
| if len(opts.WorkerSelector) > 0 { | ||
| dist.WorkerSelector = cloneStringMap(opts.WorkerSelector) | ||
| } | ||
| client, err := e.coordinatorClient(dist) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| taskOpts := []runtimeexec.TaskOption{ | ||
| runtimeexec.WithBaseConfig(runtimeexec.ResolveBaseConfig(dag.BaseConfigData, e.cfg.Paths.BaseConfig)), | ||
| } | ||
| if len(dist.WorkerSelector) > 0 { | ||
| taskOpts = append(taskOpts, runtimeexec.WithWorkerSelector(dist.WorkerSelector)) | ||
| } | ||
| if len(dag.Tags) > 0 { | ||
| taskOpts = append(taskOpts, runtimeexec.WithTags(strings.Join(dag.Tags.Strings(), ","))) | ||
| } | ||
| if dag.SourceFile != "" { | ||
| taskOpts = append(taskOpts, runtimeexec.WithSourceFile(dag.SourceFile)) | ||
| } | ||
| if snapshot, snapErr := agentsnapshot.BuildFromPaths(ctx, dag, e.cfg.Paths, e.dagStore); snapErr != nil { | ||
| _ = client.Cleanup(ctx) | ||
| return nil, fmt.Errorf("build agent snapshot: %w", snapErr) | ||
| } else if len(snapshot) > 0 { | ||
| taskOpts = append(taskOpts, runtimeexec.WithAgentSnapshot(snapshot)) | ||
| } | ||
| task := runtimeexec.CreateTask( | ||
| dag.Name, | ||
| string(dag.YamlData), | ||
| coordinatorv1.Operation_OPERATION_START, | ||
| runID, | ||
| taskOpts..., | ||
| ) | ||
| if len(dag.Params) > 0 { | ||
| task.Params = strings.Join(dag.Params, " ") | ||
| } | ||
| if err := client.Dispatch(ctx, task); err != nil { | ||
| _ = client.Cleanup(ctx) | ||
| return nil, fmt.Errorf("dispatch DAG run: %w", err) | ||
| } | ||
| return &Run{ | ||
| engine: e, | ||
| ref: RunRef{Name: dag.Name, ID: runID}, | ||
| mode: ExecutionModeDistributed, | ||
| done: make(chan struct{}), | ||
| dag: dag, | ||
| coordinator: client, | ||
| }, nil | ||
| } |
There was a problem hiding this comment.
Coordinator client leaks when a distributed Run is dropped without Wait/Stop.
The coordinator client is created here and stashed into the returned Run. The only places that call client.Cleanup are waitDistributed's defer and (*Run).Stop. If an embedder calls RunYAML in distributed mode and then discards the handle (or the parent context is canceled before they reach Wait), the client and its underlying gRPC connections are never released. Given this is a public API meant to be embedded by other Go programs, the lifecycle should be self-healing — e.g., attach cleanup to a runtime.SetFinalizer, or document loudly that callers must always call Wait or Stop.
A minimal hardening: track cleanup in the goroutine that closes done (mirroring how runLocal closes its logFile), and make Cleanup idempotent so the explicit Stop/Wait paths remain safe.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/engine/run.go` around lines 388 - 437, runDistributed creates a
coordinator client and stores it on the returned Run but never guarantees
client.Cleanup if the Run is dropped without calling Wait/Stop; modify
runDistributed/Run so the coordinator client is always cleaned up when the Run's
done channel is closed (mirror runLocal's cleanup of logFile): ensure the
goroutine that closes Run.done calls client.Cleanup (or install a
runtime.SetFinalizer on *Run to call Cleanup) and make the coordinator client's
Cleanup implementation idempotent so explicit Stop/Wait paths remain safe; refer
to runDistributed, Run (ref/done/coordinator fields), coordinatorClient,
client.Cleanup, waitDistributed, and Run.Stop to locate where to add the cleanup
and idempotency changes.
| type agentStoresResult struct { | ||
| ConfigStore agentstores.ConfigStore | ||
| ModelStore agentstores.ModelStore | ||
| MemoryStore agentstores.MemoryStore | ||
| SoulStore agentstores.SoulStore | ||
| OAuthManager *agentoauth.Manager | ||
| ContextResolver agentstores.RemoteContextResolver | ||
| } | ||
|
|
||
| func (e *Engine) agentStores(ctx context.Context) agentStoresResult { | ||
| var result agentStoresResult | ||
| if store, err := fileagentconfig.New(e.cfg.Paths.DataDir); err == nil { | ||
| result.ConfigStore = store | ||
| } else { | ||
| logger.Warn(ctx, "Failed to create agent config store", tag.Error(err)) | ||
| } | ||
| if store, err := fileagentmodel.New(filepath.Join(e.cfg.Paths.DataDir, "agent", "models")); err == nil { | ||
| result.ModelStore = store | ||
| } else { | ||
| logger.Warn(ctx, "Failed to create agent model store", tag.Error(err)) | ||
| } | ||
| if store, err := filememory.New(e.cfg.Paths.DAGsDir); err == nil { | ||
| result.MemoryStore = store | ||
| } else { | ||
| logger.Warn(ctx, "Failed to create agent memory store", tag.Error(err)) | ||
| } | ||
| if store, err := fileagentsoul.New(ctx, filepath.Join(e.cfg.Paths.DAGsDir, "souls")); err == nil { | ||
| result.SoulStore = store | ||
| } else { | ||
| logger.Warn(ctx, "Failed to create agent soul store", tag.Error(err)) | ||
| } | ||
| if manager, err := fileagentoauth.NewManager(e.cfg.Paths.DataDir); err == nil { | ||
| result.OAuthManager = manager | ||
| } else { | ||
| logger.Warn(ctx, "Failed to create agent OAuth manager", tag.Error(err)) | ||
| } | ||
| return result | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify what consumers expect from RemoteContextResolver and whether nil is handled.
ast-grep --pattern 'type RemoteContextResolver $_'
rg -nP '\bRemoteContextResolver\b' -C3
rg -nP '\bAgentRemoteContextResolver\b' -C3Repository: dagucloud/dagu
Length of output: 20515
🏁 Script executed:
#!/bin/bash
# Check if buildRemoteContextResolver is called anywhere in engine or related code
rg -n 'buildRemoteContextResolver' internal/engine/ internal/runtime/
# Check the full agentStores function in cmd/context.go to see if it initializes ContextResolver
sed -n '754,810p' internal/cmd/context.go
# Verify if there's any initialization of ContextResolver in the engine version
rg -n 'ContextResolver' internal/engine/run.go -A2 -B2Repository: dagucloud/dagu
Length of output: 2196
agentStoresResult.ContextResolver is declared but never initialized in the engine version.
The agentStores method in internal/engine/run.go initializes ConfigStore, ModelStore, MemoryStore, SoulStore, and OAuthManager, but leaves ContextResolver as nil. In contrast, the similar function in internal/cmd/context.go properly initializes it via buildRemoteContextResolver().
While tool factories gracefully handle a nil resolver (returning nil for the tool if resolver is nil), propagating an uninitialized field that's wired into AgentRemoteContextResolver is misleading to embedders who may expect remote context support. Either:
- Wire up the resolver by calling
buildRemoteContextResolver()(matching thecmdversion), or - Remove the field and its wiring until remote context resolution is actually supported in the engine.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/engine/run.go` around lines 538 - 575, The
agentStoresResult.ContextResolver is left nil in Engine.agentStores; fix by
initializing it like the cmd version: call buildRemoteContextResolver(...) from
within Engine.agentStores and assign its return to result.ContextResolver,
handling any error by logging with logger.Warn (similar to other stores) so the
resolver is not silently left uninitialized; alternatively, if remote context
support isn’t desired yet, remove the ContextResolver field from
agentStoresResult and any wiring that expects it.
| func TestEmbeddedCustomExecutorRunYAML(t *testing.T) { | ||
| const executorType = "embedded_intg_echo" | ||
|
|
||
| dagu.RegisterExecutor( | ||
| executorType, | ||
| func(_ context.Context, step dagu.Step) (dagu.Executor, error) { | ||
| return &echoExecutor{step: step}, nil | ||
| }, | ||
| dagu.WithExecutorCapabilities(dagu.ExecutorCapabilities{Command: true}), | ||
| ) | ||
|
|
||
| ctx, cancel := context.WithTimeout(context.Background(), embeddedTimeout(20*time.Second)) | ||
| defer cancel() | ||
|
|
||
| engine, err := dagu.New(ctx, dagu.Options{HomeDir: t.TempDir()}) | ||
| require.NoError(t, err) | ||
| t.Cleanup(func() { | ||
| require.NoError(t, engine.Close(context.Background())) | ||
| }) | ||
|
|
||
| run, err := engine.RunYAML(ctx, []byte(` | ||
| name: embedded-intg-custom-executor | ||
| type: graph | ||
| steps: | ||
| - name: go-step | ||
| type: embedded_intg_echo | ||
| command: called from YAML | ||
| `)) | ||
| require.NoError(t, err) | ||
|
|
||
| status, err := run.Wait(ctx) | ||
| require.NoError(t, err) | ||
| require.Equal(t, "embedded-intg-custom-executor", status.Name) | ||
| require.Equal(t, "succeeded", status.Status) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Inspect RegisterExecutor implementation to see how duplicate names are handled.
fd -t f 'executor.go$' -x rg -n -C5 'func RegisterExecutor' {} \;
ast-grep --pattern 'func RegisterExecutor($$$) {
$$$
}'Repository: dagucloud/dagu
Length of output: 2064
Custom executor registration uses global state with incomplete test cleanup.
dagu.RegisterExecutor mutates a global registry. While the implementation silently overwrites on duplicate registration (rather than panicking), this pattern still violates test isolation. Re-running this test with go test -count=N will mutate the global executor registry multiple times. If other tests are added that depend on a clean registry state, or if future changes to the executor registration implementation introduce stricter checks, this will cause cross-test contamination. Consider adding t.Cleanup to unregister the custom executor after the test completes.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/intg/embed/embed_test.go` around lines 64 - 98,
TestEmbeddedCustomExecutorRunYAML registers a custom executor via
dagu.RegisterExecutor which mutates global state; add a t.Cleanup that restores
registry state after the test (e.g., call dagu.UnregisterExecutor(executorType)
if available, or capture any previous registration returned/visible and
re-register it in t.Cleanup) so that the echoExecutor/ executorType registration
is removed/restored when the test finishes to avoid cross-test contamination.
Merged features from upstream: - feat: add embedded Dagu API (dagucloud#2011) - feat: add edit-and-retry DAG runs (dagucloud#2010) - feat: add bulk DAG-run deletion in web UI (dagucloud#2009) - feat: add kubernetes secret provider (dagucloud#2006) - feat: make workspace selection global (dagucloud#2015) - feat: show cockpit run artifacts (dagucloud#2017) - feat: allow disabling DAG retry policy (dagucloud#2018) - feat: make DAG labels canonical, deprecate tags (dagucloud#2013) - feat: add workflow design workspace (dagucloud#2012) - feat: add step with/config alias (dagucloud#2021) - fix: allow runtime custom step inputs (dagucloud#2005) - fix: align embedded engine run labels (dagucloud#2014) - Various CI/test/docs improvements Conflict resolution: - Kept fork i18n files (ui/src/i18n/, LanguageSwitcher) - Merged i18n hooks with upstream structural changes in App.tsx, Layout.tsx, menu.tsx, DAGStatus.tsx, etc. - Adopted upstream labels-over-tags rename (TagCombobox -> LabelCombobox)
Summary
github.com/dagucloud/daguembedded engine API for local and shared-nothing distributed DAG executionTesting
go test ./internal/intg/embed -count=1go test ./examples/embedded/... ./internal/core/spec ./internal/engine . -count=1GOOS=linux GOARCH=amd64 go test -c ./internal/intg/embed -o /tmp/dagu-embed-linux-amd64.testGOOS=windows GOARCH=amd64 go test -c ./internal/intg/embed -o /tmp/dagu-embed-windows-amd64.test.exego test ./... -run '^$'Notes
Summary by CodeRabbit
Release Notes
New Features
Documentation