Skip to content

fix: harden agent chat SSE transport#1781

Merged
yottahmd merged 12 commits intomainfrom
bugfix/sse-manager
Mar 16, 2026
Merged

fix: harden agent chat SSE transport#1781
yottahmd merged 12 commits intomainfrom
bugfix/sse-manager

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Mar 16, 2026

Summary

  • move live agent chat off multiplexed /api/v1/events/stream onto the dedicated /api/v1/agent/sessions/{id}/stream transport
  • reactivate persisted agent sessions for direct streaming, merge stream updates incrementally, and harden reconnect handling in the UI
  • fix stale SSE topic mutation races and remove the legacy multiplexed agent: topic compatibility path

Testing

  • go test ./internal/agent ./internal/service/frontend/sse -count=1
  • go test ./internal/service/frontend/... -run ^ -count=1
  • pnpm vitest run src/hooks/tests/SSEManager.test.ts src/features/agent/hooks/tests/useSSEConnection.test.tsx src/features/agent/hooks/tests/useAgentChat.test.tsx
  • pnpm typecheck
  • pnpm build

Summary by CodeRabbit

  • New Features

    • Sessions now show custom titles, creation/last-activity timestamps, and per-session total cost in lists and details.
    • Cockpit: new consolidated DAG run hook provides per-date Kanban columns and loading state.
  • Improvements

    • Streaming/polling: more reliable fallback and multiplexed stream handling for session updates.
    • UI: submit buttons show loading state; chat auto-scroll improved; reduced unnecessary background revalidation.
  • Bug Fixes

    • More accurate cost accounting including delegate/sub-session costs.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 16, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b32c48d0-8804-4663-af0c-ea226b4627e5

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Refactors SSE streaming and multiplexing by removing the hub/watcher/client/proxy pieces, replaces agent SSE with a multiplexed/polling flow, and extends session management to persist and surface delegate snapshots and cumulative session costs (TotalCost), plus related tests and UI hook adjustments.

Changes

Cohort / File(s) Summary
Agent API & Session Costing
internal/agent/api.go, internal/agent/session.go, internal/agent/session_test.go, internal/agent/api_test.go
Add userIdentityFromContext; change reactivation to load delegate snapshots and pricing (Input/OutputCostPer1M); add cost-summing helpers (getStoredSessionCost, sum*MessageCosts); populate TotalCost for stored sessions and attach Delegates; extend SessionManager/SessionManagerConfig and session snapshots with Title/CreatedAt/LastActivity/Delegates.
SSE Core Removal (server-side)
internal/service/frontend/server.go, internal/service/frontend/sse/... (many files removed: hub.go, handler.go, client.go, proxy.go, watcher.go, metrics.go, tests under internal/service/frontend/sse/*)
Remove the SSE hub, watcher, client, handler, proxying and many tests; replace hub usage with a Multiplexer in server wiring; remove TopicTypeAgent constant and legacy deprecation headers; add ErrStreamingNotSupported, streamResponse, parseAndSanitizeQuery, and other small utilities.
SSE fetcher/multiplex tests & topic parsing
internal/service/frontend/sse/multiplex_test.go, internal/service/frontend/sse/proxy_test.go, internal/service/frontend/sse/topic_parse.go, internal/service/frontend/sse/types_test.go
Update tests to use queueitems:default and TopicTypeQueueItems; remove or adapt agent topic parsing; delete proxy tests for removed proxy logic; adjust canonicalizeTopicIdentifier to treat agent via default case.
Frontend SSE & hooks (UI)
ui/src/hooks/SSEManager.ts, ui/src/hooks/__tests__/SSEManager.test.ts, ui/src/features/agent/hooks/useSSEConnection.ts, ui/src/features/agent/hooks/__tests__/useSSEConnection.test.tsx
Change SSEManager: remove buildAgentSessionTopic export, add stale-mutation detection to mutation flow; replace SSE streaming in agent hook with disabled-agent approach returning isSessionLive:false and unify callbacks to onEvent; update tests accordingly.
Agent chat & delegate management (UI)
ui/src/features/agent/hooks/useAgentChat.ts, ui/src/features/agent/hooks/useDelegateManager.ts, ui/src/features/agent/hooks/__tests__/useAgentChat.test.tsx
Introduce incremental event handling (applySessionEvent, mergeMessages), replace handleDelegateSnapshots with reconcileDelegateSnapshots, remove restoreDelegates, add refs for polling and apply/ref wrappers, and add tests for reactivation, merging, and pricing propagation.
SSE cache/polling sync (UI)
ui/src/hooks/useSSECacheSync.ts, ui/src/App.tsx
Add SSE "settling" detection to suppress polling during SSE handshake; disable SWR revalidation on focus/reconnect via SWRConfig changes in AppInner.
Cockpit UI: Kanban & hooks
ui/src/features/cockpit/hooks/useCockpitDagRuns.ts, ui/src/features/cockpit/components/DateKanbanList.tsx, ui/src/features/cockpit/components/DateKanbanSection.tsx, ui/src/features/cockpit/components/TemplateSelector.tsx
Add useCockpitDagRuns hook to consolidate per-date DAG run fetching and partitioning; refactor DateKanbanList/Section to accept columns and loading state; defer TemplateSelector queries until dropdown opened.
Minor UI component tweaks
ui/src/features/agent/components/*, ui/src/features/agent/hooks/useDelegateManager.ts
UI improvements: loading states for approval/prompt submit buttons, auto-scroll dependency expansion, and export renames (reconcileDelegateSnapshots exported).

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant EventSource
    participant Server
    participant SessionStore
    participant SessionMgr

    Client->>EventSource: open(stream URL)
    EventSource->>Server: GET /api/v1/events/stream (multiplex)
    Server->>SessionStore: getOrReactivateSession(sessionID)
    SessionStore->>SessionMgr: construct SessionManager(cfg: Title, CreatedAt, LastActivity, Delegates, pricing)
    SessionMgr->>Server: return active session manager
    Server->>EventSource: initial snapshot (includes Delegates, Title, TotalCost for stored)
    loop streaming events
        Server->>EventSource: StreamResponse (event or incremental)
        EventSource->>Client: onEvent(payload)
        Client->>Client: applySessionEvent / mergeMessages / reconcileDelegateSnapshots
    end
    Client->>EventSource: close
    EventSource->>Server: connection closed
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.45% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title "fix: harden agent chat SSE transport" directly and clearly describes the primary change—hardening the agent chat SSE transport layer by moving to a dedicated endpoint and improving reconnect handling.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch bugfix/sse-manager
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
ui/src/features/agent/hooks/__tests__/useSSEConnection.test.tsx (1)

5-52: MockEventSource is missing removeEventListener method.

If the hook under test calls removeEventListener during cleanup or reconnection, the mock will throw. Consider adding it for completeness, even if currently unused.

♻️ Suggested addition
   addEventListener(
     type: string,
     listener: (event: MessageEvent<string>) => void
   ) {
     const listeners = this.listeners.get(type) ?? new Set();
     listeners.add(listener);
     this.listeners.set(type, listeners);
   }
+
+  removeEventListener(
+    type: string,
+    listener: (event: MessageEvent<string>) => void
+  ) {
+    this.listeners.get(type)?.delete(listener);
+  }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/agent/hooks/__tests__/useSSEConnection.test.tsx` around lines
5 - 52, The MockEventSource class used in tests lacks a removeEventListener
method which will cause failures if the hook calls it during cleanup or
reconnection; add a removeEventListener(type: string, listener: (event:
MessageEvent<string>) => void) that looks up the Set in the private listeners
map and removes the provided listener (and deletes the Set if empty), ensuring
behavior mirrors addEventListener and preventing thrown errors when
useSSEConnection or tests call removeEventListener on MockEventSource.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/agent/api.go`:
- Around line 553-556: The struct is initializing LastActivity from persisted
sess.UpdatedAt which lets reactivated sessions appear idle and be evicted;
change the runtime initialization to set LastActivity to the current time (e.g.,
time.Now()) when creating/reactivating the session instead of sess.UpdatedAt so
reactivated sessions start with fresh activity. Update the initializer that sets
LastActivity (near Title/CreatedAt/OnMessage and the createMessageCallback(id)
usage) to use the current timestamp when bringing a session back into an active
manager.

In `@ui/src/hooks/SSEManager.ts`:
- Around line 589-595: The stale-check currently performed using conn.sessionId
!== mutationSessionId || conn.eventSource !== mutationEventSource happens before
awaiting response.json(), so if a reconnect occurs during parsing the old
mutation may still be applied; after awaiting response.json() (the JSON parse of
the SSE message) re-run the same staleness check (comparing conn.sessionId and
conn.eventSource to mutationSessionId/mutationEventSource) and bail out if stale
before mutating connection state in the code that updates the connection (the
mutation block that follows the parse). Ensure you apply this same re-check
pattern for the other similar section that handles parsed messages (the earlier
block around the other response.json() usage) so no stale subscription state is
applied after async parsing.

---

Nitpick comments:
In `@ui/src/features/agent/hooks/__tests__/useSSEConnection.test.tsx`:
- Around line 5-52: The MockEventSource class used in tests lacks a
removeEventListener method which will cause failures if the hook calls it during
cleanup or reconnection; add a removeEventListener(type: string, listener:
(event: MessageEvent<string>) => void) that looks up the Set in the private
listeners map and removes the provided listener (and deletes the Set if empty),
ensuring behavior mirrors addEventListener and preventing thrown errors when
useSSEConnection or tests call removeEventListener on MockEventSource.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c0d2c41d-68e5-4142-b99c-0537ad4dc9c1

📥 Commits

Reviewing files that changed from the base of the PR and between c10df50 and 5891193.

📒 Files selected for processing (16)
  • internal/agent/api.go
  • internal/agent/api_test.go
  • internal/agent/session.go
  • internal/service/frontend/server.go
  • internal/service/frontend/sse/multiplex_test.go
  • internal/service/frontend/sse/proxy.go
  • internal/service/frontend/sse/proxy_test.go
  • internal/service/frontend/sse/topic_parse.go
  • internal/service/frontend/sse/types.go
  • ui/src/features/agent/hooks/__tests__/useAgentChat.test.tsx
  • ui/src/features/agent/hooks/__tests__/useSSEConnection.test.tsx
  • ui/src/features/agent/hooks/useAgentChat.ts
  • ui/src/features/agent/hooks/useDelegateManager.ts
  • ui/src/features/agent/hooks/useSSEConnection.ts
  • ui/src/hooks/SSEManager.ts
  • ui/src/hooks/__tests__/SSEManager.test.ts
💤 Files with no reviewable changes (4)
  • internal/service/frontend/sse/proxy_test.go
  • internal/service/frontend/sse/types.go
  • internal/service/frontend/server.go
  • internal/service/frontend/sse/proxy.go

Comment thread internal/agent/api.go
Comment thread ui/src/hooks/SSEManager.ts Outdated
@yottahmd
Copy link
Copy Markdown
Collaborator Author

@coderabbitai full review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 16, 2026

✅ Actions performed

Full review triggered.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
ui/src/features/agent/components/UserPromptMessage.tsx (1)

66-94: ⚠️ Potential issue | 🟠 Major

Missing error handling leaves buttons permanently disabled on failure.

setIsSubmitting(true) is called but never reset to false. If onRespond throws or rejects, the component remains mounted with isAnswered=false (per the parent rendering logic in ChatMessages.tsx), leaving both buttons permanently disabled.

🛠️ Proposed fix: wrap in try/catch or handle async rejection

If onRespond is synchronous and may throw:

 const handleSubmit = () => {
   setIsSubmitting(true);
+  try {
     const response: UserPromptResponse = {
       prompt_id: prompt.prompt_id,
     };

     let displayValue = '';
     if (selectedOptions.size > 0) {
       response.selected_option_ids = Array.from(selectedOptions);
       const selectedLabels = prompt.options
         ?.filter(opt => selectedOptions.has(opt.id))
         .map(opt => opt.label) ?? [];
       displayValue = selectedLabels.join(', ');
     }
     if (freeText) {
       response.free_text_response = freeText;
       displayValue = displayValue ? `${displayValue}; ${freeText}` : freeText;
     }

     onRespond(response, displayValue || 'Submitted');
+  } catch {
+    setIsSubmitting(false);
+  }
 };

 const handleSkip = () => {
   setIsSubmitting(true);
+  try {
     onRespond({
       prompt_id: prompt.prompt_id,
       cancelled: true,
     }, 'Skipped');
+  } catch {
+    setIsSubmitting(false);
+  }
 };

If onRespond returns a Promise, use .catch() or .finally() instead:

const handleSubmit = () => {
  setIsSubmitting(true);
  // ...build response...
  Promise.resolve(onRespond(response, displayValue || 'Submitted'))
    .catch(() => setIsSubmitting(false));
};
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/agent/components/UserPromptMessage.tsx` around lines 66 - 94,
handleSubmit and handleSkip call setIsSubmitting(true) but never reset it, so if
onRespond throws or rejects the buttons remain disabled; update both
handleSubmit and handleSkip to handle synchronous exceptions and Promise
rejections from onRespond and always call setIsSubmitting(false) in a finally
branch (or in .catch/.finally) after onRespond completes/fails. Specifically,
wrap the onRespond call inside handleSubmit and handleSkip with
try/catch/finally (or use Promise.resolve(onRespond(...)).finally(...)) so that
setIsSubmitting(false) is executed on error or success while preserving existing
response construction logic that uses selectedOptions and freeText. Ensure the
identifiers handleSubmit, handleSkip, setIsSubmitting, and onRespond are the
ones updated.
ui/src/features/agent/components/CommandApprovalMessage.tsx (1)

8-41: ⚠️ Potential issue | 🟠 Major

Fix type signature mismatch and properly handle the async submission response

Line 8 declares onRespond as returning void, but the actual callback (respondToPrompt in useAgentChat.ts) returns Promise<void>. Lines 21-30 and 32-41 call onRespond without awaiting it. Update the type signature to reflect the async nature and use try/finally to ensure consistent state handling:

💡 Proposed fix
interface CommandApprovalMessageProps {
  prompt: UserPrompt;
-  onRespond: (response: UserPromptResponse, displayValue: string) => void;
+  onRespond: (response: UserPromptResponse, displayValue: string) => Promise<void>;
  isAnswered: boolean;
  answeredValue?: string;
}

-  const handleApprove = () => {
+  const handleApprove = async () => {
+    if (isSubmitting || isAnswered) return;
     setIsSubmitting(true);
-    onRespond(
+    try {
+      await onRespond(
+        {
+          prompt_id: prompt.prompt_id,
+          selected_option_ids: ['approve'],
+        },
+        'Approved'
+      );
+    } finally {
+      setIsSubmitting(false);
+    }
+  };
+
+  const handleReject = async () => {
+    if (isSubmitting || isAnswered) return;
+    setIsSubmitting(true);
+    try {
+      await onRespond(
+        {
+          prompt_id: prompt.prompt_id,
+          selected_option_ids: ['reject'],
+        },
+        'Rejected'
+      );
+    } finally {
+      setIsSubmitting(false);
+    }
-      {
-        prompt_id: prompt.prompt_id,
-        selected_option_ids: ['approve'],
-      },
-      'Approved'
-    );
-  };
-
-  const handleReject = () => {
-    setIsSubmitting(true);
-    onRespond(
-      {
-        prompt_id: prompt.prompt_id,
-        selected_option_ids: ['reject'],
-      },
-      'Rejected'
-    );
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/agent/components/CommandApprovalMessage.tsx` around lines 8 -
41, The onRespond type is declared as returning void but the real callback
(respondToPrompt in useAgentChat.ts) is async; change the
CommandApprovalMessageProps onRespond signature to return Promise<void>, then
make handleApprove and handleReject async, await onRespond calls and wrap them
in try/finally so setIsSubmitting(true) is paired with setIsSubmitting(false)
even on error; update references to prompt.prompt_id and selected_option_ids as
before and ensure the handlers still pass the displayValue strings
('Approved'/'Rejected').
♻️ Duplicate comments (2)
ui/src/hooks/SSEManager.ts (1)

596-603: ⚠️ Potential issue | 🟠 Major

Re-check staleness after JSON parse before mutating connection state.

A reconnect can happen between the pre-parse stale check and await response.json(), allowing stale mutation data to overwrite the current session state.

🔧 Suggested fix
-      const isStaleMutation =
-        conn.sessionId !== mutationSessionId ||
-        conn.eventSource !== mutationEventSource;
+      const isStaleMutation = () =>
+        conn.sessionId !== mutationSessionId ||
+        conn.eventSource !== mutationEventSource;

-      if (isStaleMutation) {
+      if (isStaleMutation()) {
         return;
       }
@@
       const body = (await response.json()) as
         | TopicMutationResponse
         | { error?: string; message?: string };
+
+      if (isStaleMutation()) {
+        return;
+      }

Run this read-only check to confirm there is no stale guard after response.json() today:

#!/bin/bash
set -euo pipefail

nl -ba ui/src/hooks/SSEManager.ts | sed -n '588,640p'

python - <<'PY'
from pathlib import Path
text = Path("ui/src/hooks/SSEManager.ts").read_text()
start = text.find("const body = (await response.json())")
end = text.find("if ('subscribed' in body)")
segment = text[start:end]
print("\n--- Segment between JSON parse and state mutation ---")
print(segment)
print("\nHas post-parse stale guard:", "isStaleMutation()" in segment or "conn.sessionId !== mutationSessionId" in segment)
PY

Also applies to: 615-639

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/hooks/SSEManager.ts` around lines 596 - 603, The pre-parse stale check
using conn.sessionId !== mutationSessionId || conn.eventSource !==
mutationEventSource can be bypassed if a reconnect occurs before awaiting
response.json(); re-check staleness immediately after parsing the JSON (right
after const body = (await response.json())) and before any mutation of
connection state (the block that tests 'subscribed' in body and subsequent calls
that mutate conn). Specifically, re-evaluate the same condition (using
conn.sessionId, mutationSessionId, conn.eventSource, mutationEventSource) and
return early if stale to prevent overwriting the current session state.
internal/agent/api.go (1)

616-624: ⚠️ Potential issue | 🟠 Major

Reactivated sessions still come back already idle.

Using sess.UpdatedAt as runtime LastActivity means an older stored session can be evicted on the next cleanup tick immediately after reactivation or stream attach.

Suggested fix
+	reactivatedAt := time.Now()
 	mgr := NewSessionManager(SessionManagerConfig{
 		ID:                 id,
 		User:               user,
 		Logger:             a.logger,
 		WorkingDir:         a.workingDir,
 		Title:              sess.Title,
 		CreatedAt:          sess.CreatedAt,
-		LastActivity:       sess.UpdatedAt,
+		LastActivity:       reactivatedAt,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/agent/api.go` around lines 616 - 624, The session manager is being
initialized with LastActivity set to the stored sess.UpdatedAt, which causes
reactivated sessions to appear idle and be evicted immediately; update the
SessionManagerConfig passed to NewSessionManager so LastActivity is set to the
current time (e.g., time.Now()) when creating a manager for a reactivated or
attached session instead of using sess.UpdatedAt (refer to NewSessionManager,
SessionManagerConfig, LastActivity, and sess.UpdatedAt).
🧹 Nitpick comments (1)
ui/src/features/agent/components/CommandApprovalMessage.tsx (1)

108-117: Use action-specific pending UI to avoid misleading feedback

Line 109 shows Sending... on the Approve button for both approve and reject submissions because isSubmitting is shared. Consider tracking which action is pending ('approve' | 'reject' | null) so the clicked button reflects the loading state accurately.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/agent/components/CommandApprovalMessage.tsx` around lines 108
- 117, The approve/reject UI uses a shared isSubmitting flag causing both
buttons to show "Sending..."; modify the CommandApprovalMessage component to
track a pending action (e.g., pendingAction: 'approve' | 'reject' | null)
instead of only isSubmitting, set pendingAction = 'approve' in the approve
handler and 'reject' in handleReject, update the Approve button to show its
loader/text when pendingAction === 'approve' and the Reject button when
pendingAction === 'reject', keep disabling logic using pendingAction !== null,
and clear pendingAction (and isSubmitting if still used) in the success/finally
paths so only the clicked button shows the pending state.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/agent/session.go`:
- Around line 128-150: The reconstructed totalCost ignores costs stored on
reactivated delegates, so reactivateSession/SessionManagerConfig.Delegates leads
to an underreported total; update the totalCost reconstruction loop to also sum
any per-delegate restored costs by iterating cfg.Delegates (the DelegateSnapshot
entries) and adding their Cost/Cost pointer values (e.g., delegateSnapshot.Cost)
to totalCost (guarding nil pointers) in addition to the existing loop over
messages, ensuring you reference cfg.Delegates, DelegateSnapshot, and totalCost
so the restored delegate costs are included.

In `@internal/service/frontend/sse/types.go`:
- Around line 125-127: The current code truncates the encoded query (result :=
values.Encode()) to maxQueryLength, which mutates the topic identifier and can
collapse distinct filter sets; instead detect when len(result) > maxQueryLength
and return an explicit error (e.g., ErrQueryTooLong) rather than slicing; update
the function that builds topic IDs (the block using values.Encode(), result and
maxQueryLength) to validate length and propagate the error so callers can reject
or handle overly long queries without changing identifiers.
- Around line 95-99: The current loop drains the entire upstream body with
io.Copy(io.Discard, body) when a downstream write fails, but body can be a
long-lived SSE stream and blocking on that drain will leak the goroutine and
connection; instead, on writeErr stop attempting to drain and cleanly terminate
the upstream stream by closing it (if it implements io.Closer) and/or cancelling
the upstream request context, then return. Locate the code using variables body,
w, buf (in the read/write loop) and replace the io.Copy(io.Discard, body) call
with a safe shutdown: if body implements io.Closer call body.Close(), otherwise
invoke the upstream request cancel function (or close the transport) so the
upstream connection is released, then return immediately. Ensure no blocking
draining is performed.

In `@ui/src/features/agent/components/CommandApprovalMessage.tsx`:
- Around line 1-2: The file CommandApprovalMessage.tsx is missing the required
GPL v3 license header; add the repository's standard GPL v3 header to the very
top of the file (above the existing import lines) — either run the repo license
tooling (e.g., make addlicense) to apply the header automatically or paste the
canonical GPL v3 header used across the project into CommandApprovalMessage.tsx
before the imports (ensure it matches other .tsx files and the project's license
template).

In `@ui/src/features/agent/hooks/useAgentChat.ts`:
- Around line 251-258: The current applySessionSnapshot callback treats any
historical user message as an ACK because it uses nextMessages.some(msg =>
msg.type === 'user'); change this to detect acknowledgement of the specific
pendingUserMessage instead: when pendingUserMessage exists, look for a message
in snapshot.messages that matches that pendingUserMessage's unique identifier
(e.g. pendingUserMessage.id, client_id, or a dedicated echo/client token) or, if
none exists, a robust match on content+timestamp, and only then call
setPendingUserMessage(null). Update the condition in applySessionSnapshot to
compare message.id or message.client_id (or the chosen unique field) against
pendingUserMessage's identifier rather than just message.type.

In `@ui/src/features/agent/hooks/useSSEConnection.ts`:
- Around line 12-33: The exported useSSEConnection currently returns a stub and
ignores its inputs (sessionId, remoteNode, SSECallbacks) which disables agent
streaming; fix by wiring this hook to the agent stream endpoint or removing it
entirely: if you want streaming, inside useSSEConnection use sessionId and
remoteNode to build the EventSource URL (/api/v1/agent/sessions/{id}/stream),
create an EventSource when sessionId is non-null, attach handlers to call the
provided SSECallbacks (onMessage/onOpen/onError as appropriate), update and
return AgentSSEStatus.isSessionLive = true while open, and ensure you close the
EventSource on parameter change/unmount to avoid leaks; if polling-only is
intentional, remove the useSSEConnection export and any callers instead of
leaving a no-op stub.

In `@ui/src/features/cockpit/components/TemplateSelector.tsx`:
- Around line 31-32: The write to hasOpenedRef currently happens during render
(hasOpenedRef.current = true when isOpen), which violates React render purity;
remove that mutation from the render path in TemplateSelector and instead set
hasOpenedRef.current = true inside the handler that opens the selector (e.g.,
handleOpen) at the moment you actually transition to open; ensure handleOpen is
the function invoked by the open trigger and that it performs the ref write
before any side effects that unpause queries, leaving the ref read-only during
render.

In `@ui/src/features/cockpit/hooks/useCockpitDagRuns.ts`:
- Around line 134-137: The loop currently drops any DAGRunSummary with no
startedAt (in useCockpitDagRuns.ts where it checks run.startedAt and does `if
(!startedAt) continue;` before `dayjs(startedAt).unix()`), which removes
queued/not-started runs; instead, when startedAt is missing, compute the bucket
timestamp using the same date field the backend uses for /dag-runs range
filtering (e.g., the DAG run's backend date field such as
logicalDate/executionDate or the configured dagRuns range date field) and use
that value for `dayjs(...).unix()` so Status.Queued and Status.NotStarted runs
are included in the correct day bucket rather than being skipped.
- Around line 1-9: Add the project's GPL v3 license header to the top of the new
source file useCockpitDagRuns.ts: insert the standard multi-line GPL banner (as
used elsewhere in the repo and added by make addlicense) before the import
statements so the file complies with the repository's licensing guidelines and
CI checks; you can run or emulate make addlicense to obtain the exact header and
place it above the first import in useCockpitDagRuns.ts.
- Around line 153-159: The hook currently returns a global isLoading = !data
which causes all DateKanbanSection components to show loading when the query key
(e.g., loadedDates) expands; change this so existing boards remain visible by
either enabling keepPreviousData on the useQuery that loads columns (so data
stays available while refetching) or by exposing a per-date loading indicator
(add a getIsLoadingForDate(date) that returns true only if the date is part of
the newly requested range and columnsByDate.has(date) is false). Update the hook
return to include getIsLoadingForDate (or replace isLoading) and reference
getColumnsForDate, columnsByDate and loadedDates when computing per-date loading
state so only newly requested days show placeholders.

---

Outside diff comments:
In `@ui/src/features/agent/components/CommandApprovalMessage.tsx`:
- Around line 8-41: The onRespond type is declared as returning void but the
real callback (respondToPrompt in useAgentChat.ts) is async; change the
CommandApprovalMessageProps onRespond signature to return Promise<void>, then
make handleApprove and handleReject async, await onRespond calls and wrap them
in try/finally so setIsSubmitting(true) is paired with setIsSubmitting(false)
even on error; update references to prompt.prompt_id and selected_option_ids as
before and ensure the handlers still pass the displayValue strings
('Approved'/'Rejected').

In `@ui/src/features/agent/components/UserPromptMessage.tsx`:
- Around line 66-94: handleSubmit and handleSkip call setIsSubmitting(true) but
never reset it, so if onRespond throws or rejects the buttons remain disabled;
update both handleSubmit and handleSkip to handle synchronous exceptions and
Promise rejections from onRespond and always call setIsSubmitting(false) in a
finally branch (or in .catch/.finally) after onRespond completes/fails.
Specifically, wrap the onRespond call inside handleSubmit and handleSkip with
try/catch/finally (or use Promise.resolve(onRespond(...)).finally(...)) so that
setIsSubmitting(false) is executed on error or success while preserving existing
response construction logic that uses selectedOptions and freeText. Ensure the
identifiers handleSubmit, handleSkip, setIsSubmitting, and onRespond are the
ones updated.

---

Duplicate comments:
In `@internal/agent/api.go`:
- Around line 616-624: The session manager is being initialized with
LastActivity set to the stored sess.UpdatedAt, which causes reactivated sessions
to appear idle and be evicted immediately; update the SessionManagerConfig
passed to NewSessionManager so LastActivity is set to the current time (e.g.,
time.Now()) when creating a manager for a reactivated or attached session
instead of using sess.UpdatedAt (refer to NewSessionManager,
SessionManagerConfig, LastActivity, and sess.UpdatedAt).

In `@ui/src/hooks/SSEManager.ts`:
- Around line 596-603: The pre-parse stale check using conn.sessionId !==
mutationSessionId || conn.eventSource !== mutationEventSource can be bypassed if
a reconnect occurs before awaiting response.json(); re-check staleness
immediately after parsing the JSON (right after const body = (await
response.json())) and before any mutation of connection state (the block that
tests 'subscribed' in body and subsequent calls that mutate conn). Specifically,
re-evaluate the same condition (using conn.sessionId, mutationSessionId,
conn.eventSource, mutationEventSource) and return early if stale to prevent
overwriting the current session state.

---

Nitpick comments:
In `@ui/src/features/agent/components/CommandApprovalMessage.tsx`:
- Around line 108-117: The approve/reject UI uses a shared isSubmitting flag
causing both buttons to show "Sending..."; modify the CommandApprovalMessage
component to track a pending action (e.g., pendingAction: 'approve' | 'reject' |
null) instead of only isSubmitting, set pendingAction = 'approve' in the approve
handler and 'reject' in handleReject, update the Approve button to show its
loader/text when pendingAction === 'approve' and the Reject button when
pendingAction === 'reject', keep disabling logic using pendingAction !== null,
and clear pendingAction (and isSubmitting if still used) in the success/finally
paths so only the clicked button shows the pending state.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d58c9ec8-0808-459a-8fa1-85ee56453ab9

📥 Commits

Reviewing files that changed from the base of the PR and between c10df50 and 2131572.

📒 Files selected for processing (40)
  • internal/agent/api.go
  • internal/agent/api_test.go
  • internal/agent/session.go
  • internal/agent/session_test.go
  • internal/service/frontend/api/v1/auth_test.go
  • internal/service/frontend/server.go
  • internal/service/frontend/server_test.go
  • internal/service/frontend/sse/client.go
  • internal/service/frontend/sse/client_test.go
  • internal/service/frontend/sse/handler.go
  • internal/service/frontend/sse/handler_test.go
  • internal/service/frontend/sse/hub.go
  • internal/service/frontend/sse/hub_test.go
  • internal/service/frontend/sse/integration_test.go
  • internal/service/frontend/sse/metrics.go
  • internal/service/frontend/sse/metrics_test.go
  • internal/service/frontend/sse/multiplex_test.go
  • internal/service/frontend/sse/proxy.go
  • internal/service/frontend/sse/proxy_test.go
  • internal/service/frontend/sse/topic_parse.go
  • internal/service/frontend/sse/types.go
  • internal/service/frontend/sse/types_test.go
  • internal/service/frontend/sse/watcher.go
  • internal/service/frontend/sse/watcher_test.go
  • ui/src/App.tsx
  • ui/src/features/agent/components/ChatMessages.tsx
  • ui/src/features/agent/components/CommandApprovalMessage.tsx
  • ui/src/features/agent/components/UserPromptMessage.tsx
  • ui/src/features/agent/hooks/__tests__/useAgentChat.test.tsx
  • ui/src/features/agent/hooks/__tests__/useSSEConnection.test.tsx
  • ui/src/features/agent/hooks/useAgentChat.ts
  • ui/src/features/agent/hooks/useDelegateManager.ts
  • ui/src/features/agent/hooks/useSSEConnection.ts
  • ui/src/features/cockpit/components/DateKanbanList.tsx
  • ui/src/features/cockpit/components/DateKanbanSection.tsx
  • ui/src/features/cockpit/components/TemplateSelector.tsx
  • ui/src/features/cockpit/hooks/useCockpitDagRuns.ts
  • ui/src/hooks/SSEManager.ts
  • ui/src/hooks/__tests__/SSEManager.test.ts
  • ui/src/hooks/useSSECacheSync.ts
💤 Files with no reviewable changes (14)
  • internal/service/frontend/sse/client_test.go
  • internal/service/frontend/server_test.go
  • internal/service/frontend/sse/watcher.go
  • internal/service/frontend/sse/watcher_test.go
  • internal/service/frontend/sse/types_test.go
  • internal/service/frontend/sse/metrics.go
  • internal/service/frontend/sse/proxy.go
  • internal/service/frontend/sse/proxy_test.go
  • internal/service/frontend/sse/handler.go
  • internal/service/frontend/sse/client.go
  • internal/service/frontend/sse/hub_test.go
  • internal/service/frontend/sse/hub.go
  • internal/service/frontend/sse/integration_test.go
  • internal/service/frontend/sse/handler_test.go

Comment thread internal/agent/session.go Outdated
Comment on lines +128 to +150
// Reconstruct accumulated cost from historical messages so that
// reactivated sessions display the correct total.
var totalCost float64
for _, msg := range messages {
if msg.Cost != nil {
totalCost += *msg.Cost
}
}

now := time.Now()
createdAt := cfg.CreatedAt
if createdAt.IsZero() {
createdAt = now
}
lastActivity := cfg.LastActivity
if lastActivity.IsZero() {
lastActivity = createdAt
}

delegates := make(map[string]DelegateSnapshot, len(cfg.Delegates))
for _, delegateSnapshot := range cfg.Delegates {
delegates[delegateSnapshot.ID] = delegateSnapshot
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Restored delegate costs are missing from totalCost.

reactivateSession() now seeds SessionManagerConfig.Delegates, but this constructor only rebuilds totalCost from cfg.History. A reopened parent session will report a smaller total than the inactive detail/list views, and future costs will accumulate from the wrong base.

Suggested fix
 	var totalCost float64
 	for _, msg := range messages {
 		if msg.Cost != nil {
 			totalCost += *msg.Cost
 		}
 	}
+	for _, delegateSnapshot := range cfg.Delegates {
+		totalCost += delegateSnapshot.Cost
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/agent/session.go` around lines 128 - 150, The reconstructed
totalCost ignores costs stored on reactivated delegates, so
reactivateSession/SessionManagerConfig.Delegates leads to an underreported
total; update the totalCost reconstruction loop to also sum any per-delegate
restored costs by iterating cfg.Delegates (the DelegateSnapshot entries) and
adding their Cost/Cost pointer values (e.g., delegateSnapshot.Cost) to totalCost
(guarding nil pointers) in addition to the existing loop over messages, ensuring
you reference cfg.Delegates, DelegateSnapshot, and totalCost so the restored
delegate costs are included.

Comment on lines +95 to +99
n, readErr := body.Read(buf)
if n > 0 {
if _, writeErr := w.Write(buf[:n]); writeErr != nil {
_, _ = io.Copy(io.Discard, body) // Drain remaining to allow connection reuse
return
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't drain a long-lived stream after the client write fails.

body can be an upstream SSE response that never ends. io.Copy(io.Discard, body) can then block forever after the downstream disconnect, leaking the proxy goroutine and the upstream connection.

Suggested fix
-			if _, writeErr := w.Write(buf[:n]); writeErr != nil {
-				_, _ = io.Copy(io.Discard, body) // Drain remaining to allow connection reuse
-				return
-			}
+			if _, writeErr := w.Write(buf[:n]); writeErr != nil {
+				return
+			}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
n, readErr := body.Read(buf)
if n > 0 {
if _, writeErr := w.Write(buf[:n]); writeErr != nil {
_, _ = io.Copy(io.Discard, body) // Drain remaining to allow connection reuse
return
n, readErr := body.Read(buf)
if n > 0 {
if _, writeErr := w.Write(buf[:n]); writeErr != nil {
return
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/sse/types.go` around lines 95 - 99, The current
loop drains the entire upstream body with io.Copy(io.Discard, body) when a
downstream write fails, but body can be a long-lived SSE stream and blocking on
that drain will leak the goroutine and connection; instead, on writeErr stop
attempting to drain and cleanly terminate the upstream stream by closing it (if
it implements io.Closer) and/or cancelling the upstream request context, then
return. Locate the code using variables body, w, buf (in the read/write loop)
and replace the io.Copy(io.Discard, body) call with a safe shutdown: if body
implements io.Closer call body.Close(), otherwise invoke the upstream request
cancel function (or close the transport) so the upstream connection is released,
then return immediately. Ensure no blocking draining is performed.

Comment thread internal/service/frontend/sse/types.go Outdated
Comment on lines +125 to +127
result := values.Encode()
if len(result) > maxQueryLength {
return result[:maxQueryLength], nil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject overlong queries instead of truncating topic IDs.

Slicing result[:maxQueryLength] changes the identifier. Two distinct filter sets with the same prefix now collapse into one multiplex topic and can receive the wrong cached updates/authorization result.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/sse/types.go` around lines 125 - 127, The current
code truncates the encoded query (result := values.Encode()) to maxQueryLength,
which mutates the topic identifier and can collapse distinct filter sets;
instead detect when len(result) > maxQueryLength and return an explicit error
(e.g., ErrQueryTooLong) rather than slicing; update the function that builds
topic IDs (the block using values.Encode(), result and maxQueryLength) to
validate length and propagate the error so callers can reject or handle overly
long queries without changing identifiers.

Comment on lines +1 to +2
import { useState } from 'react';
import { AlertTriangle, Check, FolderOpen, Loader2, Terminal, X } from 'lucide-react';
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add the required GPL v3 license header to this source file

Line 1 starts with imports and no license header is present. Please run the repository license tooling for this file (for example via make addlicense).

As per coding guidelines "**/*.{go,ts,tsx,js}: Apply GPL v3 license headers on source files, managed via make addlicense."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/agent/components/CommandApprovalMessage.tsx` around lines 1 -
2, The file CommandApprovalMessage.tsx is missing the required GPL v3 license
header; add the repository's standard GPL v3 header to the very top of the file
(above the existing import lines) — either run the repo license tooling (e.g.,
make addlicense) to apply the header automatically or paste the canonical GPL v3
header used across the project into CommandApprovalMessage.tsx before the
imports (ensure it matches other .tsx files and the project's license template).

Comment on lines +251 to +258
const applySessionSnapshot = useCallback((snapshot: StreamResponse) => {
const nextMessages = snapshot.messages || [];
if (
pendingUserMessage &&
(nextMessages.some((message) => message.type === 'user') ||
snapshot.session_state?.working)
) {
setPendingUserMessage(null);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't treat any historical user turn as an ACK.

applySessionSnapshot() receives the full transcript. Once a chat already has older user messages, nextMessages.some((message) => message.type === 'user') is always true, so a brand-new pendingUserMessage can disappear before the new turn is actually acknowledged by the server.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/agent/hooks/useAgentChat.ts` around lines 251 - 258, The
current applySessionSnapshot callback treats any historical user message as an
ACK because it uses nextMessages.some(msg => msg.type === 'user'); change this
to detect acknowledgement of the specific pendingUserMessage instead: when
pendingUserMessage exists, look for a message in snapshot.messages that matches
that pendingUserMessage's unique identifier (e.g. pendingUserMessage.id,
client_id, or a dedicated echo/client token) or, if none exists, a robust match
on content+timestamp, and only then call setPendingUserMessage(null). Update the
condition in applySessionSnapshot to compare message.id or message.client_id (or
the chosen unique field) against pendingUserMessage's identifier rather than
just message.type.

Comment on lines +12 to 33
/**
* Agent SSE is intentionally disabled. The polling fallback in useAgentChat
* handles all session updates via periodic GET requests.
*
* Why: Each EventSource holds a permanent HTTP/1.1 connection slot (browsers
* allow only 6 per origin). The multiplexed SSE for dashboard/cockpit already
* uses one slot. Adding a second for the agent leaves only 4 slots for all
* fetch requests, which causes deadlocks when any request is slow (AI
* generation, server lag). Removing the agent EventSource frees a slot and
* eliminates the entire class of connection-starvation bugs.
*
* The 2s polling interval is imperceptible in a chat UI where the user is
* already waiting for AI responses.
*/
export function useSSEConnection(
sessionId: string | null,
delegateSessionIds: string[],
apiURL: string,
remoteNode: string,
callbacks: SSECallbacks
_sessionId: string | null,
_apiURL: string,
_remoteNode: string,
_callbacks: SSECallbacks
): AgentSSEStatus {
const handledNavigateIdsRef = useRef<Set<string>>(new Set());
const cbRef = useRef(callbacks);
cbRef.current = callbacks;
const [isSessionLive, setIsSessionLive] = useState(false);
const [liveDelegateSessions, setLiveDelegateSessions] = useState<
Record<string, boolean>
>({});

const delegateKey = useMemo(
() => [...delegateSessionIds].sort().join('|'),
[delegateSessionIds]
);
const stableDelegateSessionIds = useMemo(
() => (delegateKey ? delegateKey.split('|') : []),
[delegateKey]
);

useEffect(() => {
handledNavigateIdsRef.current = new Set();
setIsSessionLive(false);
}, [sessionId]);

useEffect(() => {
setLiveDelegateSessions((prev) => {
const next: Record<string, boolean> = {};
for (const delegateId of stableDelegateSessionIds) {
next[delegateId] = prev[delegateId] ?? false;
}
return next;
});
}, [stableDelegateSessionIds]);

useEffect(() => {
if (!sessionId) {
setIsSessionLive(false);
return;
}

return sseManager.subscribeTopic(
buildAgentSessionTopic(sessionId),
remoteNode,
apiURL,
{
onData: (data) => {
const snapshot = data as StreamResponse;
cbRef.current.onSnapshot(snapshot);

for (const msg of snapshot.messages ?? []) {
if (
msg.id &&
msg.type === 'ui_action' &&
msg.ui_action?.type === 'navigate' &&
msg.ui_action.path &&
!handledNavigateIdsRef.current.has(msg.id)
) {
handledNavigateIdsRef.current.add(msg.id);
cbRef.current.onNavigate(msg.ui_action.path);
}
}
},
onStateChange: (state) => {
setIsSessionLive(state.isConnected && !state.shouldUseFallback);
},
}
);
}, [sessionId, remoteNode, apiURL]);

useEffect(() => {
if (!delegateKey) {
setLiveDelegateSessions({});
return;
}

const unsubscribes = stableDelegateSessionIds.map((delegateId) =>
sseManager.subscribeTopic(
buildAgentSessionTopic(delegateId),
remoteNode,
apiURL,
{
onData: (data) => {
cbRef.current.onDelegateSnapshot(
delegateId,
data as StreamResponse
);
},
onStateChange: (state) => {
setLiveDelegateSessions((prev) => {
const nextValue = state.isConnected && !state.shouldUseFallback;
if (prev[delegateId] === nextValue) {
return prev;
}
return {
...prev,
[delegateId]: nextValue,
};
});
},
}
)
);

return () => {
for (const unsubscribe of unsubscribes) {
unsubscribe();
}
};
}, [delegateKey, stableDelegateSessionIds, remoteNode, apiURL]);

return {
isSessionLive,
liveDelegateSessions,
};
return { isSessionLive: false };
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This disables agent streaming instead of migrating it.

The hook now ignores sessionId, remoteNode, and both callbacks, so the UI never subscribes to the dedicated agent session stream and all live updates devolve to polling. If polling-only is intentional, remove this SSE hook surface entirely; otherwise this needs to be wired to /api/v1/agent/sessions/{id}/stream.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/agent/hooks/useSSEConnection.ts` around lines 12 - 33, The
exported useSSEConnection currently returns a stub and ignores its inputs
(sessionId, remoteNode, SSECallbacks) which disables agent streaming; fix by
wiring this hook to the agent stream endpoint or removing it entirely: if you
want streaming, inside useSSEConnection use sessionId and remoteNode to build
the EventSource URL (/api/v1/agent/sessions/{id}/stream), create an EventSource
when sessionId is non-null, attach handlers to call the provided SSECallbacks
(onMessage/onOpen/onError as appropriate), update and return
AgentSSEStatus.isSessionLive = true while open, and ensure you close the
EventSource on parameter change/unmount to avoid leaks; if polling-only is
intentional, remove the useSSEConnection export and any callers instead of
leaving a no-op stub.

Comment on lines +31 to +32
const hasOpenedRef = useRef(false);
if (isOpen) hasOpenedRef.current = true;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Check if the file exists and get its line count
ls -la ui/src/features/cockpit/components/TemplateSelector.tsx 2>/dev/null && wc -l ui/src/features/cockpit/components/TemplateSelector.tsx

Repository: dagu-org/dagu

Length of output: 222


🏁 Script executed:

# Read the file to verify lines 31-32 and surrounding context
cat -n ui/src/features/cockpit/components/TemplateSelector.tsx | head -70

Repository: dagu-org/dagu

Length of output: 3401


🏁 Script executed:

# Continue reading to find the handleOpen function
cat -n ui/src/features/cockpit/components/TemplateSelector.tsx | sed -n '70,150p'

Repository: dagu-org/dagu

Length of output: 3094


🏁 Script executed:

# Search for handleOpen function specifically
grep -n "handleOpen" ui/src/features/cockpit/components/TemplateSelector.tsx

Repository: dagu-org/dagu

Length of output: 122


🏁 Script executed:

# Read the handleOpen function and surrounding context
cat -n ui/src/features/cockpit/components/TemplateSelector.tsx | sed -n '180,210p'

Repository: dagu-org/dagu

Length of output: 1193


🏁 Script executed:

# Check all occurrences of setIsOpen to ensure handleOpen is the only place where isOpen becomes true
grep -n "setIsOpen\|isOpen\s*=" ui/src/features/cockpit/components/TemplateSelector.tsx

Repository: dagu-org/dagu

Length of output: 219


🏁 Script executed:

# Verify the remoteNode parameter usage in the API calls
cat -n ui/src/features/cockpit/components/TemplateSelector.tsx | sed -n '42,59p'

Repository: dagu-org/dagu

Length of output: 871


Move the hasOpenedRef write out of render to handleOpen.

Mutating the ref during render (line 32) violates React's render phase purity. React can abandon renders, so if a render where isOpen becomes true is abandoned, the ref mutation persists even though the render was discarded. This can cause the deferred API queries to unpause earlier than intended.

Safer approach
 const hasOpenedRef = useRef(false);
-if (isOpen) hasOpenedRef.current = true;
@@
 const handleOpen = () => {
+  hasOpenedRef.current = true;
   setSearchTerm('');
   setDebouncedTerm('');
   setSelectedTags([]);
   setHighlightedIndex(-1);
   setIsOpen(true);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/cockpit/components/TemplateSelector.tsx` around lines 31 -
32, The write to hasOpenedRef currently happens during render
(hasOpenedRef.current = true when isOpen), which violates React render purity;
remove that mutation from the render path in TemplateSelector and instead set
hasOpenedRef.current = true inside the handler that opens the selector (e.g.,
handleOpen) at the moment you actually transition to open; ensure handleOpen is
the function invoked by the open trigger and that it performs the ref write
before any side effects that unpause queries, leaving the ref read-only during
render.

Comment on lines +1 to +9
import { useCallback, useContext, useMemo } from 'react';
import { useQuery } from '@/hooks/api';
import { useDAGRunsListSSE } from '@/hooks/useDAGRunsListSSE';
import { sseFallbackOptions, useSSECacheSync } from '@/hooks/useSSECacheSync';
import { AppBarContext } from '@/contexts/AppBarContext';
import { useConfig } from '@/contexts/ConfigContext';
import dayjs from '@/lib/dayjs';
import { components, Status } from '@/api/v1/schema';
import type { KanbanColumns } from './useDateKanbanData';
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add the standard GPL header before merging.

This new source file starts directly with imports, so it misses the repo's required license banner.

Suggested header
+// Copyright (C) 2026 Yota Hamada
+// SPDX-License-Identifier: GPL-3.0-or-later
+
 import { useCallback, useContext, useMemo } from 'react';

As per coding guidelines, **/*.{go,ts,tsx,js}: Apply GPL v3 license headers on source files, managed via make addlicense.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import { useCallback, useContext, useMemo } from 'react';
import { useQuery } from '@/hooks/api';
import { useDAGRunsListSSE } from '@/hooks/useDAGRunsListSSE';
import { sseFallbackOptions, useSSECacheSync } from '@/hooks/useSSECacheSync';
import { AppBarContext } from '@/contexts/AppBarContext';
import { useConfig } from '@/contexts/ConfigContext';
import dayjs from '@/lib/dayjs';
import { components, Status } from '@/api/v1/schema';
import type { KanbanColumns } from './useDateKanbanData';
// Copyright (C) 2026 Yota Hamada
// SPDX-License-Identifier: GPL-3.0-or-later
import { useCallback, useContext, useMemo } from 'react';
import { useQuery } from '@/hooks/api';
import { useDAGRunsListSSE } from '@/hooks/useDAGRunsListSSE';
import { sseFallbackOptions, useSSECacheSync } from '@/hooks/useSSECacheSync';
import { AppBarContext } from '@/contexts/AppBarContext';
import { useConfig } from '@/contexts/ConfigContext';
import dayjs from '@/lib/dayjs';
import { components, Status } from '@/api/v1/schema';
import type { KanbanColumns } from './useDateKanbanData';
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/cockpit/hooks/useCockpitDagRuns.ts` around lines 1 - 9, Add
the project's GPL v3 license header to the top of the new source file
useCockpitDagRuns.ts: insert the standard multi-line GPL banner (as used
elsewhere in the repo and added by make addlicense) before the import statements
so the file complies with the repository's licensing guidelines and CI checks;
you can run or emulate make addlicense to obtain the exact header and place it
above the first import in useCockpitDagRuns.ts.

Comment on lines +134 to +137
for (const run of allRuns) {
const startedAt = run.startedAt;
if (!startedAt) continue;
const ts = dayjs(startedAt).unix();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't drop runs that haven't started yet.

This loop skips every DAGRunSummary without startedAt, but the same hook still groups Status.Queued and Status.NotStarted. Those runs can legitimately have no start time, so they disappear from every day bucket. Bucket with the same date field the backend uses for /dag-runs range filtering instead of unconditionally continuing here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/cockpit/hooks/useCockpitDagRuns.ts` around lines 134 - 137,
The loop currently drops any DAGRunSummary with no startedAt (in
useCockpitDagRuns.ts where it checks run.startedAt and does `if (!startedAt)
continue;` before `dayjs(startedAt).unix()`), which removes queued/not-started
runs; instead, when startedAt is missing, compute the bucket timestamp using the
same date field the backend uses for /dag-runs range filtering (e.g., the DAG
run's backend date field such as logicalDate/executionDate or the configured
dagRuns range date field) and use that value for `dayjs(...).unix()` so
Status.Queued and Status.NotStarted runs are included in the correct day bucket
rather than being skipped.

Comment on lines +153 to +159
const getColumnsForDate = useCallback(
(date: string): KanbanColumns => columnsByDate.get(date) ?? EMPTY_COLUMNS,
[columnsByDate]
);

return { getColumnsForDate, isLoading: !data };
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Keep existing boards visible while fetching a wider range.

isLoading: !data is global to the hook. When loadedDates grows and the query key changes, every DateKanbanSection flips to the loading state until the broader request returns, hiding data that was already on screen. Please retain the previous result or expose per-date loading so only newly requested days show a placeholder. As per coding guidelines, NEVER use full-page loading overlays or LoadingIndicator components that hide content; instead show stale data while fetching updates.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/cockpit/hooks/useCockpitDagRuns.ts` around lines 153 - 159,
The hook currently returns a global isLoading = !data which causes all
DateKanbanSection components to show loading when the query key (e.g.,
loadedDates) expands; change this so existing boards remain visible by either
enabling keepPreviousData on the useQuery that loads columns (so data stays
available while refetching) or by exposing a per-date loading indicator (add a
getIsLoadingForDate(date) that returns true only if the date is part of the
newly requested range and columnsByDate.has(date) is false). Update the hook
return to include getIsLoadingForDate (or replace isLoading) and reference
getColumnsForDate, columnsByDate and loadedDates when computing per-date loading
state so only newly requested days show placeholders.

- Use time.Now() for reactivated session LastActivity to prevent immediate eviction
- Include delegate snapshot costs in totalCost reconstruction
- Remove blocking io.Copy drain on long-lived SSE streams
- Return error instead of truncating overlong topic query strings
- Add GPL headers to CommandApprovalMessage.tsx and useCockpitDagRuns.ts
- Make onRespond async with proper error handling in prompt components
- Track per-button pending state in CommandApprovalMessage
- Match pending message content instead of any user message type for ACK
- Move hasOpenedRef mutation from render to handleOpen handler
- Fall back to queuedAt/scheduleTime for runs without startedAt
- Add keepPreviousData to prevent loading flash on date range expansion
- Re-check SSE mutation staleness after async JSON parse
@yottahmd yottahmd merged commit 98c3725 into main Mar 16, 2026
6 checks passed
@yottahmd yottahmd deleted the bugfix/sse-manager branch March 16, 2026 05:51
@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 16, 2026

Codecov Report

❌ Patch coverage is 79.41176% with 21 lines in your changes missing coverage. Please review.
✅ Project coverage is 69.66%. Comparing base (c10df50) to head (aa4bae6).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
internal/agent/api.go 73.75% 12 Missing and 9 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1781      +/-   ##
==========================================
+ Coverage   69.64%   69.66%   +0.02%     
==========================================
  Files         414      414              
  Lines       47907    47970      +63     
==========================================
+ Hits        33364    33418      +54     
- Misses      11762    11766       +4     
- Partials     2781     2786       +5     
Files with missing lines Coverage Δ
internal/agent/session.go 84.85% <100.00%> (+0.52%) ⬆️
internal/agent/api.go 72.49% <73.75%> (+3.18%) ⬆️

... and 15 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c10df50...aa4bae6. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant