Skip to content

Commit 1655c3d

Browse files
committed
feat: tag search for dag-runs
1 parent 1428cbd commit 1655c3d

6 files changed

Lines changed: 509 additions & 266 deletions

File tree

api/v2/api.gen.go

Lines changed: 275 additions & 263 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v2/api.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1473,6 +1473,12 @@ paths:
14731473
schema:
14741474
type: "string"
14751475
description: "Filter DAG-runs by name"
1476+
- name: "tags"
1477+
in: "query"
1478+
required: false
1479+
schema:
1480+
type: "string"
1481+
description: "Filter DAG-runs by DAG tags (comma-separated). Returns runs from DAGs that have ALL specified tags."
14761482
responses:
14771483
"200":
14781484
description: "A successful response"

internal/core/exec/dagrun.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type ListDAGRunStatusesOptions struct {
6060
To TimeInUTC
6161
Statuses []core.Status
6262
Limit int
63+
Tags []string // Filter by DAG tags (AND logic - all tags must match)
6364
}
6465

6566
// ListRunsOption is a functional option for configuring ListRunsOptions
@@ -107,6 +108,13 @@ func WithDAGRunID(dagRunID string) ListDAGRunStatusesOption {
107108
}
108109
}
109110

111+
// WithTags sets the tags filter for listing dag-runs (AND logic - all tags must match)
112+
func WithTags(tags []string) ListDAGRunStatusesOption {
113+
return func(o *ListDAGRunStatusesOptions) {
114+
o.Tags = tags
115+
}
116+
}
117+
110118
// RemoveOldDAGRunsOptions contains options for removing old dag-runs
111119
type RemoveOldDAGRunsOptions struct {
112120
// DryRun if true, only returns the paths that would be removed without actually deleting

internal/service/frontend/api/v2/dagruns.go

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,20 @@ func (a *API) ListDAGRuns(ctx context.Context, request api.ListDAGRunsRequestObj
277277
opts = append(opts, exec.WithDAGRunID(*request.Params.DagRunId))
278278
}
279279

280+
// Parse comma-separated tags
281+
var tags []string
282+
if request.Params.Tags != nil && *request.Params.Tags != "" {
283+
for _, tag := range strings.Split(*request.Params.Tags, ",") {
284+
trimmed := strings.TrimSpace(tag)
285+
if trimmed != "" {
286+
tags = append(tags, trimmed)
287+
}
288+
}
289+
}
290+
if len(tags) > 0 {
291+
opts = append(opts, exec.WithTags(tags))
292+
}
293+
280294
dagRuns, err := a.listDAGRuns(ctx, opts)
281295
if err != nil {
282296
return nil, fmt.Errorf("error listing dag-runs: %w", err)
@@ -320,17 +334,75 @@ func (a *API) ListDAGRunsByName(ctx context.Context, request api.ListDAGRunsByNa
320334
}
321335

322336
func (a *API) listDAGRuns(ctx context.Context, opts []exec.ListDAGRunStatusesOption) ([]api.DAGRunSummary, error) {
323-
statuses, err := a.dagRunStore.ListStatuses(ctx, opts...)
337+
// Extract tags from options and build filtered options for the store
338+
var tagsFilter []string
339+
var filteredOpts []exec.ListDAGRunStatusesOption
340+
for _, opt := range opts {
341+
// Apply option to temp struct to check if it sets tags
342+
tempOpts := exec.ListDAGRunStatusesOptions{}
343+
opt(&tempOpts)
344+
if len(tempOpts.Tags) > 0 {
345+
tagsFilter = tempOpts.Tags
346+
} else {
347+
filteredOpts = append(filteredOpts, opt)
348+
}
349+
}
350+
351+
// If tags filter is specified, get matching DAG names (AND logic)
352+
var allowedDAGNames map[string]struct{}
353+
if len(tagsFilter) > 0 {
354+
// Get all DAGs to filter by tags
355+
result, _, err := a.dagStore.List(ctx, exec.ListDAGsOptions{})
356+
if err != nil {
357+
return nil, fmt.Errorf("error getting DAGs for tag filter: %w", err)
358+
}
359+
360+
allowedDAGNames = make(map[string]struct{})
361+
for _, dag := range result.Items {
362+
// Check if DAG has ALL requested tags (AND logic)
363+
if hasAllTags(dag.Tags, tagsFilter) {
364+
allowedDAGNames[dag.Name] = struct{}{}
365+
}
366+
}
367+
368+
// If no DAGs match all tags, return empty result
369+
if len(allowedDAGNames) == 0 {
370+
return []api.DAGRunSummary{}, nil
371+
}
372+
}
373+
374+
statuses, err := a.dagRunStore.ListStatuses(ctx, filteredOpts...)
324375
if err != nil {
325376
return nil, fmt.Errorf("error listing dag-runs: %w", err)
326377
}
378+
327379
var dagRuns []api.DAGRunSummary
328380
for _, status := range statuses {
381+
// Filter by tags if specified
382+
if allowedDAGNames != nil {
383+
if _, ok := allowedDAGNames[status.Name]; !ok {
384+
continue
385+
}
386+
}
329387
dagRuns = append(dagRuns, toDAGRunSummary(*status))
330388
}
331389
return dagRuns, nil
332390
}
333391

392+
// hasAllTags checks if dagTags contains all requiredTags (case-insensitive)
393+
func hasAllTags(dagTags []string, requiredTags []string) bool {
394+
tagSet := make(map[string]struct{})
395+
for _, t := range dagTags {
396+
tagSet[strings.ToLower(t)] = struct{}{}
397+
}
398+
for _, req := range requiredTags {
399+
if _, ok := tagSet[strings.ToLower(req)]; !ok {
400+
return false
401+
}
402+
}
403+
return true
404+
}
405+
334406
func (a *API) GetDAGRunLog(ctx context.Context, request api.GetDAGRunLogRequestObject) (api.GetDAGRunLogResponseObject, error) {
335407
dagName := request.Name
336408
dagRunId := request.DagRunId

ui/src/api/v2/schema.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3873,6 +3873,8 @@ export interface operations {
38733873
remoteNode?: components["parameters"]["RemoteNode"];
38743874
/** @description Filter DAG-runs by name */
38753875
name?: string;
3876+
/** @description Filter DAG-runs by DAG tags (comma-separated). Returns runs from DAGs that have ALL specified tags. */
3877+
tags?: string;
38763878
};
38773879
header?: never;
38783880
path?: never;

0 commit comments

Comments
 (0)