Skip to content

feat: improve queue browsing and dequeue reliability#1967

Merged
yottahmd merged 15 commits intomainfrom
fix/server-side-queue-clear
Apr 5, 2026
Merged

feat: improve queue browsing and dequeue reliability#1967
yottahmd merged 15 commits intomainfrom
fix/server-side-queue-clear

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Apr 4, 2026

Summary

  • replace the queue clear-all flow with selected dequeue and progress UI using repeated single dequeue requests from the queue detail page
  • split queues into a lightweight dashboard plus a queue detail page with cursor-based infinite loading backed by a queue read index
  • harden queue dequeue, retry, and distributed queue-dispatch handling for stale or orphaned queued runs
  • simplify the queue dashboard cards and fix related web UI issues in cockpit mobile scrolling, agent new-session handling, and dag-run filter keyboard shortcuts

Testing

  • make fmt
  • go test ./internal/core/exec -count=1
  • go test ./internal/runtime -count=1
  • go test ./internal/service/scheduler -count=1
  • go test ./internal/service/worker -count=1
  • go test ./internal/cmd -count=1
  • go test ./internal/service/frontend/api/v1 -count=1
  • cd ui && pnpm exec vitest run src/features/agent/components/__tests__/AgentChatModal.test.tsx src/features/cockpit/components/__tests__/KanbanCounts.test.tsx src/features/dag-runs/components/dag-run-list/__tests__/DAGRunTable.test.tsx
  • cd ui && pnpm build

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 4, 2026

📝 Walkthrough

Walkthrough

Adds a DELETE /queues/{name}/items API and wiring; implements queued-item abort semantics and cursor-based queued-item listing; introduces persistent file-queue read index and bulk-delete support; adds a queue-dispatch-retry mode and related subprocess env flags; refactors dequeue/retry/scheduler flows; and updates the UI with a queue details page, cursor feed hooks, selection/batch-dequeue UI, and simplified queue list/cards.

Changes

Cohort / File(s) Summary
OpenAPI + Generated API
api/v1/api.yaml, api/v1/api.gen.go, ui/src/api/v1/schema.ts
Added DELETE /queues/{name}/items operation and generated types/handlers; changed GET /queues/{name}/items to cursor-based listQueueItems and added GET /queues/{name} getQueue; added limit/cursor params and QueuedDAGRunsPageResponse.
Frontend UI: Queue Pages & Components
ui/src/pages/queues/queue/index.tsx, ui/src/features/queues/..., ui/src/App.tsx, ui/src/api/v1/schema.ts
Added QueueDetailsPage with infinite queued feed, selection and batch-dequeue UIs; new hooks useQueuedItemsFeed, useQueueSelection, useQueueBatchDequeue; added QueueRunsTable; simplified QueueCard/QueueList, removed QueueMetrics and SSE hook; registered new route /queues/:name.
Core abort & stale-dispatch logic
internal/core/exec/queue_abort.go, internal/core/exec/stale_queue_dispatch.go, internal/core/exec/env.go
Added AbortQueuedDAGRun, DAGRunNotQueuedError, and stale-dispatch typed error + parser. Added EnvKeyQueueDispatchRetry constant.
File-backed queue index & store
internal/persis/filequeue/index.go, internal/persis/filequeue/store.go, internal/persis/filequeue/queue.go, internal/persis/filequeue/queuefile.go
Implemented persistent per-queue read index, cursor-based ListCursor, DeleteByItemIDs on store, changed enqueue/push to return filename, updated index rebuild/persist, and added deletions by item IDs.
Persistence & store tests
internal/persis/filequeue/store_test.go, internal/persis/filequeue/queue_test.go, .../queuefile_test.go
Added/updated tests for ListCursor, invalid cursor behavior, and DeleteByItemIDs; adjusted tests to new Enqueue/Push signatures.
Dequeue / Retry CLI & runtime
internal/cmd/dequeue.go, internal/cmd/dequeue_test.go, internal/cmd/retry.go, internal/cmd/retry_test.go, internal/cmd/retry_internal_test.go
Refactored dequeue to use dequeueQueuedDAGRun + proc-group locking and abort-via-AbortQueuedDAGRun; added queue dispatch retry mode gating in retry flows and tests covering missing/mismatched queued status.
Scheduler / Queue processor changes
internal/service/scheduler/queue_processor.go, internal/service/scheduler/dag_executor.go, internal/service/scheduler/queue_processor_startup_test.go
Dispatch retry uses new QueueDispatchRetry spec; processor treats StaleQueueDispatchError as discardable and changes retry logging/handling; updated startup test for stale dispatch.
Frontend API server handlers
internal/service/frontend/api/v1/queues.go, internal/service/frontend/api/v1/queue_ops.go, internal/service/frontend/api/v1/dagruns.go, internal/service/frontend/api/v1/queues_internal_test.go
Added GetQueue and cursor-based ListQueueItems; added helper functions to resolve queue name and map abort errors; ListQueueItems now skips running items and returns cursor pages; Dequeue path refactored to abort+dequeue by DAGRunID.
Coordinator / Dispatcher
internal/service/coordinator/handler.go, internal/service/coordinator/client.go, internal/service/coordinator/client_test.go, internal/service/coordinator/handler_test.go
Map stale-dispatch failures to FailedPrecondition and propagate typed StaleQueueDispatchError through client/backoff; tests verify permanent typed error and handler rejects stale retries.
Worker command spec selection
internal/service/worker/handler.go, internal/runtime/subcmd.go, internal/runtime/subcmd_test.go
When retrying queued tasks, select QueueDispatchTaskRetry/QueueDispatchRetry specs that set DAGU_QUEUE_DISPATCH_RETRY=1 in the environment; added selection helper and tests for env propagation.
Remote client / commands
internal/cmd/remote_client.go, internal/cmd/remote_commands.go
Refactored remote list call to cursor-based limit/cursor signature and updated call sites used by remote dequeue.
Mocks & telemetry tests
internal/cmn/telemetry/collector_test.go, internal/core/exec/queue.go (MockQueueStore)
Updated mocks to support ListCursor and DeleteByItemIDs; updated QueueStore interface to include ListCursor and DeleteByItemIDs.
Minor persistence & other fixes
internal/persis/filedagrun/attempt.go, internal/persis/filequeue/index.go (new), internal/cmn/backoff/retry.go
Normalized attempt dir matching, added index implementation file, and changed PermanentError to wrap with %w.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant API as "API DeleteQueueItems"
    participant ProcStore as "Process Store"
    participant QueueStore as "Queue Store"
    participant DAGRunStore as "DAGRun Store"
    participant Audit as "Audit Logger"

    Client->>API: DELETE /queues/{name}/items (optional remoteNode)
    API->>ProcStore: Lock(ctx, queueName)
    activate ProcStore
    API->>QueueStore: ListCursor(ctx, queueName, cursor="", limit=scanBatch)
    QueueStore-->>API: queued item list (itemIDs)
    loop per item
        API->>DAGRunStore: AbortQueuedDAGRun(ctx, dagRunRef)
        alt Abort succeeded (was Queued)
            DAGRunStore-->>API: success
            API->>QueueStore: mark itemID for deletion
        else Not queued / missing status
            DAGRunStore-->>API: DAGRunNotQueuedError
            alt error skippable
                API->>QueueStore: mark itemID for deletion
            else non-skippable -> break
            end
        end
    end
    API->>QueueStore: DeleteByItemIDs(ctx, queueName, itemIDs)
    QueueStore-->>API: deletedCount
    API->>Audit: Log(queue_clear, counts)
    ProcStore->>ProcStore: Unlock(ctx, queueName)
    deactivate ProcStore
    API-->>Client: 204 No Content / error
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 7.78% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main changes: improving queue browsing UX and making dequeue operations more reliable through server-side enhancements.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/server-side-queue-clear

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
ui/src/features/queues/components/QueueCard.tsx (1)

1-7: ⚠️ Potential issue | 🟠 Major

Add the required GPL header to this modified TSX file.

This file is being changed but still has no project license header.

As per coding guidelines, **/*.{go,ts,tsx,js}: Apply GPL v3 license headers on source files, managed via make addlicense.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/queues/components/QueueCard.tsx` around lines 1 - 7, This
file (QueueCard.tsx / component QueueCard) is missing the required GPL v3
license header; run the project's license tooling or add the standard GPL v3
header used across the repo to the top of the file. Either run the provided make
target (make addlicense) to automatically insert the correct header, or manually
add the project's canonical GPL v3 header comment block at the very top of
QueueCard.tsx before any imports so the file complies with the repository's /*
.ts/.tsx */ license policy.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/cmd/dequeue.go`:
- Around line 63-75: The current peek via QueueStore.ListPaginated leaves a bad
head item in place so dequeueFirst can get stuck; change the logic to perform a
destructive pop of the head instead of peeking (or, if you prefer, explicitly
remove/skipping bad heads): replace the ListPaginated usage with the QueueStore
pop/remove method (or call QueueStore.Delete on the head item) so that if
result.Items[0].Data() fails or dequeueQueuedDAGRun(...) returns a
skippable/stale error you delete the head and loop to the next item; ensure you
reference the same queueName and use dequeueQueuedDAGRun(ctx, queueName, *data)
only after successful pop and parsing so bad/stale heads are removed rather than
blocking subsequent calls.
- Line 50: The code must resolve the DAG run's actual queue before attempting
targeted dequeues: call queueNameForDAGRun(ctx, dagRun) to obtain
actualQueueName and pass that into dequeueQueuedDAGRun(ctx, actualQueueName,
dagRun) instead of the caller-supplied queueName; also update the error handling
around exec.ErrQueueItemNotFound (the block handling dequeue failures) so that
ErrQueueItemNotFound is not treated as success when the resolved queue differs —
if dequeue on the resolved queue returns ErrQueueItemNotFound, surface an
error/handle it appropriately rather than logging success.

In `@internal/cmd/retry.go`:
- Around line 112-115: When queueDispatchRetryRequested() is true, map store
read errors (exec.ErrDAGRunIDNotFound and exec.ErrNoStatusData) from the initial
lookup (e.g., findRetryAttempt and ReadStatus) into a &exec.DAGRunNotQueuedError
instead of letting the raw errors bubble; detect these errors after calling
findRetryAttempt/ReadStatus and return an exec.DAGRunNotQueuedError with
HasStatus=false (or appropriate flag for "no status") so cleared targets are
treated as benign stale-dispatches, and apply the same normalization to the
other similar block around findRetryAttempt/ReadStatus later in the file (the
region mentioned for lines ~217-240).

In `@ui/src/features/queues/components/QueueCard.tsx`:
- Around line 106-107: The onQueueCleared prop is awaited in QueueCard (await
onQueueCleared()) but is currently typed as () => void; update the prop type to
allow async functions by changing the signature to () => void | Promise<void> in
both QueueCardProps and QueueListProps so it can accept callers like the async
handleRefresh(): Promise<void> used by the queues page; ensure any other
usages/consumers of onQueueCleared are compatible with the widened type.

---

Outside diff comments:
In `@ui/src/features/queues/components/QueueCard.tsx`:
- Around line 1-7: This file (QueueCard.tsx / component QueueCard) is missing
the required GPL v3 license header; run the project's license tooling or add the
standard GPL v3 header used across the repo to the top of the file. Either run
the provided make target (make addlicense) to automatically insert the correct
header, or manually add the project's canonical GPL v3 header comment block at
the very top of QueueCard.tsx before any imports so the file complies with the
repository's /* .ts/.tsx */ license policy.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 33839660-f224-46ec-8287-144398090691

📥 Commits

Reviewing files that changed from the base of the PR and between a32042d and 42ea6cb.

📒 Files selected for processing (22)
  • api/v1/api.gen.go
  • api/v1/api.yaml
  • internal/cmd/dequeue.go
  • internal/cmd/retry.go
  • internal/cmn/telemetry/collector_test.go
  • internal/core/exec/env.go
  • internal/core/exec/queue.go
  • internal/core/exec/queue_abort.go
  • internal/core/exec/queue_abort_test.go
  • internal/persis/filedagrun/attempt.go
  • internal/persis/filequeue/queue.go
  • internal/persis/filequeue/store.go
  • internal/persis/filequeue/store_test.go
  • internal/runtime/subcmd.go
  • internal/runtime/subcmd_test.go
  • internal/service/frontend/api/v1/dagruns.go
  • internal/service/frontend/api/v1/queue_ops.go
  • internal/service/frontend/api/v1/queues.go
  • internal/service/frontend/api/v1/queues_internal_test.go
  • internal/service/scheduler/dag_executor.go
  • ui/src/api/v1/schema.ts
  • ui/src/features/queues/components/QueueCard.tsx

Comment thread internal/cmd/dequeue.go Outdated
Comment thread internal/cmd/dequeue.go Outdated
Comment thread internal/cmd/retry.go Outdated
Comment thread ui/src/features/queues/components/QueueCard.tsx Outdated
@yottahmd
Copy link
Copy Markdown
Collaborator Author

yottahmd commented Apr 5, 2026

@coderabbitai full review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 5, 2026

✅ Actions performed

Full review triggered.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (5)
ui/src/features/cockpit/components/MobileKanbanBoard.tsx (1)

1-6: ⚠️ Potential issue | 🟡 Minor

Missing GPL v3 license header.

This source file is missing the required license header. Other files in this PR (like KanbanCounts.test.tsx) include it.

Proposed fix
+// Copyright (C) 2026 Yota Hamada
+// SPDX-License-Identifier: GPL-3.0-or-later
+
 import React, { useState, useCallback } from 'react';
 import { components } from '@/api/v1/schema';

As per coding guidelines: "Apply GPL v3 license headers on source files, managed via make addlicense"

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/cockpit/components/MobileKanbanBoard.tsx` around lines 1 - 6,
The file MobileKanbanBoard.tsx is missing the required GPL v3 license header;
run the repository's license tooling (e.g., execute the make target used
elsewhere: "make addlicense") or manually prepend the same GPL v3 header used in
other files (for example match the header in KanbanCounts.test.tsx) to the top
of MobileKanbanBoard.tsx so the file contains the standard license block before
the imports; ensure the header is exact and committed.
ui/src/features/agent/components/AgentChatModal.tsx (1)

1-1: ⚠️ Potential issue | 🟡 Minor

Add the GPL header at the top of this file.

This touched source file is still missing the repository's required license banner.

As per coding guidelines, "Apply GPL v3 license headers on source files, managed via make addlicense."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/agent/components/AgentChatModal.tsx` at line 1, This file
(AgentChatModal.tsx) is missing the project's GPL v3 license banner; add the
repository's required GPL v3 header at the very top of the file (above the
import statements) consistent with other source files and the project's
formatting rules—either run the repository tooling (make addlicense) to insert
the standard header automatically or paste the canonical GPL v3 header used
elsewhere in the repo so the file (AgentChatModal component) includes the
correct license banner.
ui/src/api/v1/schema.ts (1)

1324-1343: ⚠️ Potential issue | 🟠 Major

Expose the queue-clear endpoint in the generated API contract.

The PR objective adds a server-side queue clear action, but this schema still models /queues/{name}/items as GET-only and only generates listQueueItems. That leaves no typed DELETE /queues/{name}/items entry for the UI Clear action, so it either can't use the generated client or has to bypass it. Please add the DELETE operation to the OpenAPI spec, include remoteNode on it as well, and regenerate this file.

As per coding guidelines, "Frontend API types must be generated from OpenAPI spec via pnpm gen:api to maintain type safety with the backend."

Also applies to: 8700-8747

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/api/v1/schema.ts` around lines 1324 - 1343, Update the OpenAPI spec to
add a DELETE operation for the path "/queues/{name}/items" (so the generated
client exposes a typed delete/clear method, e.g. operationId matching the server
handler such as "clearQueueItems" or "deleteQueueItems"), include the optional
remoteNode parameter on that DELETE operation, then run the generation command
(pnpm gen:api) to regenerate ui/src/api/v1/schema.ts so the schema contains both
the GET (listQueueItems) and the new DELETE operation for
"/queues/{name}/items".
ui/src/pages/queues/index.tsx (1)

