Skip to content

Add event feed UI and harden event-driven bot notifications#1946

Merged
yottahmd merged 10 commits intomainfrom
eventfeed-ui
Apr 2, 2026
Merged

Add event feed UI and harden event-driven bot notifications#1946
yottahmd merged 10 commits intomainfrom
eventfeed-ui

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Apr 1, 2026

Summary

  • add a manager/admin event feed page in the web UI and switch /event-logs to opaque cursor pagination with infinite loading
  • refactor Slack and Telegram DAG-run notifications to consume persisted event-store data instead of polling DAG-run status
  • harden notification durability with trusted watermark bootstrap, single-writer monitor locking, and atomic state transitions
  • clean up notification destination lifecycle and event-feed applied filter persistence

Testing

  • make api
  • cd ui && pnpm gen:api
  • go test ./internal/persis/fileeventstore ./internal/service/eventstore ./internal/service/frontend/api/v1 -count=1
  • go test ./internal/service/chatbridge -count=1
  • go test ./internal/service/slack ./internal/service/telegram -count=1
  • cd ui && pnpm test src/pages/event-logs/__tests__/index.test.tsx
  • cd ui && pnpm typecheck
  • make fmt

Notes

  • the branch includes the full event feed and event-store notification series on top of main
  • unrelated untracked files in the local repo root were not included in the branch

Summary by CodeRabbit

Release Notes

  • New Features

    • Added Event Logs page with filtering by DAG name, outcome type, and time ranges, plus auto-refresh capability and cursor-based pagination for viewing historical events.
    • Implemented cursor-based pagination across event log APIs for improved performance and reliability.
  • Improvements

    • Enhanced notification monitoring with persistent state tracking and multi-instance coordination.
    • Strengthened event deduplication within the file-based event store.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 1, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9db074d8-ef15-49d5-9ec3-65543c4af92a

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR implements cursor-based pagination for event logs, refactors the notification monitoring system from DAG run store polling to event-store-based notification streaming with persistent state management, and adds a new Event Logs frontend page. The changes include API schema updates, file-backed event store enhancements, notification monitor core logic restructuring, and Telegram/Slack bot integration updates.

Changes

Cohort / File(s) Summary
API & OpenAPI Spec
api/v1/api.gen.go, api/v1/api.yaml
Changed event logs pagination from offset-based (Offset/Total) to cursor-based (Cursor/NextCursor). Added EventLogCursor type as string alias. Updated schema to reflect new pagination model.
Event Store Core & Cursor Support
internal/service/eventstore/eventstore.go, internal/service/eventstore/notifications.go, internal/persis/fileeventstore/query_cursor.go, internal/persis/fileeventstore/reverse_line_reader.go
Replaced QueryFilter.Offset with Cursor, replaced QueryResult.Total with NextCursor. Added NotificationCursor and NotificationReader interface for event-stream reading. Introduced cursor encoding/decoding with filter hash validation and reverse-line-reading utility for backward pagination.
File Event Store Query & Notification Implementation
internal/persis/fileeventstore/store.go, internal/persis/fileeventstore/notifications.go
Refactored Query to use cursor-aware pagination with reverse-reading from committed files. Implemented file-based NotificationReader for streaming notification events with cursor tracking across committed logs and inbox files.
Event Store Testing
internal/persis/fileeventstore/store_test.go, internal/persis/fileeventstore/collector_test.go, internal/persis/fileeventstore/notifications_test.go, internal/service/eventstore/eventstore_test.go
Updated tests for cursor-based pagination, duplicate inbox event tracking, and notification event reading with offset/inbox file cursor management.
Notification Monitor & State Management
internal/service/chatbridge/monitor.go, internal/service/chatbridge/monitor_state.go, internal/service/chatbridge/notifications.go
Major refactor: monitor now polls eventstore.Service for notification events instead of DAGRunStore, maintains durable state in notificationStateStore with cross-process locking, and enqueues notifications via persisted cursor. Updated NotificationBatcher to accept NotificationEvent with key deduplication and added DiscardDestinations method. Added versioned state persistence with quarantining of corrupt state files.
Chatbridge Testing
internal/service/chatbridge/monitor_test.go, internal/service/chatbridge/monitor_state_test.go, internal/service/chatbridge/notifications_test.go, internal/service/chatbridge/monitor_seed_test.go
Deleted monitor_seed_test.go. Added comprehensive state/lock coordination, quarantine handling, and restart resilience tests in monitor_state_test.go. Updated existing tests to use NotificationEvent and new monitor constructor signature. Added stubNotificationStore for bootstrap failure testing.
Bot Configuration & Wiring
internal/cmd/server.go, internal/cmd/startall.go
Updated bot initialization to pass EventService and computed NotificationStateFile paths instead of DAGRunStore.
Slack & Telegram Bot Integration
internal/service/slack/bot.go, internal/service/slack/monitor.go, internal/service/telegram/bot.go, internal/service/telegram/monitor.go
Changed bot Config to accept EventService and NotificationStateFile instead of DAGRunStore. Updated NewDAGRunMonitor constructors to accept event service and state file, delegating to chatbridge.NewNotificationMonitor with new parameters. Modified Run to initialize monitors only when EventService is non-nil with warning logging.
Bot Testing
internal/service/slack/bot_test.go, internal/service/slack/monitor_test.go, internal/service/telegram/bot_test.go, internal/service/telegram/monitor_test.go
Updated monitor constructor calls to pass event service parameter (empty string in most tests) and removed stubNotificationDAGRunStore test double. Adjusted parameter positions to reflect new signature.
API Handler
internal/service/frontend/api/v1/events.go, internal/service/frontend/api/v1/events_test.go
Updated ListEventLogs to use Cursor instead of Offset in query filter, added explicit error mapping for ErrInvalidQueryCursor (HTTP 400), and changed response to include NextCursor instead of Total.
Frontend Schema & Types
ui/src/api/v1/schema.ts
Added EventLogEntry and EventLogsResponse schemas, new shared parameter types for log pagination/filtering, and listEventLogs operation definition returning event entries with optional cursor. Refactored audit log parameters to use shared parameter references.
Frontend Components & Pages
ui/src/App.tsx, ui/src/pages/event-logs/index.tsx, ui/src/contexts/AuthContext.tsx, ui/src/menu.tsx
Added new EventLogsPage route protected by manager role. Implemented feature-rich event log viewer with filtering (type/DAG/run/attempt), date range selection (preset/specific/custom), auto-refresh, pagination via opaque cursor, and raw event dialog. Added useCanViewEventLogs() hook and sidebar menu item for event logs access control.
Frontend Page Testing & Utilities
ui/src/pages/event-logs/__tests__/index.test.tsx
Added comprehensive test suite for EventLogsPage covering loading states, initial queries, URL filter restoration, user interactions, filter persistence, pagination, and raw event display. Includes mock hooks and context providers.
Frontend API Docs
ui/src/pages/api-docs/index.tsx, ui/src/pages/api-docs/__tests__/index.test.tsx
Replaced lazy-loaded ScalarViewer with direct import to eliminate runtime code-splitting and React.Suspense wrapper. Removed preferredBearerToken assertion from test expectations.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Frontend
    participant API as API Handler
    participant Monitor as NotificationMonitor
    participant EventStore as EventStore Service
    participant StateStore as StateStore (File)
    participant Batcher as NotificationBatcher
    participant Bot as Telegram/Slack Bot

    Client->>API: GET /event-logs (Cursor)
    API->>EventStore: Query(Filter with Cursor)
    EventStore->>EventStore: decodeQueryCursor(Cursor)
    EventStore->>EventStore: readCommittedEventsReverse(File, Offset)
    EventStore-->>API: QueryResult(Entries, NextCursor)
    API-->>Client: EventLogsResponse(Entries, NextCursor)

    Monitor->>EventStore: NotificationHeadCursor()
    EventStore-->>Monitor: NotificationCursor
    Monitor->>StateStore: Load()
    StateStore-->>Monitor: notificationMonitorState
    loop Poll for Events
        Monitor->>EventStore: ReadNotificationEvents(Cursor)
        EventStore-->>Monitor: [Event], UpdatedCursor
        Monitor->>Batcher: Enqueue(destination, NotificationEvent)
        Batcher-->>Monitor: batched
        Batcher->>Bot: SendBatch()
        Bot-->>Batcher: delivered
        Monitor->>Monitor: markBatchDelivered()
        Monitor->>StateStore: Save(UpdatedState)
        StateStore-->>Monitor: persisted
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 7.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Add event feed UI and harden event-driven bot notifications' directly and clearly summarizes the primary changes in the changeset: introducing an event feed page UI and refactoring bot notifications to be event-driven with improved durability.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch eventfeed-ui

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
api/v1/api.yaml (1)

3526-3546: ⚠️ Potential issue | 🟡 Minor

Document the invalid cursor response.

Line 3526 adds a validated cursor input, but /event-logs still omits the 400 that the handler now returns for malformed cursors. That leaves the generated contract behind the implementation.

🧩 Suggested spec update
       responses:
         "200":
           description: "List of event log entries"
           content:
             application/json:
               schema:
                 $ref: "#/components/schemas/EventLogsResponse"
+        "400":
+          description: "Invalid cursor"
+          content:
+            application/json:
+              schema:
+                $ref: "#/components/schemas/Error"
         "401":

As per coding guidelines, "Generate REST API server code from OpenAPI spec at api/v1/api.yaml using oapi-codegen via make api".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/v1/api.yaml` around lines 3526 - 3546, The OpenAPI for the /event-logs
operation is missing the 400 response for malformed/invalid cursors introduced
by the validated EventLogCursor parameter; update the operation (where it
references EventLogCursor and EventLogsResponse) to add a "400" response with
description like "Malformed cursor" and content application/json using the
existing Error schema so the spec matches the handler behavior and regenerated
server code via make api.
🧹 Nitpick comments (7)
internal/service/chatbridge/monitor_state.go (1)

55-65: Nil destination entries remain in the map after normalization.

When destination == nil, the loop continues without removing or initializing the entry. This leaves nil pointers in the Destinations map, which could cause nil-pointer dereferences in code that iterates over the map expecting initialized entries.

Consider either removing nil entries or initializing them:

♻️ Option: Initialize nil destination entries
 for _, destination := range s.Destinations {
   if destination == nil {
-    continue
+    // Note: Can't fix in-place during range; consider using a key-based loop
+    // or removing nil entries after iteration
   }
   if destination.Pending == nil {
     destination.Pending = make(map[string]NotificationEvent)
   }
   if destination.Delivered == nil {
     destination.Delivered = make(map[string]time.Time)
   }
 }

Alternative approach using keys:

+for key, destination := range s.Destinations {
+  if destination == nil {
+    s.Destinations[key] = &notificationDestinationState{
+      Pending:   make(map[string]NotificationEvent),
+      Delivered: make(map[string]time.Time),
+    }
+    continue
+  }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/chatbridge/monitor_state.go` around lines 55 - 65, The loop
over s.Destinations leaves nil pointer entries in the map; change it to iterate
with key, destination := range s.Destinations, and when destination == nil call
delete(s.Destinations, key) to remove that entry, otherwise ensure
destination.Pending and destination.Delivered are initialized
(make(map[string]NotificationEvent) and make(map[string]time.Time)
respectively); this ensures s.Destinations contains no nil pointers and all
remaining entries have initialized Pending and Delivered maps.
internal/service/eventstore/eventstore.go (1)

210-216: Shallow clone shares nested mutable values.

cloneData uses maps.Copy, which is a shallow clone. If data contains nested maps or slices, those are shared with the original. This is typically fine if callers don't mutate data after calling NewDAGRunEvent, but could lead to subtle bugs if they do.

If event data immutability is guaranteed by convention, this is acceptable. Otherwise, consider a deep-clone or documenting the ownership transfer.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/eventstore/eventstore.go` around lines 210 - 216, The
current cloneData call performs a shallow copy (via maps.Copy) so nested
maps/slices remain shared; update cloneData to perform a deep clone of
map[string]any (recursively cloning nested maps and slices) and use that deep
clone where data is assigned before attaching the notification snapshot (i.e.,
keep the existing usage around cloneData in the block that touches
newNotificationStatusSnapshot and notificationStatusSnapshotDataKey), or
alternatively document in NewDAGRunEvent that ownership is transferred and
callers must not mutate the passed-in data; pick one approach and apply
consistently.
internal/service/chatbridge/monitor_test.go (1)

252-256: Replace fixed sleep with an eventual assertion to reduce flakiness.

Line 252 uses a fixed time.Sleep(80 * time.Millisecond), which can intermittently fail on slower CI runners. Prefer require.Eventually on the expected monitor behavior.

Suggested change
-	time.Sleep(80 * time.Millisecond)
-
-	headCalls, readCalls := store.stats()
-	assert.Greater(t, headCalls, 0)
-	assert.Equal(t, 0, readCalls)
+	require.Eventually(t, func() bool {
+		headCalls, readCalls := store.stats()
+		return headCalls > 0 && readCalls == 0
+	}, time.Second, 10*time.Millisecond)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/chatbridge/monitor_test.go` around lines 252 - 256, Replace
the fixed time.Sleep with a polling assertion: call require.Eventually (from
testify/require) to repeatedly invoke store.stats() in a closure and assert that
headCalls > 0 and readCalls == 0, providing a reasonable timeout (e.g.,
200–500ms) and tick interval (e.g., 10–20ms); locate the sleep and the following
store.stats() call in monitor_test.go and replace them with this eventual check
so the test waits until the monitor behavior is observed instead of sleeping a
fixed duration.
ui/src/pages/event-logs/index.tsx (1)

273-277: Effect dependencies may cause extra updates.

The lastUpdatedAt update effect has dependencies [currentNextCursor, firstEntryID, hasHeadResponse, lastEntryID] but only checks hasHeadResponse before updating. The additional dependencies (firstEntryID, lastEntryID, currentNextCursor) will trigger the effect but won't change the outcome since hasHeadResponse is the gating condition.

This is harmless (just sets the same timestamp) but could be simplified:

♻️ Simplified effect dependencies
   React.useEffect(() => {
     if (hasHeadResponse) {
       setLastUpdatedAt(new Date());
     }
-  }, [currentNextCursor, firstEntryID, hasHeadResponse, lastEntryID]);
+  }, [hasHeadResponse, data]);

Using data as a dependency more directly reflects the intent: update timestamp when new data arrives.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/pages/event-logs/index.tsx` around lines 273 - 277, The effect that
updates lastUpdatedAt only checks hasHeadResponse but lists extra deps
(currentNextCursor, firstEntryID, lastEntryID) causing unnecessary runs; modify
the React.useEffect for setLastUpdatedAt so its dependency list reflects the
real trigger—either change the deps to [hasHeadResponse] or to the actual data
source (e.g., [data]) used to indicate new entries—and keep the body as: if
(hasHeadResponse) setLastUpdatedAt(new Date()); targeting the useEffect that
references setLastUpdatedAt and hasHeadResponse.
internal/service/chatbridge/monitor.go (1)

582-585: Timestamp comparison may be overly strict.

Line 582 checks if deliveredAt, ok := destState.Delivered[event.Key]; !ok || !deliveredAt.Equal(now). Since now is freshly captured at line 568, the !deliveredAt.Equal(now) condition will almost always be true for existing entries (they would have a different timestamp from a previous delivery).

This means re-marking an already-delivered event will update its timestamp. This is likely intentional (refresh the TTL for eviction), but the condition is confusing. Consider simplifying or adding a comment explaining the intent:

♻️ Clarify the intent
-			if deliveredAt, ok := destState.Delivered[event.Key]; !ok || !deliveredAt.Equal(now) {
+			// Update timestamp on each delivery to refresh TTL for eviction
+			if _, ok := destState.Delivered[event.Key]; !ok {
 				destState.Delivered[event.Key] = now
 				changed = true
 			}

Or if refreshing the timestamp is intentional:

+			// Always update timestamp to refresh eviction TTL
 			if deliveredAt, ok := destState.Delivered[event.Key]; !ok || !deliveredAt.Equal(now) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/chatbridge/monitor.go` around lines 582 - 585, The timestamp
check for destState.Delivered[event.Key] (using deliveredAt, ok, now) is
confusing because it always updates the timestamp; either clarify intent or
change behavior: if the goal is only to record first delivery, set
destState.Delivered[event.Key]=now and changed=true only when !ok; if the goal
is to refresh TTL on every touch, always set destState.Delivered[event.Key]=now
but add a clear comment above the block explaining “refreshing delivery
timestamp to extend eviction TTL” and decide whether to set changed=true on
refresh (keep or remove changed update accordingly). Ensure references to
deliveredAt, event.Key, now and changed are adjusted consistently.
internal/service/chatbridge/monitor_state_test.go (1)

506-520: Consider potential test flakiness from fixed sleeps.

The time.Sleep(100 * time.Millisecond) calls at lines 506 and 520 are used to wait for the monitor to attempt delivery and observe the save failure. While understandable for testing persistence failure scenarios, fixed sleeps can cause flakiness on slower CI systems.

Consider whether these could be replaced with more deterministic synchronization, such as waiting for a specific internal state or using a hook/callback mechanism in test mode. However, given the complexity of instrumenting save failure observation, the current approach is acceptable if the timeouts provide sufficient margin.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/chatbridge/monitor_state_test.go` around lines 506 - 520,
Replace the two fixed time.Sleep(100 * time.Millisecond) waits with
deterministic assertions to avoid CI flakiness: instead of the first sleep
before checking delivered, use require.Eventually (or a channel/hook) to assert
delivered remains empty for a short interval (checking mu and delivered), and
instead of the final sleep use require.Never or another require.Eventually that
confirms no additional deliveries occur after the expected "run-save-retry"
(using the same mu and delivered variables and the stateDir save-failure
scenario); this targets the checks around delivered/mu in monitor_state_test.go
and avoids arbitrary sleeps.
internal/service/eventstore/notifications.go (1)

184-193: Consider normalizing at the service boundary for API contract clarity. All current implementations handle CommittedOffsets safely (fileeventstore normalizes on return or immediately on entry), so this is not a bug fix. However, normalizing cursors in NotificationHeadCursor (line 192) and ReadNotificationEvents (line 203) before delegation would make the contract clearer and prevent future issues if custom reader implementations skip normalization.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/eventstore/notifications.go` around lines 184 - 193,
Normalize NotificationCursor.CommittedOffsets at the service boundary before
delegating to store readers: update Service.NotificationHeadCursor and
Service.ReadNotificationEvents to defensively normalize the cursor (e.g., ensure
CommittedOffsets is canonical/trimmed/validated) prior to calling
reader.NotificationHeadCursor(ctx) and reader.ReadNotificationEvents(ctx,
cursor) respectively so custom NotificationReader implementations can rely on a
normalized API contract; use the NotificationCursor/CommittedOffsets
normalization routine already used by fileeventstore to implement this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@api/v1/api.yaml`:
- Line 3526: The change replaced offset/total with cursor/nextCursor in the
/event-logs contract (the $ref to EventLogCursor) which will break existing
clients; restore backward compatibility by either versioning the endpoint (e.g.,
add /v2/event-logs) or keep the existing parameter names in api/v1/api.yaml and
introduce new cursor parameters as optional fields (or a new component like
EventLogCursorV2) so old clients still see offset and total; update api
generation by running make api (oapi-codegen) after modifying the spec and
ensure both parameter shapes are present or the new endpoint is introduced to
avoid breaking consumers.

In `@internal/persis/fileeventstore/notifications.go`:
- Around line 84-87: The current code in the inbox reading loop aborts the
entire operation on any file error (see call to
s.readInboxNotificationEvent(name)) and the decode/validate path also
hard-fails; change both places to treat malformed inbox files as non-fatal: when
s.readInboxNotificationEvent(name) returns an error, log the error with context
(including the inbox name and cursor), skip that file and continue the loop
instead of returning nil,nextCursor,err; likewise make the decode/validate logic
return a skip result (or nil event) on format/validation errors, log details,
advance nextCursor, and continue so a single bad file cannot stall the monitor
or prevent subsequent notifications from being processed.

In `@internal/persis/fileeventstore/reverse_line_reader.go`:
- Around line 66-70: The code creates chunk := make([]byte, readSize) but
readSize is int64; change allocation to use an int by casting: chunk :=
make([]byte, int(readSize)), and ensure subsequent uses respect the actual bytes
read (n) from r.file.ReadAt — e.g., pass chunk[:n] to any processing or slicing
logic; touch the block around reverseLineReaderBlockSize, r.pos, chunk and the
r.file.ReadAt call to apply the cast and use the returned n safely.

In `@internal/service/eventstore/notifications.go`:
- Around line 177-182: After json.Unmarshal into NotificationStatusSnapshot,
perform semantic validation on the decoded snapshot (e.g., ensure DAGRunID,
AttemptID, Name and Status are non-empty/valid) before calling
snapshot.DAGRunStatus(); if any required field is missing or invalid, return an
error like "eventstore: invalid notification snapshot: missing <field>" so
malformed `{}`/`null` payloads don't produce zero-value *exec.DAGRunStatus.
Update the function that processes payload (the block that creates var snapshot
NotificationStatusSnapshot and returns snapshot.DAGRunStatus()) to validate
those specific fields on snapshot and fail early with a clear error when
validation fails.

In `@ui/src/pages/event-logs/__tests__/index.test.tsx`:
- Around line 422-430: Update the test assertion for clientGetMock in
index.test.tsx to also assert that the pagination request includes the
remoteNode routing parameter; modify the
expect(...toHaveBeenCalledWith('/event-logs', { params: { query:
expect.objectContaining({ kind: 'dag_run', limit: 50, cursor: 'cursor-1',
remoteNode: <value> }) } })) line so remoteNode is checked (use the specific
variable/constant used in the test if available, or expect.any(String) / the
expected node id) to ensure Load More requests include remoteNode.

In `@ui/src/pages/event-logs/index.tsx`:
- Around line 1-2: Add the GPL v3 license header to the top of this new module
(the file that imports "components" from '@/api/v1/schema' and "Badge" from
'@/components/ui/badge') to match other source files; either insert the
project’s standard GPL v3 header comment block at the very top of the file or
run the repository tool (make addlicense) to apply it automatically so the
header is consistent with existing files.

---

Outside diff comments:
In `@api/v1/api.yaml`:
- Around line 3526-3546: The OpenAPI for the /event-logs operation is missing
the 400 response for malformed/invalid cursors introduced by the validated
EventLogCursor parameter; update the operation (where it references
EventLogCursor and EventLogsResponse) to add a "400" response with description
like "Malformed cursor" and content application/json using the existing Error
schema so the spec matches the handler behavior and regenerated server code via
make api.

---

Nitpick comments:
In `@internal/service/chatbridge/monitor_state_test.go`:
- Around line 506-520: Replace the two fixed time.Sleep(100 * time.Millisecond)
waits with deterministic assertions to avoid CI flakiness: instead of the first
sleep before checking delivered, use require.Eventually (or a channel/hook) to
assert delivered remains empty for a short interval (checking mu and delivered),
and instead of the final sleep use require.Never or another require.Eventually
that confirms no additional deliveries occur after the expected "run-save-retry"
(using the same mu and delivered variables and the stateDir save-failure
scenario); this targets the checks around delivered/mu in monitor_state_test.go
and avoids arbitrary sleeps.

In `@internal/service/chatbridge/monitor_state.go`:
- Around line 55-65: The loop over s.Destinations leaves nil pointer entries in
the map; change it to iterate with key, destination := range s.Destinations, and
when destination == nil call delete(s.Destinations, key) to remove that entry,
otherwise ensure destination.Pending and destination.Delivered are initialized
(make(map[string]NotificationEvent) and make(map[string]time.Time)
respectively); this ensures s.Destinations contains no nil pointers and all
remaining entries have initialized Pending and Delivered maps.

In `@internal/service/chatbridge/monitor_test.go`:
- Around line 252-256: Replace the fixed time.Sleep with a polling assertion:
call require.Eventually (from testify/require) to repeatedly invoke
store.stats() in a closure and assert that headCalls > 0 and readCalls == 0,
providing a reasonable timeout (e.g., 200–500ms) and tick interval (e.g.,
10–20ms); locate the sleep and the following store.stats() call in
monitor_test.go and replace them with this eventual check so the test waits
until the monitor behavior is observed instead of sleeping a fixed duration.

In `@internal/service/chatbridge/monitor.go`:
- Around line 582-585: The timestamp check for destState.Delivered[event.Key]
(using deliveredAt, ok, now) is confusing because it always updates the
timestamp; either clarify intent or change behavior: if the goal is only to
record first delivery, set destState.Delivered[event.Key]=now and changed=true
only when !ok; if the goal is to refresh TTL on every touch, always set
destState.Delivered[event.Key]=now but add a clear comment above the block
explaining “refreshing delivery timestamp to extend eviction TTL” and decide
whether to set changed=true on refresh (keep or remove changed update
accordingly). Ensure references to deliveredAt, event.Key, now and changed are
adjusted consistently.

In `@internal/service/eventstore/eventstore.go`:
- Around line 210-216: The current cloneData call performs a shallow copy (via
maps.Copy) so nested maps/slices remain shared; update cloneData to perform a
deep clone of map[string]any (recursively cloning nested maps and slices) and
use that deep clone where data is assigned before attaching the notification
snapshot (i.e., keep the existing usage around cloneData in the block that
touches newNotificationStatusSnapshot and notificationStatusSnapshotDataKey), or
alternatively document in NewDAGRunEvent that ownership is transferred and
callers must not mutate the passed-in data; pick one approach and apply
consistently.

In `@internal/service/eventstore/notifications.go`:
- Around line 184-193: Normalize NotificationCursor.CommittedOffsets at the
service boundary before delegating to store readers: update
Service.NotificationHeadCursor and Service.ReadNotificationEvents to defensively
normalize the cursor (e.g., ensure CommittedOffsets is
canonical/trimmed/validated) prior to calling reader.NotificationHeadCursor(ctx)
and reader.ReadNotificationEvents(ctx, cursor) respectively so custom
NotificationReader implementations can rely on a normalized API contract; use
the NotificationCursor/CommittedOffsets normalization routine already used by
fileeventstore to implement this change.

In `@ui/src/pages/event-logs/index.tsx`:
- Around line 273-277: The effect that updates lastUpdatedAt only checks
hasHeadResponse but lists extra deps (currentNextCursor, firstEntryID,
lastEntryID) causing unnecessary runs; modify the React.useEffect for
setLastUpdatedAt so its dependency list reflects the real trigger—either change
the deps to [hasHeadResponse] or to the actual data source (e.g., [data]) used
to indicate new entries—and keep the body as: if (hasHeadResponse)
setLastUpdatedAt(new Date()); targeting the useEffect that references
setLastUpdatedAt and hasHeadResponse.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 29bd3709-0b12-4947-b0f5-51477961424d

📥 Commits

Reviewing files that changed from the base of the PR and between b6c214f and 8618653.

📒 Files selected for processing (40)
  • api/v1/api.gen.go
  • api/v1/api.yaml
  • internal/cmd/server.go
  • internal/cmd/startall.go
  • internal/persis/fileeventstore/collector.go
  • internal/persis/fileeventstore/collector_test.go
  • internal/persis/fileeventstore/notifications.go
  • internal/persis/fileeventstore/notifications_test.go
  • internal/persis/fileeventstore/query_cursor.go
  • internal/persis/fileeventstore/reverse_line_reader.go
  • internal/persis/fileeventstore/store.go
  • internal/persis/fileeventstore/store_test.go
  • internal/service/chatbridge/monitor.go
  • internal/service/chatbridge/monitor_seed_test.go
  • internal/service/chatbridge/monitor_state.go
  • internal/service/chatbridge/monitor_state_test.go
  • internal/service/chatbridge/monitor_test.go
  • internal/service/chatbridge/notifications.go
  • internal/service/chatbridge/notifications_test.go
  • internal/service/eventstore/eventstore.go
  • internal/service/eventstore/eventstore_test.go
  • internal/service/eventstore/notifications.go
  • internal/service/frontend/api/v1/events.go
  • internal/service/frontend/api/v1/events_test.go
  • internal/service/slack/bot.go
  • internal/service/slack/bot_test.go
  • internal/service/slack/monitor.go
  • internal/service/slack/monitor_test.go
  • internal/service/telegram/bot.go
  • internal/service/telegram/bot_test.go
  • internal/service/telegram/monitor.go
  • internal/service/telegram/monitor_test.go
  • ui/src/App.tsx
  • ui/src/api/v1/schema.ts
  • ui/src/contexts/AuthContext.tsx
  • ui/src/menu.tsx
  • ui/src/pages/api-docs/__tests__/index.test.tsx
  • ui/src/pages/api-docs/index.tsx
  • ui/src/pages/event-logs/__tests__/index.test.tsx
  • ui/src/pages/event-logs/index.tsx
💤 Files with no reviewable changes (2)
  • ui/src/pages/api-docs/tests/index.test.tsx
  • internal/service/chatbridge/monitor_seed_test.go

Comment thread api/v1/api.yaml
- $ref: "#/components/parameters/LogEndTime"
- $ref: "#/components/parameters/EventLogLimit"
- $ref: "#/components/parameters/LogOffset"
- $ref: "#/components/parameters/EventLogCursor"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid hard-breaking the existing /event-logs contract.

This swaps offset/total for cursor/nextCursor in place. Existing clients that paginate with offset or deserialize total will stop working correctly after upgrade under the same endpoint. Please keep a compatibility window or version this contract.

As per coding guidelines, "Generate REST API server code from OpenAPI spec at api/v1/api.yaml using oapi-codegen via make api".

Also applies to: 6481-6488, 6742-6756

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/v1/api.yaml` at line 3526, The change replaced offset/total with
cursor/nextCursor in the /event-logs contract (the $ref to EventLogCursor) which
will break existing clients; restore backward compatibility by either versioning
the endpoint (e.g., add /v2/event-logs) or keep the existing parameter names in
api/v1/api.yaml and introduce new cursor parameters as optional fields (or a new
component like EventLogCursorV2) so old clients still see offset and total;
update api generation by running make api (oapi-codegen) after modifying the
spec and ensure both parameter shapes are present or the new endpoint is
introduced to avoid breaking consumers.

Comment on lines +84 to +87
event, err := s.readInboxNotificationEvent(name)
if err != nil {
return nil, nextCursor, err
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Malformed inbox files can halt notification delivery for all future events.

Line 84-87 currently aborts the entire read on any inbox file error; combined with Line 201-207 decode/validate hard-fail behavior, one bad inbox file can stall the monitor indefinitely.

Resilient handling approach
 	for _, name := range inboxFiles {
 		if cursor.LastInboxFile != "" && name <= cursor.LastInboxFile {
 			continue
 		}
 		event, err := s.readInboxNotificationEvent(name)
 		if err != nil {
-			return nil, nextCursor, err
+			slog.Warn("fileeventstore: skipping unreadable inbox notification file",
+				slog.String("file", filepath.Join(s.inboxDir, name)),
+				slog.String("error", err.Error()),
+			)
+			nextCursor.LastInboxFile = name
+			continue
 		}
 		nextCursor.LastInboxFile = name
 		selectNewestNotificationEvent(eventsByID, event)
 	}

Also applies to: 201-207

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/persis/fileeventstore/notifications.go` around lines 84 - 87, The
current code in the inbox reading loop aborts the entire operation on any file
error (see call to s.readInboxNotificationEvent(name)) and the decode/validate
path also hard-fails; change both places to treat malformed inbox files as
non-fatal: when s.readInboxNotificationEvent(name) returns an error, log the
error with context (including the inbox name and cursor), skip that file and
continue the loop instead of returning nil,nextCursor,err; likewise make the
decode/validate logic return a skip result (or nil event) on format/validation
errors, log details, advance nextCursor, and continue so a single bad file
cannot stall the monitor or prevent subsequent notifications from being
processed.

Comment on lines +66 to +70
readSize := minInt64(reverseLineReaderBlockSize, r.pos)
r.pos -= readSize

chunk := make([]byte, readSize)
n, err := r.file.ReadAt(chunk, r.pos)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify the typed length flow for the failing allocation.
rg -n -C2 'readSize := minInt64|make\(\[\]byte, readSize\)|func minInt64' internal/persis/fileeventstore/reverse_line_reader.go

Repository: dagu-org/dagu

Length of output: 351


Compile error: make([]byte, readSize) uses int64 length.

Line 69 passes readSize (int64) to make, which does not compile as written.

Suggested fix
-		chunk := make([]byte, readSize)
+		chunk := make([]byte, int(readSize))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
readSize := minInt64(reverseLineReaderBlockSize, r.pos)
r.pos -= readSize
chunk := make([]byte, readSize)
n, err := r.file.ReadAt(chunk, r.pos)
readSize := minInt64(reverseLineReaderBlockSize, r.pos)
r.pos -= readSize
chunk := make([]byte, int(readSize))
n, err := r.file.ReadAt(chunk, r.pos)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/persis/fileeventstore/reverse_line_reader.go` around lines 66 - 70,
The code creates chunk := make([]byte, readSize) but readSize is int64; change
allocation to use an int by casting: chunk := make([]byte, int(readSize)), and
ensure subsequent uses respect the actual bytes read (n) from r.file.ReadAt —
e.g., pass chunk[:n] to any processing or slicing logic; touch the block around
reverseLineReaderBlockSize, r.pos, chunk and the r.file.ReadAt call to apply the
cast and use the returned n safely.

Comment on lines +177 to +182
var snapshot NotificationStatusSnapshot
if err := json.Unmarshal(payload, &snapshot); err != nil {
return nil, fmt.Errorf("eventstore: unmarshal notification snapshot: %w", err)
}
return snapshot.DAGRunStatus(), nil
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate required notification fields after JSON decode.

json.Unmarshal succeeds for {}/null, so Line 181 can return a zero-value *exec.DAGRunStatus and treat malformed events as valid. Add semantic validation (e.g., DAGRunID, AttemptID, Name, Status) before returning.

Proposed fix
 func NotificationStatusFromEvent(event *Event) (*exec.DAGRunStatus, error) {
@@
 	var snapshot NotificationStatusSnapshot
 	if err := json.Unmarshal(payload, &snapshot); err != nil {
 		return nil, fmt.Errorf("eventstore: unmarshal notification snapshot: %w", err)
 	}
+	if snapshot.DAGRunID == "" || snapshot.AttemptID == "" || snapshot.Name == "" || snapshot.Status == "" {
+		return nil, errors.New("eventstore: notification snapshot is invalid")
+	}
 	return snapshot.DAGRunStatus(), nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
var snapshot NotificationStatusSnapshot
if err := json.Unmarshal(payload, &snapshot); err != nil {
return nil, fmt.Errorf("eventstore: unmarshal notification snapshot: %w", err)
}
return snapshot.DAGRunStatus(), nil
}
var snapshot NotificationStatusSnapshot
if err := json.Unmarshal(payload, &snapshot); err != nil {
return nil, fmt.Errorf("eventstore: unmarshal notification snapshot: %w", err)
}
if snapshot.DAGRunID == "" || snapshot.AttemptID == "" || snapshot.Name == "" || snapshot.Status == "" {
return nil, errors.New("eventstore: notification snapshot is invalid")
}
return snapshot.DAGRunStatus(), nil
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/eventstore/notifications.go` around lines 177 - 182, After
json.Unmarshal into NotificationStatusSnapshot, perform semantic validation on
the decoded snapshot (e.g., ensure DAGRunID, AttemptID, Name and Status are
non-empty/valid) before calling snapshot.DAGRunStatus(); if any required field
is missing or invalid, return an error like "eventstore: invalid notification
snapshot: missing <field>" so malformed `{}`/`null` payloads don't produce
zero-value *exec.DAGRunStatus. Update the function that processes payload (the
block that creates var snapshot NotificationStatusSnapshot and returns
snapshot.DAGRunStatus()) to validate those specific fields on snapshot and fail
early with a clear error when validation fails.

Comment on lines +422 to +430
expect(clientGetMock).toHaveBeenCalledWith('/event-logs', {
params: {
query: expect.objectContaining({
kind: 'dag_run',
limit: 50,
cursor: 'cursor-1',
}),
},
});
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Assert remoteNode in the “Load More” request expectation.

This test currently won’t catch regressions where pagination calls omit remote node routing.

Suggested assertion update
       expect(clientGetMock).toHaveBeenCalledWith('/event-logs', {
         params: {
           query: expect.objectContaining({
+            remoteNode: 'remote-a',
             kind: 'dag_run',
             limit: 50,
             cursor: 'cursor-1',
           }),
         },
       });

As per coding guidelines ui/**/*.{ts,tsx}: All API calls MUST include the remoteNode parameter to route requests to the correct node in multi-node Dagu deployments.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/pages/event-logs/__tests__/index.test.tsx` around lines 422 - 430,
Update the test assertion for clientGetMock in index.test.tsx to also assert
that the pagination request includes the remoteNode routing parameter; modify
the expect(...toHaveBeenCalledWith('/event-logs', { params: { query:
expect.objectContaining({ kind: 'dag_run', limit: 50, cursor: 'cursor-1',
remoteNode: <value> }) } })) line so remoteNode is checked (use the specific
variable/constant used in the test if available, or expect.any(String) / the
expected node id) to ensure Load More requests include remoteNode.

Comment thread ui/src/pages/event-logs/index.tsx Outdated
Comment on lines +1 to +2
import { components } from '@/api/v1/schema';
import { Badge } from '@/components/ui/badge';
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing GPL v3 license header.

This new file should include the GPL v3 license header at the top, consistent with other source files in the project.

Proposed fix to add license header
+// Copyright (C) 2026 Yota Hamada
+// SPDX-License-Identifier: GPL-3.0-or-later
+
 import { components } from '@/api/v1/schema';

As per coding guidelines: "Apply GPL v3 license headers on source files, managed via make addlicense"

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import { components } from '@/api/v1/schema';
import { Badge } from '@/components/ui/badge';
// Copyright (C) 2026 Yota Hamada
// SPDX-License-Identifier: GPL-3.0-or-later
import { components } from '@/api/v1/schema';
import { Badge } from '@/components/ui/badge';
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/pages/event-logs/index.tsx` around lines 1 - 2, Add the GPL v3 license
header to the top of this new module (the file that imports "components" from
'@/api/v1/schema' and "Badge" from '@/components/ui/badge') to match other
source files; either insert the project’s standard GPL v3 header comment block
at the very top of the file or run the repository tool (make addlicense) to
apply it automatically so the header is consistent with existing files.

@yottahmd yottahmd merged commit 5f5f73a into main Apr 2, 2026
6 checks passed
@yottahmd yottahmd deleted the eventfeed-ui branch April 2, 2026 01:46
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 2, 2026

Codecov Report

❌ Patch coverage is 64.95360% with 491 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.66%. Comparing base (7e91b40) to head (d6a64e2).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
internal/service/chatbridge/monitor.go 72.30% 126 Missing and 51 partials ⚠️
internal/persis/fileeventstore/notifications.go 57.39% 46 Missing and 26 partials ⚠️
internal/service/eventstore/notifications.go 66.92% 24 Missing and 18 partials ⚠️
internal/service/eventstore/eventstore.go 36.20% 36 Missing and 1 partial ⚠️
internal/service/chatbridge/monitor_state.go 62.76% 21 Missing and 14 partials ⚠️
internal/persis/fileeventstore/store.go 71.57% 20 Missing and 7 partials ⚠️
internal/service/chatbridge/notifications.go 67.34% 8 Missing and 8 partials ⚠️
internal/persis/fileeventstore/query_cursor.go 75.86% 7 Missing and 7 partials ⚠️
internal/service/telegram/bot.go 0.00% 14 Missing ⚠️
internal/service/slack/bot.go 0.00% 13 Missing ⚠️
... and 6 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1946      +/-   ##
==========================================
- Coverage   68.67%   68.66%   -0.02%     
==========================================
  Files         463      468       +5     
  Lines       58751    59992    +1241     
==========================================
+ Hits        40348    41193     +845     
- Misses      14673    14942     +269     
- Partials     3730     3857     +127     
Files with missing lines Coverage Δ
internal/service/slack/monitor.go 63.73% <25.00%> (-0.71%) ⬇️
internal/service/telegram/monitor.go 65.78% <25.00%> (-0.88%) ⬇️
internal/persis/fileeventstore/collector.go 48.99% <50.00%> (+0.03%) ⬆️
...ernal/persis/fileeventstore/reverse_line_reader.go 77.77% <77.77%> (ø)
internal/cmd/server.go 25.00% <0.00%> (-0.31%) ⬇️
internal/cmd/startall.go 50.51% <0.00%> (-0.53%) ⬇️
internal/service/slack/bot.go 26.13% <0.00%> (-0.15%) ⬇️
internal/persis/fileeventstore/query_cursor.go 75.86% <75.86%> (ø)
internal/service/telegram/bot.go 36.03% <0.00%> (-0.26%) ⬇️
internal/service/chatbridge/notifications.go 63.82% <67.34%> (-0.07%) ⬇️
... and 6 more

... and 12 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 7e91b40...d6a64e2. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant