Skip to content

Commit c10df50

Browse files
authored
feat: surface DAG auto-retry metadata in run lists (#1779)
1 parent dee19a3 commit c10df50

35 files changed

Lines changed: 1647 additions & 770 deletions

api/v1/api.gen.go

Lines changed: 501 additions & 494 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v1/api.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7451,6 +7451,10 @@ components:
74517451
autoRetryCount:
74527452
type: integer
74537453
description: "Number of scheduler-issued DAG auto-retries already consumed for this DAG-run"
7454+
autoRetryLimit:
7455+
type: integer
7456+
nullable: true
7457+
description: "Configured DAG-level automatic retry limit captured for this DAG-run; null when DAG-level automatic retry is not configured"
74547458
scheduleTime:
74557459
type: string
74567460
description: "RFC 3339 timestamp of when the DAG-run was scheduled to run"

internal/core/dag.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,19 @@ func (d *DAG) ProcGroup() string {
529529
return d.Name
530530
}
531531

532+
// SuspendFlagName returns the filename stem used by the file-based suspend
533+
// flag system. This intentionally follows DAG file naming, not dag.Name.
534+
func (d *DAG) SuspendFlagName() string {
535+
if d == nil {
536+
return ""
537+
}
538+
base := strings.TrimSuffix(filepath.Base(d.Location), filepath.Ext(d.Location))
539+
if base != "" && base != "." {
540+
return base
541+
}
542+
return d.Name
543+
}
544+
532545
// FileName returns the filename of the DAG without the extension.
533546
func (d *DAG) FileName() string {
534547
if d.Location == "" {

internal/core/exec/enqueue_retry.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,24 @@ func EnqueueRetry(
7070

7171
// Enqueue after status is persisted. If this fails, roll back the status.
7272
dagRun := status.DAGRun()
73-
if err := queueStore.Enqueue(ctx, dag.ProcGroup(), QueuePriorityLow, dagRun); err != nil {
73+
procGroup := retryProcGroup(dag, updatedStatus)
74+
if procGroup == "" {
75+
_, _, _ = dagRunStore.CompareAndSwapLatestAttemptStatus(
76+
ctx,
77+
dagRun,
78+
updatedStatus.AttemptID,
79+
core.Queued,
80+
func(latest *DAGRunStatus) error {
81+
latest.Status = status.Status
82+
latest.QueuedAt = status.QueuedAt
83+
latest.TriggerType = status.TriggerType
84+
latest.AutoRetryCount = status.AutoRetryCount
85+
return nil
86+
},
87+
)
88+
return errors.New("enqueue retry: proc group is empty")
89+
}
90+
if err := queueStore.Enqueue(ctx, procGroup, QueuePriorityLow, dagRun); err != nil {
7491
_, _, _ = dagRunStore.CompareAndSwapLatestAttemptStatus(
7592
ctx,
7693
dagRun,
@@ -89,3 +106,16 @@ func EnqueueRetry(
89106

90107
return nil
91108
}
109+
110+
func retryProcGroup(dag *core.DAG, status *DAGRunStatus) string {
111+
if status != nil && status.ProcGroup != "" {
112+
return status.ProcGroup
113+
}
114+
if dag != nil {
115+
return dag.ProcGroup()
116+
}
117+
if status != nil {
118+
return status.Name
119+
}
120+
return ""
121+
}

internal/core/exec/enqueue_retry_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,36 @@ func TestEnqueueRetry(t *testing.T) {
103103
assert.Equal(t, 3, store.status.AutoRetryCount)
104104
},
105105
},
106+
{
107+
name: "UsesPersistedProcGroupWhenDAGIsNil",
108+
status: &exec.DAGRunStatus{
109+
Name: "test-dag",
110+
DAGRunID: "run-fast-path",
111+
AttemptID: "att-fast-path",
112+
Status: core.Failed,
113+
AutoRetryCount: 1,
114+
ProcGroup: "input-queue",
115+
},
116+
store: &stubDAGRunStore{
117+
status: &exec.DAGRunStatus{
118+
Name: "test-dag",
119+
DAGRunID: "run-fast-path",
120+
AttemptID: "att-fast-path",
121+
Status: core.Failed,
122+
AutoRetryCount: 1,
123+
ProcGroup: "custom-queue",
124+
},
125+
},
126+
setupQueue: func(qs *exec.MockQueueStore) {
127+
qs.On("Enqueue", mock.Anything, "custom-queue", exec.QueuePriorityLow, exec.NewDAGRunRef("test-dag", "run-fast-path")).
128+
Return(nil)
129+
},
130+
assertStore: func(t *testing.T, store *stubDAGRunStore) {
131+
require.NotNil(t, store.status)
132+
assert.Equal(t, core.Queued, store.status.Status)
133+
assert.Equal(t, "custom-queue", store.status.ProcGroup)
134+
},
135+
},
106136
{
107137
name: "PersistQueuedStatusFails",
108138
dag: &core.DAG{Name: "test-dag"},
@@ -190,6 +220,33 @@ func TestEnqueueRetry(t *testing.T) {
190220
},
191221
wantErr: "enqueue retry",
192222
},
223+
{
224+
name: "EmptyProcGroupRollsBackQueuedStatus",
225+
status: &exec.DAGRunStatus{
226+
DAGRunID: "run-empty-group",
227+
AttemptID: "att-empty-group",
228+
Status: core.Failed,
229+
AutoRetryCount: 1,
230+
},
231+
store: &stubDAGRunStore{
232+
status: &exec.DAGRunStatus{
233+
DAGRunID: "run-empty-group",
234+
AttemptID: "att-empty-group",
235+
Status: core.Failed,
236+
AutoRetryCount: 1,
237+
},
238+
secondSwapped: true,
239+
},
240+
assertStore: func(t *testing.T, store *stubDAGRunStore) {
241+
require.NotNil(t, store.status)
242+
assert.Equal(t, core.Failed, store.status.Status)
243+
assert.Empty(t, store.status.QueuedAt)
244+
assert.Equal(t, core.TriggerTypeUnknown, store.status.TriggerType)
245+
assert.Equal(t, 1, store.status.AutoRetryCount)
246+
assert.Equal(t, 2, store.casCalls)
247+
},
248+
wantErr: "proc group is empty",
249+
},
193250
}
194251

195252
for _, tt := range tests {

internal/core/exec/runstatus.go

Lines changed: 80 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -21,59 +21,92 @@ const (
2121

2222
// InitialStatus creates an initial Status object for the given DAG
2323
func InitialStatus(dag *core.DAG) DAGRunStatus {
24+
var (
25+
autoRetryLimit int
26+
autoRetryInterval time.Duration
27+
autoRetryBackoff float64
28+
autoRetryMaxInterval time.Duration
29+
procGroup string
30+
suspendFlagName string
31+
)
32+
if dag != nil {
33+
procGroup = dag.ProcGroup()
34+
suspendFlagName = dag.SuspendFlagName()
35+
if dag.RetryPolicy != nil {
36+
autoRetryLimit = dag.RetryPolicy.Limit
37+
autoRetryInterval = dag.RetryPolicy.Interval
38+
autoRetryBackoff = dag.RetryPolicy.Backoff
39+
autoRetryMaxInterval = dag.RetryPolicy.MaxInterval
40+
}
41+
}
42+
2443
return DAGRunStatus{
25-
Name: dag.Name,
26-
Status: core.NotStarted,
27-
PID: PID(0),
28-
Nodes: NewNodesFromSteps(dag.Steps),
29-
OnInit: NewNodeOrNil(dag.HandlerOn.Init),
30-
OnExit: NewNodeOrNil(dag.HandlerOn.Exit),
31-
OnSuccess: NewNodeOrNil(dag.HandlerOn.Success),
32-
OnFailure: NewNodeOrNil(dag.HandlerOn.Failure),
33-
OnAbort: NewNodeOrNil(dag.HandlerOn.Abort),
34-
OnWait: NewNodeOrNil(dag.HandlerOn.Wait),
35-
Params: strings.Join(dag.Params, " "),
36-
ParamsList: dag.Params,
37-
AutoRetryCount: 0,
38-
CreatedAt: time.Now().UnixMilli(),
39-
StartedAt: stringutil.FormatTime(time.Time{}),
40-
FinishedAt: stringutil.FormatTime(time.Time{}),
41-
Preconditions: dag.Preconditions,
42-
Tags: dag.Tags.Strings(),
44+
Name: dag.Name,
45+
Status: core.NotStarted,
46+
PID: PID(0),
47+
Nodes: NewNodesFromSteps(dag.Steps),
48+
OnInit: NewNodeOrNil(dag.HandlerOn.Init),
49+
OnExit: NewNodeOrNil(dag.HandlerOn.Exit),
50+
OnSuccess: NewNodeOrNil(dag.HandlerOn.Success),
51+
OnFailure: NewNodeOrNil(dag.HandlerOn.Failure),
52+
OnAbort: NewNodeOrNil(dag.HandlerOn.Abort),
53+
OnWait: NewNodeOrNil(dag.HandlerOn.Wait),
54+
Params: strings.Join(dag.Params, " "),
55+
ParamsList: dag.Params,
56+
AutoRetryCount: 0,
57+
AutoRetryLimit: autoRetryLimit,
58+
AutoRetryInterval: autoRetryInterval,
59+
AutoRetryBackoff: autoRetryBackoff,
60+
AutoRetryMaxInterval: autoRetryMaxInterval,
61+
ProcGroup: procGroup,
62+
SuspendFlagName: suspendFlagName,
63+
CreatedAt: time.Now().UnixMilli(),
64+
StartedAt: stringutil.FormatTime(time.Time{}),
65+
FinishedAt: stringutil.FormatTime(time.Time{}),
66+
Preconditions: dag.Preconditions,
67+
Tags: dag.Tags.Strings(),
4368
}
4469
}
4570

4671
// DAGRunStatus represents the complete execution state of a dag-run.
4772
type DAGRunStatus struct {
48-
Root DAGRunRef `json:"root,omitzero"`
49-
Parent DAGRunRef `json:"parent,omitzero"`
50-
Name string `json:"name"`
51-
DAGRunID string `json:"dagRunId"`
52-
AttemptID string `json:"attemptId"`
53-
AttemptKey string `json:"attemptKey,omitempty"` // Globally unique attempt identifier
54-
Status core.Status `json:"status"`
55-
TriggerType core.TriggerType `json:"triggerType,omitempty"`
56-
WorkerID string `json:"workerId,omitempty"`
57-
PID PID `json:"pid,omitempty"`
58-
Nodes []*Node `json:"nodes,omitempty"`
59-
OnInit *Node `json:"onInit,omitempty"`
60-
OnExit *Node `json:"onExit,omitempty"`
61-
OnSuccess *Node `json:"onSuccess,omitempty"`
62-
OnFailure *Node `json:"onFailure,omitempty"`
63-
OnAbort *Node `json:"onAbort,omitempty"`
64-
OnWait *Node `json:"onWait,omitempty"`
65-
CreatedAt int64 `json:"createdAt,omitempty"`
66-
QueuedAt string `json:"queuedAt,omitempty"`
67-
ScheduleTime string `json:"scheduleTime,omitempty"`
68-
StartedAt string `json:"startedAt,omitempty"`
69-
FinishedAt string `json:"finishedAt,omitempty"`
70-
AutoRetryCount int `json:"autoRetryCount,omitempty"`
71-
Log string `json:"log,omitempty"`
72-
Error string `json:"error,omitempty"`
73-
Params string `json:"params,omitempty"`
74-
ParamsList []string `json:"paramsList,omitempty"`
75-
Preconditions []*core.Condition `json:"preconditions,omitempty"`
76-
Tags []string `json:"tags,omitempty"`
73+
Root DAGRunRef `json:"root,omitzero"`
74+
Parent DAGRunRef `json:"parent,omitzero"`
75+
Name string `json:"name"`
76+
DAGRunID string `json:"dagRunId"`
77+
AttemptID string `json:"attemptId"`
78+
AttemptKey string `json:"attemptKey,omitempty"` // Globally unique attempt identifier
79+
Status core.Status `json:"status"`
80+
TriggerType core.TriggerType `json:"triggerType,omitempty"`
81+
WorkerID string `json:"workerId,omitempty"`
82+
PID PID `json:"pid,omitempty"`
83+
Nodes []*Node `json:"nodes,omitempty"`
84+
OnInit *Node `json:"onInit,omitempty"`
85+
OnExit *Node `json:"onExit,omitempty"`
86+
OnSuccess *Node `json:"onSuccess,omitempty"`
87+
OnFailure *Node `json:"onFailure,omitempty"`
88+
OnAbort *Node `json:"onAbort,omitempty"`
89+
OnWait *Node `json:"onWait,omitempty"`
90+
CreatedAt int64 `json:"createdAt,omitempty"`
91+
QueuedAt string `json:"queuedAt,omitempty"`
92+
ScheduleTime string `json:"scheduleTime,omitempty"`
93+
StartedAt string `json:"startedAt,omitempty"`
94+
FinishedAt string `json:"finishedAt,omitempty"`
95+
AutoRetryCount int `json:"autoRetryCount,omitempty"`
96+
AutoRetryLimit int `json:"autoRetryLimit,omitempty"`
97+
// AutoRetryInterval is stored as a duration snapshot for retry scanner decisions.
98+
AutoRetryInterval time.Duration `json:"autoRetryInterval,omitempty"`
99+
AutoRetryBackoff float64 `json:"autoRetryBackoff,omitempty"`
100+
// AutoRetryMaxInterval is stored as a duration snapshot for retry scanner decisions.
101+
AutoRetryMaxInterval time.Duration `json:"autoRetryMaxInterval,omitempty"`
102+
ProcGroup string `json:"procGroup,omitempty"`
103+
SuspendFlagName string `json:"suspendFlagName,omitempty"`
104+
Log string `json:"log,omitempty"`
105+
Error string `json:"error,omitempty"`
106+
Params string `json:"params,omitempty"`
107+
ParamsList []string `json:"paramsList,omitempty"`
108+
Preconditions []*core.Condition `json:"preconditions,omitempty"`
109+
Tags []string `json:"tags,omitempty"`
77110
}
78111

79112
// DAGRun returns a reference to the dag-run associated with this status
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Copyright (C) 2026 Yota Hamada
2+
// SPDX-License-Identifier: GPL-3.0-or-later
3+
4+
package exec_test
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
"github.com/dagu-org/dagu/internal/core"
11+
"github.com/dagu-org/dagu/internal/core/exec"
12+
"github.com/stretchr/testify/assert"
13+
)
14+
15+
func TestInitialStatusSnapshotsDAGRetryMetadata(t *testing.T) {
16+
t.Parallel()
17+
18+
dag := &core.DAG{
19+
Name: "retry-dag",
20+
Queue: "shared-queue",
21+
Location: "/tmp/retry-dag.yaml",
22+
RetryPolicy: &core.DAGRetryPolicy{
23+
Limit: 3,
24+
Interval: 2 * time.Minute,
25+
Backoff: 2.0,
26+
MaxInterval: 10 * time.Minute,
27+
},
28+
}
29+
30+
status := exec.InitialStatus(dag)
31+
32+
assert.Equal(t, 3, status.AutoRetryLimit)
33+
assert.Equal(t, 2*time.Minute, status.AutoRetryInterval)
34+
assert.Equal(t, 2.0, status.AutoRetryBackoff)
35+
assert.Equal(t, 10*time.Minute, status.AutoRetryMaxInterval)
36+
assert.Equal(t, "shared-queue", status.ProcGroup)
37+
assert.Equal(t, "retry-dag", status.SuspendFlagName)
38+
}

internal/persis/filedagrun/dagrun.go

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,21 +52,29 @@ const JSONLStatusFile = "status.jsonl"
5252
// When non-nil, it allows filtering and constructing list responses
5353
// without reading status.jsonl.
5454
type DAGRunSummary struct {
55-
LatestAttemptDir string
56-
Status core.Status
57-
StartedAtUnix int64
58-
FinishedAtUnix int64
59-
Tags []string
60-
Name string
61-
DagRunID string
62-
WorkerID string
63-
Params string
64-
QueuedAt string
65-
ScheduleTime string
66-
TriggerType core.TriggerType
67-
CreatedAt int64
68-
AttemptID string
69-
AutoRetryCount int
55+
LatestAttemptDir string
56+
Status core.Status
57+
StartedAtUnix int64
58+
FinishedAtUnix int64
59+
Tags []string
60+
Name string
61+
DagRunID string
62+
WorkerID string
63+
Params string
64+
QueuedAt string
65+
ScheduleTime string
66+
TriggerType core.TriggerType
67+
CreatedAt int64
68+
AttemptID string
69+
AutoRetryCount int
70+
ParentName string
71+
ParentID string
72+
AutoRetryLimit int
73+
AutoRetryInterval time.Duration
74+
AutoRetryBackoff float64
75+
AutoRetryMaxInterval time.Duration
76+
ProcGroup string
77+
SuspendFlagName string
7078
}
7179

7280
// DAGRun represents a dag-run with its associated timestamp and run ID.

0 commit comments

Comments
 (0)