Add event feed UI and harden event-driven bot notifications#1946
Add event feed UI and harden event-driven bot notifications#1946
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR implements cursor-based pagination for event logs, refactors the notification monitoring system from DAG run store polling to event-store-based notification streaming with persistent state management, and adds a new Event Logs frontend page. The changes include API schema updates, file-backed event store enhancements, notification monitor core logic restructuring, and Telegram/Slack bot integration updates. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Frontend
participant API as API Handler
participant Monitor as NotificationMonitor
participant EventStore as EventStore Service
participant StateStore as StateStore (File)
participant Batcher as NotificationBatcher
participant Bot as Telegram/Slack Bot
Client->>API: GET /event-logs (Cursor)
API->>EventStore: Query(Filter with Cursor)
EventStore->>EventStore: decodeQueryCursor(Cursor)
EventStore->>EventStore: readCommittedEventsReverse(File, Offset)
EventStore-->>API: QueryResult(Entries, NextCursor)
API-->>Client: EventLogsResponse(Entries, NextCursor)
Monitor->>EventStore: NotificationHeadCursor()
EventStore-->>Monitor: NotificationCursor
Monitor->>StateStore: Load()
StateStore-->>Monitor: notificationMonitorState
loop Poll for Events
Monitor->>EventStore: ReadNotificationEvents(Cursor)
EventStore-->>Monitor: [Event], UpdatedCursor
Monitor->>Batcher: Enqueue(destination, NotificationEvent)
Batcher-->>Monitor: batched
Batcher->>Bot: SendBatch()
Bot-->>Batcher: delivered
Monitor->>Monitor: markBatchDelivered()
Monitor->>StateStore: Save(UpdatedState)
StateStore-->>Monitor: persisted
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
api/v1/api.yaml (1)
3526-3546:⚠️ Potential issue | 🟡 MinorDocument the invalid cursor response.
Line 3526 adds a validated cursor input, but
/event-logsstill omits the400that the handler now returns for malformed cursors. That leaves the generated contract behind the implementation.🧩 Suggested spec update
responses: "200": description: "List of event log entries" content: application/json: schema: $ref: "#/components/schemas/EventLogsResponse" + "400": + description: "Invalid cursor" + content: + application/json: + schema: + $ref: "#/components/schemas/Error" "401":As per coding guidelines, "Generate REST API server code from OpenAPI spec at
api/v1/api.yamlusingoapi-codegenviamake api".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/v1/api.yaml` around lines 3526 - 3546, The OpenAPI for the /event-logs operation is missing the 400 response for malformed/invalid cursors introduced by the validated EventLogCursor parameter; update the operation (where it references EventLogCursor and EventLogsResponse) to add a "400" response with description like "Malformed cursor" and content application/json using the existing Error schema so the spec matches the handler behavior and regenerated server code via make api.
🧹 Nitpick comments (7)
internal/service/chatbridge/monitor_state.go (1)
55-65: Nil destination entries remain in the map after normalization.When
destination == nil, the loop continues without removing or initializing the entry. This leaves nil pointers in theDestinationsmap, which could cause nil-pointer dereferences in code that iterates over the map expecting initialized entries.Consider either removing nil entries or initializing them:
♻️ Option: Initialize nil destination entries
for _, destination := range s.Destinations { if destination == nil { - continue + // Note: Can't fix in-place during range; consider using a key-based loop + // or removing nil entries after iteration } if destination.Pending == nil { destination.Pending = make(map[string]NotificationEvent) } if destination.Delivered == nil { destination.Delivered = make(map[string]time.Time) } }Alternative approach using keys:
+for key, destination := range s.Destinations { + if destination == nil { + s.Destinations[key] = ¬ificationDestinationState{ + Pending: make(map[string]NotificationEvent), + Delivered: make(map[string]time.Time), + } + continue + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/chatbridge/monitor_state.go` around lines 55 - 65, The loop over s.Destinations leaves nil pointer entries in the map; change it to iterate with key, destination := range s.Destinations, and when destination == nil call delete(s.Destinations, key) to remove that entry, otherwise ensure destination.Pending and destination.Delivered are initialized (make(map[string]NotificationEvent) and make(map[string]time.Time) respectively); this ensures s.Destinations contains no nil pointers and all remaining entries have initialized Pending and Delivered maps.internal/service/eventstore/eventstore.go (1)
210-216: Shallow clone shares nested mutable values.
cloneDatausesmaps.Copy, which is a shallow clone. Ifdatacontains nested maps or slices, those are shared with the original. This is typically fine if callers don't mutatedataafter callingNewDAGRunEvent, but could lead to subtle bugs if they do.If event data immutability is guaranteed by convention, this is acceptable. Otherwise, consider a deep-clone or documenting the ownership transfer.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/eventstore/eventstore.go` around lines 210 - 216, The current cloneData call performs a shallow copy (via maps.Copy) so nested maps/slices remain shared; update cloneData to perform a deep clone of map[string]any (recursively cloning nested maps and slices) and use that deep clone where data is assigned before attaching the notification snapshot (i.e., keep the existing usage around cloneData in the block that touches newNotificationStatusSnapshot and notificationStatusSnapshotDataKey), or alternatively document in NewDAGRunEvent that ownership is transferred and callers must not mutate the passed-in data; pick one approach and apply consistently.internal/service/chatbridge/monitor_test.go (1)
252-256: Replace fixed sleep with an eventual assertion to reduce flakiness.Line 252 uses a fixed
time.Sleep(80 * time.Millisecond), which can intermittently fail on slower CI runners. Preferrequire.Eventuallyon the expected monitor behavior.Suggested change
- time.Sleep(80 * time.Millisecond) - - headCalls, readCalls := store.stats() - assert.Greater(t, headCalls, 0) - assert.Equal(t, 0, readCalls) + require.Eventually(t, func() bool { + headCalls, readCalls := store.stats() + return headCalls > 0 && readCalls == 0 + }, time.Second, 10*time.Millisecond)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/chatbridge/monitor_test.go` around lines 252 - 256, Replace the fixed time.Sleep with a polling assertion: call require.Eventually (from testify/require) to repeatedly invoke store.stats() in a closure and assert that headCalls > 0 and readCalls == 0, providing a reasonable timeout (e.g., 200–500ms) and tick interval (e.g., 10–20ms); locate the sleep and the following store.stats() call in monitor_test.go and replace them with this eventual check so the test waits until the monitor behavior is observed instead of sleeping a fixed duration.ui/src/pages/event-logs/index.tsx (1)
273-277: Effect dependencies may cause extra updates.The
lastUpdatedAtupdate effect has dependencies[currentNextCursor, firstEntryID, hasHeadResponse, lastEntryID]but only checkshasHeadResponsebefore updating. The additional dependencies (firstEntryID,lastEntryID,currentNextCursor) will trigger the effect but won't change the outcome sincehasHeadResponseis the gating condition.This is harmless (just sets the same timestamp) but could be simplified:
♻️ Simplified effect dependencies
React.useEffect(() => { if (hasHeadResponse) { setLastUpdatedAt(new Date()); } - }, [currentNextCursor, firstEntryID, hasHeadResponse, lastEntryID]); + }, [hasHeadResponse, data]);Using
dataas a dependency more directly reflects the intent: update timestamp when new data arrives.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ui/src/pages/event-logs/index.tsx` around lines 273 - 277, The effect that updates lastUpdatedAt only checks hasHeadResponse but lists extra deps (currentNextCursor, firstEntryID, lastEntryID) causing unnecessary runs; modify the React.useEffect for setLastUpdatedAt so its dependency list reflects the real trigger—either change the deps to [hasHeadResponse] or to the actual data source (e.g., [data]) used to indicate new entries—and keep the body as: if (hasHeadResponse) setLastUpdatedAt(new Date()); targeting the useEffect that references setLastUpdatedAt and hasHeadResponse.internal/service/chatbridge/monitor.go (1)
582-585: Timestamp comparison may be overly strict.Line 582 checks
if deliveredAt, ok := destState.Delivered[event.Key]; !ok || !deliveredAt.Equal(now). Sincenowis freshly captured at line 568, the!deliveredAt.Equal(now)condition will almost always be true for existing entries (they would have a different timestamp from a previous delivery).This means re-marking an already-delivered event will update its timestamp. This is likely intentional (refresh the TTL for eviction), but the condition is confusing. Consider simplifying or adding a comment explaining the intent:
♻️ Clarify the intent
- if deliveredAt, ok := destState.Delivered[event.Key]; !ok || !deliveredAt.Equal(now) { + // Update timestamp on each delivery to refresh TTL for eviction + if _, ok := destState.Delivered[event.Key]; !ok { destState.Delivered[event.Key] = now changed = true }Or if refreshing the timestamp is intentional:
+ // Always update timestamp to refresh eviction TTL if deliveredAt, ok := destState.Delivered[event.Key]; !ok || !deliveredAt.Equal(now) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/chatbridge/monitor.go` around lines 582 - 585, The timestamp check for destState.Delivered[event.Key] (using deliveredAt, ok, now) is confusing because it always updates the timestamp; either clarify intent or change behavior: if the goal is only to record first delivery, set destState.Delivered[event.Key]=now and changed=true only when !ok; if the goal is to refresh TTL on every touch, always set destState.Delivered[event.Key]=now but add a clear comment above the block explaining “refreshing delivery timestamp to extend eviction TTL” and decide whether to set changed=true on refresh (keep or remove changed update accordingly). Ensure references to deliveredAt, event.Key, now and changed are adjusted consistently.internal/service/chatbridge/monitor_state_test.go (1)
506-520: Consider potential test flakiness from fixed sleeps.The
time.Sleep(100 * time.Millisecond)calls at lines 506 and 520 are used to wait for the monitor to attempt delivery and observe the save failure. While understandable for testing persistence failure scenarios, fixed sleeps can cause flakiness on slower CI systems.Consider whether these could be replaced with more deterministic synchronization, such as waiting for a specific internal state or using a hook/callback mechanism in test mode. However, given the complexity of instrumenting save failure observation, the current approach is acceptable if the timeouts provide sufficient margin.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/chatbridge/monitor_state_test.go` around lines 506 - 520, Replace the two fixed time.Sleep(100 * time.Millisecond) waits with deterministic assertions to avoid CI flakiness: instead of the first sleep before checking delivered, use require.Eventually (or a channel/hook) to assert delivered remains empty for a short interval (checking mu and delivered), and instead of the final sleep use require.Never or another require.Eventually that confirms no additional deliveries occur after the expected "run-save-retry" (using the same mu and delivered variables and the stateDir save-failure scenario); this targets the checks around delivered/mu in monitor_state_test.go and avoids arbitrary sleeps.internal/service/eventstore/notifications.go (1)
184-193: Consider normalizing at the service boundary for API contract clarity. All current implementations handleCommittedOffsetssafely (fileeventstore normalizes on return or immediately on entry), so this is not a bug fix. However, normalizing cursors inNotificationHeadCursor(line 192) andReadNotificationEvents(line 203) before delegation would make the contract clearer and prevent future issues if custom reader implementations skip normalization.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/eventstore/notifications.go` around lines 184 - 193, Normalize NotificationCursor.CommittedOffsets at the service boundary before delegating to store readers: update Service.NotificationHeadCursor and Service.ReadNotificationEvents to defensively normalize the cursor (e.g., ensure CommittedOffsets is canonical/trimmed/validated) prior to calling reader.NotificationHeadCursor(ctx) and reader.ReadNotificationEvents(ctx, cursor) respectively so custom NotificationReader implementations can rely on a normalized API contract; use the NotificationCursor/CommittedOffsets normalization routine already used by fileeventstore to implement this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@api/v1/api.yaml`:
- Line 3526: The change replaced offset/total with cursor/nextCursor in the
/event-logs contract (the $ref to EventLogCursor) which will break existing
clients; restore backward compatibility by either versioning the endpoint (e.g.,
add /v2/event-logs) or keep the existing parameter names in api/v1/api.yaml and
introduce new cursor parameters as optional fields (or a new component like
EventLogCursorV2) so old clients still see offset and total; update api
generation by running make api (oapi-codegen) after modifying the spec and
ensure both parameter shapes are present or the new endpoint is introduced to
avoid breaking consumers.
In `@internal/persis/fileeventstore/notifications.go`:
- Around line 84-87: The current code in the inbox reading loop aborts the
entire operation on any file error (see call to
s.readInboxNotificationEvent(name)) and the decode/validate path also
hard-fails; change both places to treat malformed inbox files as non-fatal: when
s.readInboxNotificationEvent(name) returns an error, log the error with context
(including the inbox name and cursor), skip that file and continue the loop
instead of returning nil,nextCursor,err; likewise make the decode/validate logic
return a skip result (or nil event) on format/validation errors, log details,
advance nextCursor, and continue so a single bad file cannot stall the monitor
or prevent subsequent notifications from being processed.
In `@internal/persis/fileeventstore/reverse_line_reader.go`:
- Around line 66-70: The code creates chunk := make([]byte, readSize) but
readSize is int64; change allocation to use an int by casting: chunk :=
make([]byte, int(readSize)), and ensure subsequent uses respect the actual bytes
read (n) from r.file.ReadAt — e.g., pass chunk[:n] to any processing or slicing
logic; touch the block around reverseLineReaderBlockSize, r.pos, chunk and the
r.file.ReadAt call to apply the cast and use the returned n safely.
In `@internal/service/eventstore/notifications.go`:
- Around line 177-182: After json.Unmarshal into NotificationStatusSnapshot,
perform semantic validation on the decoded snapshot (e.g., ensure DAGRunID,
AttemptID, Name and Status are non-empty/valid) before calling
snapshot.DAGRunStatus(); if any required field is missing or invalid, return an
error like "eventstore: invalid notification snapshot: missing <field>" so
malformed `{}`/`null` payloads don't produce zero-value *exec.DAGRunStatus.
Update the function that processes payload (the block that creates var snapshot
NotificationStatusSnapshot and returns snapshot.DAGRunStatus()) to validate
those specific fields on snapshot and fail early with a clear error when
validation fails.
In `@ui/src/pages/event-logs/__tests__/index.test.tsx`:
- Around line 422-430: Update the test assertion for clientGetMock in
index.test.tsx to also assert that the pagination request includes the
remoteNode routing parameter; modify the
expect(...toHaveBeenCalledWith('/event-logs', { params: { query:
expect.objectContaining({ kind: 'dag_run', limit: 50, cursor: 'cursor-1',
remoteNode: <value> }) } })) line so remoteNode is checked (use the specific
variable/constant used in the test if available, or expect.any(String) / the
expected node id) to ensure Load More requests include remoteNode.
In `@ui/src/pages/event-logs/index.tsx`:
- Around line 1-2: Add the GPL v3 license header to the top of this new module
(the file that imports "components" from '@/api/v1/schema' and "Badge" from
'@/components/ui/badge') to match other source files; either insert the
project’s standard GPL v3 header comment block at the very top of the file or
run the repository tool (make addlicense) to apply it automatically so the
header is consistent with existing files.
---
Outside diff comments:
In `@api/v1/api.yaml`:
- Around line 3526-3546: The OpenAPI for the /event-logs operation is missing
the 400 response for malformed/invalid cursors introduced by the validated
EventLogCursor parameter; update the operation (where it references
EventLogCursor and EventLogsResponse) to add a "400" response with description
like "Malformed cursor" and content application/json using the existing Error
schema so the spec matches the handler behavior and regenerated server code via
make api.
---
Nitpick comments:
In `@internal/service/chatbridge/monitor_state_test.go`:
- Around line 506-520: Replace the two fixed time.Sleep(100 * time.Millisecond)
waits with deterministic assertions to avoid CI flakiness: instead of the first
sleep before checking delivered, use require.Eventually (or a channel/hook) to
assert delivered remains empty for a short interval (checking mu and delivered),
and instead of the final sleep use require.Never or another require.Eventually
that confirms no additional deliveries occur after the expected "run-save-retry"
(using the same mu and delivered variables and the stateDir save-failure
scenario); this targets the checks around delivered/mu in monitor_state_test.go
and avoids arbitrary sleeps.
In `@internal/service/chatbridge/monitor_state.go`:
- Around line 55-65: The loop over s.Destinations leaves nil pointer entries in
the map; change it to iterate with key, destination := range s.Destinations, and
when destination == nil call delete(s.Destinations, key) to remove that entry,
otherwise ensure destination.Pending and destination.Delivered are initialized
(make(map[string]NotificationEvent) and make(map[string]time.Time)
respectively); this ensures s.Destinations contains no nil pointers and all
remaining entries have initialized Pending and Delivered maps.
In `@internal/service/chatbridge/monitor_test.go`:
- Around line 252-256: Replace the fixed time.Sleep with a polling assertion:
call require.Eventually (from testify/require) to repeatedly invoke
store.stats() in a closure and assert that headCalls > 0 and readCalls == 0,
providing a reasonable timeout (e.g., 200–500ms) and tick interval (e.g.,
10–20ms); locate the sleep and the following store.stats() call in
monitor_test.go and replace them with this eventual check so the test waits
until the monitor behavior is observed instead of sleeping a fixed duration.
In `@internal/service/chatbridge/monitor.go`:
- Around line 582-585: The timestamp check for destState.Delivered[event.Key]
(using deliveredAt, ok, now) is confusing because it always updates the
timestamp; either clarify intent or change behavior: if the goal is only to
record first delivery, set destState.Delivered[event.Key]=now and changed=true
only when !ok; if the goal is to refresh TTL on every touch, always set
destState.Delivered[event.Key]=now but add a clear comment above the block
explaining “refreshing delivery timestamp to extend eviction TTL” and decide
whether to set changed=true on refresh (keep or remove changed update
accordingly). Ensure references to deliveredAt, event.Key, now and changed are
adjusted consistently.
In `@internal/service/eventstore/eventstore.go`:
- Around line 210-216: The current cloneData call performs a shallow copy (via
maps.Copy) so nested maps/slices remain shared; update cloneData to perform a
deep clone of map[string]any (recursively cloning nested maps and slices) and
use that deep clone where data is assigned before attaching the notification
snapshot (i.e., keep the existing usage around cloneData in the block that
touches newNotificationStatusSnapshot and notificationStatusSnapshotDataKey), or
alternatively document in NewDAGRunEvent that ownership is transferred and
callers must not mutate the passed-in data; pick one approach and apply
consistently.
In `@internal/service/eventstore/notifications.go`:
- Around line 184-193: Normalize NotificationCursor.CommittedOffsets at the
service boundary before delegating to store readers: update
Service.NotificationHeadCursor and Service.ReadNotificationEvents to defensively
normalize the cursor (e.g., ensure CommittedOffsets is
canonical/trimmed/validated) prior to calling reader.NotificationHeadCursor(ctx)
and reader.ReadNotificationEvents(ctx, cursor) respectively so custom
NotificationReader implementations can rely on a normalized API contract; use
the NotificationCursor/CommittedOffsets normalization routine already used by
fileeventstore to implement this change.
In `@ui/src/pages/event-logs/index.tsx`:
- Around line 273-277: The effect that updates lastUpdatedAt only checks
hasHeadResponse but lists extra deps (currentNextCursor, firstEntryID,
lastEntryID) causing unnecessary runs; modify the React.useEffect for
setLastUpdatedAt so its dependency list reflects the real trigger—either change
the deps to [hasHeadResponse] or to the actual data source (e.g., [data]) used
to indicate new entries—and keep the body as: if (hasHeadResponse)
setLastUpdatedAt(new Date()); targeting the useEffect that references
setLastUpdatedAt and hasHeadResponse.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 29bd3709-0b12-4947-b0f5-51477961424d
📒 Files selected for processing (40)
api/v1/api.gen.goapi/v1/api.yamlinternal/cmd/server.gointernal/cmd/startall.gointernal/persis/fileeventstore/collector.gointernal/persis/fileeventstore/collector_test.gointernal/persis/fileeventstore/notifications.gointernal/persis/fileeventstore/notifications_test.gointernal/persis/fileeventstore/query_cursor.gointernal/persis/fileeventstore/reverse_line_reader.gointernal/persis/fileeventstore/store.gointernal/persis/fileeventstore/store_test.gointernal/service/chatbridge/monitor.gointernal/service/chatbridge/monitor_seed_test.gointernal/service/chatbridge/monitor_state.gointernal/service/chatbridge/monitor_state_test.gointernal/service/chatbridge/monitor_test.gointernal/service/chatbridge/notifications.gointernal/service/chatbridge/notifications_test.gointernal/service/eventstore/eventstore.gointernal/service/eventstore/eventstore_test.gointernal/service/eventstore/notifications.gointernal/service/frontend/api/v1/events.gointernal/service/frontend/api/v1/events_test.gointernal/service/slack/bot.gointernal/service/slack/bot_test.gointernal/service/slack/monitor.gointernal/service/slack/monitor_test.gointernal/service/telegram/bot.gointernal/service/telegram/bot_test.gointernal/service/telegram/monitor.gointernal/service/telegram/monitor_test.goui/src/App.tsxui/src/api/v1/schema.tsui/src/contexts/AuthContext.tsxui/src/menu.tsxui/src/pages/api-docs/__tests__/index.test.tsxui/src/pages/api-docs/index.tsxui/src/pages/event-logs/__tests__/index.test.tsxui/src/pages/event-logs/index.tsx
💤 Files with no reviewable changes (2)
- ui/src/pages/api-docs/tests/index.test.tsx
- internal/service/chatbridge/monitor_seed_test.go
| - $ref: "#/components/parameters/LogEndTime" | ||
| - $ref: "#/components/parameters/EventLogLimit" | ||
| - $ref: "#/components/parameters/LogOffset" | ||
| - $ref: "#/components/parameters/EventLogCursor" |
There was a problem hiding this comment.
Avoid hard-breaking the existing /event-logs contract.
This swaps offset/total for cursor/nextCursor in place. Existing clients that paginate with offset or deserialize total will stop working correctly after upgrade under the same endpoint. Please keep a compatibility window or version this contract.
As per coding guidelines, "Generate REST API server code from OpenAPI spec at api/v1/api.yaml using oapi-codegen via make api".
Also applies to: 6481-6488, 6742-6756
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@api/v1/api.yaml` at line 3526, The change replaced offset/total with
cursor/nextCursor in the /event-logs contract (the $ref to EventLogCursor) which
will break existing clients; restore backward compatibility by either versioning
the endpoint (e.g., add /v2/event-logs) or keep the existing parameter names in
api/v1/api.yaml and introduce new cursor parameters as optional fields (or a new
component like EventLogCursorV2) so old clients still see offset and total;
update api generation by running make api (oapi-codegen) after modifying the
spec and ensure both parameter shapes are present or the new endpoint is
introduced to avoid breaking consumers.
| event, err := s.readInboxNotificationEvent(name) | ||
| if err != nil { | ||
| return nil, nextCursor, err | ||
| } |
There was a problem hiding this comment.
Malformed inbox files can halt notification delivery for all future events.
Line 84-87 currently aborts the entire read on any inbox file error; combined with Line 201-207 decode/validate hard-fail behavior, one bad inbox file can stall the monitor indefinitely.
Resilient handling approach
for _, name := range inboxFiles {
if cursor.LastInboxFile != "" && name <= cursor.LastInboxFile {
continue
}
event, err := s.readInboxNotificationEvent(name)
if err != nil {
- return nil, nextCursor, err
+ slog.Warn("fileeventstore: skipping unreadable inbox notification file",
+ slog.String("file", filepath.Join(s.inboxDir, name)),
+ slog.String("error", err.Error()),
+ )
+ nextCursor.LastInboxFile = name
+ continue
}
nextCursor.LastInboxFile = name
selectNewestNotificationEvent(eventsByID, event)
}Also applies to: 201-207
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/persis/fileeventstore/notifications.go` around lines 84 - 87, The
current code in the inbox reading loop aborts the entire operation on any file
error (see call to s.readInboxNotificationEvent(name)) and the decode/validate
path also hard-fails; change both places to treat malformed inbox files as
non-fatal: when s.readInboxNotificationEvent(name) returns an error, log the
error with context (including the inbox name and cursor), skip that file and
continue the loop instead of returning nil,nextCursor,err; likewise make the
decode/validate logic return a skip result (or nil event) on format/validation
errors, log details, advance nextCursor, and continue so a single bad file
cannot stall the monitor or prevent subsequent notifications from being
processed.
| readSize := minInt64(reverseLineReaderBlockSize, r.pos) | ||
| r.pos -= readSize | ||
|
|
||
| chunk := make([]byte, readSize) | ||
| n, err := r.file.ReadAt(chunk, r.pos) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify the typed length flow for the failing allocation.
rg -n -C2 'readSize := minInt64|make\(\[\]byte, readSize\)|func minInt64' internal/persis/fileeventstore/reverse_line_reader.goRepository: dagu-org/dagu
Length of output: 351
Compile error: make([]byte, readSize) uses int64 length.
Line 69 passes readSize (int64) to make, which does not compile as written.
Suggested fix
- chunk := make([]byte, readSize)
+ chunk := make([]byte, int(readSize))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| readSize := minInt64(reverseLineReaderBlockSize, r.pos) | |
| r.pos -= readSize | |
| chunk := make([]byte, readSize) | |
| n, err := r.file.ReadAt(chunk, r.pos) | |
| readSize := minInt64(reverseLineReaderBlockSize, r.pos) | |
| r.pos -= readSize | |
| chunk := make([]byte, int(readSize)) | |
| n, err := r.file.ReadAt(chunk, r.pos) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/persis/fileeventstore/reverse_line_reader.go` around lines 66 - 70,
The code creates chunk := make([]byte, readSize) but readSize is int64; change
allocation to use an int by casting: chunk := make([]byte, int(readSize)), and
ensure subsequent uses respect the actual bytes read (n) from r.file.ReadAt —
e.g., pass chunk[:n] to any processing or slicing logic; touch the block around
reverseLineReaderBlockSize, r.pos, chunk and the r.file.ReadAt call to apply the
cast and use the returned n safely.
| var snapshot NotificationStatusSnapshot | ||
| if err := json.Unmarshal(payload, &snapshot); err != nil { | ||
| return nil, fmt.Errorf("eventstore: unmarshal notification snapshot: %w", err) | ||
| } | ||
| return snapshot.DAGRunStatus(), nil | ||
| } |
There was a problem hiding this comment.
Validate required notification fields after JSON decode.
json.Unmarshal succeeds for {}/null, so Line 181 can return a zero-value *exec.DAGRunStatus and treat malformed events as valid. Add semantic validation (e.g., DAGRunID, AttemptID, Name, Status) before returning.
Proposed fix
func NotificationStatusFromEvent(event *Event) (*exec.DAGRunStatus, error) {
@@
var snapshot NotificationStatusSnapshot
if err := json.Unmarshal(payload, &snapshot); err != nil {
return nil, fmt.Errorf("eventstore: unmarshal notification snapshot: %w", err)
}
+ if snapshot.DAGRunID == "" || snapshot.AttemptID == "" || snapshot.Name == "" || snapshot.Status == "" {
+ return nil, errors.New("eventstore: notification snapshot is invalid")
+ }
return snapshot.DAGRunStatus(), nil
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| var snapshot NotificationStatusSnapshot | |
| if err := json.Unmarshal(payload, &snapshot); err != nil { | |
| return nil, fmt.Errorf("eventstore: unmarshal notification snapshot: %w", err) | |
| } | |
| return snapshot.DAGRunStatus(), nil | |
| } | |
| var snapshot NotificationStatusSnapshot | |
| if err := json.Unmarshal(payload, &snapshot); err != nil { | |
| return nil, fmt.Errorf("eventstore: unmarshal notification snapshot: %w", err) | |
| } | |
| if snapshot.DAGRunID == "" || snapshot.AttemptID == "" || snapshot.Name == "" || snapshot.Status == "" { | |
| return nil, errors.New("eventstore: notification snapshot is invalid") | |
| } | |
| return snapshot.DAGRunStatus(), nil | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/service/eventstore/notifications.go` around lines 177 - 182, After
json.Unmarshal into NotificationStatusSnapshot, perform semantic validation on
the decoded snapshot (e.g., ensure DAGRunID, AttemptID, Name and Status are
non-empty/valid) before calling snapshot.DAGRunStatus(); if any required field
is missing or invalid, return an error like "eventstore: invalid notification
snapshot: missing <field>" so malformed `{}`/`null` payloads don't produce
zero-value *exec.DAGRunStatus. Update the function that processes payload (the
block that creates var snapshot NotificationStatusSnapshot and returns
snapshot.DAGRunStatus()) to validate those specific fields on snapshot and fail
early with a clear error when validation fails.
| expect(clientGetMock).toHaveBeenCalledWith('/event-logs', { | ||
| params: { | ||
| query: expect.objectContaining({ | ||
| kind: 'dag_run', | ||
| limit: 50, | ||
| cursor: 'cursor-1', | ||
| }), | ||
| }, | ||
| }); |
There was a problem hiding this comment.
Assert remoteNode in the “Load More” request expectation.
This test currently won’t catch regressions where pagination calls omit remote node routing.
Suggested assertion update
expect(clientGetMock).toHaveBeenCalledWith('/event-logs', {
params: {
query: expect.objectContaining({
+ remoteNode: 'remote-a',
kind: 'dag_run',
limit: 50,
cursor: 'cursor-1',
}),
},
});As per coding guidelines ui/**/*.{ts,tsx}: All API calls MUST include the remoteNode parameter to route requests to the correct node in multi-node Dagu deployments.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ui/src/pages/event-logs/__tests__/index.test.tsx` around lines 422 - 430,
Update the test assertion for clientGetMock in index.test.tsx to also assert
that the pagination request includes the remoteNode routing parameter; modify
the expect(...toHaveBeenCalledWith('/event-logs', { params: { query:
expect.objectContaining({ kind: 'dag_run', limit: 50, cursor: 'cursor-1',
remoteNode: <value> }) } })) line so remoteNode is checked (use the specific
variable/constant used in the test if available, or expect.any(String) / the
expected node id) to ensure Load More requests include remoteNode.
| import { components } from '@/api/v1/schema'; | ||
| import { Badge } from '@/components/ui/badge'; |
There was a problem hiding this comment.
Missing GPL v3 license header.
This new file should include the GPL v3 license header at the top, consistent with other source files in the project.
Proposed fix to add license header
+// Copyright (C) 2026 Yota Hamada
+// SPDX-License-Identifier: GPL-3.0-or-later
+
import { components } from '@/api/v1/schema';As per coding guidelines: "Apply GPL v3 license headers on source files, managed via make addlicense"
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import { components } from '@/api/v1/schema'; | |
| import { Badge } from '@/components/ui/badge'; | |
| // Copyright (C) 2026 Yota Hamada | |
| // SPDX-License-Identifier: GPL-3.0-or-later | |
| import { components } from '@/api/v1/schema'; | |
| import { Badge } from '@/components/ui/badge'; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ui/src/pages/event-logs/index.tsx` around lines 1 - 2, Add the GPL v3 license
header to the top of this new module (the file that imports "components" from
'@/api/v1/schema' and "Badge" from '@/components/ui/badge') to match other
source files; either insert the project’s standard GPL v3 header comment block
at the very top of the file or run the repository tool (make addlicense) to
apply it automatically so the header is consistent with existing files.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1946 +/- ##
==========================================
- Coverage 68.67% 68.66% -0.02%
==========================================
Files 463 468 +5
Lines 58751 59992 +1241
==========================================
+ Hits 40348 41193 +845
- Misses 14673 14942 +269
- Partials 3730 3857 +127
... and 12 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary
/event-logsto opaque cursor pagination with infinite loadingTesting
make apicd ui && pnpm gen:apigo test ./internal/persis/fileeventstore ./internal/service/eventstore ./internal/service/frontend/api/v1 -count=1go test ./internal/service/chatbridge -count=1go test ./internal/service/slack ./internal/service/telegram -count=1cd ui && pnpm test src/pages/event-logs/__tests__/index.test.tsxcd ui && pnpm typecheckmake fmtNotes
mainSummary by CodeRabbit
Release Notes
New Features
Improvements