Skip to content

feat: add DAG-level retry policy#1774

Merged
yottahmd merged 22 commits intomainfrom
daglevel-retry
Mar 15, 2026
Merged

feat: add DAG-level retry policy#1774
yottahmd merged 22 commits intomainfrom
daglevel-retry

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Mar 14, 2026

Summary

  • add DAG-level retry policy support, enqueue semantics, and scheduler retry scanning
  • simplify failure handling so terminal failures run inline and background retry only enqueues eligible retries
  • add regression coverage for retry scanning and enable a default base DAG retry policy for new installs

Testing

  • go test ./internal/service/scheduler/... ./internal/intg/queue/... ./internal/runtime/agent ./internal/core/spec ./internal/persis/filebaseconfig

Summary by CodeRabbit

  • New Features

    • DAG-level retry policies (limit, interval, backoff, max interval)
    • Automatic retry scanner that re-enqueues eligible failed runs
    • Configurable retry failure window (default 24h)
  • Improvements

    • Exposed autoRetryCount in DAG run summaries/details and UI schema
    • Safer concurrent retry updates via atomic compare-and-swap and enqueue robustness
    • Index format bumped to include attempt ID and retry count

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 14, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d71be57f-3a7d-4354-95cf-1c58a8fa3828

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds DAG-level retry support and scheduler-side retry scanning; introduces an AutoRetryCount field propagated through status, persistence, API, and UI; refactors retry enqueue to use an atomic Compare-And-Swap on latest attempt status; and wires a RetryFailureWindow configuration and scheduler RetryScanner into lifecycle.

Changes

Cohort / File(s) Summary
API Schema & Generated Types
api/v1/api.yaml, api/v1/api.gen.go, ui/src/api/v1/schema.ts, proto/index/v1/index.proto
Added autoRetryCount to DAGRunSummary/Details and corresponding protobuf/schema entries; updated generated API types and UI schema.
API Frontend Transformer
internal/service/frontend/api/v1/transformer.go, internal/service/frontend/api/v1/transformer_test.go, internal/service/frontend/api/v1/dagruns.go
Populate AutoRetryCount from internal status into API outputs; updated enqueueRetry callsite to new signature and handle stale-latest error.
Core models & retry primitives
internal/core/dag.go, internal/core/retry.go, internal/core/exec/runstatus.go, internal/core/exec/dagrun.go
Added DAG-level RetryPolicy, CalculateBackoffInterval, added AutoRetryCount on DAGRunStatus (initialized to 0), and added List option Unlimited plus DAGRunStore CAS API declaration.
Enqueue retry refactor
internal/core/exec/enqueue_retry.go, internal/core/exec/enqueue_retry_test.go, internal/cmd/retry.go
Refactored EnqueueRetry to accept dagRunStore and use CompareAndSwapLatestAttemptStatus CAS to persist queued state (with rollback on enqueue failure); introduced ErrRetryStaleLatest and EnqueueRetryOptions; updated callers and tests.
Persistence: index & store
internal/persis/filedagrun/dagrunindex/dagrunindex.go, internal/persis/filedagrun/dataroot.go, internal/persis/filedagrun/dagrun.go, internal/persis/filedagrun/store.go
Bumped index version, added AttemptID and AutoRetryCount to index entries and summaries, implemented CompareAndSwapLatestAttemptStatus in store, propagated new fields when summarizing, and added unlimited listing mode handling.
Scheduler retry scanner & integration
internal/service/scheduler/retry_scanner.go, internal/service/scheduler/retry_scanner_test.go, internal/service/scheduler/scheduler.go, internal/service/scheduler/suspend_name.go, internal/service/scheduler/tick_planner.go
New RetryScanner that scans failed runs within a retryWindow and enqueues eligible retries using policy/backoff; integrated into Scheduler lifecycle and startup/shutdown flow; added dag suspend-name helper.
Configuration
internal/cmn/config/definition.go, internal/cmn/config/loader.go, internal/cmn/config/config.go, internal/cmn/config/loader_test.go
Added scheduler.retry_failure_window config (RetryFailureWindow time.Duration), defaulting to 24h and validated as non-negative; wired env binding and tests.
Schema & DAG spec
internal/cmn/schema/dag.schema.json, internal/core/spec/dag.go, internal/core/spec/loader_test.go
Introduced reusable retry policy schema definitions; added DAG-level retry_policy parsing/validation and builders to produce core.DAGRetryPolicy; added tests for parsing/normalization and schema validation.
Integration & test fixtures
internal/intg/queue/fixture_test.go, internal/intg/queue/queue_test.go, internal/intg/distr/fixtures_test.go, internal/intg/sched_test.go, internal/service/scheduler/*_test.go
Extended fixtures and integration tests to support retryWindow, persisted AutoRetryCount, marker-based checks, and async scheduler startup; added many tests for retry scanner and scheduler lifecycle.
Agent / runtime / runner
internal/runtime/agent/agent.go, internal/runtime/agent/agent_test.go, internal/runtime/agent/retry_behavior_test.go, internal/runtime/runner.go, internal/runtime/transform/status.go
Propagated persisted timing/retry metadata into status construction, added helpers to derive current AutoRetryCount, tests for retry behavior, and a StatusOption WithAutoRetryCount; Runner gains ForcedStatus override and uses core backoff helper.
Worker & remote tests
internal/service/worker/remote_handler.go, internal/service/worker/remote_handler_test.go
Minor nil-safety and removed old retry config usage; test mocks updated with CompareAndSwapLatestAttemptStatus stub.
Health server & tests
internal/service/scheduler/health.go, internal/service/scheduler/health_test.go
HealthServer now manages listener, exposes URL(), and has safer Start/Stop with bound address tracking; tests updated to use dynamic address.
Base configs & persistence tests
internal/persis/filebaseconfig/default_base_config.go, internal/persis/filebaseconfig/store_test.go
Added default DAG retry_policy in base config and assertion in tests.
Mocks updated across packages
internal/cmn/telemetry/collector_test.go, internal/runtime/agent/dbclient_test.go, internal/service/coordinator/handler_test.go, internal/service/scheduler/zombie_detector_test.go, internal/service/worker/remote_handler_test.go
Added CompareAndSwapLatestAttemptStatus implementations to various mock DAGRunStore types used in tests to satisfy CAS usage.
UI schema
ui/src/api/v1/schema.ts
Added autoRetryCount field to DAGRunSummary schema.
Protobuf / index proto
proto/index/v1/index.proto
Added attempt_id and auto_retry_count fields to DAGRunIndexEntry proto.
Misc tests & helpers
internal/test/helper.go, internal/testdata/cli/config_test.yaml, many *_test.go updates across packages`
Test harness and fixtures adjusted (scheduler port 0 default, new helpers, and widespread test updates to support retry behavior).

Sequence Diagram(s)

sequenceDiagram
    participant Scanner as RetryScanner
    participant Store as DAGRunStore
    participant Enqueuer as EnqueueRetry
    participant Queue as QueueStore

    Scanner->>Store: ListStatuses(failed runs in window)
    Store-->>Scanner: []DAGRunStatus
    Scanner->>Scanner: evaluateRetryDecision(status, policy)
    alt eligible
        Scanner->>Enqueuer: EnqueueRetry(ctx, dagRunStore, queueStore, dag, status, opts)
        Enqueuer->>Store: CompareAndSwapLatestAttemptStatus(expectedAttemptID, expectedStatus, mutate)
        Note over Store: CAS sets Status->Queued, QueuedAt, TriggerType, optionally increments AutoRetryCount
        Store-->>Enqueuer: (newStatus, swapped=true)
        alt swapped
            Enqueuer->>Queue: Enqueue(dagRun)
            Queue-->>Enqueuer: success
            Enqueuer-->>Scanner: nil
        else enqueue failed
            Enqueuer->>Store: CompareAndSwapLatestAttemptStatus(rollback mutate)
            Store-->>Enqueuer: (rolledBackStatus, swapped=true/false)
            Enqueuer-->>Scanner: error
        end
    else not eligible
        Scanner-->>Scanner: log reason (suspended/exhausted/backoff)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~70 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 23.47% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: add DAG-level retry policy' directly and clearly summarizes the main change: adding support for DAG-level retry policies. It is concise, specific, and accurately reflects the core objective of the PR.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch daglevel-retry
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/cmn/config/loader.go (1)

795-819: ⚠️ Potential issue | 🟠 Major

Keep invalid retry_failure_window distinct from an explicit disable.

A malformed scheduler.retry_failure_window is parsed as 0, and this defaulting path only restores 24h when the key is absent. So a typo like "24hours" silently turns retry scanning off instead of leaving the default in place. Please preserve parse success separately here so only an explicit 0s disables the scanner.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmn/config/loader.go` around lines 795 - 819, The code currently
treats a malformed scheduler.retry_failure_window as 0 and loses the difference
between "parse failed" and "explicit 0", so update the parsing and defaulting:
change l.parseDuration("scheduler.retry_failure_window", ...) to return a
(time.Duration, bool) or provide a new helper (e.g., l.parseDurationOK or
l.parseDurationWithStatus) that indicates parse success, set
cfg.Scheduler.RetryFailureWindow only when parse succeeded, and adjust
setSchedulerDefaults (the setSchedulerDefaults method and usage of
cfg.Scheduler.RetryFailureWindow) to only treat 0 as a deliberate disable when
the parse succeeded for that key; otherwise apply the 24*time.Hour default when
the key was missing or malformed. Ensure references to l.parseDuration and
setSchedulerDefaults are updated accordingly.
🧹 Nitpick comments (3)
internal/cmd/retry.go (1)

118-127: Consider removing redundant parameters.

The _ exec.DAGRunAttempt parameter is unused, and dagRunID is only used for logging but could be derived from status.DAGRunID. Consider simplifying the signature:

♻️ Suggested simplification
-func enqueueRetry(ctx *Context, _ exec.DAGRunAttempt, dag *core.DAG, status *exec.DAGRunStatus, dagRunID string) error {
+func enqueueRetry(ctx *Context, dag *core.DAG, status *exec.DAGRunStatus) error {
 	if err := exec.EnqueueRetry(ctx.Context, ctx.DAGRunStore, ctx.QueueStore, dag, status); err != nil {
 		return err
 	}
 	logger.Info(ctx, "Enqueued retry; will run when queue capacity is available",
 		tag.DAG(dag.Name),
-		tag.RunID(dagRunID),
+		tag.RunID(status.DAGRunID),
 	)
 	return nil
 }

Update the call site at line 89 accordingly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmd/retry.go` around lines 118 - 127, The enqueueRetry function
currently accepts an unused _ exec.DAGRunAttempt and a dagRunID that is only
used for logging; change the signature of enqueueRetry(ctx *Context, dag
*core.DAG, status *exec.DAGRunStatus) error to remove the unused DAGRunAttempt
and derive the run ID from status.DAGRunID for logging, keep the call to
exec.EnqueueRetry(ctx.Context, ctx.DAGRunStore, ctx.QueueStore, dag, status)
as-is, update the logger.Info tag.RunID argument to use status.DAGRunID, and
then update all call sites (the place that passed the removed exec.DAGRunAttempt
and dagRunID) to call the new signature accordingly.
api/v1/api.yaml (1)

7451-7453: Constrain retryCount to non-negative values.

retryCount should not accept negative integers in the API contract.

Suggested diff
         retryCount:
           type: integer
+          minimum: 0
           description: "Number of completed retry attempts for this DAG-run"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/v1/api.yaml` around lines 7451 - 7453, The OpenAPI schema for the
property retryCount currently allows negative integers; update the
api/v1/api.yaml schema for the retryCount property to constrain it to
non-negative values by adding a minimum: 0 (and optionally exclusiveMinimum:
false if your linter requires it) so that retryCount only accepts integers >= 0;
locate the retryCount definition in the file (the property named retryCount
under the relevant schema) and add the minimum constraint.
api/v1/api.gen.go (1)

1208-1208: Keep RetryCount documented consistently across the public DAG-run models.

RetryCount is documented on the summary type below, but not on this details type. Since this file is generated, please add the same description in the source schema and regenerate so the API docs stay aligned.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/v1/api.gen.go` at line 1208, The generated details model is missing the
RetryCount description – add the same documentation that exists on the summary
model to the source schema for the details model (the field named RetryCount)
and then regenerate the API bindings so the generated struct field for
RetryCount (RetryCount int `json:"retryCount"`) includes the description in
api.gen.go; update the OpenAPI/Swagger/source schema entry for the details
model's RetryCount property to match the summary type, then run the generator.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/core/dag.go`:
- Around line 375-390: The retry-policy validation currently uses a switch in
the Validate() logic (the d.RetryPolicy block) which short-circuits after the
first failing field; change it to perform independent checks for each field
(d.RetryPolicy.Limit, d.RetryPolicy.Interval, d.RetryPolicy.Backoff,
d.RetryPolicy.MaxInterval) and append a NewValidationError for each failing
condition so all retry-policy errors are collected and returned by Validate().

In `@internal/core/exec/enqueue_retry.go`:
- Around line 32-71: The current CAS in enqueue_retry.go flips the latest
attempt to Queued then calls queueStore.Enqueue, which can leave a run stranded
if the process dies between those steps; implement a reconciliation for Queued
runs without a corresponding queue item (or make queue enqueue atomic with
status) by adding a scanner or background job that queries dagRunStore for
DAGRunStatus records with Status==core.Queued and verifies presence in
queueStore (use QueueStore methods) and for missing queue entries either
re-enqueues the DAGRun via queueStore.Enqueue or roll back the DAGRunStatus to
the previous status using dagRunStore.CompareAndSwapLatestAttemptStatus (the
same CAS helper used in enqueue_retry.go) to ensure eventual consistency. Ensure
the reconciliation is idempotent and uses the same AttemptID/DAGRun identifiers
as in enqueue_retry.go so retries are not duplicated.

In `@internal/core/spec/dag.go`:
- Around line 869-895: The parseDAGRetryInterval function returns a non-empty
intervalStr ("60") only for the nil/default branch but returns "" for all
successfully parsed numeric branches; make intervalStr consistent by returning
the original numeric value as a string on all successful parses (e.g., for int,
int64, uint64 branches return fmt.Sprintf("%d", value) along with the computed
time.Duration), and keep the error branches returning "", so callers (e.g.,
consumers expecting IntervalSecStr) always get a string representation when
parsing succeeds.

In `@internal/service/coordinator/handler_test.go`:
- Around line 100-108: The mock CompareAndSwapLatestAttemptStatus currently
no-ops and always returns false; change it to implement real CAS semantics: add
a stored current *exec.DAGRunStatus on mockDAGRunStore (protected by a mutex),
then in CompareAndSwapLatestAttemptStatus lock, copy the current status, call
the provided mutate func on the copy, and if mutate returns nil replace the
stored status with the mutated copy and return the updated *exec.DAGRunStatus,
true, nil; if mutate returns an error return nil, false, err; if mutate returns
nil but you decide not to swap return the original status, false, nil. Use the
method name CompareAndSwapLatestAttemptStatus and fields (e.g.,
latestAttemptStatus, mu) so tests exercising CAS behavior will observe real
retries/updates.

In `@internal/service/frontend/api/v1/dagruns.go`:
- Around line 1668-1673: In enqueueRetry, add a nil-status guard after calling
attempt.ReadStatus to avoid passing a nil status to exec.EnqueueRetry;
specifically, after attempt.ReadStatus(ctx) in function enqueueRetry, check if
status == nil and return a descriptive error (e.g., fmt.Errorf("nil status from
attempt.ReadStatus")) instead of calling exec.EnqueueRetry with a nil value,
preserving use of a.dagRunStore, a.queueStore and dag when invoking
exec.EnqueueRetry.

In `@internal/service/scheduler/health.go`:
- Around line 77-90: The log currently prints tag.Port(h.port) which can be
0/unset when using newHealthServerWithAddr or ephemeral ports; update the
startup log in the goroutine to log the actual bound address h.boundAddr (e.g.,
replace tag.Port(h.port) with a tag/value using h.boundAddr) and ensure the
package import list includes "log/slog" (or the appropriate slog/tag helper used
by logger) so the bound address can be logged for accurate diagnostics; modify
the logger.Info call that references h.port and confirm h.boundAddr is set
before logging.

In `@internal/service/scheduler/retry_scanner.go`:
- Around line 123-129: The current call to s.dagRunStore.ListStatuses uses
exec.WithFrom(from), which filters by run creation time and causes runs that
started before the window but failed inside it to be skipped; remove
exec.WithFrom(from) from the ListStatuses call (call ListStatuses with
exec.WithExactName(dag.Name), exec.WithStatuses([...Failed...]) and
exec.WithoutLimit()) and then post-filter the returned failedRuns by each
run.FinishedAt (e.g., include only runs whose FinishedAt falls within the
retry_failure_window "from"..now range), or if the store supports it add/replace
the query option with one that filters by FinishedAt instead of creation time;
update any related logic that assumes the pre-filtering.

---

Outside diff comments:
In `@internal/cmn/config/loader.go`:
- Around line 795-819: The code currently treats a malformed
scheduler.retry_failure_window as 0 and loses the difference between "parse
failed" and "explicit 0", so update the parsing and defaulting: change
l.parseDuration("scheduler.retry_failure_window", ...) to return a
(time.Duration, bool) or provide a new helper (e.g., l.parseDurationOK or
l.parseDurationWithStatus) that indicates parse success, set
cfg.Scheduler.RetryFailureWindow only when parse succeeded, and adjust
setSchedulerDefaults (the setSchedulerDefaults method and usage of
cfg.Scheduler.RetryFailureWindow) to only treat 0 as a deliberate disable when
the parse succeeded for that key; otherwise apply the 24*time.Hour default when
the key was missing or malformed. Ensure references to l.parseDuration and
setSchedulerDefaults are updated accordingly.

---

Nitpick comments:
In `@api/v1/api.gen.go`:
- Line 1208: The generated details model is missing the RetryCount description –
add the same documentation that exists on the summary model to the source schema
for the details model (the field named RetryCount) and then regenerate the API
bindings so the generated struct field for RetryCount (RetryCount int
`json:"retryCount"`) includes the description in api.gen.go; update the
OpenAPI/Swagger/source schema entry for the details model's RetryCount property
to match the summary type, then run the generator.

In `@api/v1/api.yaml`:
- Around line 7451-7453: The OpenAPI schema for the property retryCount
currently allows negative integers; update the api/v1/api.yaml schema for the
retryCount property to constrain it to non-negative values by adding a minimum:
0 (and optionally exclusiveMinimum: false if your linter requires it) so that
retryCount only accepts integers >= 0; locate the retryCount definition in the
file (the property named retryCount under the relevant schema) and add the
minimum constraint.

In `@internal/cmd/retry.go`:
- Around line 118-127: The enqueueRetry function currently accepts an unused _
exec.DAGRunAttempt and a dagRunID that is only used for logging; change the
signature of enqueueRetry(ctx *Context, dag *core.DAG, status
*exec.DAGRunStatus) error to remove the unused DAGRunAttempt and derive the run
ID from status.DAGRunID for logging, keep the call to
exec.EnqueueRetry(ctx.Context, ctx.DAGRunStore, ctx.QueueStore, dag, status)
as-is, update the logger.Info tag.RunID argument to use status.DAGRunID, and
then update all call sites (the place that passed the removed exec.DAGRunAttempt
and dagRunID) to call the new signature accordingly.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 525aaab2-994a-4ca0-ba50-db6acfb8b43f

📥 Commits

Reviewing files that changed from the base of the PR and between 716cde3 and 6d729b5.

⛔ Files ignored due to path filters (1)
  • proto/index/v1/index.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (45)
  • api/v1/api.gen.go
  • api/v1/api.yaml
  • internal/cmd/retry.go
  • internal/cmn/config/config.go
  • internal/cmn/config/definition.go
  • internal/cmn/config/loader.go
  • internal/cmn/config/loader_test.go
  • internal/cmn/telemetry/collector_test.go
  • internal/core/dag.go
  • internal/core/exec/dagrun.go
  • internal/core/exec/enqueue_retry.go
  • internal/core/exec/enqueue_retry_test.go
  • internal/core/exec/runstatus.go
  • internal/core/spec/dag.go
  • internal/core/spec/loader_test.go
  • internal/intg/queue/fixture_test.go
  • internal/intg/queue/queue_test.go
  • internal/persis/filebaseconfig/default_base_config.go
  • internal/persis/filebaseconfig/store_test.go
  • internal/persis/filedagrun/dagrun.go
  • internal/persis/filedagrun/dagrunindex/dagrunindex.go
  • internal/persis/filedagrun/dataroot.go
  • internal/persis/filedagrun/store.go
  • internal/runtime/agent/agent.go
  • internal/runtime/agent/agent_test.go
  • internal/runtime/agent/dbclient_test.go
  • internal/runtime/agent/retry_behavior_test.go
  • internal/runtime/runner.go
  • internal/runtime/transform/status.go
  • internal/service/coordinator/handler_test.go
  • internal/service/frontend/api/v1/dagruns.go
  • internal/service/frontend/api/v1/transformer.go
  • internal/service/frontend/api/v1/transformer_test.go
  • internal/service/scheduler/health.go
  • internal/service/scheduler/health_test.go
  • internal/service/scheduler/retry_scanner.go
  • internal/service/scheduler/retry_scanner_test.go
  • internal/service/scheduler/scheduler.go
  • internal/service/scheduler/suspend_name.go
  • internal/service/scheduler/tick_planner.go
  • internal/service/scheduler/zombie_detector_test.go
  • internal/service/worker/remote_handler.go
  • internal/service/worker/remote_handler_test.go
  • proto/index/v1/index.proto
  • ui/src/api/v1/schema.ts

Comment thread internal/core/dag.go Outdated
Comment thread internal/core/exec/enqueue_retry.go
Comment thread internal/core/spec/dag.go
Comment thread internal/service/coordinator/handler_test.go
Comment thread internal/service/frontend/api/v1/dagruns.go Outdated
Comment thread internal/service/scheduler/health.go Outdated
Comment thread internal/service/scheduler/retry_scanner.go
@yottahmd
Copy link
Copy Markdown
Collaborator Author

@coderabbitai full review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 15, 2026

✅ Actions performed

Full review triggered.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

♻️ Duplicate comments (5)
internal/service/scheduler/health.go (1)

104-106: ⚠️ Potential issue | 🟡 Minor

Log the bound address instead of the configured port.

Line 105 still logs tag.Port(h.port), which is misleading for dynamic/addr-based binding. Use the actual bound address captured at startup.

🔧 Suggested fix
 import (
 	"context"
 	"encoding/json"
 	"fmt"
+	"log/slog"
 	"net"
 	"net/http"
 	"sync"
 	"time"
@@
-	go func(server *http.Server, listener net.Listener) {
-		logger.Info(ctx, "Starting scheduler health check server", tag.Port(h.port))
+	go func(server *http.Server, listener net.Listener, boundAddr string) {
+		logger.Info(ctx, "Starting scheduler health check server", slog.String("addr", boundAddr))
 		if err := server.Serve(listener); err != nil && err != http.ErrServerClosed {
 			logger.Error(ctx, "Health check server error", tag.Error(err))
 		}
-	}(server, listener)
+	}(server, listener, boundAddr)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/scheduler/health.go` around lines 104 - 106, The log
currently uses the configured h.port (logger.Info(..., tag.Port(h.port))) which
is misleading for dynamic or address-based binding; update the goroutine that
starts the server (the anonymous func that takes server *http.Server, listener
net.Listener) to log the actual bound address by using listener.Addr().String()
(or the appropriate tag helper, e.g., tag.Addr(listener.Addr().String()))
instead of tag.Port(h.port) so the message reflects the real bind address.
internal/service/frontend/api/v1/dagruns.go (1)

1669-1673: ⚠️ Potential issue | 🟡 Minor

Add a nil-status guard before calling exec.EnqueueRetry.

attempt.ReadStatus(ctx) can still yield a nil status; passing that through risks a panic or unclear failure downstream.

✅ Suggested defensive fix
 func (a *API) enqueueRetry(ctx context.Context, attempt exec.DAGRunAttempt, dag *core.DAG) error {
 	status, err := attempt.ReadStatus(ctx)
 	if err != nil {
 		return fmt.Errorf("error reading status: %w", err)
 	}
+	if status == nil {
+		return fmt.Errorf("error reading status: status data is nil")
+	}
 	if err := exec.EnqueueRetry(ctx, a.dagRunStore, a.queueStore, dag, status, exec.EnqueueRetryOptions{}); err != nil {
 		if errors.Is(err, exec.ErrRetryStaleLatest) {
 			return &Error{
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/api/v1/dagruns.go` around lines 1669 - 1673, The
call to attempt.ReadStatus(ctx) can return a nil status which is then passed
into exec.EnqueueRetry causing potential panics; add a nil-status guard after
the ReadStatus call: check if status == nil and return a clear error (e.g.,
fmt.Errorf("nil status from attempt.ReadStatus")) before calling
exec.EnqueueRetry, so exec.EnqueueRetry(a.dagRunStore, a.queueStore, dag,
status, exec.EnqueueRetryOptions{}) always receives a non-nil status.
internal/service/coordinator/handler_test.go (1)

100-108: ⚠️ Potential issue | 🟠 Major

Implement real CAS behavior in this mock.

This stub never calls mutate and always reports swapped=false, so any coordinator path that starts depending on CAS semantics will be untestable here or fail for the wrong reason. Please mirror the lightweight in-memory CAS behavior used by the other DAGRunStore test doubles instead of returning (nil, false, nil) unconditionally.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/coordinator/handler_test.go` around lines 100 - 108, The
mock CompareAndSwapLatestAttemptStatus currently always returns (nil,false,nil)
and never calls the provided mutate callback; update
mockDAGRunStore.CompareAndSwapLatestAttemptStatus to implement an in-memory CAS:
load the current *exec.DAGRunStatus from the mock's internal storage, clone it,
call the provided mutate func on the clone, compare the pre-mutation and
post-mutation values (or compare expected status string argument against current
status), and if different replace the stored status and return the new status
with swapped=true, otherwise return the current status with swapped=false;
preserve and propagate any error from the mutate callback and match the behavior
used by the other DAGRunStore test doubles so tests depending on CAS semantics
exercise real mutation logic.
internal/core/exec/enqueue_retry.go (1)

44-87: ⚠️ Potential issue | 🟠 Major

Persisting Queued before enqueue still leaves a crash gap.

If the process dies after the CAS succeeds but before queueStore.Enqueue durably writes the item, the run is left Queued but absent from the queue. The retry scanner only revisits failed runs, so that retry is stranded indefinitely. This still needs a queued-run reconciliation path or an atomic/outbox-style enqueue strategy.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/exec/enqueue_retry.go` around lines 44 - 87, The current flow
sets status to Queued via dagRunStore.CompareAndSwapLatestAttemptStatus and only
then calls queueStore.Enqueue, which can leave a run marked Queued but not
present in the durable queue if the process crashes; change to an atomic/outbox
or reconciliation approach: either (A) persist the queue entry and the DAGRun
status in the same transactional boundary (or via a single
CompareAndSwapLatestAttemptStatus callback that also writes a durable outbox
record) so enqueue is durable before returning, or (B) if transactions aren’t
possible, write a durable outbox record (e.g., a QueueEntry row) inside the
CompareAndSwapLatestAttemptStatus update and have a separate dispatcher move
outbox->queue, or (C) add a reconciliation pass in the retry scanner to find
DAGRunStatus entries with Status==core.Queued but missing queue entries and
re-enqueue them; update logic around queueStore.Enqueue,
dagRunStore.CompareAndSwapLatestAttemptStatus, DAGRunStatus fields and the
rollback branch to reflect the chosen approach so no run can remain Queued
without a corresponding durable queue entry.
internal/service/scheduler/retry_scanner.go (1)

87-91: ⚠️ Potential issue | 🟠 Major

Retry window filtering is still tied to pre-query time filter, which can miss valid recent failures.

At Line 90, exec.WithFrom(from) is applied before retryReferenceTime(...) is evaluated. If WithFrom filters by creation time, long-running DAGs that failed inside the retry window are skipped entirely.

Proposed fix
 func (s *RetryScanner) scan(ctx context.Context) error {
 	now := s.clock().UTC()
-	from := exec.NewUTC(now.Add(-s.retryWindow))
+	windowStart := now.Add(-s.retryWindow)

 	failedRuns, err := s.dagRunStore.ListStatuses(
 		ctx,
 		exec.WithStatuses([]core.Status{core.Failed}),
-		exec.WithFrom(from),
 		exec.WithoutLimit(),
 	)
 	if err != nil {
 		return err
 	}

 	for _, listed := range failedRuns {
 		if listed == nil {
 			continue
 		}
+		refTime, ok := retryReferenceTime(listed)
+		if !ok || refTime.Before(windowStart) || refTime.After(now) {
+			continue
+		}
 		if err := s.processFailedRun(ctx, listed, now); err != nil {
 			logger.Error(ctx, "Retry scanner failed to process DAG run",
 				tag.DAG(listed.Name),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/scheduler/retry_scanner.go` around lines 87 - 91, The query
is applying exec.WithFrom(from) when calling s.dagRunStore.ListStatuses (with
exec.WithStatuses([]core.Status{core.Failed})), which uses the pre-query "from"
time and can skip DAG runs that failed inside the retry window; update the call
so the time window used by ListStatuses is computed from retryReferenceTime(...)
(or remove exec.WithFrom and instead pass a computed from based on
retryReferenceTime) so the DB query includes runs that failed within the retry
window, or alternatively keep exec.WithFrom but expand it to min(from,
retryReferenceTime(...)); modify the invocation of s.dagRunStore.ListStatuses to
use that adjusted time boundary to ensure long-running DAGs that failed recently
are not excluded.
🧹 Nitpick comments (7)
internal/intg/queue/fixture_test.go (1)

72-74: WithRetryWindow(0) currently cannot override config to zero.

Line 72 only applies the value when f.retryWindow > 0, so callers cannot explicitly set scheduler.retry_failure_window to 0 even though WithRetryWindow is documented as an override.

💡 Suggested refactor
 type fixture struct {
@@
-	retryWindow  time.Duration
+	retryWindow  *time.Duration
@@
 		test.WithConfigMutator(func(c *config.Config) {
@@
-			if f.retryWindow > 0 {
-				c.Scheduler.RetryFailureWindow = f.retryWindow
+			if f.retryWindow != nil {
+				c.Scheduler.RetryFailureWindow = *f.retryWindow
 			}
@@
 func WithRetryWindow(window time.Duration) func(*fixture) {
-	return func(f *fixture) { f.retryWindow = window }
+	return func(f *fixture) { f.retryWindow = &window }
 }

Also applies to: 119-122

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/intg/queue/fixture_test.go` around lines 72 - 74, The fixture
currently only applies f.retryWindow when f.retryWindow > 0 so callers cannot
set scheduler RetryFailureWindow to 0; update the API by adding a boolean flag
(e.g., retryWindowSet) to the fixture and set it inside WithRetryWindow, then
replace the conditional "if f.retryWindow > 0" with "if f.retryWindowSet {
c.Scheduler.RetryFailureWindow = f.retryWindow }" so zero is treated as a valid
override; apply the same change to the similar block around the 119-122 area.
internal/cmn/config/loader_test.go (1)

240-241: Consider adding explicit override/invalid parsing tests for RetryFailureWindow.

These assertions validate defaults well, but adding one env/YAML override case (and one invalid duration warning case) would better lock in loader behavior for the new field.

Also applies to: 526-527

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmn/config/loader_test.go` around lines 240 - 241, Add two tests for
the new RetryFailureWindow field in internal/cmn/config/loader_test.go: extend
the existing default-assertion test(s) to include an env/YAML override case that
sets RetryFailureWindow to a known duration (e.g., "30m") and asserts the loaded
config.RetryFailureWindow equals 30*time.Minute, and add an invalid-duration
case that supplies a bad string (e.g., "not-a-duration") and asserts the loader
logs a parse warning and falls back to the default value; place these assertions
alongside the existing default checks so they exercise the same loader code path
that reads RetryFailureWindow.
internal/persis/filebaseconfig/default_base_config.go (1)

49-54: Consider backoff/jitter defaults for fleet-wide retries.

Using a fixed 5s interval at base scope can synchronize retries after shared outages. Consider showing backoff (and optionally max_interval_sec) in the default policy to reduce retry spikes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/persis/filebaseconfig/default_base_config.go` around lines 49 - 54,
The default retry_policy uses a fixed interval_sec (5) which can synchronize
retries; update the base default in default_base_config.go so retry_policy
includes a backoff field (e.g., "backoff: true" or an exponential backoff
config) and an optional max_interval_sec to cap growth, and replace or augment
interval_sec with a base_interval_sec plus jitter/backoff semantics;
specifically edit the retry_policy block (the retry_policy object and its keys
limit, interval_sec) to add backoff and max_interval_sec fields and ensure
consumers of retry_policy (e.g., any code reading retry_policy) honor
base_interval_sec with backoff and jitter.
api/v1/api.yaml (1)

7451-7453: Constrain autoRetryCount to non-negative values.

autoRetryCount represents a consumed retry count, so allowing negatives weakens validation. Add minimum: 0 (and optionally default: 0).

♻️ Proposed schema tightening
         autoRetryCount:
           type: integer
+          minimum: 0
+          default: 0
           description: "Number of scheduler-issued DAG auto-retries already consumed for this DAG-run"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/v1/api.yaml` around lines 7451 - 7453, The property autoRetryCount in the
OpenAPI schema should be constrained to non-negative values: update the
autoRetryCount definition (the integer property named autoRetryCount) to include
minimum: 0 and optionally default: 0 so the schema enforces that consumed retry
counts cannot be negative.
internal/runtime/agent/agent.go (1)

1091-1104: Remove duplicate comment.

Line 1093 contains a duplicate of the comment that should only appear at line 1102. This appears to be a copy-paste artifact.

🧹 Remove duplicate comment
 	// If the current execution is based on a persisted target, copy timing data
 	// from that target. Otherwise, use the schedule time provided directly.
-	// Otherwise, use the schedule time provided directly via CLI flag.
 	if source != nil {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/runtime/agent/agent.go` around lines 1091 - 1104, Remove the
duplicated comment above the timing-copy logic: keep the explanatory comment
that describes copying timing data from a persisted target and using the
schedule time only once (the version currently at the later block), and delete
the repeated sentence immediately before the "if source != nil" block that
duplicates it; ensure the logic around variables source, opts,
transform.WithQueuedAt, transform.WithCreatedAt, and transform.WithScheduleTime
remains unchanged.
internal/cmd/retry.go (1)

123-129: Consider preserving the original error with %w for better debugging.

The error wrapping at line 126 loses the original exec.ErrRetryStaleLatest error context. While the message is descriptive, preserving the wrapped error allows callers to use errors.Is() for programmatic error handling.

🔧 Proposed fix
 	if err := exec.EnqueueRetry(ctx.Context, ctx.DAGRunStore, ctx.QueueStore, dag, status, exec.EnqueueRetryOptions{}); err != nil {
 		if errors.Is(err, exec.ErrRetryStaleLatest) {
-			return fmt.Errorf("dag-run state changed before retry could be queued")
+			return fmt.Errorf("dag-run state changed before retry could be queued: %w", err)
 		}
 		return err
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmd/retry.go` around lines 123 - 129, In enqueueRetry, preserve the
original err when returning the wrapped error so callers can inspect it; change
the fmt.Errorf call that returns "dag-run state changed before retry could be
queued" to wrap the original err using %w (referencing exec.ErrRetryStaleLatest
and the err variable from exec.EnqueueRetry) so errors.Is/As can detect the
underlying exec.ErrRetryStaleLatest.
internal/service/scheduler/scheduler_test.go (1)

66-71: Prefer require in the new timeout/error branches.

The new helpers and subtests mostly use require, but these added branches fall back to t.Fatal. Using require.FailNow/require.NoError here would keep failures consistent with the rest of the file.

As per coding guidelines, "**/*_test.go: Use stretchr/testify assertions for testing in Go`."

Also applies to: 150-155, 163-168, 253-267, 302-313, 381-395

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/scheduler/scheduler_test.go` around lines 66 - 71, Replace
t.Fatal calls in the select default branches with the appropriate
testify/require calls to keep tests consistent: for the select reading
restartScheduleTimeCh replace t.Fatal("restart schedule time was not recorded")
with require.FailNow(t, "restart schedule time was not recorded") (and similarly
replace other t.Fatal uses at the referenced blocks with require.FailNow or
require.NoError as appropriate). Locate the select blocks that read from
restartScheduleTimeCh and the other listed branches (around the checks at
150-155, 163-168, 253-267, 302-313, 381-395) and swap t.Fatal into
require.FailNow (or require.NoError/require.True/False matching the existing
assertions) so all test failures use stretchr/testify.require.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/cmn/config/config.go`:
- Around line 475-492: The validateScheduler function currently omits validation
for scheduler.failure_threshold; update the function (validateScheduler) to
reject negative values by checking c.Scheduler.FailureThreshold (or the exact
struct field name for failure_threshold) and returning an error like
"scheduler.failure_threshold must be >= 0" when it's < 0, consistent with the
other scheduler bounds checks.

In `@internal/cmn/schema/dag.schema.json`:
- Around line 640-740: The schemas allow unknown properties because they lack
additionalProperties: false; add additionalProperties: false to dagRetryPolicy
so unknown keys are rejected, and replace stepRetryPolicy's allOf + $ref to
retryPolicyCommon by inlining the properties from retryPolicyCommon (limit,
interval_sec, backoff, max_interval_sec with their existing oneOf/type
constraints) into the stepRetryPolicy object and then add additionalProperties:
false to that single combined stepRetryPolicy definition; keep retryPolicyCommon
if still used elsewhere or remove if not.

In `@internal/core/retry.go`:
- Around line 13-22: Clamp negative attemptCount to zero before computing the
backoff in CalculateBackoffInterval; inside the function ensure attemptCount is
set to max(0, attemptCount) (or equivalent) prior to using it in math.Pow so the
exponent is non-negative, then proceed with the existing sleepTime calculation
and maxInterval comparison.

In `@internal/core/spec/dag.go`:
- Around line 889-914: The parseDAGRetryMaxInterval function rejects numeric
strings while its siblings use parseConcreteRetryInt; modify
parseDAGRetryMaxInterval to delegate parsing to parseConcreteRetryInt
(preserving the "retry_policy.max_interval_sec" field name in any returned
validation errors), handle a nil value by returning time.Hour, and convert the
parsed int64/uint result seconds into a time.Duration (time.Second *
parsedValue) before returning so string-typed YAML like "60" is accepted
consistently with parseConcreteRetryInt.

In `@internal/intg/distr/fixtures_test.go`:
- Around line 264-288: The test hardcodes a 5-second wait when checking
scheduler startup, ignoring the caller-provided timeout; update the
require.Eventually call in startScheduler so its total wait duration uses the
passed-in timeout (the same timeout used to create schedulerCtx) instead of
5*time.Second, ensuring f.schedulerCtx/f.schedulerCancel and the scheduler start
check (f.scheduler.IsRunning() and pollSchedulerErr()) honor the caller's
timeout.

In `@internal/intg/sched_test.go`:
- Around line 81-87: The final receive from errCh in the shutdown path should
accept context cancellation errors instead of strictly requiring no error;
update the block that reads from errCh (around schedulerStopped, errCh, Stop and
the goroutine started by Start(ctx)) to treat errors.Is(err, context.Canceled)
or errors.Is(err, context.DeadlineExceeded) as acceptable outcomes and only fail
the test for other unexpected errors—i.e., replace the unconditional
require.NoError(t, err) with a conditional that passes for nil OR
context.Canceled OR context.DeadlineExceeded and fails on any other error.

In `@internal/runtime/runner.go`:
- Around line 813-815: The Status() method currently returns r.forcedStatus
immediately, causing runEventHandler()/setupEnvironEventHandler() to export a
terminal DAG_RUN_STATUS prematurely; change Status() so it only applies/returns
r.forcedStatus once the run is actually terminal (e.g., after the plan/state
indicates completion) — leave normal NotStarted/Running/Waiting checks to run
until the run reaches a terminal state, then override with forcedStatus; update
references to forcedStatus and Status() accordingly so onInit and other
pre-terminal handlers see the real in-progress status until terminal.

In `@internal/service/scheduler/scheduler_test.go`:
- Around line 143-155: The test uses a fixed time.Sleep to assume sc2 has
reached the lock-acquisition wait, which is flaky; add an explicit
synchronization point so the test waits for the scheduler to actually begin
waiting for the lock. Modify the scheduler Start path to invoke an optional
test-only callback (e.g., an onLockWait func or channel send) right before or
when it blocks trying to acquire the lock, then in the test replace the Sleep:
create a waitStarted channel, pass it into sc2 (or set the test hook), start sc2
in a goroutine, block on <-waitStarted to guarantee sc2 is blocked on the lock,
then assert behavior and cancel; apply the same replacement for the second
occurrence around lines 288-305 to remove timing-based sleeps.

---

Duplicate comments:
In `@internal/core/exec/enqueue_retry.go`:
- Around line 44-87: The current flow sets status to Queued via
dagRunStore.CompareAndSwapLatestAttemptStatus and only then calls
queueStore.Enqueue, which can leave a run marked Queued but not present in the
durable queue if the process crashes; change to an atomic/outbox or
reconciliation approach: either (A) persist the queue entry and the DAGRun
status in the same transactional boundary (or via a single
CompareAndSwapLatestAttemptStatus callback that also writes a durable outbox
record) so enqueue is durable before returning, or (B) if transactions aren’t
possible, write a durable outbox record (e.g., a QueueEntry row) inside the
CompareAndSwapLatestAttemptStatus update and have a separate dispatcher move
outbox->queue, or (C) add a reconciliation pass in the retry scanner to find
DAGRunStatus entries with Status==core.Queued but missing queue entries and
re-enqueue them; update logic around queueStore.Enqueue,
dagRunStore.CompareAndSwapLatestAttemptStatus, DAGRunStatus fields and the
rollback branch to reflect the chosen approach so no run can remain Queued
without a corresponding durable queue entry.

In `@internal/service/coordinator/handler_test.go`:
- Around line 100-108: The mock CompareAndSwapLatestAttemptStatus currently
always returns (nil,false,nil) and never calls the provided mutate callback;
update mockDAGRunStore.CompareAndSwapLatestAttemptStatus to implement an
in-memory CAS: load the current *exec.DAGRunStatus from the mock's internal
storage, clone it, call the provided mutate func on the clone, compare the
pre-mutation and post-mutation values (or compare expected status string
argument against current status), and if different replace the stored status and
return the new status with swapped=true, otherwise return the current status
with swapped=false; preserve and propagate any error from the mutate callback
and match the behavior used by the other DAGRunStore test doubles so tests
depending on CAS semantics exercise real mutation logic.

In `@internal/service/frontend/api/v1/dagruns.go`:
- Around line 1669-1673: The call to attempt.ReadStatus(ctx) can return a nil
status which is then passed into exec.EnqueueRetry causing potential panics; add
a nil-status guard after the ReadStatus call: check if status == nil and return
a clear error (e.g., fmt.Errorf("nil status from attempt.ReadStatus")) before
calling exec.EnqueueRetry, so exec.EnqueueRetry(a.dagRunStore, a.queueStore,
dag, status, exec.EnqueueRetryOptions{}) always receives a non-nil status.

In `@internal/service/scheduler/health.go`:
- Around line 104-106: The log currently uses the configured h.port
(logger.Info(..., tag.Port(h.port))) which is misleading for dynamic or
address-based binding; update the goroutine that starts the server (the
anonymous func that takes server *http.Server, listener net.Listener) to log the
actual bound address by using listener.Addr().String() (or the appropriate tag
helper, e.g., tag.Addr(listener.Addr().String())) instead of tag.Port(h.port) so
the message reflects the real bind address.

In `@internal/service/scheduler/retry_scanner.go`:
- Around line 87-91: The query is applying exec.WithFrom(from) when calling
s.dagRunStore.ListStatuses (with exec.WithStatuses([]core.Status{core.Failed})),
which uses the pre-query "from" time and can skip DAG runs that failed inside
the retry window; update the call so the time window used by ListStatuses is
computed from retryReferenceTime(...) (or remove exec.WithFrom and instead pass
a computed from based on retryReferenceTime) so the DB query includes runs that
failed within the retry window, or alternatively keep exec.WithFrom but expand
it to min(from, retryReferenceTime(...)); modify the invocation of
s.dagRunStore.ListStatuses to use that adjusted time boundary to ensure
long-running DAGs that failed recently are not excluded.

---

Nitpick comments:
In `@api/v1/api.yaml`:
- Around line 7451-7453: The property autoRetryCount in the OpenAPI schema
should be constrained to non-negative values: update the autoRetryCount
definition (the integer property named autoRetryCount) to include minimum: 0 and
optionally default: 0 so the schema enforces that consumed retry counts cannot
be negative.

In `@internal/cmd/retry.go`:
- Around line 123-129: In enqueueRetry, preserve the original err when returning
the wrapped error so callers can inspect it; change the fmt.Errorf call that
returns "dag-run state changed before retry could be queued" to wrap the
original err using %w (referencing exec.ErrRetryStaleLatest and the err variable
from exec.EnqueueRetry) so errors.Is/As can detect the underlying
exec.ErrRetryStaleLatest.

In `@internal/cmn/config/loader_test.go`:
- Around line 240-241: Add two tests for the new RetryFailureWindow field in
internal/cmn/config/loader_test.go: extend the existing default-assertion
test(s) to include an env/YAML override case that sets RetryFailureWindow to a
known duration (e.g., "30m") and asserts the loaded config.RetryFailureWindow
equals 30*time.Minute, and add an invalid-duration case that supplies a bad
string (e.g., "not-a-duration") and asserts the loader logs a parse warning and
falls back to the default value; place these assertions alongside the existing
default checks so they exercise the same loader code path that reads
RetryFailureWindow.

In `@internal/intg/queue/fixture_test.go`:
- Around line 72-74: The fixture currently only applies f.retryWindow when
f.retryWindow > 0 so callers cannot set scheduler RetryFailureWindow to 0;
update the API by adding a boolean flag (e.g., retryWindowSet) to the fixture
and set it inside WithRetryWindow, then replace the conditional "if
f.retryWindow > 0" with "if f.retryWindowSet { c.Scheduler.RetryFailureWindow =
f.retryWindow }" so zero is treated as a valid override; apply the same change
to the similar block around the 119-122 area.

In `@internal/persis/filebaseconfig/default_base_config.go`:
- Around line 49-54: The default retry_policy uses a fixed interval_sec (5)
which can synchronize retries; update the base default in default_base_config.go
so retry_policy includes a backoff field (e.g., "backoff: true" or an
exponential backoff config) and an optional max_interval_sec to cap growth, and
replace or augment interval_sec with a base_interval_sec plus jitter/backoff
semantics; specifically edit the retry_policy block (the retry_policy object and
its keys limit, interval_sec) to add backoff and max_interval_sec fields and
ensure consumers of retry_policy (e.g., any code reading retry_policy) honor
base_interval_sec with backoff and jitter.

In `@internal/runtime/agent/agent.go`:
- Around line 1091-1104: Remove the duplicated comment above the timing-copy
logic: keep the explanatory comment that describes copying timing data from a
persisted target and using the schedule time only once (the version currently at
the later block), and delete the repeated sentence immediately before the "if
source != nil" block that duplicates it; ensure the logic around variables
source, opts, transform.WithQueuedAt, transform.WithCreatedAt, and
transform.WithScheduleTime remains unchanged.

In `@internal/service/scheduler/scheduler_test.go`:
- Around line 66-71: Replace t.Fatal calls in the select default branches with
the appropriate testify/require calls to keep tests consistent: for the select
reading restartScheduleTimeCh replace t.Fatal("restart schedule time was not
recorded") with require.FailNow(t, "restart schedule time was not recorded")
(and similarly replace other t.Fatal uses at the referenced blocks with
require.FailNow or require.NoError as appropriate). Locate the select blocks
that read from restartScheduleTimeCh and the other listed branches (around the
checks at 150-155, 163-168, 253-267, 302-313, 381-395) and swap t.Fatal into
require.FailNow (or require.NoError/require.True/False matching the existing
assertions) so all test failures use stretchr/testify.require.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: aa1da421-bf4b-4e5e-ac9a-3eafcb1d65d2

📥 Commits

Reviewing files that changed from the base of the PR and between f998dc1 and afc6c40.

⛔ Files ignored due to path filters (1)
  • proto/index/v1/index.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (53)
  • api/v1/api.gen.go
  • api/v1/api.yaml
  • internal/cmd/retry.go
  • internal/cmn/config/config.go
  • internal/cmn/config/definition.go
  • internal/cmn/config/loader.go
  • internal/cmn/config/loader_test.go
  • internal/cmn/schema/dag.schema.json
  • internal/cmn/schema/dag_schema_test.go
  • internal/cmn/telemetry/collector_test.go
  • internal/core/dag.go
  • internal/core/exec/dagrun.go
  • internal/core/exec/enqueue_retry.go
  • internal/core/exec/enqueue_retry_test.go
  • internal/core/exec/runstatus.go
  • internal/core/retry.go
  • internal/core/spec/dag.go
  • internal/core/spec/loader_test.go
  • internal/intg/distr/fixtures_test.go
  • internal/intg/queue/fixture_test.go
  • internal/intg/queue/queue_test.go
  • internal/intg/sched_test.go
  • internal/persis/filebaseconfig/default_base_config.go
  • internal/persis/filebaseconfig/store_test.go
  • internal/persis/filedagrun/dagrun.go
  • internal/persis/filedagrun/dagrunindex/dagrunindex.go
  • internal/persis/filedagrun/dataroot.go
  • internal/persis/filedagrun/store.go
  • internal/runtime/agent/agent.go
  • internal/runtime/agent/agent_test.go
  • internal/runtime/agent/dbclient_test.go
  • internal/runtime/agent/retry_behavior_test.go
  • internal/runtime/runner.go
  • internal/runtime/transform/status.go
  • internal/service/coordinator/handler_test.go
  • internal/service/frontend/api/v1/dagruns.go
  • internal/service/frontend/api/v1/transformer.go
  • internal/service/frontend/api/v1/transformer_test.go
  • internal/service/scheduler/health.go
  • internal/service/scheduler/health_test.go
  • internal/service/scheduler/retry_scanner.go
  • internal/service/scheduler/retry_scanner_test.go
  • internal/service/scheduler/scheduler.go
  • internal/service/scheduler/scheduler_test.go
  • internal/service/scheduler/suspend_name.go
  • internal/service/scheduler/tick_planner.go
  • internal/service/scheduler/zombie_detector_test.go
  • internal/service/worker/remote_handler.go
  • internal/service/worker/remote_handler_test.go
  • internal/test/helper.go
  • internal/testdata/cli/config_test.yaml
  • proto/index/v1/index.proto
  • ui/src/api/v1/schema.ts

Comment thread internal/cmn/config/config.go
Comment thread internal/cmn/schema/dag.schema.json Outdated
Comment thread internal/core/retry.go
Comment thread internal/core/spec/dag.go
Comment thread internal/intg/distr/fixtures_test.go
Comment thread internal/intg/sched_test.go
Comment thread internal/runtime/runner.go Outdated
Comment thread internal/service/scheduler/scheduler_test.go Outdated
@yottahmd yottahmd merged commit 189cc01 into main Mar 15, 2026
6 checks passed
@yottahmd yottahmd deleted the daglevel-retry branch March 15, 2026 13:44
@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 15, 2026

Codecov Report

❌ Patch coverage is 72.49191% with 170 lines in your changes missing coverage. Please review.
✅ Project coverage is 69.64%. Comparing base (a885c7c) to head (2c91c8c).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
internal/service/scheduler/retry_scanner.go 65.90% 28 Missing and 17 partials ⚠️
internal/service/scheduler/scheduler.go 76.43% 24 Missing and 13 partials ⚠️
internal/core/spec/dag.go 65.82% 20 Missing and 7 partials ⚠️
internal/persis/filedagrun/store.go 62.90% 11 Missing and 12 partials ⚠️
internal/service/scheduler/health.go 74.46% 6 Missing and 6 partials ⚠️
internal/cmn/config/config.go 37.50% 5 Missing and 5 partials ⚠️
internal/runtime/agent/agent.go 72.72% 6 Missing ⚠️
internal/cmd/retry.go 0.00% 4 Missing ⚠️
internal/cmn/config/loader.go 50.00% 1 Missing and 1 partial ⚠️
internal/cmn/dirlock/dirlock.go 50.00% 1 Missing and 1 partial ⚠️
... and 1 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1774      +/-   ##
==========================================
+ Coverage   69.63%   69.64%   +0.01%     
==========================================
  Files         411      414       +3     
  Lines       47267    47761     +494     
==========================================
+ Hits        32914    33265     +351     
- Misses      11627    11722      +95     
- Partials     2726     2774      +48     
Files with missing lines Coverage Δ
internal/core/dag.go 93.95% <ø> (+1.64%) ⬆️
internal/core/exec/dagrun.go 85.45% <100.00%> (+0.83%) ⬆️
internal/core/exec/enqueue_retry.go 100.00% <100.00%> (ø)
internal/core/exec/runstatus.go 92.85% <100.00%> (+0.08%) ⬆️
internal/core/retry.go 100.00% <100.00%> (ø)
internal/core/spec/step.go 79.43% <100.00%> (ø)
internal/persis/filedagrun/dagrun.go 63.35% <ø> (ø)
...ernal/persis/filedagrun/dagrunindex/dagrunindex.go 92.78% <100.00%> (+0.23%) ⬆️
internal/persis/filedagrun/dataroot.go 69.01% <100.00%> (-0.61%) ⬇️
internal/runtime/runner.go 85.82% <100.00%> (-0.08%) ⬇️
... and 14 more

... and 8 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a885c7c...2c91c8c. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant