Skip to content

Commit 2a6e436

Browse files
committed
Merge remote-tracking branch 'origin/main' into 1856-fix-bug
# Conflicts: # internal/runtime/subcmd.go # internal/service/frontend/api/v1/dags_test.go
2 parents 0188acb + e15fc47 commit 2a6e436

44 files changed

Lines changed: 4011 additions & 424 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

internal/cmd/context.go

Lines changed: 44 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,15 @@ type Context struct {
6666
Config *config.Config
6767
Quiet bool
6868

69-
DAGRunStore exec.DAGRunStore
70-
DAGRunMgr runtime.Manager
71-
ProcStore exec.ProcStore
72-
QueueStore exec.QueueStore
73-
ServiceRegistry exec.ServiceRegistry
74-
DispatchTaskStore exec.DispatchTaskStore
75-
WorkerHeartbeatStore exec.WorkerHeartbeatStore
76-
DAGRunLeaseStore exec.DAGRunLeaseStore
69+
DAGRunStore exec.DAGRunStore
70+
DAGRunMgr runtime.Manager
71+
ProcStore exec.ProcStore
72+
QueueStore exec.QueueStore
73+
ServiceRegistry exec.ServiceRegistry
74+
DispatchTaskStore exec.DispatchTaskStore
75+
WorkerHeartbeatStore exec.WorkerHeartbeatStore
76+
DAGRunLeaseStore exec.DAGRunLeaseStore
77+
ActiveDistributedRunStore exec.ActiveDistributedRunStore
7778

7879
Proc exec.ProcHandle
7980
LicenseManager *license.Manager
@@ -83,21 +84,22 @@ type Context struct {
8384
// This is useful for creating a signal-aware context for service operations.
8485
func (c *Context) WithContext(ctx context.Context) *Context {
8586
return &Context{
86-
Context: ctx,
87-
Command: c.Command,
88-
Flags: c.Flags,
89-
Config: c.Config,
90-
Quiet: c.Quiet,
91-
DAGRunStore: c.DAGRunStore,
92-
DAGRunMgr: c.DAGRunMgr,
93-
ProcStore: c.ProcStore,
94-
QueueStore: c.QueueStore,
95-
ServiceRegistry: c.ServiceRegistry,
96-
DispatchTaskStore: c.DispatchTaskStore,
97-
WorkerHeartbeatStore: c.WorkerHeartbeatStore,
98-
DAGRunLeaseStore: c.DAGRunLeaseStore,
99-
Proc: c.Proc,
100-
LicenseManager: c.LicenseManager,
87+
Context: ctx,
88+
Command: c.Command,
89+
Flags: c.Flags,
90+
Config: c.Config,
91+
Quiet: c.Quiet,
92+
DAGRunStore: c.DAGRunStore,
93+
DAGRunMgr: c.DAGRunMgr,
94+
ProcStore: c.ProcStore,
95+
QueueStore: c.QueueStore,
96+
ServiceRegistry: c.ServiceRegistry,
97+
DispatchTaskStore: c.DispatchTaskStore,
98+
WorkerHeartbeatStore: c.WorkerHeartbeatStore,
99+
DAGRunLeaseStore: c.DAGRunLeaseStore,
100+
ActiveDistributedRunStore: c.ActiveDistributedRunStore,
101+
Proc: c.Proc,
102+
LicenseManager: c.LicenseManager,
101103
}
102104
}
103105

@@ -233,13 +235,14 @@ func NewContext(cmd *cobra.Command, flags []commandLineFlag) (*Context, error) {
233235
return nil, fmt.Errorf("failed to validate proc directory %s: %w", cfg.Paths.ProcDir, err)
234236
}
235237
drs := filedagrun.New(cfg.Paths.DAGRunsDir, hrOpts...)
238+
distributedDir := filepath.Join(cfg.Paths.DataDir, "distributed")
239+
dagRunLeaseStore := filedistributed.NewDAGRunLeaseStore(distributedDir)
240+
activeDistributedRunStore := filedistributed.NewActiveDistributedRunStore(distributedDir)
236241
drm := runtime.NewManager(drs, ps, cfg)
237242
qs := filequeue.New(cfg.Paths.QueueDir)
238243
sm := fileserviceregistry.New(cfg.Paths.ServiceRegistryDir)
239-
distributedDir := filepath.Join(cfg.Paths.DataDir, "distributed")
240244
dispatchTaskStore := filedistributed.NewDispatchTaskStore(distributedDir)
241245
workerHeartbeatStore := filedistributed.NewWorkerHeartbeatStore(distributedDir)
242-
dagRunLeaseStore := filedistributed.NewDAGRunLeaseStore(distributedDir)
243246

244247
// Initialize license manager for server commands
245248
var licMgr *license.Manager
@@ -288,20 +291,21 @@ func NewContext(cmd *cobra.Command, flags []commandLineFlag) (*Context, error) {
288291
}
289292

290293
return &Context{
291-
Context: ctx,
292-
Command: cmd,
293-
Config: cfg,
294-
Quiet: quiet,
295-
DAGRunStore: drs,
296-
DAGRunMgr: drm,
297-
Flags: flags,
298-
ProcStore: ps,
299-
QueueStore: qs,
300-
ServiceRegistry: sm,
301-
DispatchTaskStore: dispatchTaskStore,
302-
WorkerHeartbeatStore: workerHeartbeatStore,
303-
DAGRunLeaseStore: dagRunLeaseStore,
304-
LicenseManager: licMgr,
294+
Context: ctx,
295+
Command: cmd,
296+
Config: cfg,
297+
Quiet: quiet,
298+
DAGRunStore: drs,
299+
DAGRunMgr: drm,
300+
Flags: flags,
301+
ProcStore: ps,
302+
QueueStore: qs,
303+
ServiceRegistry: sm,
304+
DispatchTaskStore: dispatchTaskStore,
305+
WorkerHeartbeatStore: workerHeartbeatStore,
306+
DAGRunLeaseStore: dagRunLeaseStore,
307+
ActiveDistributedRunStore: activeDistributedRunStore,
308+
LicenseManager: licMgr,
305309
}, nil
306310
}
307311

@@ -444,6 +448,7 @@ func (c *Context) NewScheduler() (*scheduler.Scheduler, error) {
444448
return nil, err
445449
}
446450
sched.SetDAGRunLeaseStore(c.DAGRunLeaseStore)
451+
sched.SetDispatchTaskStore(c.DispatchTaskStore)
447452
return sched, nil
448453
}
449454

internal/cmd/coord.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ func runCoordinator(ctx *Context, _ []string) error {
101101
ctx.DispatchTaskStore,
102102
ctx.WorkerHeartbeatStore,
103103
ctx.DAGRunLeaseStore,
104+
ctx.ActiveDistributedRunStore,
104105
)
105106
if err != nil {
106107
return fmt.Errorf("failed to initialize coordinator: %w", err)
@@ -138,6 +139,7 @@ func newCoordinator(
138139
dispatchTaskStore exec.DispatchTaskStore,
139140
workerHeartbeatStore exec.WorkerHeartbeatStore,
140141
dagRunLeaseStore exec.DAGRunLeaseStore,
142+
activeDistributedRunStore exec.ActiveDistributedRunStore,
141143
) (*coordinator.Service, *coordinator.Handler, error) {
142144
// Generate instance ID
143145
hostname, err := os.Hostname()
@@ -217,12 +219,13 @@ func newCoordinator(
217219

218220
// Create handler with DAGRunStore for status persistence and LogDir for log streaming
219221
handler := coordinator.NewHandler(coordinator.HandlerConfig{
220-
DAGRunStore: dagRunStore,
221-
LogDir: cfg.Paths.LogDir,
222-
Owner: exec.CoordinatorEndpoint{ID: instanceID, Host: advertiseAddr, Port: cfg.Coordinator.Port},
223-
DispatchTaskStore: dispatchTaskStore,
224-
WorkerHeartbeatStore: workerHeartbeatStore,
225-
DAGRunLeaseStore: dagRunLeaseStore,
222+
DAGRunStore: dagRunStore,
223+
LogDir: cfg.Paths.LogDir,
224+
Owner: exec.CoordinatorEndpoint{ID: instanceID, Host: advertiseAddr, Port: cfg.Coordinator.Port},
225+
DispatchTaskStore: dispatchTaskStore,
226+
WorkerHeartbeatStore: workerHeartbeatStore,
227+
DAGRunLeaseStore: dagRunLeaseStore,
228+
ActiveDistributedRunStore: activeDistributedRunStore,
226229
})
227230

228231
// Create and return service with advertise address for service registry

internal/cmd/exec.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func runExec(ctx *Context, args []string) error {
151151
)
152152
logger.Debug(ctx, "Command details", tag.Command(strings.Join(args, " ")))
153153

154-
return tryExecuteDAG(ctx, dag, runID, dagRunRef, "local", core.TriggerTypeManual, "")
154+
return tryExecuteDAG(ctx, dag, runID, dagRunRef, "local", "", core.TriggerTypeManual, "")
155155
}
156156

157157
// resolveRunID returns a validated run ID from the flag or generates a new one.

internal/cmd/retry.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ var retryFlags = []commandLineFlag{
4545
rootDAGRunFlag,
4646
defaultWorkingDirFlag,
4747
retryWorkerIDFlag,
48+
attemptIDFlag,
4849
}
4950

5051
var retryWorkerIDFlag = commandLineFlag{
@@ -57,6 +58,10 @@ func runRetry(ctx *Context, args []string) error {
5758
stepName, _ := ctx.StringParam("step")
5859
rootRefStr, _ := ctx.StringParam("root")
5960
workerID := getWorkerID(ctx)
61+
attemptID, err := requireWorkerAttemptID(ctx, workerID)
62+
if err != nil {
63+
return err
64+
}
6065

6166
var rootRun exec.DAGRunRef
6267
if rootRefStr != "" {
@@ -147,12 +152,34 @@ func runRetry(ctx *Context, args []string) error {
147152
return ctx.DAGRunStore.CreateAttempt(execCtx, dag, time.Now(), dagRunID, opts)
148153
},
149154
func(preparedAttempt exec.DAGRunAttempt) error {
150-
return executeRetry(ctx, dag, status, rootRun, stepName, workerID, preparedAttempt)
155+
return executeRetry(ctx, dag, status, rootRun, stepName, workerID, attemptID, preparedAttempt)
151156
},
152157
)
153158
}
154159

155-
return executeRetry(ctx, dag, status, rootRun, stepName, workerID, nil)
160+
if ctx.DAGRunStore == nil {
161+
return executeRetry(ctx, dag, status, rootRun, stepName, workerID, attemptID, nil)
162+
}
163+
164+
if err := validateWorkerAttemptBinding(dagRunID, attemptID, attempt, status); err != nil {
165+
return err
166+
}
167+
168+
return withPreparedLocalExecution(
169+
ctx,
170+
dag,
171+
dagRunID,
172+
rootRun,
173+
status.Parent,
174+
exec.PreservedQueueTriggerType(status),
175+
status.ScheduleTime,
176+
func(context.Context) (exec.DAGRunAttempt, error) {
177+
return attempt, nil
178+
},
179+
func(preparedAttempt exec.DAGRunAttempt) error {
180+
return executeRetry(ctx, dag, status, rootRun, stepName, workerID, attemptID, preparedAttempt)
181+
},
182+
)
156183
}
157184

158185
// enqueueRetry enqueues the retry and persists Queued status via exec.EnqueueRetry.
@@ -205,7 +232,7 @@ func prepareQueuedCatchupRetry(ctx *Context, attempt exec.DAGRunAttempt, dag *co
205232

206233
// executeRetry runs a retry of a DAG run using the original run's log file.
207234
// Queued catchup runs reuse this path but preserve their catchup trigger type.
208-
func executeRetry(ctx *Context, dag *core.DAG, status *exec.DAGRunStatus, rootRun exec.DAGRunRef, stepName, workerID string, preparedAttempt exec.DAGRunAttempt) error {
235+
func executeRetry(ctx *Context, dag *core.DAG, status *exec.DAGRunStatus, rootRun exec.DAGRunRef, stepName, workerID, attemptID string, preparedAttempt exec.DAGRunAttempt) error {
209236
if stepName != "" {
210237
ctx.Context = logger.WithValues(ctx.Context, tag.Step(stepName))
211238
}
@@ -248,6 +275,7 @@ func executeRetry(ctx *Context, dag *core.DAG, status *exec.DAGRunStatus, rootRu
248275
ProgressDisplay: shouldEnableProgress(ctx),
249276
StepRetry: stepName,
250277
WorkerID: workerID,
278+
AttemptID: attemptID,
251279
PreparedAttempt: preparedAttempt,
252280
DAGRunStore: ctx.DAGRunStore,
253281
ServiceRegistry: ctx.ServiceRegistry,

internal/cmd/start.go

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ This command parses the DAG definition, resolves parameters, and initiates the D
6161
}
6262

6363
// Command line flags for the start command
64-
var startFlags = []commandLineFlag{paramsFlag, nameFlag, dagRunIDFlag, fromRunIDFlag, parentDAGRunFlag, rootDAGRunFlag, tagsFlag, defaultWorkingDirFlag, startWorkerIDFlag, triggerTypeFlag, scheduleTimeFlag}
64+
var startFlags = []commandLineFlag{paramsFlag, nameFlag, dagRunIDFlag, fromRunIDFlag, parentDAGRunFlag, rootDAGRunFlag, tagsFlag, defaultWorkingDirFlag, startWorkerIDFlag, attemptIDFlag, triggerTypeFlag, scheduleTimeFlag}
6565

6666
var fromRunIDFlag = commandLineFlag{
6767
name: "from-run-id",
@@ -94,6 +94,10 @@ func runStart(ctx *Context, args []string) error {
9494
}
9595

9696
workerID := getWorkerID(ctx)
97+
attemptID, err := requireWorkerAttemptID(ctx, workerID)
98+
if err != nil {
99+
return err
100+
}
97101

98102
triggerType, err := parseTriggerTypeParam(ctx)
99103
if err != nil {
@@ -195,7 +199,7 @@ func runStart(ctx *Context, args []string) error {
195199
if err != nil {
196200
return fmt.Errorf("failed to parse parent dag-run reference: %w", err)
197201
}
198-
return handleSubDAGRun(ctx, dag, dagRunID, params, root, parent, workerID, triggerType, scheduleTime)
202+
return handleSubDAGRun(ctx, dag, dagRunID, params, root, parent, workerID, attemptID, triggerType, scheduleTime)
199203
}
200204

201205
if fromRunID != "" {
@@ -207,13 +211,13 @@ func runStart(ctx *Context, args []string) error {
207211
logger.Info(ctx, "Executing root dag-run", slog.String("params", params))
208212
}
209213

210-
return tryExecuteDAG(ctx, dag, dagRunID, root, workerID, triggerType, scheduleTime)
214+
return tryExecuteDAG(ctx, dag, dagRunID, root, workerID, attemptID, triggerType, scheduleTime)
211215
}
212216

213217
var errProcAcquisitionFailed = errors.New("failed to acquire process handle")
214218

215219
// tryExecuteDAG acquires a process handle and executes the DAG.
216-
func tryExecuteDAG(ctx *Context, dag *core.DAG, dagRunID string, root exec.DAGRunRef, workerID string, triggerType core.TriggerType, scheduleTime string) error {
220+
func tryExecuteDAG(ctx *Context, dag *core.DAG, dagRunID string, root exec.DAGRunRef, workerID, attemptID string, triggerType core.TriggerType, scheduleTime string) error {
217221
// Check for dispatch to coordinator for distributed execution.
218222
// Skip if already running on a worker (workerID != "local").
219223
if workerID == "local" {
@@ -223,6 +227,10 @@ func tryExecuteDAG(ctx *Context, dag *core.DAG, dagRunID string, root exec.DAGRu
223227
}
224228
}
225229

230+
if workerID != "local" && ctx.DAGRunStore == nil {
231+
return executeDAGRun(ctx, dag, exec.DAGRunRef{}, dagRunID, root, workerID, attemptID, triggerType, scheduleTime, nil)
232+
}
233+
226234
return withPreparedLocalExecution(
227235
ctx,
228236
dag,
@@ -232,10 +240,17 @@ func tryExecuteDAG(ctx *Context, dag *core.DAG, dagRunID string, root exec.DAGRu
232240
triggerType,
233241
scheduleTime,
234242
func(execCtx context.Context) (exec.DAGRunAttempt, error) {
243+
if workerID != "local" {
244+
attempt, _, err := resolveWorkerPreparedAttempt(execCtx, ctx.DAGRunStore, dag.Name, dagRunID, root, attemptID)
245+
if err != nil {
246+
return nil, err
247+
}
248+
return attempt, nil
249+
}
235250
return ctx.DAGRunStore.CreateAttempt(execCtx, dag, time.Now(), dagRunID, exec.NewDAGRunAttemptOptions{})
236251
},
237252
func(preparedAttempt exec.DAGRunAttempt) error {
238-
return executeDAGRun(ctx, dag, exec.DAGRunRef{}, dagRunID, root, workerID, triggerType, scheduleTime, preparedAttempt)
253+
return executeDAGRun(ctx, dag, exec.DAGRunRef{}, dagRunID, root, workerID, attemptID, triggerType, scheduleTime, preparedAttempt)
239254
},
240255
)
241256
}
@@ -351,7 +366,7 @@ func determineRootDAGRun(isSubDAGRun bool, rootDAGRun string, dag *core.DAG, dag
351366
}
352367

353368
// handleSubDAGRun processes a sub dag-run, checking for previous runs.
354-
func handleSubDAGRun(ctx *Context, dag *core.DAG, dagRunID string, params string, root exec.DAGRunRef, parent exec.DAGRunRef, workerID string, triggerType core.TriggerType, scheduleTime string) error {
369+
func handleSubDAGRun(ctx *Context, dag *core.DAG, dagRunID string, params string, root exec.DAGRunRef, parent exec.DAGRunRef, workerID, attemptID string, triggerType core.TriggerType, scheduleTime string) error {
355370
logger.Info(ctx, "Executing sub dag-run",
356371
slog.String("params", params),
357372
slog.Any("root", root),
@@ -365,7 +380,28 @@ func handleSubDAGRun(ctx *Context, dag *core.DAG, dagRunID string, params string
365380

366381
// For distributed execution, the coordinator already created the sub-attempt record.
367382
if workerID != "local" {
368-
return executeDAGRun(ctx, dag, parent, dagRunID, root, workerID, triggerType, scheduleTime, nil)
383+
if ctx.DAGRunStore == nil {
384+
return executeDAGRun(ctx, dag, parent, dagRunID, root, workerID, attemptID, triggerType, scheduleTime, nil)
385+
}
386+
return withPreparedLocalExecution(
387+
ctx,
388+
dag,
389+
dagRunID,
390+
root,
391+
parent,
392+
triggerType,
393+
scheduleTime,
394+
func(execCtx context.Context) (exec.DAGRunAttempt, error) {
395+
attempt, _, err := resolveWorkerPreparedAttempt(execCtx, ctx.DAGRunStore, dag.Name, dagRunID, root, attemptID)
396+
if err != nil {
397+
return nil, err
398+
}
399+
return attempt, nil
400+
},
401+
func(preparedAttempt exec.DAGRunAttempt) error {
402+
return executeDAGRun(ctx, dag, parent, dagRunID, root, workerID, attemptID, triggerType, scheduleTime, preparedAttempt)
403+
},
404+
)
369405
}
370406

371407
logger.Debug(ctx, "Checking for previous sub dag-run with the dag-run ID")
@@ -386,7 +422,7 @@ func handleSubDAGRun(ctx *Context, dag *core.DAG, dagRunID string, params string
386422
})
387423
},
388424
func(preparedAttempt exec.DAGRunAttempt) error {
389-
return executeDAGRun(ctx, dag, parent, dagRunID, root, workerID, triggerType, scheduleTime, preparedAttempt)
425+
return executeDAGRun(ctx, dag, parent, dagRunID, root, workerID, attemptID, triggerType, scheduleTime, preparedAttempt)
390426
},
391427
)
392428
}
@@ -418,13 +454,13 @@ func handleSubDAGRun(ctx *Context, dag *core.DAG, dagRunID string, params string
418454
})
419455
},
420456
func(preparedAttempt exec.DAGRunAttempt) error {
421-
return executeRetry(ctx, dag, status, root, "", workerID, preparedAttempt)
457+
return executeRetry(ctx, dag, status, root, "", workerID, attemptID, preparedAttempt)
422458
},
423459
)
424460
}
425461

426462
// executeDAGRun initializes execution state for a DAG run and invokes the shared agent executor.
427-
func executeDAGRun(ctx *Context, d *core.DAG, parent exec.DAGRunRef, dagRunID string, root exec.DAGRunRef, workerID string, triggerType core.TriggerType, scheduleTime string, preparedAttempt exec.DAGRunAttempt) error {
463+
func executeDAGRun(ctx *Context, d *core.DAG, parent exec.DAGRunRef, dagRunID string, root exec.DAGRunRef, workerID, attemptID string, triggerType core.TriggerType, scheduleTime string, preparedAttempt exec.DAGRunAttempt) error {
428464
logFile, err := ctx.OpenLogFile(d, dagRunID)
429465
if err != nil {
430466
return fmt.Errorf("failed to initialize log file for DAG %s: %w", d.Name, err)
@@ -459,6 +495,7 @@ func executeDAGRun(ctx *Context, d *core.DAG, parent exec.DAGRunRef, dagRunID st
459495
ParentDAGRun: parent,
460496
ProgressDisplay: shouldEnableProgress(ctx),
461497
WorkerID: workerID,
498+
AttemptID: attemptID,
462499
QueuedRun: queuedRun,
463500
PreparedAttempt: preparedAttempt,
464501
DAGRunStore: ctx.DAGRunStore,

internal/cmd/startall.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ func runStartAll(ctx *Context, _ []string) error {
149149
ctx.DispatchTaskStore,
150150
ctx.WorkerHeartbeatStore,
151151
ctx.DAGRunLeaseStore,
152+
ctx.ActiveDistributedRunStore,
152153
)
153154
if err != nil {
154155
return fmt.Errorf("failed to initialize coordinator: %w", err)

0 commit comments

Comments
 (0)