@@ -61,7 +61,7 @@ This command parses the DAG definition, resolves parameters, and initiates the D
6161}
6262
6363// Command line flags for the start command
64- var startFlags = []commandLineFlag {paramsFlag , nameFlag , dagRunIDFlag , fromRunIDFlag , parentDAGRunFlag , rootDAGRunFlag , tagsFlag , defaultWorkingDirFlag , startWorkerIDFlag , triggerTypeFlag , scheduleTimeFlag }
64+ var startFlags = []commandLineFlag {paramsFlag , nameFlag , dagRunIDFlag , fromRunIDFlag , parentDAGRunFlag , rootDAGRunFlag , tagsFlag , defaultWorkingDirFlag , startWorkerIDFlag , attemptIDFlag , triggerTypeFlag , scheduleTimeFlag }
6565
6666var fromRunIDFlag = commandLineFlag {
6767 name : "from-run-id" ,
@@ -94,6 +94,10 @@ func runStart(ctx *Context, args []string) error {
9494 }
9595
9696 workerID := getWorkerID (ctx )
97+ attemptID , err := requireWorkerAttemptID (ctx , workerID )
98+ if err != nil {
99+ return err
100+ }
97101
98102 triggerType , err := parseTriggerTypeParam (ctx )
99103 if err != nil {
@@ -195,7 +199,7 @@ func runStart(ctx *Context, args []string) error {
195199 if err != nil {
196200 return fmt .Errorf ("failed to parse parent dag-run reference: %w" , err )
197201 }
198- return handleSubDAGRun (ctx , dag , dagRunID , params , root , parent , workerID , triggerType , scheduleTime )
202+ return handleSubDAGRun (ctx , dag , dagRunID , params , root , parent , workerID , attemptID , triggerType , scheduleTime )
199203 }
200204
201205 if fromRunID != "" {
@@ -207,13 +211,13 @@ func runStart(ctx *Context, args []string) error {
207211 logger .Info (ctx , "Executing root dag-run" , slog .String ("params" , params ))
208212 }
209213
210- return tryExecuteDAG (ctx , dag , dagRunID , root , workerID , triggerType , scheduleTime )
214+ return tryExecuteDAG (ctx , dag , dagRunID , root , workerID , attemptID , triggerType , scheduleTime )
211215}
212216
213217var errProcAcquisitionFailed = errors .New ("failed to acquire process handle" )
214218
215219// tryExecuteDAG acquires a process handle and executes the DAG.
216- func tryExecuteDAG (ctx * Context , dag * core.DAG , dagRunID string , root exec.DAGRunRef , workerID string , triggerType core.TriggerType , scheduleTime string ) error {
220+ func tryExecuteDAG (ctx * Context , dag * core.DAG , dagRunID string , root exec.DAGRunRef , workerID , attemptID string , triggerType core.TriggerType , scheduleTime string ) error {
217221 // Check for dispatch to coordinator for distributed execution.
218222 // Skip if already running on a worker (workerID != "local").
219223 if workerID == "local" {
@@ -223,6 +227,10 @@ func tryExecuteDAG(ctx *Context, dag *core.DAG, dagRunID string, root exec.DAGRu
223227 }
224228 }
225229
230+ if workerID != "local" && ctx .DAGRunStore == nil {
231+ return executeDAGRun (ctx , dag , exec.DAGRunRef {}, dagRunID , root , workerID , attemptID , triggerType , scheduleTime , nil )
232+ }
233+
226234 return withPreparedLocalExecution (
227235 ctx ,
228236 dag ,
@@ -232,10 +240,17 @@ func tryExecuteDAG(ctx *Context, dag *core.DAG, dagRunID string, root exec.DAGRu
232240 triggerType ,
233241 scheduleTime ,
234242 func (execCtx context.Context ) (exec.DAGRunAttempt , error ) {
243+ if workerID != "local" {
244+ attempt , _ , err := resolveWorkerPreparedAttempt (execCtx , ctx .DAGRunStore , dag .Name , dagRunID , root , attemptID )
245+ if err != nil {
246+ return nil , err
247+ }
248+ return attempt , nil
249+ }
235250 return ctx .DAGRunStore .CreateAttempt (execCtx , dag , time .Now (), dagRunID , exec.NewDAGRunAttemptOptions {})
236251 },
237252 func (preparedAttempt exec.DAGRunAttempt ) error {
238- return executeDAGRun (ctx , dag , exec.DAGRunRef {}, dagRunID , root , workerID , triggerType , scheduleTime , preparedAttempt )
253+ return executeDAGRun (ctx , dag , exec.DAGRunRef {}, dagRunID , root , workerID , attemptID , triggerType , scheduleTime , preparedAttempt )
239254 },
240255 )
241256}
@@ -351,7 +366,7 @@ func determineRootDAGRun(isSubDAGRun bool, rootDAGRun string, dag *core.DAG, dag
351366}
352367
353368// handleSubDAGRun processes a sub dag-run, checking for previous runs.
354- func handleSubDAGRun (ctx * Context , dag * core.DAG , dagRunID string , params string , root exec.DAGRunRef , parent exec.DAGRunRef , workerID string , triggerType core.TriggerType , scheduleTime string ) error {
369+ func handleSubDAGRun (ctx * Context , dag * core.DAG , dagRunID string , params string , root exec.DAGRunRef , parent exec.DAGRunRef , workerID , attemptID string , triggerType core.TriggerType , scheduleTime string ) error {
355370 logger .Info (ctx , "Executing sub dag-run" ,
356371 slog .String ("params" , params ),
357372 slog .Any ("root" , root ),
@@ -365,7 +380,28 @@ func handleSubDAGRun(ctx *Context, dag *core.DAG, dagRunID string, params string
365380
366381 // For distributed execution, the coordinator already created the sub-attempt record.
367382 if workerID != "local" {
368- return executeDAGRun (ctx , dag , parent , dagRunID , root , workerID , triggerType , scheduleTime , nil )
383+ if ctx .DAGRunStore == nil {
384+ return executeDAGRun (ctx , dag , parent , dagRunID , root , workerID , attemptID , triggerType , scheduleTime , nil )
385+ }
386+ return withPreparedLocalExecution (
387+ ctx ,
388+ dag ,
389+ dagRunID ,
390+ root ,
391+ parent ,
392+ triggerType ,
393+ scheduleTime ,
394+ func (execCtx context.Context ) (exec.DAGRunAttempt , error ) {
395+ attempt , _ , err := resolveWorkerPreparedAttempt (execCtx , ctx .DAGRunStore , dag .Name , dagRunID , root , attemptID )
396+ if err != nil {
397+ return nil , err
398+ }
399+ return attempt , nil
400+ },
401+ func (preparedAttempt exec.DAGRunAttempt ) error {
402+ return executeDAGRun (ctx , dag , parent , dagRunID , root , workerID , attemptID , triggerType , scheduleTime , preparedAttempt )
403+ },
404+ )
369405 }
370406
371407 logger .Debug (ctx , "Checking for previous sub dag-run with the dag-run ID" )
@@ -386,7 +422,7 @@ func handleSubDAGRun(ctx *Context, dag *core.DAG, dagRunID string, params string
386422 })
387423 },
388424 func (preparedAttempt exec.DAGRunAttempt ) error {
389- return executeDAGRun (ctx , dag , parent , dagRunID , root , workerID , triggerType , scheduleTime , preparedAttempt )
425+ return executeDAGRun (ctx , dag , parent , dagRunID , root , workerID , attemptID , triggerType , scheduleTime , preparedAttempt )
390426 },
391427 )
392428 }
@@ -418,13 +454,13 @@ func handleSubDAGRun(ctx *Context, dag *core.DAG, dagRunID string, params string
418454 })
419455 },
420456 func (preparedAttempt exec.DAGRunAttempt ) error {
421- return executeRetry (ctx , dag , status , root , "" , workerID , preparedAttempt )
457+ return executeRetry (ctx , dag , status , root , "" , workerID , attemptID , preparedAttempt )
422458 },
423459 )
424460}
425461
426462// executeDAGRun initializes execution state for a DAG run and invokes the shared agent executor.
427- func executeDAGRun (ctx * Context , d * core.DAG , parent exec.DAGRunRef , dagRunID string , root exec.DAGRunRef , workerID string , triggerType core.TriggerType , scheduleTime string , preparedAttempt exec.DAGRunAttempt ) error {
463+ func executeDAGRun (ctx * Context , d * core.DAG , parent exec.DAGRunRef , dagRunID string , root exec.DAGRunRef , workerID , attemptID string , triggerType core.TriggerType , scheduleTime string , preparedAttempt exec.DAGRunAttempt ) error {
428464 logFile , err := ctx .OpenLogFile (d , dagRunID )
429465 if err != nil {
430466 return fmt .Errorf ("failed to initialize log file for DAG %s: %w" , d .Name , err )
@@ -459,6 +495,7 @@ func executeDAGRun(ctx *Context, d *core.DAG, parent exec.DAGRunRef, dagRunID st
459495 ParentDAGRun : parent ,
460496 ProgressDisplay : shouldEnableProgress (ctx ),
461497 WorkerID : workerID ,
498+ AttemptID : attemptID ,
462499 QueuedRun : queuedRun ,
463500 PreparedAttempt : preparedAttempt ,
464501 DAGRunStore : ctx .DAGRunStore ,
0 commit comments