feat: add kubernetes step executor and DAG defaults#1886
Conversation
Add a new "kubernetes" (alias "k8s") step type that runs workflow steps as Kubernetes Jobs in a local or remote cluster. Supports namespace, resources (CPU/memory requests/limits), env vars, volumes, volume mounts, service accounts, node selectors, tolerations, image pull secrets, and configurable cleanup policy.
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR introduces Kubernetes as a new executor type for the Dagu workflow system. It adds Kubernetes job execution support via a new Changes
Sequence Diagram(s)sequenceDiagram
participant DAGSpec as DAG Spec<br/>(YAML)
participant Transformer as Spec Transformer<br/>(buildKubernetes)
participant Merger as Step Merger<br/>(mergeKubernetesExecutorConfig)
participant Step as Step Executor<br/>(Kubernetes)
participant KubeClient as Kubernetes Client
participant KubeAPI as Kubernetes API<br/>(etcd)
DAGSpec->>Transformer: kubernetes: {image: "...", namespace: "..."}
Transformer->>Transformer: Validate config<br/>(schema: kubernetes_defaults)
Transformer->>Transformer: Clone & store in DAG.Kubernetes
DAGSpec->>Step: type: "kubernetes"<br/>executor_config: {...}
Step->>Merger: Merge DAG.Kubernetes + step config
Merger->>Merger: Deep merge with step overrides
Merger->>Step: Merged config
Step->>KubeClient: CreateJob(ctx, stepName, command)
KubeClient->>KubeClient: Build Job manifest<br/>(image, env, volumes, resources)
KubeClient->>KubeAPI: Create Job
KubeAPI-->>KubeClient: Job created
Step->>KubeClient: WaitForPod(ctx)
KubeClient->>KubeAPI: Poll pod status<br/>(label selector: job-name)
KubeAPI-->>KubeClient: Pod Running
KubeClient-->>Step: Pod name
Step->>KubeClient: StreamLogs(ctx, podName)
KubeClient->>KubeAPI: Stream logs<br/>(step container)
KubeAPI-->>KubeClient: Log lines
KubeClient-->>Step: Logs written to stdout
Step->>KubeClient: WaitForCompletion(ctx)
KubeClient->>KubeAPI: Poll Job conditions
KubeAPI-->>KubeClient: Job.Completed = True
KubeClient-->>Step: Success
Step->>KubeClient: DeleteJob(ctx)<br/>(cleanup_policy)
KubeClient->>KubeAPI: Delete Job<br/>(background propagation)
KubeAPI-->>KubeClient: Job deleted
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
internal/runtime/builtin/docker/client.go (2)
151-162:⚠️ Potential issue | 🟠 MajorDon't clear
ctIDfor stopped named containers unless you also handle the existing container.If
inspectContainerfinds the named container but it is stopped, this branch drops the ID and later falls intoContainerCreatewith the same name. Docker still reserves that name, so the "will be created" path turns into a runtime conflict instead of a predictable restart/remove/fail-fast path.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/runtime/builtin/docker/client.go` around lines 151 - 162, The code clears ctID when a named container is found but not running, which later causes ContainerCreate to fail due to Docker name reservation; instead, keep ctID = info.ID for stopped containers and implement explicit handling (e.g., call a restart or remove flow) when isContainerRunning(...) returns false before attempting ContainerCreate. Update the logic around inspectContainer, isContainerRunning, and ctID so stopped containers are either restarted (dockerClient.ContainerStart/Restart) or removed (ContainerRemove) based on the desired behavior, and only call ContainerCreate when no existing container ID is present.
843-867:⚠️ Potential issue | 🟠 MajorFix TTY exec stream handling: avoid stdcopy for raw TTY streams.
When
TTYis enabled inExecOptions(line 47), theExecAttachstream is raw, not multiplexed. The current code always usesstdcopy.StdCopy(line 879), which only works for multiplexed frames. This will cause output loss or framing errors when TTY is enabled.Branch the copy logic based on the TTY setting: use
io.Copyfor TTY streams andstdcopy.StdCopyfor multiplexed streams.Suggested fix
go func() { - if _, err := stdcopy.StdCopy(stdout, stderr, resp.Reader); err != nil { - logger.Error(ctx, "Docker executor: stdcopy", tag.Error(err)) + var copyErr error + if c.cfg.ExecOptions.TTY { + _, copyErr = io.Copy(stdout, resp.Reader) + } else { + _, copyErr = stdcopy.StdCopy(stdout, stderr, resp.Reader) + } + if copyErr != nil { + logger.Error(ctx, "Docker executor: copy exec output", tag.Error(copyErr)) } wg.Done() }()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/runtime/builtin/docker/client.go` around lines 843 - 867, The ExecAttach stream handling must branch on the TTY flag: after calling cli.ExecAttach (using execCreateResp.ID) check the effective TTY (execOpts.TTY or c.cfg.ExecOptions.TTY) and if TTY is true treat the attached stream as a raw stream and use io.Copy to copy resp.Reader to the destination writer(s); if TTY is false use stdcopy.StdCopy to demultiplex stdout/stderr from resp.Reader into separate writers. Update the code around ExecAttach/resp handling to close the response when done and choose io.Copy for raw TTY streams and stdcopy.StdCopy for multiplexed streams to avoid frame loss.
🧹 Nitpick comments (1)
internal/core/dag.go (1)
227-230: Clone should deep-copy the newKubernetesmap field.Line 230 introduces a mutable map on
DAG;(*DAG).Clone()currently performs a shallow copy for this field, which can cause shared-state side effects across clones.♻️ Suggested update in
(*DAG).Clone()func (d *DAG) Clone() *DAG { //nolint:govet // intentional copy; sync.Once is immediately reset below clone := *d // Reset sync.Once so LoadDotEnv can be called on the clone clone.dotenvOnce = sync.Once{} if d.PresolvedBuildEnv != nil { clone.PresolvedBuildEnv = maps.Clone(d.PresolvedBuildEnv) } + if d.Kubernetes != nil { + clone.Kubernetes = maps.Clone(d.Kubernetes) + } return &clone }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/core/dag.go` around lines 227 - 230, The Clone method on DAG must deep-copy the new mutable Kubernetes map to avoid shared-state: update (*DAG).Clone() to produce an independent copy of the KubernetesConfig held in the DAG.Kubernetes field (either by calling a KubernetesConfig.Clone()/DeepCopy() helper or by explicitly allocating a new map and copying each key/value), so the cloned DAG gets its own Kubernetes map instance rather than a shallow reference to the original; modify DAG.Clone() to perform this deep-copy for Kubernetes before returning the cloned DAG.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/cmn/schema/dag.schema.json`:
- Around line 3682-3692: The schema currently enforces "image" as required in
the kubernetesExecutorConfig fragment (the allOf that references
kubernetesCommonConfig and required:["image"]), which rejects step-level config
fragments that rely on DAG-level defaults; remove "image" from the immediate
required list in kubernetesExecutorConfig and instead enforce that the final
merged config contains image after DAG defaults are applied. Update the
validation path that uses kubernetesExecutorConfig so it validates step config
only after merging DAG defaults (or add a separate post-merge check) to assert
presence of image in the merged object rather than on the raw step fragment.
- Around line 3267-3335: The schema currently allows invalid Kubernetes env
entries; update the definitions to enforce valid combinations: in
kubernetesEnvVar add "required": ["name"] and a oneOf ensuring exactly one of
"value" or "value_from" is present (use oneOf with presence checks for "value"
vs "value_from"); in kubernetesEnvVarSource add a oneOf requiring exactly one of
"secret_key_ref", "config_map_key_ref", or "field_ref"; in
kubernetesEnvFromSource add a oneOf requiring at least one of "config_map_ref"
or "secret_ref" (and keep kubernetesEnvFromRef required:["name"]) so env_from
entries with only "prefix" are rejected. Use the existing symbols
kubernetesEnvVar, kubernetesEnvVarSource, kubernetesEnvFromRef, and
kubernetesEnvFromSource to locate where to add these required/oneOf constraints.
In `@internal/runtime/builtin/kubernetes/client.go`:
- Around line 283-285: WaitForPod currently treats corev1.PodFailed as a
terminal success and podPriority ranks Failed above Pending, causing a failed
retry to be selected while a newer retry is still pending; update the WaitForPod
switch (function WaitForPod) to only return when pod.Status.Phase is
corev1.PodRunning or corev1.PodSucceeded (do not return on PodFailed), and
adjust the podPriority comparator (the podPriority function/logic referenced
around the other block at 338-345) to rank Pending higher than Failed (or
exclude Failed from top selection) so pending restart attempts are preferred
over older failed pods when choosing which pod to collect logs/exit code from.
Ensure both places (the switch and podPriority logic) are updated consistently.
- Around line 71-82: The current logic in the kubeConfig.ClientConfig() error
branch falls back to rest.InClusterConfig() even when an explicit KUBECONFIG was
provided; update the error handling so that if explicitKubeconfig is true (or
explicitContext is true when the user explicitly set a context) you return the
original kubeconfig error immediately (e.g., return nil, fmt.Errorf("kubeconfig
error: %w", err)) and only attempt rest.InClusterConfig() when neither
explicitKubeconfig nor explicitContext is set and no explicit kubeconfig files
exist (use hasAnyKubeconfigFile(loadingRules.GetLoadingPrecedence()) to decide);
adjust the branch around kubeConfig.ClientConfig(), rest.InClusterConfig(),
explicitKubeconfig, and explicitContext accordingly.
In `@internal/runtime/builtin/kubernetes/config.go`:
- Around line 520-528: The schema entries in config.go currently register env,
env_from, resources, tolerations, volumes, and volume_mounts as opaque
object/array nodes, which prevents CLI/UI clients from validating or
autocompleting nested Kubernetes fields; update the jsonschema map (the block
that defines "env", "env_from", "resources", "node_selector", "tolerations",
"labels", "annotations", "volumes", "volume_mounts") to replace those generic
entries with full nested jsonschema.Schema definitions: define "env" as array of
objects with properties like name, value, valueFrom (and nested
secretKeyRef/configMapKeyRef), "env_from" as array of objects with
configMapRef/secretRef, "resources" as object with properties limits and
requests (each an object of string quantities), "tolerations" as array of
objects with key, operator, value, effect, tolerationSeconds, "volumes" as array
of objects supporting secret, configMap, emptyDir, persistentVolumeClaim,
hostPath shapes, and "volume_mounts" as array of objects with name, mountPath,
subPath, readOnly; keep original descriptions and types but provide these nested
properties so schema consumers can validate and autocomplete the
runtime-supported fields.
- Around line 493-498: The parseQuantity function currently only validates
syntax; after calling resource.ParseQuantity(value) add a check for negative
values (if qty.Sign() < 0) and return an error immediately so negatives fail
during config decoding. Use the same error-pattern as other negative checks in
this file: either return fmt.Errorf("%s: negative quantity %q: %w", field,
value, ErrNegativeQuantity) and add a new sentinel ErrNegativeQuantity (matching
the ErrNegativeActiveDeadline/ErrNegativeBackoffLimit style) or reuse an
appropriate existing ErrNegative* sentinel if present; update parseQuantity to
return resource.Quantity{}, that error when qty is negative.
In `@internal/runtime/builtin/kubernetes/executor.go`:
- Around line 280-285: validateStep currently allows multiple entries in
step.Commands while buildCommand only uses step.Commands[0], causing extra
commands to be silently dropped; update validateStep (and any related validation
for the kubernetes executor) to detect when len(step.Commands) > 1 and return a
clear error (e.g., "kubernetes executor does not support multiple command
entries; provide a single combined command or use a script"), or alternatively
modify buildCommand to combine all entries into a single shell invocation (e.g.,
join with " && " or wrap in "sh -c") so all commands run; reference validateStep
and buildCommand when making the change.
- Around line 135-139: The failure path after client.WaitForPod currently always
calls e.cleanup(ctx, true) which deletes unschedulable pods even when the job's
cleanup_policy is set to keep; change this to respect the job's cleanup policy
by not forcing deletion on scheduling errors: check the job/config cleanup
policy (e.g. cleanup_policy or e.cleanupPolicy) and either skip calling
e.cleanup or call e.cleanup(ctx, false) only when the policy allows deletion,
reserving force=true for explicit cancel/kill paths; update the code around the
WaitForPod error handling and the e.cleanup(ctx, true) call accordingly.
---
Outside diff comments:
In `@internal/runtime/builtin/docker/client.go`:
- Around line 151-162: The code clears ctID when a named container is found but
not running, which later causes ContainerCreate to fail due to Docker name
reservation; instead, keep ctID = info.ID for stopped containers and implement
explicit handling (e.g., call a restart or remove flow) when
isContainerRunning(...) returns false before attempting ContainerCreate. Update
the logic around inspectContainer, isContainerRunning, and ctID so stopped
containers are either restarted (dockerClient.ContainerStart/Restart) or removed
(ContainerRemove) based on the desired behavior, and only call ContainerCreate
when no existing container ID is present.
- Around line 843-867: The ExecAttach stream handling must branch on the TTY
flag: after calling cli.ExecAttach (using execCreateResp.ID) check the effective
TTY (execOpts.TTY or c.cfg.ExecOptions.TTY) and if TTY is true treat the
attached stream as a raw stream and use io.Copy to copy resp.Reader to the
destination writer(s); if TTY is false use stdcopy.StdCopy to demultiplex
stdout/stderr from resp.Reader into separate writers. Update the code around
ExecAttach/resp handling to close the response when done and choose io.Copy for
raw TTY streams and stdcopy.StdCopy for multiplexed streams to avoid frame loss.
---
Nitpick comments:
In `@internal/core/dag.go`:
- Around line 227-230: The Clone method on DAG must deep-copy the new mutable
Kubernetes map to avoid shared-state: update (*DAG).Clone() to produce an
independent copy of the KubernetesConfig held in the DAG.Kubernetes field
(either by calling a KubernetesConfig.Clone()/DeepCopy() helper or by explicitly
allocating a new map and copying each key/value), so the cloned DAG gets its own
Kubernetes map instance rather than a shallow reference to the original; modify
DAG.Clone() to perform this deep-copy for Kubernetes before returning the cloned
DAG.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2e3537be-7095-4c2a-b26d-2bfdf19bc061
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (27)
go.modinternal/agent/schema/output_test.gointernal/cmn/schema/dag.schema.jsoninternal/cmn/schema/dag_schema_test.gointernal/core/dag.gointernal/core/spec/dag.gointernal/core/spec/kubernetes.gointernal/core/spec/kubernetes_test.gointernal/core/spec/loader.gointernal/core/spec/step.gointernal/core/spec/step_test.gointernal/intg/ct_test.gointernal/intg/sftp_test.gointernal/intg/ssh_test.gointernal/runtime/builtin/builtin.gointernal/runtime/builtin/docker/client.gointernal/runtime/builtin/docker/client_test.gointernal/runtime/builtin/docker/config.gointernal/runtime/builtin/docker/parser.gointernal/runtime/builtin/docker/registry_auth.gointernal/runtime/builtin/docker/registry_auth_test.gointernal/runtime/builtin/kubernetes/client.gointernal/runtime/builtin/kubernetes/client_test.gointernal/runtime/builtin/kubernetes/config.gointernal/runtime/builtin/kubernetes/config_test.gointernal/runtime/builtin/kubernetes/executor.gointernal/runtime/builtin/kubernetes/executor_test.go
| "kubernetesEnvVarSource": { | ||
| "type": "object", | ||
| "additionalProperties": false, | ||
| "properties": { | ||
| "secret_key_ref": { | ||
| "$ref": "#/definitions/kubernetesKeySelector", | ||
| "description": "Read the value from a specific key in a Secret." | ||
| }, | ||
| "config_map_key_ref": { | ||
| "$ref": "#/definitions/kubernetesKeySelector", | ||
| "description": "Read the value from a specific key in a ConfigMap." | ||
| }, | ||
| "field_ref": { | ||
| "$ref": "#/definitions/kubernetesFieldRef", | ||
| "description": "Read the value from a Pod field." | ||
| } | ||
| }, | ||
| "description": "Source configuration for an environment variable value." | ||
| }, | ||
| "kubernetesEnvVar": { | ||
| "type": "object", | ||
| "additionalProperties": false, | ||
| "properties": { | ||
| "name": { | ||
| "type": "string", | ||
| "description": "Environment variable name." | ||
| }, | ||
| "value": { | ||
| "type": "string", | ||
| "description": "Literal environment variable value." | ||
| }, | ||
| "value_from": { | ||
| "$ref": "#/definitions/kubernetesEnvVarSource", | ||
| "description": "Dynamic source for the environment variable value." | ||
| } | ||
| }, | ||
| "description": "A Kubernetes environment variable entry." | ||
| }, | ||
| "kubernetesEnvFromRef": { | ||
| "type": "object", | ||
| "additionalProperties": false, | ||
| "required": ["name"], | ||
| "properties": { | ||
| "name": { | ||
| "type": "string", | ||
| "description": "Name of the Secret or ConfigMap to import." | ||
| } | ||
| }, | ||
| "description": "Reference to a Secret or ConfigMap for env_from." | ||
| }, | ||
| "kubernetesEnvFromSource": { | ||
| "type": "object", | ||
| "additionalProperties": false, | ||
| "properties": { | ||
| "config_map_ref": { | ||
| "$ref": "#/definitions/kubernetesEnvFromRef", | ||
| "description": "Import all keys from a ConfigMap." | ||
| }, | ||
| "secret_ref": { | ||
| "$ref": "#/definitions/kubernetesEnvFromRef", | ||
| "description": "Import all keys from a Secret." | ||
| }, | ||
| "prefix": { | ||
| "type": "string", | ||
| "description": "Optional prefix applied to imported environment variable names." | ||
| } | ||
| }, | ||
| "description": "Import a Secret or ConfigMap as environment variables." | ||
| }, |
There was a problem hiding this comment.
Reject impossible Kubernetes env specs at schema time.
Examples like env: [{value: "x"}], env: [{name: "X", value: "a", value_from: {...}}], or env_from: [{prefix: "APP_"}] currently pass this schema even though they do not describe a valid Kubernetes env definition. That pushes basic validation out to Job creation time. Add required and oneOf constraints here so schema consumers fail these combinations early.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/cmn/schema/dag.schema.json` around lines 3267 - 3335, The schema
currently allows invalid Kubernetes env entries; update the definitions to
enforce valid combinations: in kubernetesEnvVar add "required": ["name"] and a
oneOf ensuring exactly one of "value" or "value_from" is present (use oneOf with
presence checks for "value" vs "value_from"); in kubernetesEnvVarSource add a
oneOf requiring exactly one of "secret_key_ref", "config_map_key_ref", or
"field_ref"; in kubernetesEnvFromSource add a oneOf requiring at least one of
"config_map_ref" or "secret_ref" (and keep kubernetesEnvFromRef
required:["name"]) so env_from entries with only "prefix" are rejected. Use the
existing symbols kubernetesEnvVar, kubernetesEnvVarSource, kubernetesEnvFromRef,
and kubernetesEnvFromSource to locate where to add these required/oneOf
constraints.
| "kubernetesExecutorConfig": { | ||
| "allOf": [ | ||
| { | ||
| "$ref": "#/definitions/kubernetesCommonConfig" | ||
| }, | ||
| { | ||
| "type": "object", | ||
| "required": ["image"] | ||
| } | ||
| ], | ||
| "description": "Kubernetes executor configuration for running a workflow step as a Kubernetes Job." |
There was a problem hiding this comment.
Allow step overrides when image comes from DAG defaults.
kubernetesExecutorConfig makes image mandatory whenever a step has a config block. That means a DAG with root kubernetes.image works only until a step overrides some other Kubernetes field; type: k8s plus config: { cleanup_policy: keep } is rejected before merge even though the final merged config is valid. This required-image check needs to happen after DAG defaults are merged, not on the raw step fragment.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/cmn/schema/dag.schema.json` around lines 3682 - 3692, The schema
currently enforces "image" as required in the kubernetesExecutorConfig fragment
(the allOf that references kubernetesCommonConfig and required:["image"]), which
rejects step-level config fragments that rely on DAG-level defaults; remove
"image" from the immediate required list in kubernetesExecutorConfig and instead
enforce that the final merged config contains image after DAG defaults are
applied. Update the validation path that uses kubernetesExecutorConfig so it
validates step config only after merging DAG defaults (or add a separate
post-merge check) to assert presence of image in the merged object rather than
on the raw step fragment.
| kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides) | ||
| restCfg, err := kubeConfig.ClientConfig() | ||
| if err != nil { | ||
| if explicitKubeconfig || explicitContext || hasAnyKubeconfigFile(loadingRules.GetLoadingPrecedence()) { | ||
| return nil, fmt.Errorf("kubeconfig error: %w", err) | ||
| } | ||
|
|
||
| restCfg, inClusterErr := rest.InClusterConfig() | ||
| if inClusterErr != nil { | ||
| return nil, fmt.Errorf("kubeconfig error: %w; in-cluster error: %w", err, inClusterErr) | ||
| } | ||
| return restCfg, nil |
There was a problem hiding this comment.
Don't fail open to in-cluster config when KUBECONFIG is explicitly set but broken.
If KUBECONFIG points to a missing/invalid file and this code is running inside Kubernetes, this branch silently ignores that explicit user choice and targets the in-cluster cluster instead.
Suggested fix
func buildRESTConfig(cfg *Config) (*rest.Config, error) {
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
+ envKubeconfigSet := strings.TrimSpace(os.Getenv(clientcmd.RecommendedConfigPathEnvVar)) != ""
explicitKubeconfig := cfg.Kubeconfig != ""
if explicitKubeconfig {
loadingRules.ExplicitPath = cfg.Kubeconfig
}
@@
restCfg, err := kubeConfig.ClientConfig()
if err != nil {
- if explicitKubeconfig || explicitContext || hasAnyKubeconfigFile(loadingRules.GetLoadingPrecedence()) {
+ if explicitKubeconfig || explicitContext || envKubeconfigSet || hasAnyKubeconfigFile(loadingRules.GetLoadingPrecedence()) {
return nil, fmt.Errorf("kubeconfig error: %w", err)
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/runtime/builtin/kubernetes/client.go` around lines 71 - 82, The
current logic in the kubeConfig.ClientConfig() error branch falls back to
rest.InClusterConfig() even when an explicit KUBECONFIG was provided; update the
error handling so that if explicitKubeconfig is true (or explicitContext is true
when the user explicitly set a context) you return the original kubeconfig error
immediately (e.g., return nil, fmt.Errorf("kubeconfig error: %w", err)) and only
attempt rest.InClusterConfig() when neither explicitKubeconfig nor
explicitContext is set and no explicit kubeconfig files exist (use
hasAnyKubeconfigFile(loadingRules.GetLoadingPrecedence()) to decide); adjust the
branch around kubeConfig.ClientConfig(), rest.InClusterConfig(),
explicitKubeconfig, and explicitContext accordingly.
| switch pod.Status.Phase { | ||
| case corev1.PodRunning, corev1.PodSucceeded, corev1.PodFailed: | ||
| return pod.Name, nil |
There was a problem hiding this comment.
Retrying Jobs can select the wrong pod here.
WaitForPod returns PodFailed, and podPriority ranks Failed above Pending. With backoff_limit > 0, the first failed attempt can win selection while the retry pod is still pending, so downstream log/exit-code collection can report failure even if a later retry succeeds.
Also applies to: 338-345
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/runtime/builtin/kubernetes/client.go` around lines 283 - 285,
WaitForPod currently treats corev1.PodFailed as a terminal success and
podPriority ranks Failed above Pending, causing a failed retry to be selected
while a newer retry is still pending; update the WaitForPod switch (function
WaitForPod) to only return when pod.Status.Phase is corev1.PodRunning or
corev1.PodSucceeded (do not return on PodFailed), and adjust the podPriority
comparator (the podPriority function/logic referenced around the other block at
338-345) to rank Pending higher than Failed (or exclude Failed from top
selection) so pending restart attempts are preferred over older failed pods when
choosing which pod to collect logs/exit code from. Ensure both places (the
switch and podPriority logic) are updated consistently.
| func parseQuantity(field, value string) (resource.Quantity, error) { | ||
| qty, err := resource.ParseQuantity(value) | ||
| if err != nil { | ||
| return resource.Quantity{}, fmt.Errorf("%s: invalid quantity %q: %w", field, value, err) | ||
| } | ||
| return qty, nil |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Does k8s.io/apimachinery/pkg/api/resource.ParseQuantityaccept negative values such as-100mor-1Gi? What is the idiomatic way to reject negative resource.Quantity values in Go?
💡 Result:
Yes, k8s.io/apimachinery/pkg/api/resource.ParseQuantity accepts negative values such as -100m or -1Gi. The parsing regex ^([+-]?[0-9.]+)([eEinumkKMGTP][-+]?[0-9])$ explicitly allows a leading + or - sign, and the source code handles the sign by setting positive = false for - and applying result = -result or amount.Neg(amount). The Quantity.Sign method returns -1 for negative values, and Neg supports negation. Official docs confirm signed numbers are supported in the serialization format. The idiomatic way to reject negative resource.Quantity values in Go is to check if q.Sign < 0 after parsing or use k8s.io/kubernetes/pkg/apis/core/v1/validation.ValidateNonnegativeQuantity(q, fldPath), which is the standard validation function used throughout Kubernetes for ensuring quantities are non-negative (e.g., in pod resource validation).
Citations:
- 1: https://pkg.go.dev/k8s.io/apimachinery/pkg/api/resource
- 2: https://github.com/kubernetes/apimachinery/blob/master/pkg/api/resource/quantity.go
- 3: https://go.googlesource.com/build/+/e7402a054/kubernetes/api/quantity.go
- 4: https://pkg.go.dev/k8s.io/kubernetes/pkg/apis/core/v1/validation
- 5: https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity
- 6: https://pkg.go.dev/k8s.io/kubernetes/pkg/api/validation
🏁 Script executed:
cd /tmp && find . -name "config.go" -path "*/kubernetes/*" 2>/dev/null | head -20Repository: dagu-org/dagu
Length of output: 39
🏁 Script executed:
# List files in the repository to locate the file
git ls-files | grep -E "internal/runtime.*kubernetes.*config.go"Repository: dagu-org/dagu
Length of output: 102
🏁 Script executed:
# Get an overview of the repository structure
git ls-files | head -30Repository: dagu-org/dagu
Length of output: 761
🏁 Script executed:
# Check file size first
wc -l internal/runtime/builtin/kubernetes/config.goRepository: dagu-org/dagu
Length of output: 106
🏁 Script executed:
# Read the file to understand the context and implementation
cat -n internal/runtime/builtin/kubernetes/config.go | head -100Repository: dagu-org/dagu
Length of output: 5091
🏁 Script executed:
# Read around the parseQuantity function (lines 493-498 mentioned in review)
sed -n '480,520p' internal/runtime/builtin/kubernetes/config.go | cat -nRepository: dagu-org/dagu
Length of output: 2067
🏁 Script executed:
# Look for all usages of parseQuantity in the file
grep -n "parseQuantity" internal/runtime/builtin/kubernetes/config.goRepository: dagu-org/dagu
Length of output: 394
🏁 Script executed:
# Check for imports related to validation
sed -n '1,30p' internal/runtime/builtin/kubernetes/config.go | grep -E "import|validation"Repository: dagu-org/dagu
Length of output: 65
🏁 Script executed:
# Check the usage context of parseQuantity in the file
sed -n '320,350p' internal/runtime/builtin/kubernetes/config.go | cat -nRepository: dagu-org/dagu
Length of output: 1260
🏁 Script executed:
# Check the second usage context
sed -n '380,395p' internal/runtime/builtin/kubernetes/config.go | cat -nRepository: dagu-org/dagu
Length of output: 737
🏁 Script executed:
# Check what happens after parseQuantity returns - is the qty value used?
sed -n '325,345p' internal/runtime/builtin/kubernetes/config.go | cat -nRepository: dagu-org/dagu
Length of output: 860
🏁 Script executed:
# Find LoadConfigFromMap function to understand the full validation flow
grep -n "LoadConfigFromMap" internal/runtime/builtin/kubernetes/config.goRepository: dagu-org/dagu
Length of output: 212
🏁 Script executed:
# Check if there are other error patterns for negative values in the file
grep -n "ErrNegative" internal/runtime/builtin/kubernetes/config.goRepository: dagu-org/dagu
Length of output: 465
Reject negative quantities after parsing.
resource.ParseQuantity accepts negative values like -100m or -1Gi, so parseQuantity currently validates syntax only. Negative CPU/memory in resources.requests/resources.limits and negative empty_dir.size_limit pass validation and fail later at Job creation instead of during config decoding. This is inconsistent with the existing pattern of early negative-value checks in the file (see ErrNegativeActiveDeadline, ErrNegativeBackoffLimit, ErrNegativeTTLAfterFinished).
Suggested fix
func parseQuantity(field, value string) (resource.Quantity, error) {
qty, err := resource.ParseQuantity(value)
if err != nil {
return resource.Quantity{}, fmt.Errorf("%s: invalid quantity %q: %w", field, value, err)
}
+ if qty.Sign() < 0 {
+ return resource.Quantity{}, fmt.Errorf("%s: quantity must be >= 0", field)
+ }
return qty, nil
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func parseQuantity(field, value string) (resource.Quantity, error) { | |
| qty, err := resource.ParseQuantity(value) | |
| if err != nil { | |
| return resource.Quantity{}, fmt.Errorf("%s: invalid quantity %q: %w", field, value, err) | |
| } | |
| return qty, nil | |
| func parseQuantity(field, value string) (resource.Quantity, error) { | |
| qty, err := resource.ParseQuantity(value) | |
| if err != nil { | |
| return resource.Quantity{}, fmt.Errorf("%s: invalid quantity %q: %w", field, value, err) | |
| } | |
| if qty.Sign() < 0 { | |
| return resource.Quantity{}, fmt.Errorf("%s: quantity must be >= 0", field) | |
| } | |
| return qty, nil | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/runtime/builtin/kubernetes/config.go` around lines 493 - 498, The
parseQuantity function currently only validates syntax; after calling
resource.ParseQuantity(value) add a check for negative values (if qty.Sign() <
0) and return an error immediately so negatives fail during config decoding. Use
the same error-pattern as other negative checks in this file: either return
fmt.Errorf("%s: negative quantity %q: %w", field, value, ErrNegativeQuantity)
and add a new sentinel ErrNegativeQuantity (matching the
ErrNegativeActiveDeadline/ErrNegativeBackoffLimit style) or reuse an appropriate
existing ErrNegative* sentinel if present; update parseQuantity to return
resource.Quantity{}, that error when qty is negative.
| "env": {Type: "array", Items: &jsonschema.Schema{Type: "object"}, Description: "Environment variables"}, | ||
| "env_from": {Type: "array", Items: &jsonschema.Schema{Type: "object"}, Description: "Environment variable sources"}, | ||
| "resources": {Type: "object", Description: "CPU/memory requests and limits"}, | ||
| "node_selector": {Type: "object", Description: "Node selector constraints"}, | ||
| "tolerations": {Type: "array", Items: &jsonschema.Schema{Type: "object"}, Description: "Pod tolerations"}, | ||
| "labels": {Type: "object", Description: "Labels for Job and Pod"}, | ||
| "annotations": {Type: "object", Description: "Annotations for Job and Pod"}, | ||
| "volumes": {Type: "array", Items: &jsonschema.Schema{Type: "object"}, Description: "Pod volumes"}, | ||
| "volume_mounts": {Type: "array", Items: &jsonschema.Schema{Type: "object"}, Description: "Container volume mounts"}, |
There was a problem hiding this comment.
The registered schema still hides the nested Kubernetes surface.
env, env_from, resources, tolerations, volumes, and volume_mounts are all exposed as opaque object / array{object} nodes here. CLI/UI schema consumers still cannot validate or autocomplete the nested fields that the runtime actually supports.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/runtime/builtin/kubernetes/config.go` around lines 520 - 528, The
schema entries in config.go currently register env, env_from, resources,
tolerations, volumes, and volume_mounts as opaque object/array nodes, which
prevents CLI/UI clients from validating or autocompleting nested Kubernetes
fields; update the jsonschema map (the block that defines "env", "env_from",
"resources", "node_selector", "tolerations", "labels", "annotations", "volumes",
"volume_mounts") to replace those generic entries with full nested
jsonschema.Schema definitions: define "env" as array of objects with properties
like name, value, valueFrom (and nested secretKeyRef/configMapKeyRef),
"env_from" as array of objects with configMapRef/secretRef, "resources" as
object with properties limits and requests (each an object of string
quantities), "tolerations" as array of objects with key, operator, value,
effect, tolerationSeconds, "volumes" as array of objects supporting secret,
configMap, emptyDir, persistentVolumeClaim, hostPath shapes, and "volume_mounts"
as array of objects with name, mountPath, subPath, readOnly; keep original
descriptions and types but provide these nested properties so schema consumers
can validate and autocomplete the runtime-supported fields.
| podName, err := client.WaitForPod(ctx) | ||
| if err != nil { | ||
| logger.Error(ctx, "Kubernetes executor: pod scheduling failed", slog.Any("error", err)) | ||
| e.cleanup(ctx, true) | ||
| return fmt.Errorf("pod scheduling failed: %w", err) |
There was a problem hiding this comment.
Respect cleanup_policy: keep when pod scheduling fails.
This path always calls e.cleanup(ctx, true), so unschedulable Jobs are deleted even when the user explicitly asked to keep them for debugging. That makes one of the hardest failure modes disappear immediately. Reserve force=true for cancellation/kill paths and let normal WaitForPod failures follow the configured cleanup policy.
Suggested change
podName, err := client.WaitForPod(ctx)
if err != nil {
logger.Error(ctx, "Kubernetes executor: pod scheduling failed", slog.Any("error", err))
- e.cleanup(ctx, true)
- return fmt.Errorf("pod scheduling failed: %w", err)
+ if ctx.Err() != nil {
+ e.cleanup(ctx, true)
+ return ctx.Err()
+ }
+ e.cleanup(ctx, false)
+ return fmt.Errorf("pod scheduling failed: %w", err)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| podName, err := client.WaitForPod(ctx) | |
| if err != nil { | |
| logger.Error(ctx, "Kubernetes executor: pod scheduling failed", slog.Any("error", err)) | |
| e.cleanup(ctx, true) | |
| return fmt.Errorf("pod scheduling failed: %w", err) | |
| podName, err := client.WaitForPod(ctx) | |
| if err != nil { | |
| logger.Error(ctx, "Kubernetes executor: pod scheduling failed", slog.Any("error", err)) | |
| if ctx.Err() != nil { | |
| e.cleanup(ctx, true) | |
| return ctx.Err() | |
| } | |
| e.cleanup(ctx, false) | |
| return fmt.Errorf("pod scheduling failed: %w", err) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/runtime/builtin/kubernetes/executor.go` around lines 135 - 139, The
failure path after client.WaitForPod currently always calls e.cleanup(ctx, true)
which deletes unschedulable pods even when the job's cleanup_policy is set to
keep; change this to respect the job's cleanup policy by not forcing deletion on
scheduling errors: check the job/config cleanup policy (e.g. cleanup_policy or
e.cleanupPolicy) and either skip calling e.cleanup or call e.cleanup(ctx, false)
only when the policy allows deletion, reserving force=true for explicit
cancel/kill paths; update the code around the WaitForPod error handling and the
e.cleanup(ctx, true) call accordingly.
| func validateStep(step core.Step) error { | ||
| if len(step.ExecutorConfig.Config) == 0 { | ||
| return fmt.Errorf("kubernetes executor requires config with at least 'image' field") | ||
| } | ||
| _, err := LoadConfigFromMap(step.ExecutorConfig.Config) | ||
| return err |
There was a problem hiding this comment.
Don't silently drop additional command entries.
buildCommand only forwards step.Commands[0], and validateStep does not reject multiple entries. A Kubernetes step using command: ["echo one", "echo two"] will execute only the first command and still report success. Either reject multi-entry steps for this executor or wrap all entries into a single shell command before creating the Job.
Suggested guardrail
func validateStep(step core.Step) error {
if len(step.ExecutorConfig.Config) == 0 {
return fmt.Errorf("kubernetes executor requires config with at least 'image' field")
}
+ if len(step.Commands) > 1 {
+ return fmt.Errorf("kubernetes executor supports only a single command entry; wrap multiple commands in a shell command or script")
+ }
_, err := LoadConfigFromMap(step.ExecutorConfig.Config)
return err
}Also applies to: 288-299
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/runtime/builtin/kubernetes/executor.go` around lines 280 - 285,
validateStep currently allows multiple entries in step.Commands while
buildCommand only uses step.Commands[0], causing extra commands to be silently
dropped; update validateStep (and any related validation for the kubernetes
executor) to detect when len(step.Commands) > 1 and return a clear error (e.g.,
"kubernetes executor does not support multiple command entries; provide a single
combined command or use a script"), or alternatively modify buildCommand to
combine all entries into a single shell invocation (e.g., join with " && " or
wrap in "sh -c") so all commands run; reference validateStep and buildCommand
when making the change.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1886 +/- ##
==========================================
- Coverage 68.92% 68.73% -0.19%
==========================================
Files 452 456 +4
Lines 56092 57581 +1489
==========================================
+ Hits 38661 39581 +920
- Misses 13934 14390 +456
- Partials 3497 3610 +113
... and 17 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary
type: k8s/type: kubernetes, including Job creation, pod log streaming, exit-code handling, config decoding, and direct runtime testskubernetes:defaults with explicit step opt-in and predictable merge behavior, and harden the executor around kubeconfig fallback, cleanup, invalid quantities, and completion-state handlingWhy
Testing
go test ./internal/runtime/builtin/kubernetesgo test ./internal/runtime/...go test ./internal/core/...go test ./internal/agent/schema ./internal/cmn/schema -count=1go test ./internal/core/spec -count=1Summary by CodeRabbit
New Features
Dependencies