@@ -277,6 +277,20 @@ func (a *API) ListDAGRuns(ctx context.Context, request api.ListDAGRunsRequestObj
277277 opts = append (opts , exec .WithDAGRunID (* request .Params .DagRunId ))
278278 }
279279
280+ // Parse comma-separated tags
281+ var tags []string
282+ if request .Params .Tags != nil && * request .Params .Tags != "" {
283+ for _ , tag := range strings .Split (* request .Params .Tags , "," ) {
284+ trimmed := strings .TrimSpace (tag )
285+ if trimmed != "" {
286+ tags = append (tags , trimmed )
287+ }
288+ }
289+ }
290+ if len (tags ) > 0 {
291+ opts = append (opts , exec .WithTags (tags ))
292+ }
293+
280294 dagRuns , err := a .listDAGRuns (ctx , opts )
281295 if err != nil {
282296 return nil , fmt .Errorf ("error listing dag-runs: %w" , err )
@@ -320,17 +334,75 @@ func (a *API) ListDAGRunsByName(ctx context.Context, request api.ListDAGRunsByNa
320334}
321335
322336func (a * API ) listDAGRuns (ctx context.Context , opts []exec.ListDAGRunStatusesOption ) ([]api.DAGRunSummary , error ) {
323- statuses , err := a .dagRunStore .ListStatuses (ctx , opts ... )
337+ // Extract tags from options and build filtered options for the store
338+ var tagsFilter []string
339+ var filteredOpts []exec.ListDAGRunStatusesOption
340+ for _ , opt := range opts {
341+ // Apply option to temp struct to check if it sets tags
342+ tempOpts := exec.ListDAGRunStatusesOptions {}
343+ opt (& tempOpts )
344+ if len (tempOpts .Tags ) > 0 {
345+ tagsFilter = tempOpts .Tags
346+ } else {
347+ filteredOpts = append (filteredOpts , opt )
348+ }
349+ }
350+
351+ // If tags filter is specified, get matching DAG names (AND logic)
352+ var allowedDAGNames map [string ]struct {}
353+ if len (tagsFilter ) > 0 {
354+ // Get all DAGs to filter by tags
355+ result , _ , err := a .dagStore .List (ctx , exec.ListDAGsOptions {})
356+ if err != nil {
357+ return nil , fmt .Errorf ("error getting DAGs for tag filter: %w" , err )
358+ }
359+
360+ allowedDAGNames = make (map [string ]struct {})
361+ for _ , dag := range result .Items {
362+ // Check if DAG has ALL requested tags (AND logic)
363+ if hasAllTags (dag .Tags , tagsFilter ) {
364+ allowedDAGNames [dag .Name ] = struct {}{}
365+ }
366+ }
367+
368+ // If no DAGs match all tags, return empty result
369+ if len (allowedDAGNames ) == 0 {
370+ return []api.DAGRunSummary {}, nil
371+ }
372+ }
373+
374+ statuses , err := a .dagRunStore .ListStatuses (ctx , filteredOpts ... )
324375 if err != nil {
325376 return nil , fmt .Errorf ("error listing dag-runs: %w" , err )
326377 }
378+
327379 var dagRuns []api.DAGRunSummary
328380 for _ , status := range statuses {
381+ // Filter by tags if specified
382+ if allowedDAGNames != nil {
383+ if _ , ok := allowedDAGNames [status .Name ]; ! ok {
384+ continue
385+ }
386+ }
329387 dagRuns = append (dagRuns , toDAGRunSummary (* status ))
330388 }
331389 return dagRuns , nil
332390}
333391
392+ // hasAllTags checks if dagTags contains all requiredTags (case-insensitive)
393+ func hasAllTags (dagTags []string , requiredTags []string ) bool {
394+ tagSet := make (map [string ]struct {})
395+ for _ , t := range dagTags {
396+ tagSet [strings .ToLower (t )] = struct {}{}
397+ }
398+ for _ , req := range requiredTags {
399+ if _ , ok := tagSet [strings .ToLower (req )]; ! ok {
400+ return false
401+ }
402+ }
403+ return true
404+ }
405+
334406func (a * API ) GetDAGRunLog (ctx context.Context , request api.GetDAGRunLogRequestObject ) (api.GetDAGRunLogResponseObject , error ) {
335407 dagName := request .Name
336408 dagRunId := request .DagRunId
0 commit comments