Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughAdds DAG-level retry support and scheduler-side retry scanning; introduces an AutoRetryCount field propagated through status, persistence, API, and UI; refactors retry enqueue to use an atomic Compare-And-Swap on latest attempt status; and wires a RetryFailureWindow configuration and scheduler RetryScanner into lifecycle. Changes
Sequence Diagram(s)sequenceDiagram
participant Scanner as RetryScanner
participant Store as DAGRunStore
participant Enqueuer as EnqueueRetry
participant Queue as QueueStore
Scanner->>Store: ListStatuses(failed runs in window)
Store-->>Scanner: []DAGRunStatus
Scanner->>Scanner: evaluateRetryDecision(status, policy)
alt eligible
Scanner->>Enqueuer: EnqueueRetry(ctx, dagRunStore, queueStore, dag, status, opts)
Enqueuer->>Store: CompareAndSwapLatestAttemptStatus(expectedAttemptID, expectedStatus, mutate)
Note over Store: CAS sets Status->Queued, QueuedAt, TriggerType, optionally increments AutoRetryCount
Store-->>Enqueuer: (newStatus, swapped=true)
alt swapped
Enqueuer->>Queue: Enqueue(dagRun)
Queue-->>Enqueuer: success
Enqueuer-->>Scanner: nil
else enqueue failed
Enqueuer->>Store: CompareAndSwapLatestAttemptStatus(rollback mutate)
Store-->>Enqueuer: (rolledBackStatus, swapped=true/false)
Enqueuer-->>Scanner: error
end
else not eligible
Scanner-->>Scanner: log reason (suspended/exhausted/backoff)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~70 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/cmn/config/loader.go (1)
795-819:⚠️ Potential issue | 🟠 MajorKeep invalid
retry_failure_windowdistinct from an explicit disable.A malformed
scheduler.retry_failure_windowis parsed as0, and this defaulting path only restores24hwhen the key is absent. So a typo like"24hours"silently turns retry scanning off instead of leaving the default in place. Please preserve parse success separately here so only an explicit0sdisables the scanner.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/config/loader.go` around lines 795 - 819, The code currently treats a malformed scheduler.retry_failure_window as 0 and loses the difference between "parse failed" and "explicit 0", so update the parsing and defaulting: change l.parseDuration("scheduler.retry_failure_window", ...) to return a (time.Duration, bool) or provide a new helper (e.g., l.parseDurationOK or l.parseDurationWithStatus) that indicates parse success, set cfg.Scheduler.RetryFailureWindow only when parse succeeded, and adjust setSchedulerDefaults (the setSchedulerDefaults method and usage of cfg.Scheduler.RetryFailureWindow) to only treat 0 as a deliberate disable when the parse succeeded for that key; otherwise apply the 24*time.Hour default when the key was missing or malformed. Ensure references to l.parseDuration and setSchedulerDefaults are updated accordingly.
🧹 Nitpick comments (3)
internal/cmd/retry.go (1)
118-127: Consider removing redundant parameters.The
_ exec.DAGRunAttemptparameter is unused, anddagRunIDis only used for logging but could be derived fromstatus.DAGRunID. Consider simplifying the signature:♻️ Suggested simplification
-func enqueueRetry(ctx *Context, _ exec.DAGRunAttempt, dag *core.DAG, status *exec.DAGRunStatus, dagRunID string) error { +func enqueueRetry(ctx *Context, dag *core.DAG, status *exec.DAGRunStatus) error { if err := exec.EnqueueRetry(ctx.Context, ctx.DAGRunStore, ctx.QueueStore, dag, status); err != nil { return err } logger.Info(ctx, "Enqueued retry; will run when queue capacity is available", tag.DAG(dag.Name), - tag.RunID(dagRunID), + tag.RunID(status.DAGRunID), ) return nil }Update the call site at line 89 accordingly.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmd/retry.go` around lines 118 - 127, The enqueueRetry function currently accepts an unused _ exec.DAGRunAttempt and a dagRunID that is only used for logging; change the signature of enqueueRetry(ctx *Context, dag *core.DAG, status *exec.DAGRunStatus) error to remove the unused DAGRunAttempt and derive the run ID from status.DAGRunID for logging, keep the call to exec.EnqueueRetry(ctx.Context, ctx.DAGRunStore, ctx.QueueStore, dag, status) as-is, update the logger.Info tag.RunID argument to use status.DAGRunID, and then update all call sites (the place that passed the removed exec.DAGRunAttempt and dagRunID) to call the new signature accordingly.api/v1/api.yaml (1)
7451-7453: ConstrainretryCountto non-negative values.
retryCountshould not accept negative integers in the API contract.Suggested diff
retryCount: type: integer + minimum: 0 description: "Number of completed retry attempts for this DAG-run"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/v1/api.yaml` around lines 7451 - 7453, The OpenAPI schema for the property retryCount currently allows negative integers; update the api/v1/api.yaml schema for the retryCount property to constrain it to non-negative values by adding a minimum: 0 (and optionally exclusiveMinimum: false if your linter requires it) so that retryCount only accepts integers >= 0; locate the retryCount definition in the file (the property named retryCount under the relevant schema) and add the minimum constraint.api/v1/api.gen.go (1)
1208-1208: KeepRetryCountdocumented consistently across the public DAG-run models.
RetryCountis documented on the summary type below, but not on this details type. Since this file is generated, please add the same description in the source schema and regenerate so the API docs stay aligned.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/v1/api.gen.go` at line 1208, The generated details model is missing the RetryCount description – add the same documentation that exists on the summary model to the source schema for the details model (the field named RetryCount) and then regenerate the API bindings so the generated struct field for RetryCount (RetryCount int `json:"retryCount"`) includes the description in api.gen.go; update the OpenAPI/Swagger/source schema entry for the details model's RetryCount property to match the summary type, then run the generator.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/core/dag.go`:
- Around line 375-390: The retry-policy validation currently uses a switch in
the Validate() logic (the d.RetryPolicy block) which short-circuits after the
first failing field; change it to perform independent checks for each field
(d.RetryPolicy.Limit, d.RetryPolicy.Interval, d.RetryPolicy.Backoff,
d.RetryPolicy.MaxInterval) and append a NewValidationError for each failing
condition so all retry-policy errors are collected and returned by Validate().
In `@internal/core/exec/enqueue_retry.go`:
- Around line 32-71: The current CAS in enqueue_retry.go flips the latest
attempt to Queued then calls queueStore.Enqueue, which can leave a run stranded
if the process dies between those steps; implement a reconciliation for Queued
runs without a corresponding queue item (or make queue enqueue atomic with
status) by adding a scanner or background job that queries dagRunStore for
DAGRunStatus records with Status==core.Queued and verifies presence in
queueStore (use QueueStore methods) and for missing queue entries either
re-enqueues the DAGRun via queueStore.Enqueue or roll back the DAGRunStatus to
the previous status using dagRunStore.CompareAndSwapLatestAttemptStatus (the
same CAS helper used in enqueue_retry.go) to ensure eventual consistency. Ensure
the reconciliation is idempotent and uses the same AttemptID/DAGRun identifiers
as in enqueue_retry.go so retries are not duplicated.
In `@internal/core/spec/dag.go`:
- Around line 869-895: The parseDAGRetryInterval function returns a non-empty
intervalStr ("60") only for the nil/default branch but returns "" for all
successfully parsed numeric branches; make intervalStr consistent by returning
the original numeric value as a string on all successful parses (e.g., for int,
int64, uint64 branches return fmt.Sprintf("%d", value) along with the computed
time.Duration), and keep the error branches returning "", so callers (e.g.,
consumers expecting IntervalSecStr) always get a string representation when
parsing succeeds.
In `@internal/service/coordinator/handler_test.go`:
- Around line 100-108: The mock CompareAndSwapLatestAttemptStatus currently
no-ops and always returns false; change it to implement real CAS semantics: add
a stored current *exec.DAGRunStatus on mockDAGRunStore (protected by a mutex),
then in CompareAndSwapLatestAttemptStatus lock, copy the current status, call
the provided mutate func on the copy, and if mutate returns nil replace the
stored status with the mutated copy and return the updated *exec.DAGRunStatus,
true, nil; if mutate returns an error return nil, false, err; if mutate returns
nil but you decide not to swap return the original status, false, nil. Use the
method name CompareAndSwapLatestAttemptStatus and fields (e.g.,
latestAttemptStatus, mu) so tests exercising CAS behavior will observe real
retries/updates.
In `@internal/service/frontend/api/v1/dagruns.go`:
- Around line 1668-1673: In enqueueRetry, add a nil-status guard after calling
attempt.ReadStatus to avoid passing a nil status to exec.EnqueueRetry;
specifically, after attempt.ReadStatus(ctx) in function enqueueRetry, check if
status == nil and return a descriptive error (e.g., fmt.Errorf("nil status from
attempt.ReadStatus")) instead of calling exec.EnqueueRetry with a nil value,
preserving use of a.dagRunStore, a.queueStore and dag when invoking
exec.EnqueueRetry.
In `@internal/service/scheduler/health.go`:
- Around line 77-90: The log currently prints tag.Port(h.port) which can be
0/unset when using newHealthServerWithAddr or ephemeral ports; update the
startup log in the goroutine to log the actual bound address h.boundAddr (e.g.,
replace tag.Port(h.port) with a tag/value using h.boundAddr) and ensure the
package import list includes "log/slog" (or the appropriate slog/tag helper used
by logger) so the bound address can be logged for accurate diagnostics; modify
the logger.Info call that references h.port and confirm h.boundAddr is set
before logging.
In `@internal/service/scheduler/retry_scanner.go`:
- Around line 123-129: The current call to s.dagRunStore.ListStatuses uses
exec.WithFrom(from), which filters by run creation time and causes runs that
started before the window but failed inside it to be skipped; remove
exec.WithFrom(from) from the ListStatuses call (call ListStatuses with
exec.WithExactName(dag.Name), exec.WithStatuses([...Failed...]) and
exec.WithoutLimit()) and then post-filter the returned failedRuns by each
run.FinishedAt (e.g., include only runs whose FinishedAt falls within the
retry_failure_window "from"..now range), or if the store supports it add/replace
the query option with one that filters by FinishedAt instead of creation time;
update any related logic that assumes the pre-filtering.
---
Outside diff comments:
In `@internal/cmn/config/loader.go`:
- Around line 795-819: The code currently treats a malformed
scheduler.retry_failure_window as 0 and loses the difference between "parse
failed" and "explicit 0", so update the parsing and defaulting: change
l.parseDuration("scheduler.retry_failure_window", ...) to return a
(time.Duration, bool) or provide a new helper (e.g., l.parseDurationOK or
l.parseDurationWithStatus) that indicates parse success, set
cfg.Scheduler.RetryFailureWindow only when parse succeeded, and adjust
setSchedulerDefaults (the setSchedulerDefaults method and usage of
cfg.Scheduler.RetryFailureWindow) to only treat 0 as a deliberate disable when
the parse succeeded for that key; otherwise apply the 24*time.Hour default when
the key was missing or malformed. Ensure references to l.parseDuration and
setSchedulerDefaults are updated accordingly.
---
Nitpick comments:
In `@api/v1/api.gen.go`:
- Line 1208: The generated details model is missing the RetryCount description –
add the same documentation that exists on the summary model to the source schema
for the details model (the field named RetryCount) and then regenerate the API
bindings so the generated struct field for RetryCount (RetryCount int
`json:"retryCount"`) includes the description in api.gen.go; update the
OpenAPI/Swagger/source schema entry for the details model's RetryCount property
to match the summary type, then run the generator.
In `@api/v1/api.yaml`:
- Around line 7451-7453: The OpenAPI schema for the property retryCount
currently allows negative integers; update the api/v1/api.yaml schema for the
retryCount property to constrain it to non-negative values by adding a minimum:
0 (and optionally exclusiveMinimum: false if your linter requires it) so that
retryCount only accepts integers >= 0; locate the retryCount definition in the
file (the property named retryCount under the relevant schema) and add the
minimum constraint.
In `@internal/cmd/retry.go`:
- Around line 118-127: The enqueueRetry function currently accepts an unused _
exec.DAGRunAttempt and a dagRunID that is only used for logging; change the
signature of enqueueRetry(ctx *Context, dag *core.DAG, status
*exec.DAGRunStatus) error to remove the unused DAGRunAttempt and derive the run
ID from status.DAGRunID for logging, keep the call to
exec.EnqueueRetry(ctx.Context, ctx.DAGRunStore, ctx.QueueStore, dag, status)
as-is, update the logger.Info tag.RunID argument to use status.DAGRunID, and
then update all call sites (the place that passed the removed exec.DAGRunAttempt
and dagRunID) to call the new signature accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 525aaab2-994a-4ca0-ba50-db6acfb8b43f
⛔ Files ignored due to path filters (1)
proto/index/v1/index.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (45)
api/v1/api.gen.goapi/v1/api.yamlinternal/cmd/retry.gointernal/cmn/config/config.gointernal/cmn/config/definition.gointernal/cmn/config/loader.gointernal/cmn/config/loader_test.gointernal/cmn/telemetry/collector_test.gointernal/core/dag.gointernal/core/exec/dagrun.gointernal/core/exec/enqueue_retry.gointernal/core/exec/enqueue_retry_test.gointernal/core/exec/runstatus.gointernal/core/spec/dag.gointernal/core/spec/loader_test.gointernal/intg/queue/fixture_test.gointernal/intg/queue/queue_test.gointernal/persis/filebaseconfig/default_base_config.gointernal/persis/filebaseconfig/store_test.gointernal/persis/filedagrun/dagrun.gointernal/persis/filedagrun/dagrunindex/dagrunindex.gointernal/persis/filedagrun/dataroot.gointernal/persis/filedagrun/store.gointernal/runtime/agent/agent.gointernal/runtime/agent/agent_test.gointernal/runtime/agent/dbclient_test.gointernal/runtime/agent/retry_behavior_test.gointernal/runtime/runner.gointernal/runtime/transform/status.gointernal/service/coordinator/handler_test.gointernal/service/frontend/api/v1/dagruns.gointernal/service/frontend/api/v1/transformer.gointernal/service/frontend/api/v1/transformer_test.gointernal/service/scheduler/health.gointernal/service/scheduler/health_test.gointernal/service/scheduler/retry_scanner.gointernal/service/scheduler/retry_scanner_test.gointernal/service/scheduler/scheduler.gointernal/service/scheduler/suspend_name.gointernal/service/scheduler/tick_planner.gointernal/service/scheduler/zombie_detector_test.gointernal/service/worker/remote_handler.gointernal/service/worker/remote_handler_test.goproto/index/v1/index.protoui/src/api/v1/schema.ts
# Conflicts: # internal/intg/queue/fixture_test.go # internal/intg/queue/queue_test.go # internal/service/worker/remote_handler.go
# Conflicts: # internal/cmn/config/config.go # internal/cmn/config/definition.go # internal/cmn/config/loader.go # internal/cmn/config/loader_test.go # internal/intg/queue/fixture_test.go
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Actionable comments posted: 8
♻️ Duplicate comments (5)
internal/service/scheduler/health.go (1)
104-106:⚠️ Potential issue | 🟡 MinorLog the bound address instead of the configured port.
Line 105 still logs
tag.Port(h.port), which is misleading for dynamic/addr-based binding. Use the actual bound address captured at startup.🔧 Suggested fix
import ( "context" "encoding/json" "fmt" + "log/slog" "net" "net/http" "sync" "time" @@ - go func(server *http.Server, listener net.Listener) { - logger.Info(ctx, "Starting scheduler health check server", tag.Port(h.port)) + go func(server *http.Server, listener net.Listener, boundAddr string) { + logger.Info(ctx, "Starting scheduler health check server", slog.String("addr", boundAddr)) if err := server.Serve(listener); err != nil && err != http.ErrServerClosed { logger.Error(ctx, "Health check server error", tag.Error(err)) } - }(server, listener) + }(server, listener, boundAddr)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/scheduler/health.go` around lines 104 - 106, The log currently uses the configured h.port (logger.Info(..., tag.Port(h.port))) which is misleading for dynamic or address-based binding; update the goroutine that starts the server (the anonymous func that takes server *http.Server, listener net.Listener) to log the actual bound address by using listener.Addr().String() (or the appropriate tag helper, e.g., tag.Addr(listener.Addr().String())) instead of tag.Port(h.port) so the message reflects the real bind address.internal/service/frontend/api/v1/dagruns.go (1)
1669-1673:⚠️ Potential issue | 🟡 MinorAdd a nil-status guard before calling
exec.EnqueueRetry.
attempt.ReadStatus(ctx)can still yield a nil status; passing that through risks a panic or unclear failure downstream.✅ Suggested defensive fix
func (a *API) enqueueRetry(ctx context.Context, attempt exec.DAGRunAttempt, dag *core.DAG) error { status, err := attempt.ReadStatus(ctx) if err != nil { return fmt.Errorf("error reading status: %w", err) } + if status == nil { + return fmt.Errorf("error reading status: status data is nil") + } if err := exec.EnqueueRetry(ctx, a.dagRunStore, a.queueStore, dag, status, exec.EnqueueRetryOptions{}); err != nil { if errors.Is(err, exec.ErrRetryStaleLatest) { return &Error{🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/frontend/api/v1/dagruns.go` around lines 1669 - 1673, The call to attempt.ReadStatus(ctx) can return a nil status which is then passed into exec.EnqueueRetry causing potential panics; add a nil-status guard after the ReadStatus call: check if status == nil and return a clear error (e.g., fmt.Errorf("nil status from attempt.ReadStatus")) before calling exec.EnqueueRetry, so exec.EnqueueRetry(a.dagRunStore, a.queueStore, dag, status, exec.EnqueueRetryOptions{}) always receives a non-nil status.internal/service/coordinator/handler_test.go (1)
100-108:⚠️ Potential issue | 🟠 MajorImplement real CAS behavior in this mock.
This stub never calls
mutateand always reportsswapped=false, so any coordinator path that starts depending on CAS semantics will be untestable here or fail for the wrong reason. Please mirror the lightweight in-memory CAS behavior used by the otherDAGRunStoretest doubles instead of returning(nil, false, nil)unconditionally.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/coordinator/handler_test.go` around lines 100 - 108, The mock CompareAndSwapLatestAttemptStatus currently always returns (nil,false,nil) and never calls the provided mutate callback; update mockDAGRunStore.CompareAndSwapLatestAttemptStatus to implement an in-memory CAS: load the current *exec.DAGRunStatus from the mock's internal storage, clone it, call the provided mutate func on the clone, compare the pre-mutation and post-mutation values (or compare expected status string argument against current status), and if different replace the stored status and return the new status with swapped=true, otherwise return the current status with swapped=false; preserve and propagate any error from the mutate callback and match the behavior used by the other DAGRunStore test doubles so tests depending on CAS semantics exercise real mutation logic.internal/core/exec/enqueue_retry.go (1)
44-87:⚠️ Potential issue | 🟠 MajorPersisting
Queuedbefore enqueue still leaves a crash gap.If the process dies after the CAS succeeds but before
queueStore.Enqueuedurably writes the item, the run is leftQueuedbut absent from the queue. The retry scanner only revisits failed runs, so that retry is stranded indefinitely. This still needs a queued-run reconciliation path or an atomic/outbox-style enqueue strategy.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/core/exec/enqueue_retry.go` around lines 44 - 87, The current flow sets status to Queued via dagRunStore.CompareAndSwapLatestAttemptStatus and only then calls queueStore.Enqueue, which can leave a run marked Queued but not present in the durable queue if the process crashes; change to an atomic/outbox or reconciliation approach: either (A) persist the queue entry and the DAGRun status in the same transactional boundary (or via a single CompareAndSwapLatestAttemptStatus callback that also writes a durable outbox record) so enqueue is durable before returning, or (B) if transactions aren’t possible, write a durable outbox record (e.g., a QueueEntry row) inside the CompareAndSwapLatestAttemptStatus update and have a separate dispatcher move outbox->queue, or (C) add a reconciliation pass in the retry scanner to find DAGRunStatus entries with Status==core.Queued but missing queue entries and re-enqueue them; update logic around queueStore.Enqueue, dagRunStore.CompareAndSwapLatestAttemptStatus, DAGRunStatus fields and the rollback branch to reflect the chosen approach so no run can remain Queued without a corresponding durable queue entry.internal/service/scheduler/retry_scanner.go (1)
87-91:⚠️ Potential issue | 🟠 MajorRetry window filtering is still tied to pre-query time filter, which can miss valid recent failures.
At Line 90,
exec.WithFrom(from)is applied beforeretryReferenceTime(...)is evaluated. IfWithFromfilters by creation time, long-running DAGs that failed inside the retry window are skipped entirely.Proposed fix
func (s *RetryScanner) scan(ctx context.Context) error { now := s.clock().UTC() - from := exec.NewUTC(now.Add(-s.retryWindow)) + windowStart := now.Add(-s.retryWindow) failedRuns, err := s.dagRunStore.ListStatuses( ctx, exec.WithStatuses([]core.Status{core.Failed}), - exec.WithFrom(from), exec.WithoutLimit(), ) if err != nil { return err } for _, listed := range failedRuns { if listed == nil { continue } + refTime, ok := retryReferenceTime(listed) + if !ok || refTime.Before(windowStart) || refTime.After(now) { + continue + } if err := s.processFailedRun(ctx, listed, now); err != nil { logger.Error(ctx, "Retry scanner failed to process DAG run", tag.DAG(listed.Name),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/scheduler/retry_scanner.go` around lines 87 - 91, The query is applying exec.WithFrom(from) when calling s.dagRunStore.ListStatuses (with exec.WithStatuses([]core.Status{core.Failed})), which uses the pre-query "from" time and can skip DAG runs that failed inside the retry window; update the call so the time window used by ListStatuses is computed from retryReferenceTime(...) (or remove exec.WithFrom and instead pass a computed from based on retryReferenceTime) so the DB query includes runs that failed within the retry window, or alternatively keep exec.WithFrom but expand it to min(from, retryReferenceTime(...)); modify the invocation of s.dagRunStore.ListStatuses to use that adjusted time boundary to ensure long-running DAGs that failed recently are not excluded.
🧹 Nitpick comments (7)
internal/intg/queue/fixture_test.go (1)
72-74:WithRetryWindow(0)currently cannot override config to zero.Line 72 only applies the value when
f.retryWindow > 0, so callers cannot explicitly setscheduler.retry_failure_windowto0even thoughWithRetryWindowis documented as an override.💡 Suggested refactor
type fixture struct { @@ - retryWindow time.Duration + retryWindow *time.Duration @@ test.WithConfigMutator(func(c *config.Config) { @@ - if f.retryWindow > 0 { - c.Scheduler.RetryFailureWindow = f.retryWindow + if f.retryWindow != nil { + c.Scheduler.RetryFailureWindow = *f.retryWindow } @@ func WithRetryWindow(window time.Duration) func(*fixture) { - return func(f *fixture) { f.retryWindow = window } + return func(f *fixture) { f.retryWindow = &window } }Also applies to: 119-122
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/intg/queue/fixture_test.go` around lines 72 - 74, The fixture currently only applies f.retryWindow when f.retryWindow > 0 so callers cannot set scheduler RetryFailureWindow to 0; update the API by adding a boolean flag (e.g., retryWindowSet) to the fixture and set it inside WithRetryWindow, then replace the conditional "if f.retryWindow > 0" with "if f.retryWindowSet { c.Scheduler.RetryFailureWindow = f.retryWindow }" so zero is treated as a valid override; apply the same change to the similar block around the 119-122 area.internal/cmn/config/loader_test.go (1)
240-241: Consider adding explicit override/invalid parsing tests forRetryFailureWindow.These assertions validate defaults well, but adding one env/YAML override case (and one invalid duration warning case) would better lock in loader behavior for the new field.
Also applies to: 526-527
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/config/loader_test.go` around lines 240 - 241, Add two tests for the new RetryFailureWindow field in internal/cmn/config/loader_test.go: extend the existing default-assertion test(s) to include an env/YAML override case that sets RetryFailureWindow to a known duration (e.g., "30m") and asserts the loaded config.RetryFailureWindow equals 30*time.Minute, and add an invalid-duration case that supplies a bad string (e.g., "not-a-duration") and asserts the loader logs a parse warning and falls back to the default value; place these assertions alongside the existing default checks so they exercise the same loader code path that reads RetryFailureWindow.internal/persis/filebaseconfig/default_base_config.go (1)
49-54: Consider backoff/jitter defaults for fleet-wide retries.Using a fixed 5s interval at base scope can synchronize retries after shared outages. Consider showing
backoff(and optionallymax_interval_sec) in the default policy to reduce retry spikes.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/persis/filebaseconfig/default_base_config.go` around lines 49 - 54, The default retry_policy uses a fixed interval_sec (5) which can synchronize retries; update the base default in default_base_config.go so retry_policy includes a backoff field (e.g., "backoff: true" or an exponential backoff config) and an optional max_interval_sec to cap growth, and replace or augment interval_sec with a base_interval_sec plus jitter/backoff semantics; specifically edit the retry_policy block (the retry_policy object and its keys limit, interval_sec) to add backoff and max_interval_sec fields and ensure consumers of retry_policy (e.g., any code reading retry_policy) honor base_interval_sec with backoff and jitter.api/v1/api.yaml (1)
7451-7453: ConstrainautoRetryCountto non-negative values.
autoRetryCountrepresents a consumed retry count, so allowing negatives weakens validation. Addminimum: 0(and optionallydefault: 0).♻️ Proposed schema tightening
autoRetryCount: type: integer + minimum: 0 + default: 0 description: "Number of scheduler-issued DAG auto-retries already consumed for this DAG-run"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/v1/api.yaml` around lines 7451 - 7453, The property autoRetryCount in the OpenAPI schema should be constrained to non-negative values: update the autoRetryCount definition (the integer property named autoRetryCount) to include minimum: 0 and optionally default: 0 so the schema enforces that consumed retry counts cannot be negative.internal/runtime/agent/agent.go (1)
1091-1104: Remove duplicate comment.Line 1093 contains a duplicate of the comment that should only appear at line 1102. This appears to be a copy-paste artifact.
🧹 Remove duplicate comment
// If the current execution is based on a persisted target, copy timing data // from that target. Otherwise, use the schedule time provided directly. - // Otherwise, use the schedule time provided directly via CLI flag. if source != nil {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/runtime/agent/agent.go` around lines 1091 - 1104, Remove the duplicated comment above the timing-copy logic: keep the explanatory comment that describes copying timing data from a persisted target and using the schedule time only once (the version currently at the later block), and delete the repeated sentence immediately before the "if source != nil" block that duplicates it; ensure the logic around variables source, opts, transform.WithQueuedAt, transform.WithCreatedAt, and transform.WithScheduleTime remains unchanged.internal/cmd/retry.go (1)
123-129: Consider preserving the original error with%wfor better debugging.The error wrapping at line 126 loses the original
exec.ErrRetryStaleLatesterror context. While the message is descriptive, preserving the wrapped error allows callers to useerrors.Is()for programmatic error handling.🔧 Proposed fix
if err := exec.EnqueueRetry(ctx.Context, ctx.DAGRunStore, ctx.QueueStore, dag, status, exec.EnqueueRetryOptions{}); err != nil { if errors.Is(err, exec.ErrRetryStaleLatest) { - return fmt.Errorf("dag-run state changed before retry could be queued") + return fmt.Errorf("dag-run state changed before retry could be queued: %w", err) } return err }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmd/retry.go` around lines 123 - 129, In enqueueRetry, preserve the original err when returning the wrapped error so callers can inspect it; change the fmt.Errorf call that returns "dag-run state changed before retry could be queued" to wrap the original err using %w (referencing exec.ErrRetryStaleLatest and the err variable from exec.EnqueueRetry) so errors.Is/As can detect the underlying exec.ErrRetryStaleLatest.internal/service/scheduler/scheduler_test.go (1)
66-71: Preferrequirein the new timeout/error branches.The new helpers and subtests mostly use
require, but these added branches fall back tot.Fatal. Usingrequire.FailNow/require.NoErrorhere would keep failures consistent with the rest of the file.As per coding guidelines, "
**/*_test.go: Use stretchr/testify assertions for testing in Go`."Also applies to: 150-155, 163-168, 253-267, 302-313, 381-395
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/scheduler/scheduler_test.go` around lines 66 - 71, Replace t.Fatal calls in the select default branches with the appropriate testify/require calls to keep tests consistent: for the select reading restartScheduleTimeCh replace t.Fatal("restart schedule time was not recorded") with require.FailNow(t, "restart schedule time was not recorded") (and similarly replace other t.Fatal uses at the referenced blocks with require.FailNow or require.NoError as appropriate). Locate the select blocks that read from restartScheduleTimeCh and the other listed branches (around the checks at 150-155, 163-168, 253-267, 302-313, 381-395) and swap t.Fatal into require.FailNow (or require.NoError/require.True/False matching the existing assertions) so all test failures use stretchr/testify.require.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/cmn/config/config.go`:
- Around line 475-492: The validateScheduler function currently omits validation
for scheduler.failure_threshold; update the function (validateScheduler) to
reject negative values by checking c.Scheduler.FailureThreshold (or the exact
struct field name for failure_threshold) and returning an error like
"scheduler.failure_threshold must be >= 0" when it's < 0, consistent with the
other scheduler bounds checks.
In `@internal/cmn/schema/dag.schema.json`:
- Around line 640-740: The schemas allow unknown properties because they lack
additionalProperties: false; add additionalProperties: false to dagRetryPolicy
so unknown keys are rejected, and replace stepRetryPolicy's allOf + $ref to
retryPolicyCommon by inlining the properties from retryPolicyCommon (limit,
interval_sec, backoff, max_interval_sec with their existing oneOf/type
constraints) into the stepRetryPolicy object and then add additionalProperties:
false to that single combined stepRetryPolicy definition; keep retryPolicyCommon
if still used elsewhere or remove if not.
In `@internal/core/retry.go`:
- Around line 13-22: Clamp negative attemptCount to zero before computing the
backoff in CalculateBackoffInterval; inside the function ensure attemptCount is
set to max(0, attemptCount) (or equivalent) prior to using it in math.Pow so the
exponent is non-negative, then proceed with the existing sleepTime calculation
and maxInterval comparison.
In `@internal/core/spec/dag.go`:
- Around line 889-914: The parseDAGRetryMaxInterval function rejects numeric
strings while its siblings use parseConcreteRetryInt; modify
parseDAGRetryMaxInterval to delegate parsing to parseConcreteRetryInt
(preserving the "retry_policy.max_interval_sec" field name in any returned
validation errors), handle a nil value by returning time.Hour, and convert the
parsed int64/uint result seconds into a time.Duration (time.Second *
parsedValue) before returning so string-typed YAML like "60" is accepted
consistently with parseConcreteRetryInt.
In `@internal/intg/distr/fixtures_test.go`:
- Around line 264-288: The test hardcodes a 5-second wait when checking
scheduler startup, ignoring the caller-provided timeout; update the
require.Eventually call in startScheduler so its total wait duration uses the
passed-in timeout (the same timeout used to create schedulerCtx) instead of
5*time.Second, ensuring f.schedulerCtx/f.schedulerCancel and the scheduler start
check (f.scheduler.IsRunning() and pollSchedulerErr()) honor the caller's
timeout.
In `@internal/intg/sched_test.go`:
- Around line 81-87: The final receive from errCh in the shutdown path should
accept context cancellation errors instead of strictly requiring no error;
update the block that reads from errCh (around schedulerStopped, errCh, Stop and
the goroutine started by Start(ctx)) to treat errors.Is(err, context.Canceled)
or errors.Is(err, context.DeadlineExceeded) as acceptable outcomes and only fail
the test for other unexpected errors—i.e., replace the unconditional
require.NoError(t, err) with a conditional that passes for nil OR
context.Canceled OR context.DeadlineExceeded and fails on any other error.
In `@internal/runtime/runner.go`:
- Around line 813-815: The Status() method currently returns r.forcedStatus
immediately, causing runEventHandler()/setupEnvironEventHandler() to export a
terminal DAG_RUN_STATUS prematurely; change Status() so it only applies/returns
r.forcedStatus once the run is actually terminal (e.g., after the plan/state
indicates completion) — leave normal NotStarted/Running/Waiting checks to run
until the run reaches a terminal state, then override with forcedStatus; update
references to forcedStatus and Status() accordingly so onInit and other
pre-terminal handlers see the real in-progress status until terminal.
In `@internal/service/scheduler/scheduler_test.go`:
- Around line 143-155: The test uses a fixed time.Sleep to assume sc2 has
reached the lock-acquisition wait, which is flaky; add an explicit
synchronization point so the test waits for the scheduler to actually begin
waiting for the lock. Modify the scheduler Start path to invoke an optional
test-only callback (e.g., an onLockWait func or channel send) right before or
when it blocks trying to acquire the lock, then in the test replace the Sleep:
create a waitStarted channel, pass it into sc2 (or set the test hook), start sc2
in a goroutine, block on <-waitStarted to guarantee sc2 is blocked on the lock,
then assert behavior and cancel; apply the same replacement for the second
occurrence around lines 288-305 to remove timing-based sleeps.
---
Duplicate comments:
In `@internal/core/exec/enqueue_retry.go`:
- Around line 44-87: The current flow sets status to Queued via
dagRunStore.CompareAndSwapLatestAttemptStatus and only then calls
queueStore.Enqueue, which can leave a run marked Queued but not present in the
durable queue if the process crashes; change to an atomic/outbox or
reconciliation approach: either (A) persist the queue entry and the DAGRun
status in the same transactional boundary (or via a single
CompareAndSwapLatestAttemptStatus callback that also writes a durable outbox
record) so enqueue is durable before returning, or (B) if transactions aren’t
possible, write a durable outbox record (e.g., a QueueEntry row) inside the
CompareAndSwapLatestAttemptStatus update and have a separate dispatcher move
outbox->queue, or (C) add a reconciliation pass in the retry scanner to find
DAGRunStatus entries with Status==core.Queued but missing queue entries and
re-enqueue them; update logic around queueStore.Enqueue,
dagRunStore.CompareAndSwapLatestAttemptStatus, DAGRunStatus fields and the
rollback branch to reflect the chosen approach so no run can remain Queued
without a corresponding durable queue entry.
In `@internal/service/coordinator/handler_test.go`:
- Around line 100-108: The mock CompareAndSwapLatestAttemptStatus currently
always returns (nil,false,nil) and never calls the provided mutate callback;
update mockDAGRunStore.CompareAndSwapLatestAttemptStatus to implement an
in-memory CAS: load the current *exec.DAGRunStatus from the mock's internal
storage, clone it, call the provided mutate func on the clone, compare the
pre-mutation and post-mutation values (or compare expected status string
argument against current status), and if different replace the stored status and
return the new status with swapped=true, otherwise return the current status
with swapped=false; preserve and propagate any error from the mutate callback
and match the behavior used by the other DAGRunStore test doubles so tests
depending on CAS semantics exercise real mutation logic.
In `@internal/service/frontend/api/v1/dagruns.go`:
- Around line 1669-1673: The call to attempt.ReadStatus(ctx) can return a nil
status which is then passed into exec.EnqueueRetry causing potential panics; add
a nil-status guard after the ReadStatus call: check if status == nil and return
a clear error (e.g., fmt.Errorf("nil status from attempt.ReadStatus")) before
calling exec.EnqueueRetry, so exec.EnqueueRetry(a.dagRunStore, a.queueStore,
dag, status, exec.EnqueueRetryOptions{}) always receives a non-nil status.
In `@internal/service/scheduler/health.go`:
- Around line 104-106: The log currently uses the configured h.port
(logger.Info(..., tag.Port(h.port))) which is misleading for dynamic or
address-based binding; update the goroutine that starts the server (the
anonymous func that takes server *http.Server, listener net.Listener) to log the
actual bound address by using listener.Addr().String() (or the appropriate tag
helper, e.g., tag.Addr(listener.Addr().String())) instead of tag.Port(h.port) so
the message reflects the real bind address.
In `@internal/service/scheduler/retry_scanner.go`:
- Around line 87-91: The query is applying exec.WithFrom(from) when calling
s.dagRunStore.ListStatuses (with exec.WithStatuses([]core.Status{core.Failed})),
which uses the pre-query "from" time and can skip DAG runs that failed inside
the retry window; update the call so the time window used by ListStatuses is
computed from retryReferenceTime(...) (or remove exec.WithFrom and instead pass
a computed from based on retryReferenceTime) so the DB query includes runs that
failed within the retry window, or alternatively keep exec.WithFrom but expand
it to min(from, retryReferenceTime(...)); modify the invocation of
s.dagRunStore.ListStatuses to use that adjusted time boundary to ensure
long-running DAGs that failed recently are not excluded.
---
Nitpick comments:
In `@api/v1/api.yaml`:
- Around line 7451-7453: The property autoRetryCount in the OpenAPI schema
should be constrained to non-negative values: update the autoRetryCount
definition (the integer property named autoRetryCount) to include minimum: 0 and
optionally default: 0 so the schema enforces that consumed retry counts cannot
be negative.
In `@internal/cmd/retry.go`:
- Around line 123-129: In enqueueRetry, preserve the original err when returning
the wrapped error so callers can inspect it; change the fmt.Errorf call that
returns "dag-run state changed before retry could be queued" to wrap the
original err using %w (referencing exec.ErrRetryStaleLatest and the err variable
from exec.EnqueueRetry) so errors.Is/As can detect the underlying
exec.ErrRetryStaleLatest.
In `@internal/cmn/config/loader_test.go`:
- Around line 240-241: Add two tests for the new RetryFailureWindow field in
internal/cmn/config/loader_test.go: extend the existing default-assertion
test(s) to include an env/YAML override case that sets RetryFailureWindow to a
known duration (e.g., "30m") and asserts the loaded config.RetryFailureWindow
equals 30*time.Minute, and add an invalid-duration case that supplies a bad
string (e.g., "not-a-duration") and asserts the loader logs a parse warning and
falls back to the default value; place these assertions alongside the existing
default checks so they exercise the same loader code path that reads
RetryFailureWindow.
In `@internal/intg/queue/fixture_test.go`:
- Around line 72-74: The fixture currently only applies f.retryWindow when
f.retryWindow > 0 so callers cannot set scheduler RetryFailureWindow to 0;
update the API by adding a boolean flag (e.g., retryWindowSet) to the fixture
and set it inside WithRetryWindow, then replace the conditional "if
f.retryWindow > 0" with "if f.retryWindowSet { c.Scheduler.RetryFailureWindow =
f.retryWindow }" so zero is treated as a valid override; apply the same change
to the similar block around the 119-122 area.
In `@internal/persis/filebaseconfig/default_base_config.go`:
- Around line 49-54: The default retry_policy uses a fixed interval_sec (5)
which can synchronize retries; update the base default in default_base_config.go
so retry_policy includes a backoff field (e.g., "backoff: true" or an
exponential backoff config) and an optional max_interval_sec to cap growth, and
replace or augment interval_sec with a base_interval_sec plus jitter/backoff
semantics; specifically edit the retry_policy block (the retry_policy object and
its keys limit, interval_sec) to add backoff and max_interval_sec fields and
ensure consumers of retry_policy (e.g., any code reading retry_policy) honor
base_interval_sec with backoff and jitter.
In `@internal/runtime/agent/agent.go`:
- Around line 1091-1104: Remove the duplicated comment above the timing-copy
logic: keep the explanatory comment that describes copying timing data from a
persisted target and using the schedule time only once (the version currently at
the later block), and delete the repeated sentence immediately before the "if
source != nil" block that duplicates it; ensure the logic around variables
source, opts, transform.WithQueuedAt, transform.WithCreatedAt, and
transform.WithScheduleTime remains unchanged.
In `@internal/service/scheduler/scheduler_test.go`:
- Around line 66-71: Replace t.Fatal calls in the select default branches with
the appropriate testify/require calls to keep tests consistent: for the select
reading restartScheduleTimeCh replace t.Fatal("restart schedule time was not
recorded") with require.FailNow(t, "restart schedule time was not recorded")
(and similarly replace other t.Fatal uses at the referenced blocks with
require.FailNow or require.NoError as appropriate). Locate the select blocks
that read from restartScheduleTimeCh and the other listed branches (around the
checks at 150-155, 163-168, 253-267, 302-313, 381-395) and swap t.Fatal into
require.FailNow (or require.NoError/require.True/False matching the existing
assertions) so all test failures use stretchr/testify.require.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: aa1da421-bf4b-4e5e-ac9a-3eafcb1d65d2
⛔ Files ignored due to path filters (1)
proto/index/v1/index.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (53)
api/v1/api.gen.goapi/v1/api.yamlinternal/cmd/retry.gointernal/cmn/config/config.gointernal/cmn/config/definition.gointernal/cmn/config/loader.gointernal/cmn/config/loader_test.gointernal/cmn/schema/dag.schema.jsoninternal/cmn/schema/dag_schema_test.gointernal/cmn/telemetry/collector_test.gointernal/core/dag.gointernal/core/exec/dagrun.gointernal/core/exec/enqueue_retry.gointernal/core/exec/enqueue_retry_test.gointernal/core/exec/runstatus.gointernal/core/retry.gointernal/core/spec/dag.gointernal/core/spec/loader_test.gointernal/intg/distr/fixtures_test.gointernal/intg/queue/fixture_test.gointernal/intg/queue/queue_test.gointernal/intg/sched_test.gointernal/persis/filebaseconfig/default_base_config.gointernal/persis/filebaseconfig/store_test.gointernal/persis/filedagrun/dagrun.gointernal/persis/filedagrun/dagrunindex/dagrunindex.gointernal/persis/filedagrun/dataroot.gointernal/persis/filedagrun/store.gointernal/runtime/agent/agent.gointernal/runtime/agent/agent_test.gointernal/runtime/agent/dbclient_test.gointernal/runtime/agent/retry_behavior_test.gointernal/runtime/runner.gointernal/runtime/transform/status.gointernal/service/coordinator/handler_test.gointernal/service/frontend/api/v1/dagruns.gointernal/service/frontend/api/v1/transformer.gointernal/service/frontend/api/v1/transformer_test.gointernal/service/scheduler/health.gointernal/service/scheduler/health_test.gointernal/service/scheduler/retry_scanner.gointernal/service/scheduler/retry_scanner_test.gointernal/service/scheduler/scheduler.gointernal/service/scheduler/scheduler_test.gointernal/service/scheduler/suspend_name.gointernal/service/scheduler/tick_planner.gointernal/service/scheduler/zombie_detector_test.gointernal/service/worker/remote_handler.gointernal/service/worker/remote_handler_test.gointernal/test/helper.gointernal/testdata/cli/config_test.yamlproto/index/v1/index.protoui/src/api/v1/schema.ts
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1774 +/- ##
==========================================
+ Coverage 69.63% 69.64% +0.01%
==========================================
Files 411 414 +3
Lines 47267 47761 +494
==========================================
+ Hits 32914 33265 +351
- Misses 11627 11722 +95
- Partials 2726 2774 +48
... and 8 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary
Testing
Summary by CodeRabbit
New Features
Improvements