fix: harden and consolidate bot notifications#1798
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis pull request introduces a centralized Changes
Sequence DiagramsequenceDiagram
participant Monitor as NotificationMonitor
participant Store as DAGRunStore
participant Batcher as NotificationBatcher
participant Transport as NotificationTransport<br/>(SlackBot/TelegramBot)
participant Dest as Destination<br/>(Slack/Telegram)
rect rgba(100, 150, 200, 0.5)
Note over Monitor,Store: Initialization & Seeding
Monitor->>Store: ListStatuses (last 24h, non-active)
Store-->>Monitor: historical statuses
Monitor->>Monitor: seedDelivered: mark all as delivered
end
rect rgba(100, 200, 150, 0.5)
Note over Monitor,Batcher: Polling Loop
Monitor->>Store: ListStatuses (last 1h, active/inactive)
Store-->>Monitor: current statuses
Monitor->>Monitor: checkForCompletions: filter by NotificationStatuses
Monitor->>Batcher: Enqueue(destination, status)
Batcher->>Batcher: classify status into bucket (urgent/digest)
Batcher->>Batcher: deduplicate & buffer by destination+class
Batcher-->>Monitor: signal ready when bucket timer fires
end
rect rgba(200, 150, 100, 0.5)
Note over Monitor,Transport: Flushing & Delivery
Monitor->>Batcher: TakeReady() returns NotificationPendingBatches
Monitor->>Transport: FlushNotificationBatch(ctx, dest, batch, allowLLM)
Transport->>Dest: deliver batch (with LLM-generated or formatted message)
Dest-->>Transport: success/failure
Transport-->>Monitor: bool (acknowledged)
Monitor->>Monitor: IsDelivered: mark batch events delivered for destination
end
rect rgba(150, 100, 200, 0.5)
Note over Monitor,Batcher: Shutdown & Drain
Monitor->>Batcher: DrainAndStop() returns pending + buffered batches
Batcher-->>Monitor: []NotificationPendingBatch (urgent first)
loop for each pending batch
Monitor->>Transport: FlushNotificationBatch(ctx, dest, batch, allowLLM=false)
Transport->>Dest: deliver without LLM
Dest-->>Transport: acknowledged
Monitor->>Monitor: mark delivered
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (7)
internal/service/telegram/bot_test.go (1)
158-166: Consider consistent lock scope forgeneratedErraccess.Same concern as the Slack fake:
generatedErrandgeneratedare read outside the lock. While safe for current test usage, capturing values under the lock would be more robust.🔧 Suggested fix for consistent locking
func (s *fakeTelegramAgentService) GenerateAssistantMessage(context.Context, string, agent.UserIdentity, string, string) (agent.Message, error) { s.mu.Lock() s.generateCalls++ - s.mu.Unlock() - if s.generatedErr != nil { - return agent.Message{}, s.generatedErr + err := s.generatedErr + msg := s.generated + s.mu.Unlock() + if err != nil { + return agent.Message{}, err } - return s.generated, nil + return msg, nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/telegram/bot_test.go` around lines 158 - 166, In GenerateAssistantMessage, avoid reading s.generatedErr and s.generated outside the mutex; acquire the lock, increment s.generateCalls and copy s.generatedErr and s.generated to local vars while holding s.mu, then release the lock and use those locals for the error check and return. This keeps access to the shared fields (s.generatedErr, s.generated, s.generateCalls) consistent and race-free in the fakeTelegramAgentService.internal/service/slack/monitor_test.go (1)
22-40: Consider extractingstartTestMonitorto a shared test helper.This helper is duplicated verbatim in
internal/service/telegram/monitor_test.go. While acceptable for now, extracting it to a shared test utility (e.g., inchatbridgeor atestutilpackage) would reduce maintenance burden.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/slack/monitor_test.go` around lines 22 - 40, The startTestMonitor helper is duplicated; extract it into a shared test helper (e.g., package testutil or chatbridge/testhelpers) so both internal/service/slack/monitor_test.go and internal/service/telegram/monitor_test.go import and call the common function; move the startTestMonitor implementation (context creation, goroutine running monitor.Run(ctx), cancellation and shutdown wait logic) into the shared package, export it if needed (StartTestMonitor) or provide a wrapper, update both test files to remove the local copy and call the shared helper, and run tests to ensure no behavior changes.internal/service/chatbridge/notifications_test.go (2)
175-185: Potential issue with DAG name generation for largemaxNotificationGroups.
string(rune('a'+i))produces valid lowercase letters only foriin 0-25. IfmaxNotificationGroupsexceeds 24, the generated names will contain non-letter characters (e.g.,{,|). While the test may still pass, the names become confusing. Consider usingfmt.Sprintf("dag-%d", i)for clarity and robustness.🔧 Suggested fix
for i := range maxNotificationGroups + 2 { events = append(events, NotificationEvent{ Status: &exec.DAGRunStatus{ - Name: "dag-" + string(rune('a'+i)), - DAGRunID: "run-" + string(rune('a'+i)), + Name: fmt.Sprintf("dag-%d", i), + DAGRunID: fmt.Sprintf("run-%d", i), AttemptID: "a1", Status: core.Succeeded, },🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/chatbridge/notifications_test.go` around lines 175 - 185, The test currently constructs DAG names using string(rune('a'+i)) which breaks for i>25; update the name generation in the loop that builds events (the NotificationEvent / exec.DAGRunStatus entries) to use numeric formatting instead, e.g. build Name and DAGRunID with fmt.Sprintf("dag-%d", i) and fmt.Sprintf("run-%d", i) so names remain clear and robust for any value of maxNotificationGroups.
52-55: Unused mutex-protected sliceflushed.The
flushedslice is declared and mutex-protected but only written to, never read for assertions after the final check at line 79. Lines 64-67 checkcurrentFlushesbutflushedis empty at that point since batches are only appended afterwaitForReadyBatch. Consider removing the dead code or clarifying its purpose.🔧 Suggested simplification
func TestNotificationBatcher_ReplacesWaitingWithSuccessBeforeFlush(t *testing.T) { t.Parallel() - var ( - mu sync.Mutex - flushed []NotificationBatch - ) batcher := NewNotificationBatcher(15*time.Millisecond, 25*time.Millisecond) defer batcher.Stop() require.True(t, batcher.Enqueue("dest-1", &exec.DAGRunStatus{Name: "briefing", DAGRunID: "run-1", AttemptID: "a1", Status: core.Waiting})) time.Sleep(5 * time.Millisecond) require.True(t, batcher.Enqueue("dest-1", &exec.DAGRunStatus{Name: "briefing", DAGRunID: "run-1", AttemptID: "a1", Status: core.Succeeded})) time.Sleep(20 * time.Millisecond) - mu.Lock() - currentFlushes := len(flushed) - mu.Unlock() - assert.Zero(t, currentFlushes, "waiting batch should have been replaced before urgent flush") ready := waitForReadyBatch(t, batcher) - mu.Lock() - flushed = append(flushed, ready.Batch) - mu.Unlock() - - mu.Lock() - defer mu.Unlock() - require.Len(t, flushed, 1) - assert.Equal(t, NotificationClassSuccessDigest, flushed[0].Class) - require.Len(t, flushed[0].Events, 1) - assert.Equal(t, core.Succeeded, flushed[0].Events[0].Status.Status) + assert.Equal(t, NotificationClassSuccessDigest, ready.Batch.Class) + require.Len(t, ready.Batch.Events, 1) + assert.Equal(t, core.Succeeded, ready.Batch.Events[0].Status.Status) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/chatbridge/notifications_test.go` around lines 52 - 55, The mutex-protected slice `flushed` (and `mu`) is dead code: it's only appended to but never read, causing confusion in tests; remove the declarations `mu sync.Mutex` and `flushed []NotificationBatch` and any code that appends to `flushed`, and instead rely on the existing `currentFlushes` checks and `waitForReadyBatch` assertions to validate flush behavior (or if the intent was to assert flushed contents, update the test to read/assert `flushed` after the final `waitForReadyBatch`). Locate uses around `waitForReadyBatch`, `currentFlushes`, and any append operations to `flushed` to remove or convert them into proper assertions.internal/service/slack/bot_test.go (1)
172-180: Consider consistent lock scope forgeneratedErraccess.The lock is released before checking
generatedErr(line 176), butgeneratedErris read outside the lock. If tests ever concurrently modifygeneratedErr, this could race. Since tests currently set it before calling, this works, but holding the lock through the check would be safer.🔧 Suggested fix for consistent locking
func (s *fakeSlackAgentService) GenerateAssistantMessage(context.Context, string, agent.UserIdentity, string, string) (agent.Message, error) { s.mu.Lock() s.generateCalls++ - s.mu.Unlock() - if s.generatedErr != nil { - return agent.Message{}, s.generatedErr + err := s.generatedErr + msg := s.generated + s.mu.Unlock() + if err != nil { + return agent.Message{}, err } - return s.generated, nil + return msg, nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/slack/bot_test.go` around lines 172 - 180, In GenerateAssistantMessage, the mutex is unlocked before reading s.generatedErr which can race if tests modify it concurrently; hold s.mu during the check and return so the read of s.generatedErr and access to s.generated are protected. Modify GenerateAssistantMessage (function name) to keep the lock around the generatedErr check and the return of s.generated, then unlock the mutex afterwards (protecting s.mu, s.generatedErr, and s.generated).internal/service/chatbridge/notifications.go (1)
619-625: Shallow clone may not isolate all mutable state.The
cloneNotificationStatusfunction performs a shallow copy. Ifexec.DAGRunStatuscontains pointer fields (likeNodes,OnFailure,OnExitslices/pointers visible in the formatting functions), modifications to the original status after enqueuing could affect the batched snapshot.However, since the monitor polls completed statuses that are unlikely to be mutated further, this is likely acceptable in practice.
💡 Consider deep clone if status mutation is possible
If there's any possibility of the original status being mutated after enqueuing, consider a deeper clone:
func cloneNotificationStatus(status *exec.DAGRunStatus) *exec.DAGRunStatus { if status == nil { return nil } clone := *status if status.Nodes != nil { clone.Nodes = make([]*exec.Node, len(status.Nodes)) copy(clone.Nodes, status.Nodes) } return &clone }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/chatbridge/notifications.go` around lines 619 - 625, The current cloneNotificationStatus function does a shallow copy which can leave pointer/slice fields shared (e.g., Nodes, OnFailure, OnExit or any pointer/map/slice inside exec.DAGRunStatus), so modify cloneNotificationStatus to perform a deep copy of those mutable fields: after cloning the struct value, allocate and copy any non-nil slices (e.g., make and copy clone.Nodes, clone.OnFailure, clone.OnExit) and, if needed, clone the elements (e.g., copy node pointers into new slice or deep-copy node structs) and copy any maps by creating new maps and copying entries; keep the nil checks and return the new pointer as before.internal/service/chatbridge/monitor.go (1)
149-182: Consider potential startup delay with large DAG history.The
seedDeliveredmethod queries the last 24 hours of DAG runs without a limit. For repositories with high DAG activity, this could result in loading a large number of statuses at startup.💡 Consider adding a limit to seedDelivered query
If startup performance becomes an issue with very active DAG repositories, consider adding a limit:
statuses, err := m.dagRunStore.ListStatuses(ctx, exec.WithFrom(from), exec.WithTo(to), + exec.WithLimit(10000), // Cap seeding to avoid slow startup )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/chatbridge/monitor.go` around lines 149 - 182, The seedDelivered method can load an unbounded number of DAG statuses; update seedDelivered to pass a limit and ordering to dagRunStore.ListStatuses (e.g., add exec.WithLimit(N) and an order option such as exec.WithOrder or exec.WithSortRecent) so you only fetch the most recent N runs (make N configurable on NotificationMonitor or use a sensible default like 1000); keep the existing time window (exec.WithFrom/exec.WithTo) but combine it with the limit and ensure the store call still returns the most recent entries first before calling m.markDelivered for each destination.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/service/chatbridge/monitor.go`:
- Around line 149-182: The seedDelivered method can load an unbounded number of
DAG statuses; update seedDelivered to pass a limit and ordering to
dagRunStore.ListStatuses (e.g., add exec.WithLimit(N) and an order option such
as exec.WithOrder or exec.WithSortRecent) so you only fetch the most recent N
runs (make N configurable on NotificationMonitor or use a sensible default like
1000); keep the existing time window (exec.WithFrom/exec.WithTo) but combine it
with the limit and ensure the store call still returns the most recent entries
first before calling m.markDelivered for each destination.
In `@internal/service/chatbridge/notifications_test.go`:
- Around line 175-185: The test currently constructs DAG names using
string(rune('a'+i)) which breaks for i>25; update the name generation in the
loop that builds events (the NotificationEvent / exec.DAGRunStatus entries) to
use numeric formatting instead, e.g. build Name and DAGRunID with
fmt.Sprintf("dag-%d", i) and fmt.Sprintf("run-%d", i) so names remain clear and
robust for any value of maxNotificationGroups.
- Around line 52-55: The mutex-protected slice `flushed` (and `mu`) is dead
code: it's only appended to but never read, causing confusion in tests; remove
the declarations `mu sync.Mutex` and `flushed []NotificationBatch` and any code
that appends to `flushed`, and instead rely on the existing `currentFlushes`
checks and `waitForReadyBatch` assertions to validate flush behavior (or if the
intent was to assert flushed contents, update the test to read/assert `flushed`
after the final `waitForReadyBatch`). Locate uses around `waitForReadyBatch`,
`currentFlushes`, and any append operations to `flushed` to remove or convert
them into proper assertions.
In `@internal/service/chatbridge/notifications.go`:
- Around line 619-625: The current cloneNotificationStatus function does a
shallow copy which can leave pointer/slice fields shared (e.g., Nodes,
OnFailure, OnExit or any pointer/map/slice inside exec.DAGRunStatus), so modify
cloneNotificationStatus to perform a deep copy of those mutable fields: after
cloning the struct value, allocate and copy any non-nil slices (e.g., make and
copy clone.Nodes, clone.OnFailure, clone.OnExit) and, if needed, clone the
elements (e.g., copy node pointers into new slice or deep-copy node structs) and
copy any maps by creating new maps and copying entries; keep the nil checks and
return the new pointer as before.
In `@internal/service/slack/bot_test.go`:
- Around line 172-180: In GenerateAssistantMessage, the mutex is unlocked before
reading s.generatedErr which can race if tests modify it concurrently; hold s.mu
during the check and return so the read of s.generatedErr and access to
s.generated are protected. Modify GenerateAssistantMessage (function name) to
keep the lock around the generatedErr check and the return of s.generated, then
unlock the mutex afterwards (protecting s.mu, s.generatedErr, and s.generated).
In `@internal/service/slack/monitor_test.go`:
- Around line 22-40: The startTestMonitor helper is duplicated; extract it into
a shared test helper (e.g., package testutil or chatbridge/testhelpers) so both
internal/service/slack/monitor_test.go and
internal/service/telegram/monitor_test.go import and call the common function;
move the startTestMonitor implementation (context creation, goroutine running
monitor.Run(ctx), cancellation and shutdown wait logic) into the shared package,
export it if needed (StartTestMonitor) or provide a wrapper, update both test
files to remove the local copy and call the shared helper, and run tests to
ensure no behavior changes.
In `@internal/service/telegram/bot_test.go`:
- Around line 158-166: In GenerateAssistantMessage, avoid reading s.generatedErr
and s.generated outside the mutex; acquire the lock, increment s.generateCalls
and copy s.generatedErr and s.generated to local vars while holding s.mu, then
release the lock and use those locals for the error check and return. This keeps
access to the shared fields (s.generatedErr, s.generated, s.generateCalls)
consistent and race-free in the fakeTelegramAgentService.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4403fd8e-a8cd-4da9-9c1e-dee2c0550834
📒 Files selected for processing (12)
internal/service/chatbridge/chatbridge_test.gointernal/service/chatbridge/monitor.gointernal/service/chatbridge/monitor_test.gointernal/service/chatbridge/notifications.gointernal/service/chatbridge/notifications_test.gointernal/service/slack/bot_test.gointernal/service/slack/monitor.gointernal/service/slack/monitor_test.gointernal/service/telegram/bot.gointernal/service/telegram/bot_test.gointernal/service/telegram/monitor.gointernal/service/telegram/monitor_test.go
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1798 +/- ##
==========================================
+ Coverage 68.69% 68.98% +0.29%
==========================================
Files 422 424 +2
Lines 50800 51230 +430
==========================================
+ Hits 34896 35343 +447
+ Misses 12953 12877 -76
- Partials 2951 3010 +59
... and 17 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary
Testing
Summary by CodeRabbit
Release Notes
New Features
Tests