1-15: ⚠️ Potential issue | 🟠 Major

Add the GPL header to this modified TSX file.

This file was changed in the PR but still lacks the repository's required license header. Please run make addlicense on it before merging.

As per coding guidelines, **/*.{go,ts,tsx,js}: Apply GPL v3 license headers on source files, managed via make addlicense.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/pages/queues/index.tsx` around lines 1 - 15, The modified TSX file
(the one that imports Layers, Search, QueueList, useQueuesListSSE and
useSSECacheSync) is missing the repository's GPL v3 license header; run the
repository tool to apply it by executing make addlicense (or manually prepend
the project's GPL v3 header template) so the top of the file contains the
correct license block, then stage and commit the change.
ui/src/App.tsx (1)

1-18: ⚠️ Potential issue | 🟠 Major

Add the GPL header to this modified TSX file.

This file was updated in the PR but still does not carry the required GPL header. Please run make addlicense here as well.

As per coding guidelines, **/*.{go,ts,tsx,js}: Apply GPL v3 license headers on source files, managed via make addlicense.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/App.tsx` around lines 1 - 18, This TSX file is missing the required
GPL v3 header; add the standard GPL v3 license header to the top of App.tsx
(above the first import such as "import { Theme } from '@radix-ui/themes'") by
running the repository tooling: execute make addlicense to apply the GPL header
automatically, or if applying manually, insert the project's canonical GPL v3
comment block at file top so the file (which contains imports like Theme, React,
BrowserRouter, SWRConfig) carries the required license header.
🧹 Nitpick comments (8)
ui/src/features/agent/components/__tests__/AgentChatModal.test.tsx (1)

55-57: Exercise the sidebar "new" path too.

The production change also routes onSelectSession('new') through handleClearSession, but this mock returns null, so that branch can regress without breaking the suite. Please expose onSelectSession here and assert the sidebar-triggered "new" action still calls clearSession and suppresses auto-select.

Also applies to: 154-166

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/agent/components/__tests__/AgentChatModal.test.tsx` around
lines 55 - 57, The SessionSidebar mock currently returns null so the "new"
selection path isn't exercised; update the mock in AgentChatModal.test.tsx (and
the similar block at 154-166) to render a minimal component that accepts an
onSelectSession prop and calls it with "new" in a test-controlled way; then add
an assertion that invoking onSelectSession('new') triggers the component's
clearSession/handleClearSession flow and prevents auto-select (assert
clearSession was called and that the auto-select behavior did not run). Use the
exact prop name onSelectSession and the handler/spy names clearSession or
handleClearSession from the test to locate and hook into the right functions.
internal/cmd/dequeue_test.go (1)

164-172: Assert the dequeued run's persisted status too.

This currently proves the queue becomes empty, but it would still pass if the implementation dropped both items and left valid-run in a queued state. Adding a status assertion makes the stale-head behavior much harder to fake.

✅ Suggested assertion
 	th.RunCommand(t, cmd.Dequeue(), test.CmdTest{
 		Name:        "DequeueFirstSkipsStaleHead",
 		Args:        []string{"dequeue", dag.ProcGroup()},
 		ExpectedOut: []string{"Dequeued dag-run"},
 	})

 	length, err := th.QueueStore.Len(th.Context, dag.ProcGroup())
 	require.NoError(t, err)
 	assert.Equal(t, 0, length)
+
+	attempt, err := th.DAGRunStore.FindAttempt(th.Context, exec.NewDAGRunRef(dag.Name, "valid-run"))
+	require.NoError(t, err)
+	status, err := attempt.ReadStatus(th.Context)
+	require.NoError(t, err)
+	require.NotNil(t, status)
+	assert.NotEqual(t, core.Queued, status.Status)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmd/dequeue_test.go` around lines 164 - 172, After calling
th.RunCommand(t, cmd.Dequeue(), ...) and asserting the queue length, also fetch
the dequeued run by its identifier (e.g., "valid-run") from the persisted store
and assert its persisted status is updated to the dequeued/processed state (not
still "queued"); use the test helpers/store methods available in the test
fixture (e.g., th.RunStore.Get or equivalent) to load the run and assert its
Status (or PersistedStatus) value, so the test verifies the run itself was
dequeued and not left in queued state.
ui/src/features/queues/hooks/useQueuedItemsFeed.ts (2)

169-183: Redundant try/catch blocks around loadPage calls.

The loadPage function already handles all errors internally (sets error state, resets loading flags). The outer try/catch in loadMore and reload will only catch synchronous exceptions, which are unlikely given the async nature of the function.

These try/catch blocks could be removed unless you anticipate synchronous throws from loadPage.

♻️ Simplified wrappers without redundant error handling
-  const loadMore = React.useCallback(async () => {
-    try {
-      await loadPage(false);
-    } catch (error) {
-      setError(getQueueFeedError(error, 'Failed to load more queued items.'));
-    }
-  }, [loadPage]);
+  const loadMore = React.useCallback(() => loadPage(false), [loadPage]);

-  const reload = React.useCallback(async () => {
-    try {
-      await loadPage(true);
-    } catch (error) {
-      setError(getQueueFeedError(error, 'Failed to reload queued items.'));
-    }
-  }, [loadPage]);
+  const reload = React.useCallback(() => loadPage(true), [loadPage]);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/queues/hooks/useQueuedItemsFeed.ts` around lines 169 - 183,
The try/catch wrappers in the useQueuedItemsFeed hooks are redundant because
loadPage already handles errors and loading state; update the loadMore and
reload callbacks to simply call await loadPage(false) and await loadPage(true)
respectively (without catching and calling setError/getQueueFeedError) so you
don't duplicate error handling—leave the React.useCallback and dependency on
loadPage intact and remove the inner catch blocks and setError calls.

22-36: getQueueFeedError helper is only used once.

This helper function is defined but only called once (line 147). Consider inlining it or removing it if you simplify the error handling per the previous comment.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/queues/hooks/useQueuedItemsFeed.ts` around lines 22 - 36,
getQueueFeedError is defined but only used once in useQueuedItemsFeed; remove
the standalone getQueueFeedError function and inline its logic at the call site
inside useQueuedItemsFeed (replace the call to getQueueFeedError(error,
fallback) with the equivalent inline extraction: check if error instanceof Error
&& error.message, else if error is object with a string 'message' property and
non-empty, else use fallback). Ensure you remove the getQueueFeedError
declaration to avoid dead code and keep the same return semantics.
ui/src/pages/queues/queue/index.tsx (1)

36-59: Consider stabilizing onLoadMore callback reference.

The useAutoLoadMore hook includes onLoadMore in its dependency array. If the callback passed to it is not memoized (e.g., an inline arrow function), the IntersectionObserver will be disconnected and reconnected on every render, which could cause performance issues or missed intersection events.

At line 147, () => void loadMore() is passed as an inline function, which creates a new reference each render.

♻️ Proposed fix

Wrap the callback with useCallback:

+  const handleLoadMore = React.useCallback(() => {
+    void loadMore();
+  }, [loadMore]);
+
   useAutoLoadMore(
     sentinelRef,
     hasMore && !isLoadingMore && !queuedItemsError,
-    () => void loadMore()
+    handleLoadMore
   );
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/pages/queues/queue/index.tsx` around lines 36 - 59, The hook
useAutoLoadMore is re-creating the IntersectionObserver whenever onLoadMore
changes; stabilize the callback passed into it by memoizing the loadMore handler
(the function passed at line 147 where "() => void loadMore()" is used) with
React.useCallback so its reference is stable across renders, then pass that
memoized function into useAutoLoadMore instead of an inline lambda; ensure the
dependency array for the memoized loadMore includes only the values it truly
depends on so the callback doesn't change unnecessarily.
internal/core/exec/queue_abort.go (1)

44-68: Consider logging or handling the case when Hide fails after status update.

After the successful CAS at lines 45-58, the attempt status is already written as Aborted. If Hide fails at line 66, the attempt remains visible but marked as aborted. While this isn't necessarily incorrect (the abort succeeded), it may be worth logging the intermediate state for debugging purposes.

The current error wrapping is appropriate, but ensure callers understand that a Hide failure doesn't mean the abort failed—just that cleanup was incomplete.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/exec/queue_abort.go` around lines 44 - 68, After the CAS that
sets the DAGRun status to Aborted (CompareAndSwapLatestAttemptStatus and the
anonymous func), handle failures from attempt.Hide by emitting a warning that
the abort succeeded but cleanup (Hide) failed: call your logger (or log.Printf)
with context including attempt.ID(), dagRun.ID (or other identifying fields) and
the error before returning the wrapped error (fmt.Errorf("hide aborted attempt:
%w", err)); keep existing return behavior but add the warning log to make it
clear callers/readers that the status update succeeded even though Hide failed.
api/v1/api.yaml (1)

3175-3182: Specify the queue ordering behind nextCursor.

forward-only and next page still leave clients guessing whether this feed is returned in queue order or reverse order. Documenting the stable sort order here would make cursor handling unambiguous.

Also applies to: 6923-6930, 8955-8966

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/v1/api.yaml` around lines 3175 - 3182, Update the OpenAPI docs for the
listQueueItems operation (operationId "listQueueItems") to explicitly state the
deterministic sort order used when returning a forward-only page and when
providing "nextCursor" (the QueueListCursor parameter): specify whether items
are returned oldest-first or newest-first, how ties are broken (e.g., by ID or
timestamp), and that the cursor advances in that same stable order; apply the
same clarification to the other affected operations referenced (the other
endpoints using the QueueListCursor parameter) so clients can unambiguously
implement cursor paging.
internal/service/frontend/api/v1/queues.go (1)

29-46: Return queues in a deterministic order.

Now that collectQueues returns a map, this handler ranges it directly. That makes queue ordering nondeterministic between calls, which can cause avoidable UI/SSE reshuffles.

♻️ Proposed change
 import (
 	"context"
 	"errors"
 	"fmt"
 	"log/slog"
+	"sort"
 	"time"
@@
-	queues := make([]api.Queue, 0, len(queueMap))
+	queueNames := make([]string, 0, len(queueMap))
+	for name := range queueMap {
+		queueNames = append(queueNames, name)
+	}
+	sort.Strings(queueNames)
+
+	queues := make([]api.Queue, 0, len(queueNames))
 	var totalRunning, totalQueued, totalCapacity int
-	for _, q := range queueMap {
+	for _, name := range queueNames {
+		q := queueMap[name]
 		queue, err := a.toQueueResource(ctx, q)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/api/v1/queues.go` around lines 29 - 46,
collectQueues returns a map so iterating it produces nondeterministic order;
change the handler to build a sorted slice of queue names from queueMap, sort it
(e.g., sort.Strings on the keys), then iterate over the sorted keys and use
queueMap[key] when calling toQueueResource and accumulating
totalRunning/totalQueued/totalCapacity before appending to queues; this ensures
deterministic ordering of the queues in the response while keeping the existing
logic in toQueueResource and the totals calculations.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@api/v1/api.yaml`:
- Around line 3172-3207: The change repurposes the existing /queues/{name}/items
endpoint and operationId listQueueItems to a queued-only cursor pagination
format, which breaks existing clients; restore backward compatibility by either
creating a new endpoint/operationId (e.g., /queues/{name}/backlog or a new
operationId) for the backlog/queued-only cursor API or revert the updated
contract for /queues/{name}/items so it continues to return the prior
page/cursor schema (QueuedDAGRunsPageResponse) until consumers migrate; apply
the same approach to the other affected occurrences referenced (lines around the
other two locations) so you do not change the existing path or operationId
in-place.

In `@internal/cmd/dequeue.go`:
- Around line 117-135: queueNameForDAGRun currently returns early on
ErrDAGRunIDNotFound or ErrNoStatusData which prevents removing orphaned queue
items; change the error handling so that if queueNameForDAGRun(ctx, dagRun)
returns ErrDAGRunIDNotFound or ErrNoStatusData you set actualQueueName =
requestedQueueName and continue (rather than returning
mapAbortQueuedDAGRunError), otherwise keep the existing return behavior; then
proceed to call withQueueProcLock(ctx, actualQueueName, ...) and do the
DequeueByDAGRunID / AbortQueuedDAGRun logic as before so orphaned items in the
named queue can be cleared (reference symbols: queueNameForDAGRun,
ErrDAGRunIDNotFound, ErrNoStatusData, requestedQueueName, actualQueueName,
withQueueProcLock, ctx.QueueStore.DequeueByDAGRunID, exec.AbortQueuedDAGRun,
mapAbortQueuedDAGRunError, dequeueFirst).

In `@internal/cmd/remote_client.go`:
- Around line 226-228: The helper currently always sets params["limit"] =
fmt.Sprintf("%d", limit), forcing callers who pass the zero value to send
limit=0; change the code in the function that builds params (where params is
declared) to only add the "limit" key when limit != 0 so zero-value callers can
rely on the server default. Use the same formatting (fmt.Sprintf or
strconv.Itoa) when adding the key, but skip adding it for limit == 0.

In `@internal/cmd/retry.go`:
- Around line 94-119: The code incorrectly always calls
ctx.DAGRunStore.FindSubAttempt when rootRun is non-zero, bypassing the existing
helper findRetryAttempt that treats rootRun.ID == ref.ID as a top-level lookup;
update the branch so both paths reuse findRetryAttempt(ref, rootRun) for the
initial attempt lookup (instead of directly calling FindSubAttempt) and then
apply queueDispatchRetryRequested()/normalizeQueueDispatchRetryLookupError to
the returned error exactly as currently done, preserving the existing
error-wrapping behavior and the queue-dispatch short-circuit (i.e., replace the
direct FindSubAttempt call with a call to findRetryAttempt and keep the
subsequent error handling logic).

In `@internal/core/exec/queue.go`:
- Around line 128-131: The MockQueueStore.ListCursor method can panic when
args.Get(0) is nil; change its return logic to mirror other mocks (e.g.,
DequeueByName, List) by first retrieving v := args.Get(0), if v == nil return
zero-value CursorResult[QueuedItemData] and args.Error(1), otherwise assert and
return v.(CursorResult[QueuedItemData]) and args.Error(1); this avoids a nil
type-assertion panic while preserving the mocked error return.

In `@internal/runtime/subcmd.go`:
- Around line 279-291: The Retry and TaskRetry builders currently inherit the
process environment and can propagate the DAGU_QUEUE_DISPATCH_RETRY key into
plain retries; update the Retry and TaskRetry implementations to filter their
resulting CmdSpec.Env (and any os.Environ-based env construction) to remove any
entry whose key equals exec1.EnvKeyQueueDispatchRetry before returning the spec,
and leave the QueueDispatchRetry and QueueDispatchTaskRetry wrappers unchanged
so they explicitly append exec1.EnvKeyQueueDispatchRetry+"=1" only for
queue-dispatch retries.

In `@internal/service/scheduler/queue_processor.go`:
- Around line 593-596: The retry loop is returning a raw
*exec.StaleQueueDispatchError which backoff.Retry treats as retryable; detect
that error (the existing errors.As(err, &staleErr) check) and return it wrapped
as a permanent error (e.g., using backoff.Permanent) so the retry loop exits
immediately for stale dispatches instead of waiting through the retry schedule.

In `@ui/src/features/agent/components/__tests__/AgentChatModal.test.tsx`:
- Around line 1-2: The file AgentChatModal.test.tsx is missing the repository
GPL v3 license header; add the standard GPL v3 banner at the very top of the
file (above the import statements) — either run the repository tooling `make
addlicense` to apply the canonical header or insert the project's GPL v3 license
comment block manually so the test file has the same license banner as other
source files.

In `@ui/src/features/queues/components/QueueCard.tsx`:
- Around line 27-33: The h3 in QueueCard currently uses the "truncate" class
which hides distinguishing parts of queue.name; replace "truncate" with
"whitespace-normal break-words" on the h3 that renders {queue.name} so long
names wrap and don't overflow (update the className on the h3 element inside the
QueueCard component accordingly to follow ui/**/*.{ts,tsx} guidelines).

In `@ui/src/features/queues/hooks/useQueueBatchDequeue.ts`:
- Around line 12-14: The snapshot currently only saves selectedRuns
(ActiveBatch.snapshot) but not the originating remoteNode, allowing requests to
be routed to whatever node is active when the dialog submits; update the
ActiveBatch type to include remoteNode, capture the remoteNode when creating the
snapshot inside useQueueBatchDequeue, and then use that stored remoteNode when
calling the APIs (the dequeue/bulk action functions invoked from
useQueueBatchDequeue) so every request includes the original remoteNode
parameter and routes to the correct node.

---

Outside diff comments:
In `@ui/src/api/v1/schema.ts`:
- Around line 1324-1343: Update the OpenAPI spec to add a DELETE operation for
the path "/queues/{name}/items" (so the generated client exposes a typed
delete/clear method, e.g. operationId matching the server handler such as
"clearQueueItems" or "deleteQueueItems"), include the optional remoteNode
parameter on that DELETE operation, then run the generation command (pnpm
gen:api) to regenerate ui/src/api/v1/schema.ts so the schema contains both the
GET (listQueueItems) and the new DELETE operation for "/queues/{name}/items".

In `@ui/src/App.tsx`:
- Around line 1-18: This TSX file is missing the required GPL v3 header; add the
standard GPL v3 license header to the top of App.tsx (above the first import
such as "import { Theme } from '@radix-ui/themes'") by running the repository
tooling: execute make addlicense to apply the GPL header automatically, or if
applying manually, insert the project's canonical GPL v3 comment block at file
top so the file (which contains imports like Theme, React, BrowserRouter,
SWRConfig) carries the required license header.

In `@ui/src/features/agent/components/AgentChatModal.tsx`:
- Line 1: This file (AgentChatModal.tsx) is missing the project's GPL v3 license
banner; add the repository's required GPL v3 header at the very top of the file
(above the import statements) consistent with other source files and the
project's formatting rules—either run the repository tooling (make addlicense)
to insert the standard header automatically or paste the canonical GPL v3 header
used elsewhere in the repo so the file (AgentChatModal component) includes the
correct license banner.

In `@ui/src/features/cockpit/components/MobileKanbanBoard.tsx`:
- Around line 1-6: The file MobileKanbanBoard.tsx is missing the required GPL v3
license header; run the repository's license tooling (e.g., execute the make
target used elsewhere: "make addlicense") or manually prepend the same GPL v3
header used in other files (for example match the header in
KanbanCounts.test.tsx) to the top of MobileKanbanBoard.tsx so the file contains
the standard license block before the imports; ensure the header is exact and
committed.

In `@ui/src/pages/queues/index.tsx`:
- Around line 1-15: The modified TSX file (the one that imports Layers, Search,
QueueList, useQueuesListSSE and useSSECacheSync) is missing the repository's GPL
v3 license header; run the repository tool to apply it by executing make
addlicense (or manually prepend the project's GPL v3 header template) so the top
of the file contains the correct license block, then stage and commit the
change.

---

Nitpick comments:
In `@api/v1/api.yaml`:
- Around line 3175-3182: Update the OpenAPI docs for the listQueueItems
operation (operationId "listQueueItems") to explicitly state the deterministic
sort order used when returning a forward-only page and when providing
"nextCursor" (the QueueListCursor parameter): specify whether items are returned
oldest-first or newest-first, how ties are broken (e.g., by ID or timestamp),
and that the cursor advances in that same stable order; apply the same
clarification to the other affected operations referenced (the other endpoints
using the QueueListCursor parameter) so clients can unambiguously implement
cursor paging.

In `@internal/cmd/dequeue_test.go`:
- Around line 164-172: After calling th.RunCommand(t, cmd.Dequeue(), ...) and
asserting the queue length, also fetch the dequeued run by its identifier (e.g.,
"valid-run") from the persisted store and assert its persisted status is updated
to the dequeued/processed state (not still "queued"); use the test helpers/store
methods available in the test fixture (e.g., th.RunStore.Get or equivalent) to
load the run and assert its Status (or PersistedStatus) value, so the test
verifies the run itself was dequeued and not left in queued state.

In `@internal/core/exec/queue_abort.go`:
- Around line 44-68: After the CAS that sets the DAGRun status to Aborted
(CompareAndSwapLatestAttemptStatus and the anonymous func), handle failures from
attempt.Hide by emitting a warning that the abort succeeded but cleanup (Hide)
failed: call your logger (or log.Printf) with context including attempt.ID(),
dagRun.ID (or other identifying fields) and the error before returning the
wrapped error (fmt.Errorf("hide aborted attempt: %w", err)); keep existing
return behavior but add the warning log to make it clear callers/readers that
the status update succeeded even though Hide failed.

In `@internal/service/frontend/api/v1/queues.go`:
- Around line 29-46: collectQueues returns a map so iterating it produces
nondeterministic order; change the handler to build a sorted slice of queue
names from queueMap, sort it (e.g., sort.Strings on the keys), then iterate over
the sorted keys and use queueMap[key] when calling toQueueResource and
accumulating totalRunning/totalQueued/totalCapacity before appending to queues;
this ensures deterministic ordering of the queues in the response while keeping
the existing logic in toQueueResource and the totals calculations.

In `@ui/src/features/agent/components/__tests__/AgentChatModal.test.tsx`:
- Around line 55-57: The SessionSidebar mock currently returns null so the "new"
selection path isn't exercised; update the mock in AgentChatModal.test.tsx (and
the similar block at 154-166) to render a minimal component that accepts an
onSelectSession prop and calls it with "new" in a test-controlled way; then add
an assertion that invoking onSelectSession('new') triggers the component's
clearSession/handleClearSession flow and prevents auto-select (assert
clearSession was called and that the auto-select behavior did not run). Use the
exact prop name onSelectSession and the handler/spy names clearSession or
handleClearSession from the test to locate and hook into the right functions.

In `@ui/src/features/queues/hooks/useQueuedItemsFeed.ts`:
- Around line 169-183: The try/catch wrappers in the useQueuedItemsFeed hooks
are redundant because loadPage already handles errors and loading state; update
the loadMore and reload callbacks to simply call await loadPage(false) and await
loadPage(true) respectively (without catching and calling
setError/getQueueFeedError) so you don't duplicate error handling—leave the
React.useCallback and dependency on loadPage intact and remove the inner catch
blocks and setError calls.
- Around line 22-36: getQueueFeedError is defined but only used once in
useQueuedItemsFeed; remove the standalone getQueueFeedError function and inline
its logic at the call site inside useQueuedItemsFeed (replace the call to
getQueueFeedError(error, fallback) with the equivalent inline extraction: check
if error instanceof Error && error.message, else if error is object with a
string 'message' property and non-empty, else use fallback). Ensure you remove
the getQueueFeedError declaration to avoid dead code and keep the same return
semantics.

In `@ui/src/pages/queues/queue/index.tsx`:
- Around line 36-59: The hook useAutoLoadMore is re-creating the
IntersectionObserver whenever onLoadMore changes; stabilize the callback passed
into it by memoizing the loadMore handler (the function passed at line 147 where
"() => void loadMore()" is used) with React.useCallback so its reference is
stable across renders, then pass that memoized function into useAutoLoadMore
instead of an inline lambda; ensure the dependency array for the memoized
loadMore includes only the values it truly depends on so the callback doesn't
change unnecessarily.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 776dcae2-3a92-4d33-b5c6-b1bfbb34ea1d

📥 Commits

Reviewing files that changed from the base of the PR and between 03269d2 and ae26774.

📒 Files selected for processing (58)
  • api/v1/api.gen.go
  • api/v1/api.yaml
  • internal/cmd/dequeue.go
  • internal/cmd/dequeue_test.go
  • internal/cmd/remote_client.go
  • internal/cmd/remote_commands.go
  • internal/cmd/retry.go
  • internal/cmd/retry_internal_test.go
  • internal/cmd/retry_test.go
  • internal/cmn/backoff/retry.go
  • internal/cmn/telemetry/collector_test.go
  • internal/core/exec/env.go
  • internal/core/exec/queue.go
  • internal/core/exec/queue_abort.go
  • internal/core/exec/queue_abort_test.go
  • internal/core/exec/stale_queue_dispatch.go
  • internal/persis/filedagrun/attempt.go
  • internal/persis/filequeue/index.go
  • internal/persis/filequeue/queue.go
  • internal/persis/filequeue/queue_test.go
  • internal/persis/filequeue/queuefile.go
  • internal/persis/filequeue/queuefile_test.go
  • internal/persis/filequeue/store.go
  • internal/persis/filequeue/store_test.go
  • internal/runtime/subcmd.go
  • internal/runtime/subcmd_test.go
  • internal/service/coordinator/client.go
  • internal/service/coordinator/client_test.go
  • internal/service/coordinator/handler.go
  • internal/service/coordinator/handler_test.go
  • internal/service/frontend/api/v1/dagruns.go
  • internal/service/frontend/api/v1/queue_ops.go
  • internal/service/frontend/api/v1/queues.go
  • internal/service/frontend/api/v1/queues_internal_test.go
  • internal/service/frontend/server.go
  • internal/service/scheduler/dag_executor.go
  • internal/service/scheduler/queue_processor.go
  • internal/service/scheduler/queue_processor_startup_test.go
  • internal/service/worker/handler.go
  • ui/src/App.tsx
  • ui/src/api/v1/schema.ts
  • ui/src/features/agent/components/AgentChatModal.tsx
  • ui/src/features/agent/components/__tests__/AgentChatModal.test.tsx
  • ui/src/features/cockpit/components/KanbanColumn.tsx
  • ui/src/features/cockpit/components/MobileKanbanBoard.tsx
  • ui/src/features/cockpit/components/__tests__/KanbanCounts.test.tsx
  • ui/src/features/dag-runs/components/dag-run-list/DAGRunTable.tsx
  • ui/src/features/dag-runs/components/dag-run-list/__tests__/DAGRunTable.test.tsx
  • ui/src/features/queues/components/QueueCard.tsx
  • ui/src/features/queues/components/QueueList.tsx
  • ui/src/features/queues/components/QueueMetrics.tsx
  • ui/src/features/queues/components/QueueRunsTable.tsx
  • ui/src/features/queues/hooks/useQueueBatchDequeue.ts
  • ui/src/features/queues/hooks/useQueueSelection.ts
  • ui/src/features/queues/hooks/useQueuedItemsFeed.ts
  • ui/src/hooks/useQueueItemsSSE.ts
  • ui/src/pages/queues/index.tsx
  • ui/src/pages/queues/queue/index.tsx
💤 Files with no reviewable changes (3)
  • internal/service/frontend/server.go
  • ui/src/features/queues/components/QueueMetrics.tsx
  • ui/src/hooks/useQueueItemsSSE.ts

Comment thread api/v1/api.yaml
Comment on lines +3172 to +3207
/queues/{name}/items:
get:
summary: "Get queued items for a specific queue"
description: "Returns one forward-only page of queued DAG-runs for the specified queue."
operationId: "listQueueItems"
tags:
- "queues"
parameters:
- $ref: "#/components/parameters/RemoteNode"
- $ref: "#/components/parameters/QueueListLimit"
- $ref: "#/components/parameters/QueueListCursor"
- name: name
in: path
description: "Queue name"
required: true
schema:
type: string
responses:
"200":
description: "A successful response"
content:
application/json:
schema:
$ref: "#/components/schemas/QueuedDAGRunsPageResponse"
"400":
description: "Invalid cursor"
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
default:
description: "Generic error response"
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't repurpose listQueueItems in place.

Keeping the same path and operationId while switching this endpoint to queued-only cursor pagination is a hard breaking change for generated and hand-written clients. Prefer a new operation/path for the backlog feed, or keep the previous query/response contract until consumers migrate.

Also applies to: 6912-6930, 8955-8968

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/v1/api.yaml` around lines 3172 - 3207, The change repurposes the existing
/queues/{name}/items endpoint and operationId listQueueItems to a queued-only
cursor pagination format, which breaks existing clients; restore backward
compatibility by either creating a new endpoint/operationId (e.g.,
/queues/{name}/backlog or a new operationId) for the backlog/queued-only cursor
API or revert the updated contract for /queues/{name}/items so it continues to
return the prior page/cursor schema (QueuedDAGRunsPageResponse) until consumers
migrate; apply the same approach to the other affected occurrences referenced
(lines around the other two locations) so you do not change the existing path or
operationId in-place.

Comment thread internal/cmd/dequeue.go
Comment on lines +117 to +135
actualQueueName, err := queueNameForDAGRun(ctx, dagRun)
if err != nil {
return fmt.Errorf("failed to find the record for dag-run ID %s: %w", dagRun.ID, err)
return mapAbortQueuedDAGRunError(dagRun, err)
}

dagStatus, err := attempt.ReadStatus(ctx)
err = withQueueProcLock(ctx, actualQueueName, func() error {
if err := exec.AbortQueuedDAGRun(ctx.Context, ctx.DAGRunStore, dagRun); err != nil {
return err
}
if _, err := ctx.QueueStore.DequeueByDAGRunID(ctx.Context, actualQueueName, dagRun); err != nil {
if errors.Is(err, exec.ErrQueueItemNotFound) && actualQueueName == requestedQueueName {
return nil
}
return fmt.Errorf("failed to dequeue dag-run %s from queue %s: %w", dagRun.ID, actualQueueName, err)
}
return nil
})
if err != nil {
return fmt.Errorf("failed to read status: %w", err)
return mapAbortQueuedDAGRunError(dagRun, err)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fall back to requestedQueueName when the run record is already gone.

If queueNameForDAGRun hits ErrDAGRunIDNotFound or ErrNoStatusData, Line 119-120 returns before any queue-store delete happens. That makes dagu dequeue <queue> --dag-run=... unable to clear orphaned items from the named queue, even though dequeueFirst already treats the same cases as stale and drops them.

💡 Suggested fix
  actualQueueName, err := queueNameForDAGRun(ctx, dagRun)
  if err != nil {
+   if errors.Is(err, exec.ErrDAGRunIDNotFound) || errors.Is(err, exec.ErrNoStatusData) || errors.Is(err, exec.ErrCorruptedStatusFile) {
+     return withQueueProcLock(ctx, requestedQueueName, func() error {
+       if _, dequeueErr := ctx.QueueStore.DequeueByDAGRunID(ctx.Context, requestedQueueName, dagRun); dequeueErr != nil && !errors.Is(dequeueErr, exec.ErrQueueItemNotFound) {
+         return fmt.Errorf("failed to dequeue dag-run %s from queue %s: %w", dagRun.ID, requestedQueueName, dequeueErr)
+       }
+       return nil
+     })
+   }
    return mapAbortQueuedDAGRunError(dagRun, err)
  }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmd/dequeue.go` around lines 117 - 135, queueNameForDAGRun currently
returns early on ErrDAGRunIDNotFound or ErrNoStatusData which prevents removing
orphaned queue items; change the error handling so that if
queueNameForDAGRun(ctx, dagRun) returns ErrDAGRunIDNotFound or ErrNoStatusData
you set actualQueueName = requestedQueueName and continue (rather than returning
mapAbortQueuedDAGRunError), otherwise keep the existing return behavior; then
proceed to call withQueueProcLock(ctx, actualQueueName, ...) and do the
DequeueByDAGRunID / AbortQueuedDAGRun logic as before so orphaned items in the
named queue can be cleared (reference symbols: queueNameForDAGRun,
ErrDAGRunIDNotFound, ErrNoStatusData, requestedQueueName, actualQueueName,
withQueueProcLock, ctx.QueueStore.DequeueByDAGRunID, exec.AbortQueuedDAGRun,
mapAbortQueuedDAGRunError, dequeueFirst).

Comment thread internal/cmd/remote_client.go Outdated
Comment on lines +226 to +228
params := map[string]string{
"type": string(state),
"page": fmt.Sprintf("%d", page),
"perPage": fmt.Sprintf("%d", perPage),
"limit": fmt.Sprintf("%d", limit),
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Don't force limit=0 onto zero-value callers.

Line 227 always serializes limit, so callers that pass 0 can no longer fall back to the server's default page size. That changes this helper's zero-value behavior and can produce empty or rejected responses depending on handler validation.

🔧 Proposed fix
 func (c *remoteClient) listQueueItems(ctx context.Context, queueName string, limit int, cursor string) (*api.QueuedDAGRunsPageResponse, error) {
 	var out api.QueuedDAGRunsPageResponse
-	params := map[string]string{
-		"limit": fmt.Sprintf("%d", limit),
-	}
+	params := map[string]string{}
+	if limit > 0 {
+		params["limit"] = fmt.Sprintf("%d", limit)
+	}
 	if cursor != "" {
 		params["cursor"] = cursor
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
params := map[string]string{
"type": string(state),
"page": fmt.Sprintf("%d", page),
"perPage": fmt.Sprintf("%d", perPage),
"limit": fmt.Sprintf("%d", limit),
}
func (c *remoteClient) listQueueItems(ctx context.Context, queueName string, limit int, cursor string) (*api.QueuedDAGRunsPageResponse, error) {
var out api.QueuedDAGRunsPageResponse
params := map[string]string{}
if limit > 0 {
params["limit"] = fmt.Sprintf("%d", limit)
}
if cursor != "" {
params["cursor"] = cursor
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmd/remote_client.go` around lines 226 - 228, The helper currently
always sets params["limit"] = fmt.Sprintf("%d", limit), forcing callers who pass
the zero value to send limit=0; change the code in the function that builds
params (where params is declared) to only add the "limit" key when limit != 0 so
zero-value callers can rely on the server default. Use the same formatting
(fmt.Sprintf or strconv.Itoa) when adding the key, but skip adding it for limit
== 0.

Comment thread internal/cmd/retry.go
Comment on lines 94 to 119
ref := exec.NewDAGRunRef(name, dagRunID)
queueDispatchRetry := queueDispatchRetryRequested()
var attempt exec.DAGRunAttempt
if rootRun.Zero() {
attempt, err = ctx.DAGRunStore.FindAttempt(ctx, ref)
if queueDispatchRetry {
err = normalizeQueueDispatchRetryLookupError(err)
}
if err != nil {
if queueDispatchRetry {
return err
}
return fmt.Errorf("failed to find the record for dag-run ID %s: %w", dagRunID, err)
}
} else {
attempt, err = ctx.DAGRunStore.FindSubAttempt(ctx, rootRun, dagRunID)
if queueDispatchRetry {
err = normalizeQueueDispatchRetryLookupError(err)
}
if err != nil {
if queueDispatchRetry {
return err
}
return fmt.Errorf("failed to find the sub DAG record for dag-run ID %s under root %s: %w", dagRunID, rootRun, err)
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reuse findRetryAttempt for the first attempt lookup.

The new helper already treats rootRun.ID == ref.ID as a top-level retry target, but this earlier branch still routes any non-zero rootRun to FindSubAttempt. That makes retry --root=<same run ref> fail before the queue-dispatch guard runs, even though the later validation path would resolve the same input correctly.

Proposed fix
-	if rootRun.Zero() {
-		attempt, err = ctx.DAGRunStore.FindAttempt(ctx, ref)
-		if queueDispatchRetry {
-			err = normalizeQueueDispatchRetryLookupError(err)
-		}
-		if err != nil {
-			if queueDispatchRetry {
-				return err
-			}
-			return fmt.Errorf("failed to find the record for dag-run ID %s: %w", dagRunID, err)
-		}
-	} else {
-		attempt, err = ctx.DAGRunStore.FindSubAttempt(ctx, rootRun, dagRunID)
-		if queueDispatchRetry {
-			err = normalizeQueueDispatchRetryLookupError(err)
-		}
-		if err != nil {
-			if queueDispatchRetry {
-				return err
-			}
-			return fmt.Errorf("failed to find the sub DAG record for dag-run ID %s under root %s: %w", dagRunID, rootRun, err)
-		}
-	}
+	attempt, err = findRetryAttempt(ctx, ctx.DAGRunStore, ref, rootRun)
+	if queueDispatchRetry {
+		err = normalizeQueueDispatchRetryLookupError(err)
+	}
+	if err != nil {
+		if queueDispatchRetry {
+			return err
+		}
+		if rootRun.Zero() || rootRun.ID == ref.ID {
+			return fmt.Errorf("failed to find the record for dag-run ID %s: %w", dagRunID, err)
+		}
+		return fmt.Errorf("failed to find the sub DAG record for dag-run ID %s under root %s: %w", dagRunID, rootRun, err)
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmd/retry.go` around lines 94 - 119, The code incorrectly always
calls ctx.DAGRunStore.FindSubAttempt when rootRun is non-zero, bypassing the
existing helper findRetryAttempt that treats rootRun.ID == ref.ID as a top-level
lookup; update the branch so both paths reuse findRetryAttempt(ref, rootRun) for
the initial attempt lookup (instead of directly calling FindSubAttempt) and then
apply queueDispatchRetryRequested()/normalizeQueueDispatchRetryLookupError to
the returned error exactly as currently done, preserving the existing
error-wrapping behavior and the queue-dispatch short-circuit (i.e., replace the
direct FindSubAttempt call with a call to findRetryAttempt and keep the
subsequent error handling logic).

Comment on lines +128 to 131
func (m *MockQueueStore) ListCursor(ctx context.Context, name, cursor string, limit int) (CursorResult[QueuedItemData], error) {
args := m.Called(ctx, name, cursor, limit)
return args.Get(0).(CursorResult[QueuedItemData]), args.Error(1)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential panic on nil return in ListCursor mock.

Unlike other mock methods in this file (e.g., DequeueByName, List), ListCursor does not check for nil before the type assertion. If a test sets up the mock to return nil, this will panic.

🛡️ Proposed fix
 func (m *MockQueueStore) ListCursor(ctx context.Context, name, cursor string, limit int) (CursorResult[QueuedItemData], error) {
 	args := m.Called(ctx, name, cursor, limit)
+	if args.Get(0) == nil {
+		return CursorResult[QueuedItemData]{}, args.Error(1)
+	}
 	return args.Get(0).(CursorResult[QueuedItemData]), args.Error(1)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/exec/queue.go` around lines 128 - 131, The
MockQueueStore.ListCursor method can panic when args.Get(0) is nil; change its
return logic to mirror other mocks (e.g., DequeueByName, List) by first
retrieving v := args.Get(0), if v == nil return zero-value
CursorResult[QueuedItemData] and args.Error(1), otherwise assert and return
v.(CursorResult[QueuedItemData]) and args.Error(1); this avoids a nil
type-assertion panic while preserving the mocked error return.

Comment on lines +279 to +291
// QueueDispatchRetry creates a retry command spec for a scheduler-consumed queued run.
func (b *SubCmdBuilder) QueueDispatchRetry(dag *core.DAG, dagRunID string, stepName string) CmdSpec {
spec := b.Retry(dag, dagRunID, stepName)
spec.Env = append(spec.Env, exec1.EnvKeyQueueDispatchRetry+"=1")
return spec
}

// QueueDispatchTaskRetry creates a retry command spec for a worker-consumed queued run.
func (b *SubCmdBuilder) QueueDispatchTaskRetry(task *coordinatorv1.Task, envHints []string, dagName string) CmdSpec {
spec := b.TaskRetry(task, envHints, dagName)
spec.Env = append(spec.Env, exec1.EnvKeyQueueDispatchRetry+"=1")
return spec
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Keep the queue-dispatch marker from leaking into plain nested retries.

These wrappers build on Retry / TaskRetry, and those specs inherit os.Environ(). Once a queue-dispatch retry process is running, any later plain retry it spawns can inherit DAGU_QUEUE_DISPATCH_RETRY=1 even when the caller uses the non-queue builders, so the queue-only stale-dispatch guard can start firing outside scheduler dispatch. Please scrub this key in the default retry env path and only add it back in these queue-dispatch wrappers.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/runtime/subcmd.go` around lines 279 - 291, The Retry and TaskRetry
builders currently inherit the process environment and can propagate the
DAGU_QUEUE_DISPATCH_RETRY key into plain retries; update the Retry and TaskRetry
implementations to filter their resulting CmdSpec.Env (and any os.Environ-based
env construction) to remove any entry whose key equals
exec1.EnvKeyQueueDispatchRetry before returning the spec, and leave the
QueueDispatchRetry and QueueDispatchTaskRetry wrappers unchanged so they
explicitly append exec1.EnvKeyQueueDispatchRetry+"=1" only for queue-dispatch
retries.

Comment on lines +593 to +596
var staleErr *exec.StaleQueueDispatchError
if errors.As(err, &staleErr) {
return err
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make stale dispatches permanent inside the retry loop.

Returning the raw *exec.StaleQueueDispatchError here still looks retryable to backoff.Retry, so a cleared/stale queue item waits through the entire retry schedule before the outer errors.As block finally discards it. Mark it permanent at the source so stale dispatches exit the loop immediately.

Proposed fix
 			if err != nil {
 				var staleErr *exec.StaleQueueDispatchError
 				if errors.As(err, &staleErr) {
-					return err
+					return backoff.PermanentError(err)
 				}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
var staleErr *exec.StaleQueueDispatchError
if errors.As(err, &staleErr) {
return err
}
var staleErr *exec.StaleQueueDispatchError
if errors.As(err, &staleErr) {
return backoff.PermanentError(err)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/scheduler/queue_processor.go` around lines 593 - 596, The
retry loop is returning a raw *exec.StaleQueueDispatchError which backoff.Retry
treats as retryable; detect that error (the existing errors.As(err, &staleErr)
check) and return it wrapped as a permanent error (e.g., using
backoff.Permanent) so the retry loop exits immediately for stale dispatches
instead of waiting through the retry schedule.

Comment on lines +1 to +2
import { cleanup, fireEvent, render, screen, waitFor } from '@testing-library/react';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add the GPL header at the top of this file.

New source files should carry the repository's license banner too.

As per coding guidelines, "Apply GPL v3 license headers on source files, managed via make addlicense."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/agent/components/__tests__/AgentChatModal.test.tsx` around
lines 1 - 2, The file AgentChatModal.test.tsx is missing the repository GPL v3
license header; add the standard GPL v3 banner at the very top of the file
(above the import statements) — either run the repository tooling `make
addlicense` to apply the canonical header or insert the project's GPL v3 license
comment block manually so the test file has the same license banner as other
source files.

Comment on lines +27 to +33
<div className="min-w-0">
<h3 className="truncate text-base font-semibold text-foreground">
{queue.name}
</h3>
<p className="mt-1 text-sm text-muted-foreground">
{formatActivityLine(runningCount, queuedCount)}
</p>
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Let queue names wrap instead of truncating them.

The queue name is the only stable identifier on this card, so truncate makes similarly-prefixed queues hard to tell apart. Use whitespace-normal break-words here so long names stay readable without overflowing.

Proposed fix
-          <h3 className="truncate text-base font-semibold text-foreground">
+          <h3 className="whitespace-normal break-words text-base font-semibold text-foreground">
             {queue.name}
           </h3>

As per coding guidelines, ui/**/*.{ts,tsx}: Always handle long text in tables and lists with whitespace-normal break-words to prevent layout overflow.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
<div className="min-w-0">
<h3 className="truncate text-base font-semibold text-foreground">
{queue.name}
</h3>
<p className="mt-1 text-sm text-muted-foreground">
{formatActivityLine(runningCount, queuedCount)}
</p>
<div className="min-w-0">
<h3 className="whitespace-normal break-words text-base font-semibold text-foreground">
{queue.name}
</h3>
<p className="mt-1 text-sm text-muted-foreground">
{formatActivityLine(runningCount, queuedCount)}
</p>
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/queues/components/QueueCard.tsx` around lines 27 - 33, The h3
in QueueCard currently uses the "truncate" class which hides distinguishing
parts of queue.name; replace "truncate" with "whitespace-normal break-words" on
the h3 that renders {queue.name} so long names wrap and don't overflow (update
the className on the h3 element inside the QueueCard component accordingly to
follow ui/**/*.{ts,tsx} guidelines).

Comment on lines +12 to +14
type ActiveBatch = {
snapshot: QueueSelectionItem[];
};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Snapshot remoteNode with the batch.

This hook pins the selected rows, but not the node they came from. If the user switches nodes after opening the dialog and before submitting, the requests are sent to the new node instead of the one that produced selectedRuns, so the batch can report false skips/successes against the wrong queue.

💡 Suggested fix
 type ActiveBatch = {
+  remoteNode: string;
   snapshot: QueueSelectionItem[];
 };
@@
   const openBatchDialog = React.useCallback(() => {
     if (selectedRuns.length === 0) {
       return;
     }
     setActiveBatch({
+      remoteNode,
       snapshot: [...selectedRuns],
     });
     setPhase('confirm');
     setProgress(createEmptyProgress());
-  }, [selectedRuns]);
+  }, [selectedRuns, remoteNode]);
@@
-  const submitBatchItem = React.useCallback(
-    async (dagRun: QueueSelectionItem): Promise<QueueBatchResult> => {
+  const submitBatchItem = React.useCallback(
+    async (
+      dagRun: QueueSelectionItem,
+      batchRemoteNode: string
+    ): Promise<QueueBatchResult> => {
@@
-              remoteNode,
+              remoteNode: batchRemoteNode,
@@
-    [client, remoteNode]
+    [client]
   );
@@
-    const { snapshot } = activeBatch;
+    const { snapshot, remoteNode: batchRemoteNode } = activeBatch;
@@
-      const result = await submitBatchItem(dagRun);
+      const result = await submitBatchItem(dagRun, batchRemoteNode);

As per coding guidelines, "All API calls MUST include the remoteNode parameter to route requests to the correct node in multi-node Dagu deployments".

Also applies to: 96-117, 119-167, 170-205

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/queues/hooks/useQueueBatchDequeue.ts` around lines 12 - 14,
The snapshot currently only saves selectedRuns (ActiveBatch.snapshot) but not
the originating remoteNode, allowing requests to be routed to whatever node is
active when the dialog submits; update the ActiveBatch type to include
remoteNode, capture the remoteNode when creating the snapshot inside
useQueueBatchDequeue, and then use that stored remoteNode when calling the APIs
(the dequeue/bulk action functions invoked from useQueueBatchDequeue) so every
request includes the original remoteNode parameter and routes to the correct
node.

@yottahmd yottahmd changed the title fix: add server-side queue clear API feat: improve queue browsing and dequeue reliability Apr 5, 2026
@yottahmd yottahmd merged commit 33540a8 into main Apr 5, 2026
5 checks passed
@yottahmd yottahmd deleted the fix/server-side-queue-clear branch April 5, 2026 10:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant