Skip to content

feat: add embedded Dagu API#2011

Merged
yottahmd merged 12 commits intomainfrom
feat/embedded-dagu-api
Apr 19, 2026
Merged

feat: add embedded Dagu API#2011
yottahmd merged 12 commits intomainfrom
feat/embedded-dagu-api

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Apr 19, 2026

Summary

  • add an experimental root github.com/dagucloud/dagu embedded engine API for local and shared-nothing distributed DAG execution
  • expose async run handles, status/stop operations, embedded worker startup, and custom executor registration
  • add runnable embedded examples and integration coverage for local, custom executor, and distributed embedded execution

Testing

  • go test ./internal/intg/embed -count=1
  • go test ./examples/embedded/... ./internal/core/spec ./internal/engine . -count=1
  • GOOS=linux GOARCH=amd64 go test -c ./internal/intg/embed -o /tmp/dagu-embed-linux-amd64.test
  • GOOS=windows GOARCH=amd64 go test -c ./internal/intg/embed -o /tmp/dagu-embed-windows-amd64.test.exe
  • go test ./... -run '^$'

Notes

  • Linux and Windows runtime coverage should be validated by CI runners.

Summary by CodeRabbit

Release Notes

  • New Features

    • Embedded engine API enabling DAG workflow execution from Go applications
    • Local execution mode for single-machine workflow runs
    • Distributed execution mode with coordinator support for multi-worker deployments
    • Custom executor registration capability for extended functionality
    • Parameter injection and workflow configuration management
    • Worker lifecycle management for distributed architectures
  • Documentation

    • Comprehensive usage guide with local, distributed, and custom executor examples

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 19, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e0c9d293-c0c3-4dea-a81e-a77f890edf1c

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR introduces a new public embedded API for the dagu package, enabling DAG execution from Go applications. It exposes core types (Engine, Run, Worker, Status, Options), executor registration capabilities, and methods for local and distributed DAG execution with comprehensive examples and integration tests.

Changes

Cohort / File(s) Summary
Package Documentation & Public API
doc.go, engine.go, executor.go, tools.go
Introduces package-level documentation and core public API including ExecutionMode, Options, DistributedOptions, RunOptions, Status, Engine, Run, Worker types with factory/lifecycle methods. Adds executor registration API with type aliases and option helpers. Updates tools.go package namespace.
Example Programs
examples/embedded/README.md, examples/embedded/local/main.go, examples/embedded/distributed/main.go, examples/embedded/custom-executor/main.go
Provides three example implementations: local execution with parameter passing, shared-nothing distributed execution with coordinator/worker setup, and custom executor registration demonstrating in-process executor integration. Documentation explains API concepts and run instructions.
Example Workflows
examples/embedded/local/workflow.yaml, examples/embedded/distributed/workflow.yaml, examples/embedded/custom-executor/workflow.yaml
Workflow YAML definitions for example programs, demonstrating graph-type DAGs with sequential steps and parameter substitution.
Internal Engine Implementation
internal/engine/engine.go, internal/engine/run.go, internal/engine/worker.go, internal/engine/types.go, internal/engine/logger.go
Core runtime orchestration: Engine initialization with config/persistence/logging setup, RunFile/RunYAML orchestration for local/distributed execution with attempt lifecycle management, Worker instantiation and startup for distributed mode, type definitions for Options/RunOptions/Status/Run/Worker, and slog adapter for logging integration.
Public Tests
engine_test.go, example_test.go
Integration tests validating local execution with parameters, YAML-based runs, error handling for distributed without coordinator, and example-style demonstrations of all three execution patterns.
Internal Integration Tests
internal/intg/embed/embed_test.go
Comprehensive integration tests for local YAML execution, custom executor registration/execution, and distributed shared-nothing execution with worker coordination and assignment verification.
Step Type Registration
internal/core/spec/step_types.go
Adds thread-safe executor type name registration via new RegisterExecutorTypeName function, protecting builtin step type set with RWMutex for concurrent read/write safety.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Engine
    participant DAGStore
    participant Agent
    participant Steps

    Client->>Engine: RunFile/RunYAML (with RunOptions)
    activate Engine
    Engine->>DAGStore: Load DAG definition
    DAGStore-->>Engine: DAG metadata
    Engine->>Engine: Apply run overrides (params, tags, selector)
    Engine->>Agent: Create & start local execution agent
    deactivate Engine
    activate Agent
    Client->>Engine: Wait (on Run object)
    Agent->>Steps: Execute each step in DAG
    activate Steps
    Steps->>Steps: Run with params
    Steps-->>Agent: Result & logs
    deactivate Steps
    Agent->>Agent: Record attempt status
    Agent-->>Engine: Signal completion
    deactivate Agent
    Engine-->>Client: Return final Status
Loading
sequenceDiagram
    participant Client
    participant Engine
    participant Coordinator
    participant Worker
    participant Agent
    participant Steps

    Client->>Engine: RunFile/RunYAML (ExecutionModeDistributed)
    activate Engine
    Engine->>Coordinator: Submit task for scheduling
    deactivate Engine
    Coordinator->>Coordinator: Match worker by selector/tags
    Coordinator->>Worker: Dispatch to matching worker
    activate Worker
    Worker->>Agent: Create execution agent
    activate Agent
    Agent->>Steps: Execute each step in DAG
    activate Steps
    Steps->>Steps: Run with params
    Steps-->>Agent: Result & logs
    deactivate Steps
    Agent->>Coordinator: Report status updates
    deactivate Agent
    Worker-->>Coordinator: Execution complete
    deactivate Worker
    Client->>Engine: Poll Status(RunRef)
    Engine->>Coordinator: Query run status
    Coordinator-->>Engine: Return Status snapshot
    Engine-->>Client: Return final Status
Loading
sequenceDiagram
    participant User
    participant User Code
    participant Engine
    participant Runtime
    participant ExecutorRegistry
    participant Custom Executor

    User->>User Code: Call RegisterExecutor("custom_type", factory, opts)
    activate User Code
    User Code->>ExecutorRegistry: RegisterExecutor with factory & options
    ExecutorRegistry->>ExecutorRegistry: Add factory mapping
    ExecutorRegistry->>ExecutorRegistry: Register type name
    deactivate User Code
    
    User->>Engine: RunYAML (DAG step uses custom_type)
    activate Engine
    Engine->>Runtime: Prepare & execute step
    Runtime->>ExecutorRegistry: Get executor for custom_type
    ExecutorRegistry-->>Runtime: Return custom executor instance
    activate Custom Executor
    Runtime->>Custom Executor: Run(Step)
    Custom Executor->>Custom Executor: Execute custom logic
    Custom Executor-->>Runtime: Return result
    deactivate Custom Executor
    deactivate Engine
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 18.03% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: add embedded Dagu API' accurately and concisely summarizes the main change—introducing a new embedded engine API for running Dagu DAGs from Go applications, as evidenced by the introduction of core API types, methods, examples, and integration tests.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/embedded-dagu-api

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

🧹 Nitpick comments (8)
internal/core/spec/step_types.go (1)

86-98: Thread-safe registration looks correct.

Read/write locking around builtinStepTypeNames is properly paired across RegisterExecutorTypeName and isBuiltinStepTypeName. One optional consideration: RegisterExecutorTypeName does not validate the name against customStepTypeNameRegexp (the same validator applied to custom step types). Registering an arbitrary string here means any garbage value will silently be accepted by DAG loading. Consider rejecting names that don't match customStepTypeNameRegexp (or returning an error) to prevent malformed executor names from poisoning the registry.

Also applies to: 287-292

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/spec/step_types.go` around lines 86 - 98,
RegisterExecutorTypeName currently accepts any non-empty string and inserts it
into builtinStepTypeNames; change it to validate the trimmed name against
customStepTypeNameRegexp and refuse to register invalid names (i.e., return
early without inserting) to avoid poisoning the registry with garbage. Update
the body of RegisterExecutorTypeName to: trim the input, if empty return, if
!customStepTypeNameRegexp.MatchString(name) return (or optionally log/debug),
otherwise acquire stepTypeNamesMu and insert into builtinStepTypeNames; apply
the same validation logic to the other registration site that manipulates
builtinStepTypeNames (the one around the other RegisterExecutorTypeName-like
block).
executor.go (1)

14-51: GoDoc comments are a best practice for public APIs, but not currently enforced by your linter configuration.

This file defines the public API surface (Step, Executor, ExecutorFactory, StepValidator, ExecutorCapabilities, ExecutorOption, RegisterExecutor, WithStepValidator, WithExecutorCapabilities). While adding one-line doc comments to these exported identifiers would improve documentation, the revive linter's exported rule is not enabled in .golangci.yml (only specific rules are listed: unused-parameter, import-alias-naming, if-return, error-return, dot-imports, struct-tag, function-result-limit, atomic, use-any, redundant-import-alias, enforce-map-style, enforce-slice-style). Adding GoDoc is optional unless your team adopts a style guide that requires it.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executor.go` around lines 14 - 51, Add concise GoDoc one-line comments for
all exported identifiers in this file to improve public API documentation:
document type aliases Step, Executor, ExecutorFactory, StepValidator,
ExecutorCapabilities, ExecutorOption, the executorRegistration struct, and
exported functions RegisterExecutor, WithStepValidator, and
WithExecutorCapabilities by adding a brief comment above each declaration
describing its purpose and usage (e.g., what RegisterExecutor does and what
arguments it expects, and what the option helpers set). Ensure comments follow
GoDoc style (start with the identifier name) and are placed immediately above
the corresponding declarations.
internal/engine/logger.go (1)

41-58: Pass slog.Attr values directly to With() for efficiency, but the LogValuer preservation claim is inaccurate.

slog.Logger.Log (which With uses) explicitly handles slog.Attr arguments: "If an argument is an Attr, it is used as is." The suggested change is valid and more efficient—it avoids unpacking and reconstructing the Attr.

However, the reasoning about preserving "lazy evaluation semantics" is incorrect. Both approaches preserve the LogValuer instance: calling attr.Value.Any() on a LogValuer-type Value returns the LogValuer instance itself, not a resolved value. Neither approach "flattens" the structure; both maintain the typed LogValuer for handler processing.

♻️ Suggested change
 func (l *slogAdapter) With(attrs ...slog.Attr) logger.Logger {
-	args := make([]any, 0, len(attrs)*2)
-	for _, attr := range attrs {
-		args = append(args, attr.Key, attr.Value.Any())
-	}
-	return &slogAdapter{logger: l.logger.With(args...)}
+	args := make([]any, 0, len(attrs))
+	for _, attr := range attrs {
+		args = append(args, attr)
+	}
+	return &slogAdapter{logger: l.logger.With(args...)}
 }

The same applies to log() on line 53.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/engine/logger.go` around lines 41 - 58, The current slogAdapter.With
and slogAdapter.log unpack slog.Attr into key/value pairs by calling
attr.Value.Any(), which is unnecessary and less efficient; instead pass the
slog.Attr values directly to the underlying logger (use l.logger.With(attrs...)
in With and l.logger.Log(..., tags...) in log) so you stop constructing args
slices and preserve Attrs unchanged; update the With method to forward attrs
as-is and update log to forward tags as-is (remove attr.Value.Any() usage and
the key/value unpacking).
engine_test.go (1)

30-56: Use testify assertions in the new tests.

These tests use manual if/t.Fatalf checks instead of require/assert. Per the coding guidelines for **/*_test.go: Use stretchr/testify assertions for testing in Go. Testify (v1.11.1) is already available as a dependency.

♻️ Example conversion pattern
 import (
 	"context"
 	"os"
 	"path/filepath"
 	"strings"
 	"testing"
 	"time"
 
 	"github.com/dagucloud/dagu"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
 )
@@
 	engine, err := dagu.New(ctx, dagu.Options{HomeDir: home})
-	if err != nil {
-		t.Fatalf("New() error = %v", err)
-	}
+	require.NoError(t, err, "New()")
@@
 	run, err := engine.RunFile(ctx, dagFile, dagu.WithParams(map[string]string{"FOO": "bar"}))
-	if err != nil {
-		t.Fatalf("RunFile() error = %v", err)
-	}
+	require.NoError(t, err, "RunFile()")
 	status, err := run.Wait(ctx)
-	if err != nil {
-		t.Fatalf("Wait() error = %v", err)
-	}
-	if status.Status != "succeeded" {
-		t.Fatalf("status = %q, want succeeded", status.Status)
-	}
-	if status.Name != "embedded-file" {
-		t.Fatalf("name = %q, want embedded-file", status.Name)
-	}
-	if status.RunID == "" {
-		t.Fatal("run ID is empty")
-	}
+	require.NoError(t, err, "Wait()")
+	assert.Equal(t, "succeeded", status.Status)
+	assert.Equal(t, "embedded-file", status.Name)
+	assert.NotEmpty(t, status.RunID)

Also applies to: 63-90, 98-118, 122-126

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine_test.go` around lines 30 - 56, Replace the manual if/t.Fatalf style
checks in engine_test.go with testify/require assertions: import
"github.com/stretchr/testify/require" and use require.NoError(t, err) after New,
RunFile and Wait calls, use require.Equal(t, "succeeded", status.Status) and
require.Equal(t, "embedded-file", status.Name) for the expected values, and use
require.NotEmpty(t, status.RunID) for the run ID; also replace the deferred
Close() error check with require.NoError(t, engine.Close(context.Background()))
inside the deferred function to keep failure reporting consistent (update the
other similar test blocks at lines referenced in the comment as well).
engine.go (2)

236-257: Run.Wait (and Run.Status partly) can be simplified.

The if err != nil branch returns the same tuple as the success branch in Wait. Collapsing avoids the visual implication that the error case is special-cased.

♻️ Suggested simplification
-	status, err := r.inner.Wait(ctx)
-	if err != nil {
-		return publicStatus(status), err
-	}
-	return publicStatus(status), nil
+	status, err := r.inner.Wait(ctx)
+	return publicStatus(status), err
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine.go` around lines 236 - 257, The error branch in Run.Wait duplicates
the success return; simplify both Wait (and mirror for Status) by removing the
redundant if err != nil block and directly return publicStatus(status), err from
the call to r.inner.Wait(ctx) (and similarly return publicStatus(status), err
for r.inner.Status(ctx)), keeping the initial nil-check for r and r.inner and
using publicStatus to convert the inner status.

307-318: Inconsistent empty-vs-nil normalization between maps and slices.

WithParams / WithWorkerSelector / WithTags go through cloneMap (which returns nil for an empty input), while WithParamsList / WithTags use append([]string{}, …) which returns a non-nil empty slice. Then internalRunOptions re-clones again, doubling the work and re-introducing the same nil/empty mismatch. Consider:

  • Normalizing to one helper (e.g., a cloneSlice that returns nil for empty input) for symmetry, and
  • Skipping the second clone in internalRunOptions since applyRunOptions already produced a defensive copy.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine.go` around lines 307 - 318, WithParams/WithWorkerSelector/WithTags
currently use cloneMap which returns nil for empty maps while
WithParamsList/WithTags create non-nil empty slices via append, causing
inconsistent empty-vs-nil behavior and redundant cloning in internalRunOptions;
add a cloneSlice helper that mirrors cloneMap semantics (returns nil for empty
input) and update WithParamsList (and any slice-using WithTags overloads) to use
cloneSlice, and then remove the second cloning step inside internalRunOptions
(since applyRunOptions already makes defensive copies) so the code normalizes
empties consistently and avoids double work; reference functions: WithParams,
WithParamsList, WithWorkerSelector, WithTags, cloneMap, cloneSlice (new),
applyRunOptions, internalRunOptions.
internal/engine/run.go (2)

260-267: applyRunOverrides doesn't dedupe tags and only handles two fields.

dag.Tags = append(dag.Tags, core.NewTags(opts.Tags)...) will accumulate duplicates if the embedder calls WithTags with values already present in the YAML. Also, given that other run-level overrides like Name/DefaultWorkingDir/params live in loadOptions, it's worth a comment here noting the split so future maintainers don't re-add overlapping logic.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/engine/run.go` around lines 260 - 267, applyRunOverrides currently
appends opts.Tags to dag.Tags without deduping and only handles
WorkerSelector/Tags; change applyRunOverrides to merge tags idempotently by
building a set of existing dag.Tags (from dag.Tags and core.NewTags(opts.Tags))
and only append new tags that aren't present (use the tag string/identifier
comparisons consistent with core.NewTags), and add a short comment in
applyRunOverrides explaining that other run-level overrides (Name,
DefaultWorkingDir, params) are handled in loadOptions so logic is intentionally
limited here; keep the existing WorkerSelector override behavior.

151-216: waitLocal/waitDistributed ignore caller context for the final status fetch.

In both branches the post-completion (or post-cancellation) Status lookup uses context.Background(), even when the caller's ctx is the one carrying deadlines, tracing, or cancellation reasons. For the timeout case (line 167, 190) this is arguably required so we can still return the latest snapshot, but it also means the call can hang indefinitely on a slow file/coord backend after the user's context already expired. Consider using a short bounded context derived from context.Background() (e.g. context.WithTimeout(context.Background(), 5*time.Second)) so the embedder isn't stuck.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/engine/run.go` around lines 151 - 216, Both waitLocal and
waitDistributed call r.Status(context.Background()) after the caller ctx is
done, which can hang indefinitely; replace those background calls with a short
bounded background context (e.g. ctxStatus, cancel :=
context.WithTimeout(context.Background(), 5*time.Second); defer cancel()) and
pass ctxStatus into r.Status. Update all occurrences inside waitLocal (the
status/statusErr fetch after <-r.done) and both places in
waitLocal/waitDistributed where you currently call
r.Status(context.Background()) on ctx.Done() paths so the status fetch times out
quickly; ensure you call cancel() to avoid leaks.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@engine_test.go`:
- Around line 23-28: The DAG uses a POSIX-only command ("test \"$FOO\" =
\"bar\"") which breaks on Windows; in the TestEngineRunFile where writeDAG(...)
is called, detect runtime.GOOS and write a platform-appropriate command string:
on non-windows keep the existing POSIX command, on windows emit a CMD
conditional like `cmd /C if "%FOO%"=="bar" exit 0 else exit 1`; update the
writeDAG call to use that conditional command string so the DAG step (named
"check-param") runs correctly on both platforms.

In `@engine.go`:
- Around line 96-104: Add clear doc comments to the public WorkerOptions struct
to document precedence: state that WorkerOptions.Coordinators and
WorkerOptions.TLS, if non-empty, override the engine-level DistributedOptions,
and if they are empty the engine will fall back to DistributedOptions; also
mention that Coordinators are validated to be non-empty when provided. Reference
the symbols WorkerOptions.Coordinators, WorkerOptions.TLS and DistributedOptions
in the comments so callers know the exact override/fallback behavior and
validation contract.

In `@examples/embedded/distributed/main.go`:
- Around line 66-82: The worker may not be registered before submitting the DAG
(engine.RunFile), causing a race; after launching the goroutine that calls
worker.Start(workerCtx) add a readiness wait before calling engine.RunFile:
either expose a readiness signal from Start (e.g., have Start return a ready
channel or provide a Worker.Ready() method) and block until it signals, or
implement a short poll loop that queries the coordinator (via the
coordinator/client API) for the worker's registration and only then call
engine.RunFile; update the code around worker.Start/workerCtx and the
engine.RunFile call to wait for that readiness signal or successful poll before
dispatching the workflow.

In `@examples/embedded/local/workflow.yaml`:
- Line 5: The command in workflow.yaml uses POSIX shell variable syntax
(${TARGET}) which won't expand in Windows cmd.exe; update the example so it is
shell-agnostic by either declaring the shell to use (e.g., ensure the step uses
a POSIX shell such as bash) or replace the environment expansion with Dagu's
parameter substitution mechanism (instead of ${TARGET}) so the value is injected
by the workflow engine; update the command entry and add a short note in the
example explaining the chosen approach.

In `@executor.go`:
- Around line 27-39: Trim and validate the provided name in RegisterExecutor
(use strings.TrimSpace) and panic (or return an error if your API prefers) when
the trimmed name is empty to avoid inserting an empty key into runtimeexec's
registry; perform this check before calling runtimeexec.RegisterExecutor or
spec.RegisterExecutorTypeName. Keep the rest of the logic (applying
ExecutorOption to executorRegistration and passing
registration.validator/registration.caps) unchanged, and add a short doc comment
on RegisterExecutor that registrations are not concurrency-safe and must occur
before engine/runtime use (or alternatively move synchronization into
runtimeexec.RegisterExecutor if you control that package).

In `@internal/engine/engine.go`:
- Around line 207-228: The coordinatorClient currently silently sets
cfg.Insecure = true when no TLS material is present; change this to fail fast:
in Engine.coordinatorClient, detect the case where cfg.Insecure is false (use
!cfg.Insecure), cfg.CAFile/CertFile/KeyFile are empty and !cfg.SkipTLSVerify,
and return a clear error asking the caller to either provide TLS files or
explicitly set TLS.Insecure=true (or introduce an explicit AllowInsecure flag),
instead of flipping Insecure; also update any messaging to use e.logger where
appropriate and clean up bool comparisons to use !cfg.Insecure and
!cfg.SkipTLSVerify for revive style compliance.

In `@internal/engine/run.go`:
- Around line 388-437: runDistributed creates a coordinator client and stores it
on the returned Run but never guarantees client.Cleanup if the Run is dropped
without calling Wait/Stop; modify runDistributed/Run so the coordinator client
is always cleaned up when the Run's done channel is closed (mirror runLocal's
cleanup of logFile): ensure the goroutine that closes Run.done calls
client.Cleanup (or install a runtime.SetFinalizer on *Run to call Cleanup) and
make the coordinator client's Cleanup implementation idempotent so explicit
Stop/Wait paths remain safe; refer to runDistributed, Run (ref/done/coordinator
fields), coordinatorClient, client.Cleanup, waitDistributed, and Run.Stop to
locate where to add the cleanup and idempotency changes.
- Around line 538-575: The agentStoresResult.ContextResolver is left nil in
Engine.agentStores; fix by initializing it like the cmd version: call
buildRemoteContextResolver(...) from within Engine.agentStores and assign its
return to result.ContextResolver, handling any error by logging with logger.Warn
(similar to other stores) so the resolver is not silently left uninitialized;
alternatively, if remote context support isn’t desired yet, remove the
ContextResolver field from agentStoresResult and any wiring that expects it.

In `@internal/intg/embed/embed_test.go`:
- Around line 64-98: TestEmbeddedCustomExecutorRunYAML registers a custom
executor via dagu.RegisterExecutor which mutates global state; add a t.Cleanup
that restores registry state after the test (e.g., call
dagu.UnregisterExecutor(executorType) if available, or capture any previous
registration returned/visible and re-register it in t.Cleanup) so that the
echoExecutor/ executorType registration is removed/restored when the test
finishes to avoid cross-test contamination.

---

Nitpick comments:
In `@engine_test.go`:
- Around line 30-56: Replace the manual if/t.Fatalf style checks in
engine_test.go with testify/require assertions: import
"github.com/stretchr/testify/require" and use require.NoError(t, err) after New,
RunFile and Wait calls, use require.Equal(t, "succeeded", status.Status) and
require.Equal(t, "embedded-file", status.Name) for the expected values, and use
require.NotEmpty(t, status.RunID) for the run ID; also replace the deferred
Close() error check with require.NoError(t, engine.Close(context.Background()))
inside the deferred function to keep failure reporting consistent (update the
other similar test blocks at lines referenced in the comment as well).

In `@engine.go`:
- Around line 236-257: The error branch in Run.Wait duplicates the success
return; simplify both Wait (and mirror for Status) by removing the redundant if
err != nil block and directly return publicStatus(status), err from the call to
r.inner.Wait(ctx) (and similarly return publicStatus(status), err for
r.inner.Status(ctx)), keeping the initial nil-check for r and r.inner and using
publicStatus to convert the inner status.
- Around line 307-318: WithParams/WithWorkerSelector/WithTags currently use
cloneMap which returns nil for empty maps while WithParamsList/WithTags create
non-nil empty slices via append, causing inconsistent empty-vs-nil behavior and
redundant cloning in internalRunOptions; add a cloneSlice helper that mirrors
cloneMap semantics (returns nil for empty input) and update WithParamsList (and
any slice-using WithTags overloads) to use cloneSlice, and then remove the
second cloning step inside internalRunOptions (since applyRunOptions already
makes defensive copies) so the code normalizes empties consistently and avoids
double work; reference functions: WithParams, WithParamsList,
WithWorkerSelector, WithTags, cloneMap, cloneSlice (new), applyRunOptions,
internalRunOptions.

In `@executor.go`:
- Around line 14-51: Add concise GoDoc one-line comments for all exported
identifiers in this file to improve public API documentation: document type
aliases Step, Executor, ExecutorFactory, StepValidator, ExecutorCapabilities,
ExecutorOption, the executorRegistration struct, and exported functions
RegisterExecutor, WithStepValidator, and WithExecutorCapabilities by adding a
brief comment above each declaration describing its purpose and usage (e.g.,
what RegisterExecutor does and what arguments it expects, and what the option
helpers set). Ensure comments follow GoDoc style (start with the identifier
name) and are placed immediately above the corresponding declarations.

In `@internal/core/spec/step_types.go`:
- Around line 86-98: RegisterExecutorTypeName currently accepts any non-empty
string and inserts it into builtinStepTypeNames; change it to validate the
trimmed name against customStepTypeNameRegexp and refuse to register invalid
names (i.e., return early without inserting) to avoid poisoning the registry
with garbage. Update the body of RegisterExecutorTypeName to: trim the input, if
empty return, if !customStepTypeNameRegexp.MatchString(name) return (or
optionally log/debug), otherwise acquire stepTypeNamesMu and insert into
builtinStepTypeNames; apply the same validation logic to the other registration
site that manipulates builtinStepTypeNames (the one around the other
RegisterExecutorTypeName-like block).

In `@internal/engine/logger.go`:
- Around line 41-58: The current slogAdapter.With and slogAdapter.log unpack
slog.Attr into key/value pairs by calling attr.Value.Any(), which is unnecessary
and less efficient; instead pass the slog.Attr values directly to the underlying
logger (use l.logger.With(attrs...) in With and l.logger.Log(..., tags...) in
log) so you stop constructing args slices and preserve Attrs unchanged; update
the With method to forward attrs as-is and update log to forward tags as-is
(remove attr.Value.Any() usage and the key/value unpacking).

In `@internal/engine/run.go`:
- Around line 260-267: applyRunOverrides currently appends opts.Tags to dag.Tags
without deduping and only handles WorkerSelector/Tags; change applyRunOverrides
to merge tags idempotently by building a set of existing dag.Tags (from dag.Tags
and core.NewTags(opts.Tags)) and only append new tags that aren't present (use
the tag string/identifier comparisons consistent with core.NewTags), and add a
short comment in applyRunOverrides explaining that other run-level overrides
(Name, DefaultWorkingDir, params) are handled in loadOptions so logic is
intentionally limited here; keep the existing WorkerSelector override behavior.
- Around line 151-216: Both waitLocal and waitDistributed call
r.Status(context.Background()) after the caller ctx is done, which can hang
indefinitely; replace those background calls with a short bounded background
context (e.g. ctxStatus, cancel := context.WithTimeout(context.Background(),
5*time.Second); defer cancel()) and pass ctxStatus into r.Status. Update all
occurrences inside waitLocal (the status/statusErr fetch after <-r.done) and
both places in waitLocal/waitDistributed where you currently call
r.Status(context.Background()) on ctx.Done() paths so the status fetch times out
quickly; ensure you call cancel() to avoid leaks.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 4a340579-3fb3-42dc-b3b3-79ca9a561f84

📥 Commits

Reviewing files that changed from the base of the PR and between b22c3e6 and 6d4ca1e.

📒 Files selected for processing (20)
  • doc.go
  • engine.go
  • engine_test.go
  • example_test.go
  • examples/embedded/README.md
  • examples/embedded/custom-executor/main.go
  • examples/embedded/custom-executor/workflow.yaml
  • examples/embedded/distributed/main.go
  • examples/embedded/distributed/workflow.yaml
  • examples/embedded/local/main.go
  • examples/embedded/local/workflow.yaml
  • executor.go
  • internal/core/spec/step_types.go
  • internal/engine/engine.go
  • internal/engine/logger.go
  • internal/engine/run.go
  • internal/engine/types.go
  • internal/engine/worker.go
  • internal/intg/embed/embed_test.go
  • tools.go

Comment thread engine_test.go Outdated
Comment on lines +23 to +28
writeDAG(t, dagFile, `
name: embedded-file
steps:
- name: check-param
command: test "$FOO" = "bar"
`)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make this DAG command portable for Windows coverage.

Line 27 uses POSIX test, so TestEngineRunFile can fail on Windows despite this PR requesting Windows runtime validation.

🐛 Proposed portable command selection
 import (
 	"context"
+	"fmt"
 	"os"
 	"path/filepath"
+	"runtime"
 	"strings"
 	"testing"
 	"time"
@@
 	home := t.TempDir()
 	dagFile := filepath.Join(home, "embedded-file.yaml")
-	writeDAG(t, dagFile, `
+	checkParamCommand := `test "$FOO" = "bar"`
+	if runtime.GOOS == "windows" {
+		checkParamCommand = `if not "%FOO%"=="bar" exit /b 1`
+	}
+	writeDAG(t, dagFile, fmt.Sprintf(`
 name: embedded-file
 steps:
   - name: check-param
-    command: test "$FOO" = "bar"
-`)
+    command: %q
+`, checkParamCommand))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
writeDAG(t, dagFile, `
name: embedded-file
steps:
- name: check-param
command: test "$FOO" = "bar"
`)
checkParamCommand := `test "$FOO" = "bar"`
if runtime.GOOS == "windows" {
checkParamCommand = `if not "%FOO%"=="bar" exit /b 1`
}
writeDAG(t, dagFile, fmt.Sprintf(`
name: embedded-file
steps:
- name: check-param
command: %q
`, checkParamCommand))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine_test.go` around lines 23 - 28, The DAG uses a POSIX-only command
("test \"$FOO\" = \"bar\"") which breaks on Windows; in the TestEngineRunFile
where writeDAG(...) is called, detect runtime.GOOS and write a
platform-appropriate command string: on non-windows keep the existing POSIX
command, on windows emit a CMD conditional like `cmd /C if "%FOO%"=="bar" exit 0
else exit 1`; update the writeDAG call to use that conditional command string so
the DAG step (named "check-param") runs correctly on both platforms.

Comment thread engine.go
Comment on lines +96 to +104
// WorkerOptions configures an embedded shared-nothing worker.
type WorkerOptions struct {
ID string
MaxActiveRuns int
Labels map[string]string
Coordinators []string
TLS TLSOptions
HealthPort int
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# See how internal NewWorker reconciles WorkerOptions.Coordinators with engine.distributed.Coordinators.
fd -t f 'worker.go$' -p 'internal/engine' -x rg -n -C10 'func .*NewWorker' {} \;

Repository: dagucloud/dagu

Length of output: 600


🏁 Script executed:

cd /repo && cat -n engine.go | sed -n '85,115p'

Repository: dagucloud/dagu

Length of output: 113


🏁 Script executed:

cat -n engine.go | sed -n '85,115p'

Repository: dagucloud/dagu

Length of output: 911


🏁 Script executed:

rg -n 'type TLSOptions' engine.go -A 5

Repository: dagucloud/dagu

Length of output: 206


Document WorkerOptions.Coordinators and TLS field precedence in public API.

The public WorkerOptions struct lacks doc comments explaining how its Coordinators and TLS fields interact with engine-level DistributedOptions. Users cannot tell from the API signature alone whether these fields override or augment the engine configuration. The implementation correctly handles this (falls back to DistributedOptions if empty, with validation that coordinators are non-empty), but this contract should be documented on the struct fields themselves.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine.go` around lines 96 - 104, Add clear doc comments to the public
WorkerOptions struct to document precedence: state that
WorkerOptions.Coordinators and WorkerOptions.TLS, if non-empty, override the
engine-level DistributedOptions, and if they are empty the engine will fall back
to DistributedOptions; also mention that Coordinators are validated to be
non-empty when provided. Reference the symbols WorkerOptions.Coordinators,
WorkerOptions.TLS and DistributedOptions in the comments so callers know the
exact override/fallback behavior and validation contract.

Comment on lines +66 to +82
workerCtx, stopWorker := context.WithCancel(ctx)
defer stopWorker()
go func() {
if err := worker.Start(workerCtx); err != nil && !errors.Is(err, context.Canceled) {
log.Printf("worker stopped: %v", err)
}
}()

run, err := engine.RunFile(ctx, examplePath("workflow.yaml"))
if err != nil {
log.Fatal(err)
}
status, err := run.Wait(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s %s finished with %s on worker %s\n", status.Name, status.RunID, status.Status, status.WorkerID)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Wait for the worker to be ready before dispatching.

RunFile can reach the coordinator before the goroutine has registered the worker, making the distributed example timing-dependent. Add a readiness wait/poll against the coordinator or expose a worker start signal before submitting the DAG.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/embedded/distributed/main.go` around lines 66 - 82, The worker may
not be registered before submitting the DAG (engine.RunFile), causing a race;
after launching the goroutine that calls worker.Start(workerCtx) add a readiness
wait before calling engine.RunFile: either expose a readiness signal from Start
(e.g., have Start return a ready channel or provide a Worker.Ready() method) and
block until it signals, or implement a short poll loop that queries the
coordinator (via the coordinator/client API) for the worker's registration and
only then call engine.RunFile; update the code around worker.Start/workerCtx and
the engine.RunFile call to wait for that readiness signal or successful poll
before dispatching the workflow.

type: graph
steps:
- name: greet
command: echo "hello ${TARGET}"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

${TARGET} expansion is shell-dependent.

The PR requests CI validation on Windows. echo "hello ${TARGET}" is expanded by POSIX shells but not by Windows cmd.exe (which uses %TARGET%). If this example is exercised on Windows runners, it will print the literal string. Consider documenting the required shell, or using a shell-agnostic mechanism (e.g., relying on Dagu's parameter substitution rather than environment-variable expansion).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/embedded/local/workflow.yaml` at line 5, The command in
workflow.yaml uses POSIX shell variable syntax (${TARGET}) which won't expand in
Windows cmd.exe; update the example so it is shell-agnostic by either declaring
the shell to use (e.g., ensure the step uses a POSIX shell such as bash) or
replace the environment expansion with Dagu's parameter substitution mechanism
(instead of ${TARGET}) so the value is injected by the workflow engine; update
the command entry and add a short note in the example explaining the chosen
approach.

Comment thread executor.go
Comment on lines +27 to +39
func RegisterExecutor(name string, factory ExecutorFactory, opts ...ExecutorOption) {
var registration executorRegistration
for _, opt := range opts {
opt(&registration)
}
runtimeexec.RegisterExecutor(
name,
runtimeexec.ExecutorFactory(factory),
registration.validator,
registration.caps,
)
spec.RegisterExecutorTypeName(name)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Validate the executor name and consider concurrency expectations.

Two concerns:

  1. RegisterExecutor does not reject empty/whitespace name. spec.RegisterExecutorTypeName silently ignores it, but runtimeexec.RegisterExecutor will still insert an entry under the empty key into executorRegistry (per internal/runtime/executor/executor.go:58), which is a footgun. Trim and reject empty names early (or panic, matching typical Register* semantics).
  2. runtimeexec.RegisterExecutor writes to a package-level executorRegistry without synchronization. In the embedded use case users may call dagu.RegisterExecutor concurrently with engine setup or even at runtime; document that registration must happen before any engine use, or add a mutex on the runtime side.
🛡️ Suggested guard
 func RegisterExecutor(name string, factory ExecutorFactory, opts ...ExecutorOption) {
+	name = strings.TrimSpace(name)
+	if name == "" {
+		panic("dagu: RegisterExecutor called with empty name")
+	}
+	if factory == nil {
+		panic("dagu: RegisterExecutor called with nil factory")
+	}
 	var registration executorRegistration
 	for _, opt := range opts {
 		opt(&registration)
 	}

(Add "strings" to imports.)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executor.go` around lines 27 - 39, Trim and validate the provided name in
RegisterExecutor (use strings.TrimSpace) and panic (or return an error if your
API prefers) when the trimmed name is empty to avoid inserting an empty key into
runtimeexec's registry; perform this check before calling
runtimeexec.RegisterExecutor or spec.RegisterExecutorTypeName. Keep the rest of
the logic (applying ExecutorOption to executorRegistration and passing
registration.validator/registration.caps) unchanged, and add a short doc comment
on RegisterExecutor that registrations are not concurrency-safe and must occur
before engine/runtime use (or alternatively move synchronization into
runtimeexec.RegisterExecutor if you control that package).

Comment thread internal/engine/engine.go
Comment on lines +207 to +228
func (e *Engine) coordinatorClient(opts DistributedOptions) (coordinator.Client, error) {
if len(opts.Coordinators) == 0 {
return nil, fmt.Errorf("distributed execution requires at least one coordinator address")
}
cfg := coordinator.DefaultConfig()
cfg.CAFile = opts.TLS.ClientCAFile
cfg.CertFile = opts.TLS.CertFile
cfg.KeyFile = opts.TLS.KeyFile
cfg.SkipTLSVerify = opts.TLS.SkipTLSVerify
cfg.Insecure = opts.TLS.Insecure
if cfg.Insecure == false && cfg.CAFile == "" && cfg.CertFile == "" && cfg.KeyFile == "" && !cfg.SkipTLSVerify {
cfg.Insecure = true
}
if err := cfg.Validate(); err != nil {
return nil, err
}
registry, err := coordinator.NewStaticRegistry(opts.Coordinators)
if err != nil {
return nil, err
}
return coordinator.New(registry, cfg), nil
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Silent downgrade to insecure transport is a security footgun.

When the caller passes a non-empty Coordinators list but no TLS material and doesn't explicitly set Insecure or SkipTLSVerify, the code silently flips cfg.Insecure = true. From a library-API perspective this means "I forgot to configure TLS" and "I want plaintext" become indistinguishable, and the embedded test in internal/intg/embed/embed_test.go (which leaves TLS zero-valued) is what makes this convenient — at the cost of making it the default for every embedder.

Recommend one of:

  • Require an explicit TLS.Insecure = true (or a dedicated AllowInsecure flag) and return an error otherwise.
  • At minimum, log a warning via e.logger when this fallback kicks in so embedders see it in their logs.

Also worth aligning style with the rest of the repo — cfg.Insecure == false is flagged by revive (bool-comparison) and the mixed ==false / !cfg.SkipTLSVerify is hard to read:

♻️ Stylistic cleanup (orthogonal to the security concern)
-	if cfg.Insecure == false && cfg.CAFile == "" && cfg.CertFile == "" && cfg.KeyFile == "" && !cfg.SkipTLSVerify {
-		cfg.Insecure = true
-	}
+	noTLSMaterial := cfg.CAFile == "" && cfg.CertFile == "" && cfg.KeyFile == ""
+	if !cfg.Insecure && !cfg.SkipTLSVerify && noTLSMaterial {
+		// TODO(security): require explicit opt-in instead of silent downgrade.
+		cfg.Insecure = true
+	}

As per coding guidelines: "Use golangci-lint v2 for linting Go code (errcheck, govet, staticcheck, gosec, revive, etc.)" — revive will flag cfg.Insecure == false.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/engine/engine.go` around lines 207 - 228, The coordinatorClient
currently silently sets cfg.Insecure = true when no TLS material is present;
change this to fail fast: in Engine.coordinatorClient, detect the case where
cfg.Insecure is false (use !cfg.Insecure), cfg.CAFile/CertFile/KeyFile are empty
and !cfg.SkipTLSVerify, and return a clear error asking the caller to either
provide TLS files or explicitly set TLS.Insecure=true (or introduce an explicit
AllowInsecure flag), instead of flipping Insecure; also update any messaging to
use e.logger where appropriate and clean up bool comparisons to use
!cfg.Insecure and !cfg.SkipTLSVerify for revive style compliance.

Comment thread internal/engine/logger.go
func (l *slogAdapter) Info(msg string, tags ...slog.Attr) { l.log(slog.LevelInfo, msg, tags...) }
func (l *slogAdapter) Warn(msg string, tags ...slog.Attr) { l.log(slog.LevelWarn, msg, tags...) }
func (l *slogAdapter) Error(msg string, tags ...slog.Attr) { l.log(slog.LevelError, msg, tags...) }
func (l *slogAdapter) Fatal(msg string, tags ...slog.Attr) { l.log(slog.LevelError, msg, tags...) }
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find other logger.Logger implementations and inspect their Fatal/Fatalf behavior.
rg -nP --type=go '\bfunc \([^)]+\) Fatal(f)?\(' -C3

Repository: dagucloud/dagu

Length of output: 3402


🏁 Script executed:

sed -n '218,230p' internal/cmn/logger/logger.go

Repository: dagucloud/dagu

Length of output: 414


🏁 Script executed:

sed -n '264,280p' internal/cmn/logger/logger.go

Repository: dagucloud/dagu

Length of output: 513


🏁 Script executed:

# Check the logger.Logger interface definition
rg -nP 'type Logger interface' -A 20 --type=go

Repository: dagucloud/dagu

Length of output: 1298


🏁 Script executed:

# Find where slogAdapter is instantiated and used
rg -n 'slogAdapter|NewSlogAdapter' --type=go -B 2 -A 2

Repository: dagucloud/dagu

Length of output: 3054


🏁 Script executed:

# Check internal/engine/logger.go for context and comments about slogAdapter
head -50 internal/engine/logger.go

Repository: dagucloud/dagu

Length of output: 1817


🏁 Script executed:

# Find all calls to newSlogAdapter
rg -n 'newSlogAdapter' --type=go

Repository: dagucloud/dagu

Length of output: 207


🏁 Script executed:

# Check the broader context of internal/engine/logger.go - what is this engine module for?
head -100 internal/engine/logger.go | tail -60

Repository: dagucloud/dagu

Length of output: 2075


🏁 Script executed:

# Check engine.go around line 48 to understand the context
sed -n '40,60p' internal/engine/engine.go

Repository: dagucloud/dagu

Length of output: 701


🏁 Script executed:

# Check if there's any documentation about why slogAdapter might be embedded
grep -n 'embedded\|embed' internal/engine/logger.go internal/engine/engine.go

Repository: dagucloud/dagu

Length of output: 40


🏁 Script executed:

# Find the Engine type definition and its documentation
rg -n 'type Engine struct' -A 10 --type=go internal/engine/engine.go

Repository: dagucloud/dagu

Length of output: 427


🏁 Script executed:

# Check if there are any examples or documentation about embedding the engine
find . -name '*.md' -o -name '*.go' | xargs grep -l 'Engine.*embed\|embed.*Engine' 2>/dev/null | head -5

Repository: dagucloud/dagu

Length of output: 69


🏁 Script executed:

# Get more context from engine.go to understand the embedded pattern
sed -n '1,100p' internal/engine/engine.go | head -50

Repository: dagucloud/dagu

Length of output: 1485


Fatal/Fatalf do not terminate the process.

The appLogger implementation (the default logger) explicitly calls os.Exit(1) after logging Fatal errors, consistent with standard Go logging behavior. However, slogAdapter (used when a custom *slog.Logger is provided) logs at LevelError and returns without exiting. This creates a behavioral inconsistency: callers relying on Fatal to abort execution will see different behavior depending on which logger implementation is active.

This appears intentional for the embedded library use case (avoiding process termination when Engine is used as a library), but it is not documented. Either add os.Exit(1) calls in slogAdapter or add a clear comment explaining why this logger variant intentionally degrades Fatal to prevent surprising callers.

Also applies to: 35-35

Comment thread internal/engine/run.go
Comment on lines +388 to +437
func (e *Engine) runDistributed(ctx context.Context, dag *core.DAG, runID string, opts RunOptions) (*Run, error) {
dist := e.distributed
if len(opts.WorkerSelector) > 0 {
dist.WorkerSelector = cloneStringMap(opts.WorkerSelector)
}
client, err := e.coordinatorClient(dist)
if err != nil {
return nil, err
}
taskOpts := []runtimeexec.TaskOption{
runtimeexec.WithBaseConfig(runtimeexec.ResolveBaseConfig(dag.BaseConfigData, e.cfg.Paths.BaseConfig)),
}
if len(dist.WorkerSelector) > 0 {
taskOpts = append(taskOpts, runtimeexec.WithWorkerSelector(dist.WorkerSelector))
}
if len(dag.Tags) > 0 {
taskOpts = append(taskOpts, runtimeexec.WithTags(strings.Join(dag.Tags.Strings(), ",")))
}
if dag.SourceFile != "" {
taskOpts = append(taskOpts, runtimeexec.WithSourceFile(dag.SourceFile))
}
if snapshot, snapErr := agentsnapshot.BuildFromPaths(ctx, dag, e.cfg.Paths, e.dagStore); snapErr != nil {
_ = client.Cleanup(ctx)
return nil, fmt.Errorf("build agent snapshot: %w", snapErr)
} else if len(snapshot) > 0 {
taskOpts = append(taskOpts, runtimeexec.WithAgentSnapshot(snapshot))
}
task := runtimeexec.CreateTask(
dag.Name,
string(dag.YamlData),
coordinatorv1.Operation_OPERATION_START,
runID,
taskOpts...,
)
if len(dag.Params) > 0 {
task.Params = strings.Join(dag.Params, " ")
}
if err := client.Dispatch(ctx, task); err != nil {
_ = client.Cleanup(ctx)
return nil, fmt.Errorf("dispatch DAG run: %w", err)
}
return &Run{
engine: e,
ref: RunRef{Name: dag.Name, ID: runID},
mode: ExecutionModeDistributed,
done: make(chan struct{}),
dag: dag,
coordinator: client,
}, nil
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Coordinator client leaks when a distributed Run is dropped without Wait/Stop.

The coordinator client is created here and stashed into the returned Run. The only places that call client.Cleanup are waitDistributed's defer and (*Run).Stop. If an embedder calls RunYAML in distributed mode and then discards the handle (or the parent context is canceled before they reach Wait), the client and its underlying gRPC connections are never released. Given this is a public API meant to be embedded by other Go programs, the lifecycle should be self-healing — e.g., attach cleanup to a runtime.SetFinalizer, or document loudly that callers must always call Wait or Stop.

A minimal hardening: track cleanup in the goroutine that closes done (mirroring how runLocal closes its logFile), and make Cleanup idempotent so the explicit Stop/Wait paths remain safe.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/engine/run.go` around lines 388 - 437, runDistributed creates a
coordinator client and stores it on the returned Run but never guarantees
client.Cleanup if the Run is dropped without calling Wait/Stop; modify
runDistributed/Run so the coordinator client is always cleaned up when the Run's
done channel is closed (mirror runLocal's cleanup of logFile): ensure the
goroutine that closes Run.done calls client.Cleanup (or install a
runtime.SetFinalizer on *Run to call Cleanup) and make the coordinator client's
Cleanup implementation idempotent so explicit Stop/Wait paths remain safe; refer
to runDistributed, Run (ref/done/coordinator fields), coordinatorClient,
client.Cleanup, waitDistributed, and Run.Stop to locate where to add the cleanup
and idempotency changes.

Comment thread internal/engine/run.go
Comment on lines +538 to +575
type agentStoresResult struct {
ConfigStore agentstores.ConfigStore
ModelStore agentstores.ModelStore
MemoryStore agentstores.MemoryStore
SoulStore agentstores.SoulStore
OAuthManager *agentoauth.Manager
ContextResolver agentstores.RemoteContextResolver
}

func (e *Engine) agentStores(ctx context.Context) agentStoresResult {
var result agentStoresResult
if store, err := fileagentconfig.New(e.cfg.Paths.DataDir); err == nil {
result.ConfigStore = store
} else {
logger.Warn(ctx, "Failed to create agent config store", tag.Error(err))
}
if store, err := fileagentmodel.New(filepath.Join(e.cfg.Paths.DataDir, "agent", "models")); err == nil {
result.ModelStore = store
} else {
logger.Warn(ctx, "Failed to create agent model store", tag.Error(err))
}
if store, err := filememory.New(e.cfg.Paths.DAGsDir); err == nil {
result.MemoryStore = store
} else {
logger.Warn(ctx, "Failed to create agent memory store", tag.Error(err))
}
if store, err := fileagentsoul.New(ctx, filepath.Join(e.cfg.Paths.DAGsDir, "souls")); err == nil {
result.SoulStore = store
} else {
logger.Warn(ctx, "Failed to create agent soul store", tag.Error(err))
}
if manager, err := fileagentoauth.NewManager(e.cfg.Paths.DataDir); err == nil {
result.OAuthManager = manager
} else {
logger.Warn(ctx, "Failed to create agent OAuth manager", tag.Error(err))
}
return result
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify what consumers expect from RemoteContextResolver and whether nil is handled.
ast-grep --pattern 'type RemoteContextResolver $_'
rg -nP '\bRemoteContextResolver\b' -C3
rg -nP '\bAgentRemoteContextResolver\b' -C3

Repository: dagucloud/dagu

Length of output: 20515


🏁 Script executed:

#!/bin/bash
# Check if buildRemoteContextResolver is called anywhere in engine or related code
rg -n 'buildRemoteContextResolver' internal/engine/ internal/runtime/

# Check the full agentStores function in cmd/context.go to see if it initializes ContextResolver
sed -n '754,810p' internal/cmd/context.go

# Verify if there's any initialization of ContextResolver in the engine version
rg -n 'ContextResolver' internal/engine/run.go -A2 -B2

Repository: dagucloud/dagu

Length of output: 2196


agentStoresResult.ContextResolver is declared but never initialized in the engine version.

The agentStores method in internal/engine/run.go initializes ConfigStore, ModelStore, MemoryStore, SoulStore, and OAuthManager, but leaves ContextResolver as nil. In contrast, the similar function in internal/cmd/context.go properly initializes it via buildRemoteContextResolver().

While tool factories gracefully handle a nil resolver (returning nil for the tool if resolver is nil), propagating an uninitialized field that's wired into AgentRemoteContextResolver is misleading to embedders who may expect remote context support. Either:

  • Wire up the resolver by calling buildRemoteContextResolver() (matching the cmd version), or
  • Remove the field and its wiring until remote context resolution is actually supported in the engine.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/engine/run.go` around lines 538 - 575, The
agentStoresResult.ContextResolver is left nil in Engine.agentStores; fix by
initializing it like the cmd version: call buildRemoteContextResolver(...) from
within Engine.agentStores and assign its return to result.ContextResolver,
handling any error by logging with logger.Warn (similar to other stores) so the
resolver is not silently left uninitialized; alternatively, if remote context
support isn’t desired yet, remove the ContextResolver field from
agentStoresResult and any wiring that expects it.

Comment on lines +64 to +98
func TestEmbeddedCustomExecutorRunYAML(t *testing.T) {
const executorType = "embedded_intg_echo"

dagu.RegisterExecutor(
executorType,
func(_ context.Context, step dagu.Step) (dagu.Executor, error) {
return &echoExecutor{step: step}, nil
},
dagu.WithExecutorCapabilities(dagu.ExecutorCapabilities{Command: true}),
)

ctx, cancel := context.WithTimeout(context.Background(), embeddedTimeout(20*time.Second))
defer cancel()

engine, err := dagu.New(ctx, dagu.Options{HomeDir: t.TempDir()})
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, engine.Close(context.Background()))
})

run, err := engine.RunYAML(ctx, []byte(`
name: embedded-intg-custom-executor
type: graph
steps:
- name: go-step
type: embedded_intg_echo
command: called from YAML
`))
require.NoError(t, err)

status, err := run.Wait(ctx)
require.NoError(t, err)
require.Equal(t, "embedded-intg-custom-executor", status.Name)
require.Equal(t, "succeeded", status.Status)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Inspect RegisterExecutor implementation to see how duplicate names are handled.
fd -t f 'executor.go$' -x rg -n -C5 'func RegisterExecutor' {} \;
ast-grep --pattern 'func RegisterExecutor($$$) {
  $$$
}'

Repository: dagucloud/dagu

Length of output: 2064


Custom executor registration uses global state with incomplete test cleanup.

dagu.RegisterExecutor mutates a global registry. While the implementation silently overwrites on duplicate registration (rather than panicking), this pattern still violates test isolation. Re-running this test with go test -count=N will mutate the global executor registry multiple times. If other tests are added that depend on a clean registry state, or if future changes to the executor registration implementation introduce stricter checks, this will cause cross-test contamination. Consider adding t.Cleanup to unregister the custom executor after the test completes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/intg/embed/embed_test.go` around lines 64 - 98,
TestEmbeddedCustomExecutorRunYAML registers a custom executor via
dagu.RegisterExecutor which mutates global state; add a t.Cleanup that restores
registry state after the test (e.g., call dagu.UnregisterExecutor(executorType)
if available, or capture any previous registration returned/visible and
re-register it in t.Cleanup) so that the echoExecutor/ executorType registration
is removed/restored when the test finishes to avoid cross-test contamination.

@yottahmd yottahmd merged commit d502813 into main Apr 19, 2026
10 checks passed
@yottahmd yottahmd deleted the feat/embedded-dagu-api branch April 19, 2026 14:36
zhnq pushed a commit to zhnq/dagu that referenced this pull request Apr 23, 2026
Merged features from upstream:
- feat: add embedded Dagu API (dagucloud#2011)
- feat: add edit-and-retry DAG runs (dagucloud#2010)
- feat: add bulk DAG-run deletion in web UI (dagucloud#2009)
- feat: add kubernetes secret provider (dagucloud#2006)
- feat: make workspace selection global (dagucloud#2015)
- feat: show cockpit run artifacts (dagucloud#2017)
- feat: allow disabling DAG retry policy (dagucloud#2018)
- feat: make DAG labels canonical, deprecate tags (dagucloud#2013)
- feat: add workflow design workspace (dagucloud#2012)
- feat: add step with/config alias (dagucloud#2021)
- fix: allow runtime custom step inputs (dagucloud#2005)
- fix: align embedded engine run labels (dagucloud#2014)
- Various CI/test/docs improvements

Conflict resolution:
- Kept fork i18n files (ui/src/i18n/, LanguageSwitcher)
- Merged i18n hooks with upstream structural changes in
  App.tsx, Layout.tsx, menu.tsx, DAGStatus.tsx, etc.
- Adopted upstream labels-over-tags rename (TagCombobox -> LabelCombobox)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant