fix: harden bot session continuity and responsiveness#1793
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughAdds comprehensive session lifecycle APIs and runtime configuration for non-mutating assistant generation, external message appends, and automatic session compaction. Bots (Slack, Telegram) and a new chatbridge package adopt batching, delivery tracking, and session rotation; session manager and loop behaviors updated to support these flows. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Bot
participant AgentAPI
participant SessionMgr
participant LLM
Client->>Bot: Request assistant message
Bot->>AgentAPI: GenerateAssistantMessage(sessionID?, prompt)
AgentAPI->>SessionMgr: Load session runtime config (memory, skills, DAG)
AgentAPI->>AgentAPI: Build system prompt
AgentAPI->>LLM: One-shot LLM call
LLM-->>AgentAPI: Response
AgentAPI-->>Bot: Return Message (non-mutating)
Bot-->>Client: Deliver assistant message
sequenceDiagram
participant User
participant Bot
participant AgentAPI
participant SessionMgr
participant LLM
User->>Bot: Send message A
Bot->>Bot: Enqueue incoming (batch)
User->>Bot: Send message B (rapid)
Bot->>Bot: Flush batch after delay
Bot->>AgentAPI: CreateSession (batched content)
AgentAPI->>SessionMgr: Create session + record messages
SessionMgr->>LLM: Process input (may call tools)
LLM-->>SessionMgr: Streamed responses / tool calls
SessionMgr-->>AgentAPI: StreamResponse updates
AgentAPI-->>Bot: Streamed updates
Bot->>User: Post replies / thread messages
sequenceDiagram
participant Client
participant Bot
participant AgentAPI
participant SessionMgr
participant LLM
Client->>Bot: CompactSessionIfNeeded(sessionID)
Bot->>AgentAPI: CompactSessionIfNeeded
AgentAPI->>SessionMgr: Evaluate token usage / latestPromptTokens
alt Near token limit
AgentAPI->>AgentAPI: Build compaction transcript
AgentAPI->>LLM: Generate handoff summary
LLM-->>AgentAPI: Summary
AgentAPI->>SessionMgr: Create new session seeded with summary
AgentAPI->>SessionMgr: Mark old session inactive
AgentAPI-->>Bot: Return (newSessionID, rotated=true)
else Enough tokens
AgentAPI-->>Bot: Return (sessionID, rotated=false)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
internal/service/telegram/monitor.go (1)
168-191:⚠️ Potential issue | 🟠 MajorRetry when every Telegram delivery attempt fails.
notifyCompletionstill returnstrueeven if everynotifyChatcall returnsfalse. A transient agent/Telegram failure can therefore suppress that DAG run permanently with no notification ever reaching users.🛠 Possible fix
- for chatID := range m.bot.allowedChats { - m.notifyChat(ctx, chatID, s, prompt) - } - return true // Mark as seen even on partial failure to avoid duplicates + delivered := false + for chatID := range m.bot.allowedChats { + if m.notifyChat(ctx, chatID, s, prompt) { + delivered = true + } + } + return delivered🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/telegram/monitor.go` around lines 168 - 191, notifyCompletion currently ignores notifyChat results and returns true even if every delivery failed; update notifyCompletion to track the boolean result of each m.notifyChat(ctx, chatID, s, prompt) call (and treat sendFallbackNotification as before when m.agentAPI == nil) and return false if all notifyChat calls returned false (return true if at least one succeeded) so transient failures don't permanently mark the DAG run as delivered; use the existing symbols notifyCompletion, notifyChat, sendFallbackNotification, m.agentAPI and m.bot.allowedChats to locate and implement this change.internal/service/slack/bot.go (1)
329-380:⚠️ Potential issue | 🔴 CriticalSerialize the empty-session create path.
flushIncomingMessagesdecides to callcreateSessionfrom unsynchronized conversation state. A concurrent notification or reset can install/replacecs.sessionIDbetween that decision and the async create, leaving the same Slack conversation split across two agent sessions and dropping the continuity this PR is trying to preserve.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/slack/bot.go` around lines 329 - 380, flushIncomingMessages currently reads cs.sessionID without holding cs.mu and may call createSession concurrently causing duplicate/rotated sessions; fix by rechecking and serializing the "empty-session create" path under cs.mu: after computing thinkingTS and before calling createSession, acquire cs.mu, if cs.sessionID is now non-empty release and follow the non-empty path (call prepareSessionForMessage/sendAgentMessage and handle rotated case), otherwise mark/reserve that you are creating the session (or call createSession while still holding the mutex if safe) and then release; reference functions/fields: flushIncomingMessages, createSession, prepareSessionForMessage, sendAgentMessage, clearPendingIndicators, and chatState.cs.mu/sessionID to locate and implement the recheck/serialization.internal/service/slack/monitor.go (1)
158-181:⚠️ Potential issue | 🟠 MajorRetry when every Slack delivery attempt fails.
notifyCompletionstill returnstrueeven if everynotifyChannelcall returnsfalse. A transient Slack/agent failure can therefore suppress that DAG run permanently with no notification ever reaching users.🛠 Possible fix
- for channelID := range m.bot.allowedChannels { - m.notifyChannel(ctx, channelID, s, prompt) - } - return true // Mark as seen even on partial failure to avoid duplicates + delivered := false + for channelID := range m.bot.allowedChannels { + if m.notifyChannel(ctx, channelID, s, prompt) { + delivered = true + } + } + return delivered🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/slack/monitor.go` around lines 158 - 181, notifyCompletion currently ignores notifyChannel results and always returns true (preventing retries); change it to collect the boolean return value from each call to m.notifyChannel(ctx, channelID, s, prompt) and track if any call returned true, then return true only if at least one delivery succeeded; return false when all notifyChannel attempts failed so the caller can retry. Also preserve the existing early fallback when m.agentAPI == nil and keep logging as-is; update the function to use a success flag (e.g., anySuccess) that is set by notifyChannel's true return and used in the final return.
🧹 Nitpick comments (2)
internal/agent/mocks_test.go (1)
362-411: Add locking tomockMemoryStoreto avoid racey test behavior.
dagmap andglobalare mutated/read from multiple methods without synchronization. In parallel tests or goroutine-heavy flows this can race.Proposed thread-safe mock update
type mockMemoryStore struct { + mu sync.RWMutex global string dag map[string]string } @@ func (m *mockMemoryStore) LoadGlobalMemory(_ context.Context) (string, error) { + m.mu.RLock() + defer m.mu.RUnlock() return m.global, nil } @@ func (m *mockMemoryStore) LoadDAGMemory(_ context.Context, dagName string) (string, error) { + m.mu.RLock() + defer m.mu.RUnlock() return m.dag[dagName], nil } @@ func (m *mockMemoryStore) SaveGlobalMemory(_ context.Context, content string) error { + m.mu.Lock() + defer m.mu.Unlock() m.global = content return nil } @@ func (m *mockMemoryStore) SaveDAGMemory(_ context.Context, dagName string, content string) error { + m.mu.Lock() + defer m.mu.Unlock() m.dag[dagName] = content return nil } @@ func (m *mockMemoryStore) ListDAGMemories(_ context.Context) ([]string, error) { + m.mu.RLock() + defer m.mu.RUnlock() out := make([]string, 0, len(m.dag)) for dagName := range m.dag { out = append(out, dagName) } return out, nil } @@ func (m *mockMemoryStore) DeleteGlobalMemory(_ context.Context) error { + m.mu.Lock() + defer m.mu.Unlock() m.global = "" return nil } @@ func (m *mockMemoryStore) DeleteDAGMemory(_ context.Context, dagName string) error { + m.mu.Lock() + defer m.mu.Unlock() delete(m.dag, dagName) return nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/agent/mocks_test.go` around lines 362 - 411, The mockMemoryStore is not thread-safe: add a sync.RWMutex field to the mockMemoryStore struct and use it to protect access to both global and dag; acquire RLock/RUnlock in read-only methods (LoadGlobalMemory, LoadDAGMemory, ListDAGMemories, MemoryDir if it ever reads mutable state) and Lock/Unlock in mutating methods (SaveGlobalMemory, SaveDAGMemory, DeleteGlobalMemory, DeleteDAGMemory), ensuring newMockMemoryStore returns the struct with the mutex zero-valued (no special init needed); update each method to wrap the appropriate lock/unlock calls to eliminate data races.internal/service/telegram/bot.go (1)
732-748: Useerrors.Is()for error comparison.Line 734 uses direct equality (
==) to check forErrSessionNotFound. Usingerrors.Is()is more idiomatic and handles wrapped errors correctly.♻️ Proposed fix
Add to imports:
"errors"Then update the comparison:
newSID, rotated, err := b.agentAPI.CompactSessionIfNeeded(ctx, sid, user) if err != nil { - if err == agent.ErrSessionNotFound { + if errors.Is(err, agent.ErrSessionNotFound) { b.logger.Warn("Session missing during compaction, resetting chat",🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/telegram/bot.go` around lines 732 - 748, Replace the direct equality check on the returned error from b.agentAPI.CompactSessionIfNeeded with errors.Is to correctly handle wrapped errors: add the "errors" import and change the branch that currently does `if err == agent.ErrSessionNotFound` to `if errors.Is(err, agent.ErrSessionNotFound)` (keeping the same logging, b.resetChat(cs) and returns), leaving the other error handling intact; this touches the CompactSessionIfNeeded call site in bot.go where newSID, rotated, err are handled.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/agent/delegate.go`:
- Around line 294-297: The code calls subMgr.RecordExternalMessage(ctx.Context,
userMsg) but ignores its returned message and forwards the original userMsg,
which can omit assigned metadata; change the call to capture the returned
message (e.g., recordedMsg, err := subMgr.RecordExternalMessage(...)) and pass
recordedMsg to forwardToParent instead of userMsg; if err != nil, still log the
warning but prefer forwarding recordedMsg when non-nil (fall back to userMsg
only if recordedMsg is nil); apply the same change to the other occurrence
around forwardToParent (the 346-350 site) so both places use the recorded
message.
In `@internal/agent/session.go`:
- Around line 470-474: The onMessage persistence error handling returns an empty
Message{} losing appended message metadata; update the error return path inside
the block that calls sm.onMessage (the conditional checking sm.onMessage != nil
and invoking sm.onMessage(ctx, msg)) to return the original msg along with the
error instead of Message{} so callers receive the stored message and its
assigned metadata when persistence fails.
In `@internal/service/slack/monitor.go`:
- Around line 213-315: The code currently calls m.bot.setActiveSession (adopting
the new session into chatState) before ensuring AppendExternalMessage succeeded
in both notifyChannelThread and appendNotification; move the setActiveSession
calls so they happen only after AppendExternalMessage returns successfully
(i.e., create or recreate the session with m.agentAPI.CreateEmptySession, then
call AppendExternalMessage, and only on success call m.bot.setActiveSession and
then return/markDelivered). Update notifyChannelThread (threadKey flow) and both
branches in appendNotification (new session and session-recreation paths) to
adopt this ordering and avoid setting chatState.sessionID on failed appends.
- Around line 317-338: In buildNotificationMessage, avoid embedding raw provider
errors into the Slack fallback text: remove err.Error() from the user-facing
message (fallbackNotificationText call) and instead pass a generic sanitized
message like "AI unavailable" or "AI temporarily unavailable, try again later";
keep the detailed err.Error() only in the m.logger.Warn call so the raw error
stays in logs (update references in buildNotificationMessage and the
LLMData/Content payload to use the sanitized text).
In `@internal/service/telegram/bot.go`:
- Around line 251-273: In flushIncomingMessages, avoid the data race by reading
cs.sessionID while holding cs.mu: inside the critical section (the block guarded
by cs.mu.Lock()/Unlock()) capture sessionID := cs.sessionID (and keep existing
pendingMessages handling), then unlock and use that local sessionID variable in
the subsequent logic instead of reading cs.sessionID directly; this ensures
flushIncomingMessages, prepareSessionForMessage, createSession and sendMessage
calls use a consistent, race-free sessionID value.
In `@internal/service/telegram/monitor.go`:
- Around line 219-277: The code in appendNotification sets the chat state's
active session (m.bot.setActiveSession) before ensuring the notification was
successfully stored, risking adoption of an empty replacement session; change
both creation branches in appendNotification so that you first call
CreateEmptySession and then attempt AppendExternalMessage (via appendToSession),
and only call m.bot.setActiveSession(cs, newSessionID, user.UserID) after
appendToSession succeeds; if appendToSession fails, do not update chatState and
return the error path as before. Include the same adjustment for the "session
not found" recreate branch (CreateEmptySession -> appendToSession ->
setActiveSession only on success).
- Around line 279-300: The fallback path in buildNotificationMessage currently
exposes provider error text to users via fallbackNotificationText(err.Error());
change it to keep the detailed error only in logs and send a sanitized
user-facing message instead (e.g. "AI unavailable, please try again later" or
include only a non-sensitive short code). Concretely: inside
buildNotificationMessage, retain the m.logger.Warn call with err.Error() for
server-side diagnostics, but replace "AI unavailable: "+err.Error() passed to
fallbackNotificationText with a generic string; ensure the returned
agent.Message and llm.Message use that sanitized text. Use the existing symbols
buildNotificationMessage, fallbackNotificationText, m.logger.Warn, agent.Message
and llm.Message to locate and update the code.
---
Outside diff comments:
In `@internal/service/slack/bot.go`:
- Around line 329-380: flushIncomingMessages currently reads cs.sessionID
without holding cs.mu and may call createSession concurrently causing
duplicate/rotated sessions; fix by rechecking and serializing the "empty-session
create" path under cs.mu: after computing thinkingTS and before calling
createSession, acquire cs.mu, if cs.sessionID is now non-empty release and
follow the non-empty path (call prepareSessionForMessage/sendAgentMessage and
handle rotated case), otherwise mark/reserve that you are creating the session
(or call createSession while still holding the mutex if safe) and then release;
reference functions/fields: flushIncomingMessages, createSession,
prepareSessionForMessage, sendAgentMessage, clearPendingIndicators, and
chatState.cs.mu/sessionID to locate and implement the recheck/serialization.
In `@internal/service/slack/monitor.go`:
- Around line 158-181: notifyCompletion currently ignores notifyChannel results
and always returns true (preventing retries); change it to collect the boolean
return value from each call to m.notifyChannel(ctx, channelID, s, prompt) and
track if any call returned true, then return true only if at least one delivery
succeeded; return false when all notifyChannel attempts failed so the caller can
retry. Also preserve the existing early fallback when m.agentAPI == nil and keep
logging as-is; update the function to use a success flag (e.g., anySuccess) that
is set by notifyChannel's true return and used in the final return.
In `@internal/service/telegram/monitor.go`:
- Around line 168-191: notifyCompletion currently ignores notifyChat results and
returns true even if every delivery failed; update notifyCompletion to track the
boolean result of each m.notifyChat(ctx, chatID, s, prompt) call (and treat
sendFallbackNotification as before when m.agentAPI == nil) and return false if
all notifyChat calls returned false (return true if at least one succeeded) so
transient failures don't permanently mark the DAG run as delivered; use the
existing symbols notifyCompletion, notifyChat, sendFallbackNotification,
m.agentAPI and m.bot.allowedChats to locate and implement this change.
---
Nitpick comments:
In `@internal/agent/mocks_test.go`:
- Around line 362-411: The mockMemoryStore is not thread-safe: add a
sync.RWMutex field to the mockMemoryStore struct and use it to protect access to
both global and dag; acquire RLock/RUnlock in read-only methods
(LoadGlobalMemory, LoadDAGMemory, ListDAGMemories, MemoryDir if it ever reads
mutable state) and Lock/Unlock in mutating methods (SaveGlobalMemory,
SaveDAGMemory, DeleteGlobalMemory, DeleteDAGMemory), ensuring newMockMemoryStore
returns the struct with the mutex zero-valued (no special init needed); update
each method to wrap the appropriate lock/unlock calls to eliminate data races.
In `@internal/service/telegram/bot.go`:
- Around line 732-748: Replace the direct equality check on the returned error
from b.agentAPI.CompactSessionIfNeeded with errors.Is to correctly handle
wrapped errors: add the "errors" import and change the branch that currently
does `if err == agent.ErrSessionNotFound` to `if errors.Is(err,
agent.ErrSessionNotFound)` (keeping the same logging, b.resetChat(cs) and
returns), leaving the other error handling intact; this touches the
CompactSessionIfNeeded call site in bot.go where newSID, rotated, err are
handled.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b454b51c-626a-49df-9d33-f9b14484d028
📒 Files selected for processing (17)
internal/agent/api.gointernal/agent/api_test.gointernal/agent/delegate.gointernal/agent/loop.gointernal/agent/mocks_test.gointernal/agent/session.gointernal/agent/session_test.gointernal/agent/system_prompt.txtinternal/agent/system_prompt_test.gointernal/persis/fileagentskill/examples/dagu/SKILL.mdinternal/persis/fileagentskill/examples_test.gointernal/service/slack/bot.gointernal/service/slack/bot_test.gointernal/service/slack/monitor.gointernal/service/telegram/bot.gointernal/service/telegram/bot_test.gointernal/service/telegram/monitor.go
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/agent/session.go (1)
470-487:⚠️ Potential issue | 🟠 MajorStore the sequenced message, not the pre-sequence copy.
msg.SequenceID = sm.appendMessage(msg)only updates the local variable.appendMessagehas already appended the older struct tosm.messages, so the active-session snapshot keepsSequenceID == 0for this external message.chatbridge.ProcessStreamResponsetreats0as untracked, which can replay the same notification on reconnect. Please make the helper assign the sequence under the lock and return the updatedMessage.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/agent/session.go` around lines 470 - 487, The stored message in sm.messages keeps SequenceID==0 because sm.appendMessage(msg) appends the pre-sequence copy and only updates the local msg variable; update appendMessage (or create a helper) to perform the SequenceID assignment while holding the session lock and return the updated Message, then replace the call sm.SequenceID = sm.appendMessage(msg) with msg = sm.appendMessage(msg) (or use the new helper) so sm.messages contains the sequenced struct; ensure references to appendMessage, sm.messages, msg.SequenceID, sm.subpub.Publish and sm.onMessage continue to use the returned/updated Message so the persisted snapshot contains the correct SequenceID.
♻️ Duplicate comments (2)
internal/service/telegram/monitor.go (1)
236-249:⚠️ Potential issue | 🟠 MajorSanitize the Telegram fallback text.
err.Error()is still copied into the user-facing fallback here. Provider failures can expose model names, transport details, or other internals that should stay in logs.🛠 Possible fix
m.logger.Warn("Failed to generate AI notification, falling back to plain text", slog.String("dag", s.Name), slog.String("status", s.Status.String()), slog.String("error", err.Error()), ) - text := fallbackNotificationText(s, "AI unavailable: "+err.Error()) + text := fallbackNotificationText(s, "AI-generated details are unavailable right now.") return agent.Message{ Type: agent.MessageTypeAssistant, Content: text, CreatedAt: time.Now(),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/telegram/monitor.go` around lines 236 - 249, The fallback notification currently injects err.Error() into user-facing text and llm.Message content; instead, remove the raw error from the message returned to users and replace it with a generic sanitized phrase (e.g., "AI temporarily unavailable") when building the fallback via fallbackNotificationText and agent.Message/llm.Message; keep the full err.Error() only in the internal m.logger.Warn call (or add a separate m.logger.Error) so logs retain details while the user-visible text and llm.Message content do not leak provider internals.internal/service/slack/monitor.go (1)
262-275:⚠️ Potential issue | 🟠 MajorSanitize the Slack fallback text.
err.Error()is still copied into the user-facing fallback here. Provider failures can expose model names, transport details, or other internals that should stay in logs.🛠 Possible fix
m.logger.Warn("Failed to generate AI notification, falling back to plain text", slog.String("dag", s.Name), slog.String("status", s.Status.String()), slog.String("error", err.Error()), ) - text := fallbackNotificationText(s, "AI unavailable: "+err.Error()) + text := fallbackNotificationText(s, "AI-generated details are unavailable right now.") return agent.Message{ Type: agent.MessageTypeAssistant, Content: text, CreatedAt: time.Now(),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/slack/monitor.go` around lines 262 - 275, The fallback copies err.Error() into user-facing text; remove or redact raw error details when building the fallback message in the block using m.logger.Warn and fallbackNotificationText so internal/provider details aren't exposed. Keep the detailed error only in the log (m.logger.Warn already includes slog.String("error", err.Error())); change the user text passed to fallbackNotificationText and assigned to agent.Message and llm.Message to a generic/sanitized string (e.g., "AI unavailable" or a short redacted token) or a safely truncated/redacted version of err, ensuring all places that set Content and LLMData.Content use the sanitized value instead of err.Error().
🧹 Nitpick comments (3)
internal/service/chatbridge/chatbridge.go (1)
290-307: Consider logging whenGetSessionDetailfails.
MarkSessionSnapshotDeliveredsilently ignores errors fromGetSessionDetail. While this may be intentional (best-effort), consider logging at debug level to aid troubleshooting when delivery tracking misbehaves.🔧 Optional: Add debug logging for visibility
func MarkSessionSnapshotDelivered(ctx context.Context, svc AgentService, state *State, sessionID, userID string) { detail, err := svc.GetSessionDetail(ctx, sessionID, userID) - if err != nil || detail == nil { + if err != nil { + // Could log at debug level: "failed to get session detail for delivery marking" + return + } + if detail == nil { return }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/chatbridge/chatbridge.go` around lines 290 - 307, MarkSessionSnapshotDelivered currently swallows errors from svc.GetSessionDetail; add a debug-level log when GetSessionDetail returns an error or nil detail including sessionID, userID and the error to aid troubleshooting. Update the error branch (the err != nil || detail == nil check) to log the failure before returning, using the service or package logger (e.g., svc.Debugf or the package logger) so callers can see why delivery marking was skipped, then return as before; keep the rest of the logic (iterating detail.Messages and calling state.MarkDelivered) unchanged.internal/service/slack/bot_test.go (1)
237-263: Missingt.Parallel()call.This test is missing
t.Parallel()unlike other tests in this file. Adding it would maintain consistency and allow concurrent test execution.Proposed fix
func TestBot_ProcessIncoming_BatchesRapidMessagesIntoSingleCreate(t *testing.T) { + t.Parallel() + logger := slog.New(slog.NewTextHandler(io.Discard, nil))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/slack/bot_test.go` around lines 237 - 263, The test TestBot_ProcessIncoming_BatchesRapidMessagesIntoSingleCreate is missing t.Parallel(); add a call to t.Parallel() at the start of that test function (immediately after the function begins, before shared state setup like logger/client/service/bot creation) to enable parallel execution and match the other tests in the file.internal/service/telegram/bot_test.go (1)
225-253: Missingt.Parallel()call.This test is missing
t.Parallel()unlike other tests in this file. Adding it would maintain consistency and allow concurrent test execution.Proposed fix
func TestBot_HandleMessage_BatchesRapidMessagesIntoSingleCreate(t *testing.T) { + t.Parallel() + logger := slog.New(slog.NewTextHandler(io.Discard, nil))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/telegram/bot_test.go` around lines 225 - 253, Add a t.Parallel() call at the start of the TestBot_HandleMessage_BatchesRapidMessagesIntoSingleCreate test function so it runs concurrently with other tests; locate the function TestBot_HandleMessage_BatchesRapidMessagesIntoSingleCreate and insert t.Parallel() immediately after the function begins (before creating logger, api, service, bot, etc.) to match the pattern used by other tests in this file.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/agent/api.go`:
- Around line 1224-1239: If RecordExternalMessage fails after creating the
persisted continuation via newManagedSession, perform a best-effort removal of
that persisted session (newID) before returning: call the persistent
session-store delete/remove API for newID (in addition to the existing in-memory
a.sessions.Delete(newID)), handle and log any error from that delete, and then
return the original error; ensure this cleanup happens in the error branch that
currently only calls a.sessions.Delete(newID) so no empty continuation remains
in the persistent SessionStore.
In `@internal/service/slack/bot.go`:
- Around line 327-354: The code re-reads cs.SessionID() after
prepareSessionForMessage which can race with subscribeLoop; modify
sendAgentMessage (and ensureSubscription) to accept the prepared sid returned by
prepareSessionForMessage and use that sid instead of calling cs.SessionID()
inside those functions, and update callers in flushIncomingMessages (and the
other block around lines 501-516) to pass the sid; ensure rotated handling and
clearPendingIndicators remain the same and that createSession is still used when
sid == "".
- Around line 827-835: resetChat currently nulled cs.thinkingMessage without
removing the Slack placeholder, leaving a stray "_Thinking..._" message; modify
resetChat to, while holding cs.thinkingMu, check if cs.thinkingMessage is
non-nil and call the bot's message-deletion helper (e.g., the Bot method or
client used to delete messages) with the stored channel/timestamp before setting
cs.thinkingMessage = nil, handle/log any deletion error, then unlock; keep the
cs.Reset() cancel logic unchanged.
In `@internal/service/slack/monitor.go`:
- Around line 221-234: The thread is being marked active before the notification
is persisted; change the flow so you only call
m.bot.activeThreads.Store(threadKey, true) after chatbridge.AppendNotification
succeeds and indicates the notification was stored (check the stored return
value), and do not mark the thread active if err != nil or stored == false; use
the existing variables threadKey, chatbridge.AppendNotification,
m.bot.getOrCreateChat, and m.bot.userIdentity to locate and update the logic.
In `@internal/service/telegram/bot.go`:
- Around line 226-244: flushIncomingMessages currently calls
prepareSessionForMessage which returns a session id (sid) but then ignores it
and calls sendMessage which re-reads cs.SessionID(), allowing subscribeLoop to
clear the session concurrently; change flushIncomingMessages (and the other
similar block around lines 384-403) to pass the returned sid into sendMessage so
sendMessage and its call to ensureSubscription use that explicit sid instead of
calling cs.SessionID(); update sendMessage signature/usage to accept an optional
sid (or a concrete sid string) and ensure ensureSubscription uses that sid to
reactivate/reuse the prepared session.
---
Outside diff comments:
In `@internal/agent/session.go`:
- Around line 470-487: The stored message in sm.messages keeps SequenceID==0
because sm.appendMessage(msg) appends the pre-sequence copy and only updates the
local msg variable; update appendMessage (or create a helper) to perform the
SequenceID assignment while holding the session lock and return the updated
Message, then replace the call sm.SequenceID = sm.appendMessage(msg) with msg =
sm.appendMessage(msg) (or use the new helper) so sm.messages contains the
sequenced struct; ensure references to appendMessage, sm.messages,
msg.SequenceID, sm.subpub.Publish and sm.onMessage continue to use the
returned/updated Message so the persisted snapshot contains the correct
SequenceID.
---
Duplicate comments:
In `@internal/service/slack/monitor.go`:
- Around line 262-275: The fallback copies err.Error() into user-facing text;
remove or redact raw error details when building the fallback message in the
block using m.logger.Warn and fallbackNotificationText so internal/provider
details aren't exposed. Keep the detailed error only in the log (m.logger.Warn
already includes slog.String("error", err.Error())); change the user text passed
to fallbackNotificationText and assigned to agent.Message and llm.Message to a
generic/sanitized string (e.g., "AI unavailable" or a short redacted token) or a
safely truncated/redacted version of err, ensuring all places that set Content
and LLMData.Content use the sanitized value instead of err.Error().
In `@internal/service/telegram/monitor.go`:
- Around line 236-249: The fallback notification currently injects err.Error()
into user-facing text and llm.Message content; instead, remove the raw error
from the message returned to users and replace it with a generic sanitized
phrase (e.g., "AI temporarily unavailable") when building the fallback via
fallbackNotificationText and agent.Message/llm.Message; keep the full
err.Error() only in the internal m.logger.Warn call (or add a separate
m.logger.Error) so logs retain details while the user-visible text and
llm.Message content do not leak provider internals.
---
Nitpick comments:
In `@internal/service/chatbridge/chatbridge.go`:
- Around line 290-307: MarkSessionSnapshotDelivered currently swallows errors
from svc.GetSessionDetail; add a debug-level log when GetSessionDetail returns
an error or nil detail including sessionID, userID and the error to aid
troubleshooting. Update the error branch (the err != nil || detail == nil check)
to log the failure before returning, using the service or package logger (e.g.,
svc.Debugf or the package logger) so callers can see why delivery marking was
skipped, then return as before; keep the rest of the logic (iterating
detail.Messages and calling state.MarkDelivered) unchanged.
In `@internal/service/slack/bot_test.go`:
- Around line 237-263: The test
TestBot_ProcessIncoming_BatchesRapidMessagesIntoSingleCreate is missing
t.Parallel(); add a call to t.Parallel() at the start of that test function
(immediately after the function begins, before shared state setup like
logger/client/service/bot creation) to enable parallel execution and match the
other tests in the file.
In `@internal/service/telegram/bot_test.go`:
- Around line 225-253: Add a t.Parallel() call at the start of the
TestBot_HandleMessage_BatchesRapidMessagesIntoSingleCreate test function so it
runs concurrently with other tests; locate the function
TestBot_HandleMessage_BatchesRapidMessagesIntoSingleCreate and insert
t.Parallel() immediately after the function begins (before creating logger, api,
service, bot, etc.) to match the pattern used by other tests in this file.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 51aa1818-23e3-4c2a-ac64-1a7cc0d05282
📒 Files selected for processing (23)
internal/agent/api.gointernal/agent/api_test.gointernal/agent/delegate.gointernal/agent/loop.gointernal/agent/loop_test.gointernal/agent/mocks_test.gointernal/agent/session.gointernal/agent/session_test.gointernal/agent/system_prompt.txtinternal/agent/system_prompt_test.gointernal/cmd/schema_test.gointernal/cmn/schema/config.schema.jsoninternal/cmn/schema/config_schema_test.gointernal/persis/fileagentskill/examples/dagu/SKILL.mdinternal/persis/fileagentskill/examples_test.gointernal/service/chatbridge/chatbridge.gointernal/service/chatbridge/chatbridge_test.gointernal/service/slack/bot.gointernal/service/slack/bot_test.gointernal/service/slack/monitor.gointernal/service/telegram/bot.gointernal/service/telegram/bot_test.gointernal/service/telegram/monitor.go
| newID := uuid.New().String() | ||
| newMgr := a.newManagedSession(ctx, newID, user, runtimeCfg, time.Now()) | ||
| summaryContent := sessionSummaryPrefix + strings.TrimSpace(resp.Content) | ||
| if _, err := newMgr.RecordExternalMessage(ctx, Message{ | ||
| Type: MessageTypeAssistant, | ||
| Content: summaryContent, | ||
| Usage: &resp.Usage, | ||
| CreatedAt: time.Now(), | ||
| LLMData: &llm.Message{ | ||
| Role: llm.RoleAssistant, | ||
| Content: summaryContent, | ||
| }, | ||
| }); err != nil { | ||
| a.sessions.Delete(newID) | ||
| return "", false, err | ||
| } |
There was a problem hiding this comment.
Rollback the new continuation session if the summary append fails.
newManagedSession has already persisted newID. If RecordExternalMessage fails here, only the in-memory session is removed; the empty continuation session stays in SessionStore and can surface later in listings. Please best-effort delete newID from the store before returning.
🛠 Possible fix
if _, err := newMgr.RecordExternalMessage(ctx, Message{
Type: MessageTypeAssistant,
Content: summaryContent,
Usage: &resp.Usage,
CreatedAt: time.Now(),
LLMData: &llm.Message{
Role: llm.RoleAssistant,
Content: summaryContent,
},
}); err != nil {
a.sessions.Delete(newID)
+ if a.store != nil {
+ if delErr := a.store.DeleteSession(ctx, newID); delErr != nil {
+ a.logger.Warn("Failed to remove empty continuation session", "session_id", newID, "error", delErr)
+ }
+ }
return "", false, err
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/agent/api.go` around lines 1224 - 1239, If RecordExternalMessage
fails after creating the persisted continuation via newManagedSession, perform a
best-effort removal of that persisted session (newID) before returning: call the
persistent session-store delete/remove API for newID (in addition to the
existing in-memory a.sessions.Delete(newID)), handle and log any error from that
delete, and then return the original error; ensure this cleanup happens in the
error branch that currently only calls a.sessions.Delete(newID) so no empty
continuation remains in the persistent SessionStore.
| func (b *Bot) flushIncomingMessages(ctx context.Context, cs *chatState, convKey string, gen uint64) { | ||
| text, ok := cs.TakePendingMessages(gen, "\n\n") | ||
| if !ok { | ||
| return | ||
| } | ||
|
|
||
| thinkingTS := b.postThinking(cs) | ||
| user := b.userIdentity(convKey) | ||
|
|
||
| cs.mu.Lock() | ||
| cs.thinkingMu.Lock() | ||
| if thinkingTS != "" { | ||
| cs.thinkingMessage = &messageRef{channel: cs.channelID, timestamp: thinkingTS} | ||
| } | ||
| cs.mu.Unlock() | ||
| cs.thinkingMu.Unlock() | ||
|
|
||
| if cs.sessionID == "" { | ||
| if cs.SessionID() == "" { | ||
| b.createSession(ctx, cs, user, text) | ||
| } else { | ||
| if b.shouldRotateSession(ctx, cs, user.UserID) { | ||
| b.rotateSession(ctx, cs, user, text) | ||
| } else { | ||
| b.sendAgentMessage(ctx, cs, user, text) | ||
| sid, rotated := b.prepareSessionForMessage(ctx, cs, user) | ||
| if sid == "" { | ||
| b.createSession(ctx, cs, user, text) | ||
| return | ||
| } | ||
| if rotated { | ||
| b.clearPendingIndicators(cs) | ||
| } | ||
| b.sendAgentMessage(ctx, cs, user, text) | ||
| } |
There was a problem hiding this comment.
Don’t re-read SessionID() after session prep.
prepareSessionForMessage already selected/rotated the target session, but sendAgentMessage fetches cs.SessionID() again. subscribeLoop can clear that field asynchronously on session end, so the batched text can be sent with an empty ID and get rejected instead of reusing/reactivating the prepared session. Pass the prepared sid through to sendAgentMessage and ensureSubscription.
Also applies to: 501-516
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/service/slack/bot.go` around lines 327 - 354, The code re-reads
cs.SessionID() after prepareSessionForMessage which can race with subscribeLoop;
modify sendAgentMessage (and ensureSubscription) to accept the prepared sid
returned by prepareSessionForMessage and use that sid instead of calling
cs.SessionID() inside those functions, and update callers in
flushIncomingMessages (and the other block around lines 501-516) to pass the
sid; ensure rotated handling and clearPendingIndicators remain the same and that
createSession is still used when sid == "".
| // resetChat clears the session state for a chat. | ||
| func (b *Bot) resetChat(cs *chatState) { | ||
| cs.mu.Lock() | ||
| defer cs.mu.Unlock() | ||
|
|
||
| if cs.subCancel != nil { | ||
| cs.subCancel() | ||
| cs.subCancel = nil | ||
| if cancel := cs.Reset(); cancel != nil { | ||
| cancel() | ||
| } | ||
| cs.sessionID = "" | ||
| cs.ownerUserID = "" | ||
| cs.subSessionID = "" | ||
| cs.pendingPromptID = "" | ||
| cs.thinkingMu.Lock() | ||
| cs.thinkingMessage = nil | ||
| cs.thinkingMu.Unlock() | ||
| } |
There was a problem hiding this comment.
Delete the thinking placeholder during reset.
resetChat clears cs.thinkingMessage without deleting the Slack message first, so /dagu-new, /dagu-cancel, or missing-session resets can leave a stray _Thinking..._ message in the channel.
🛠 Possible fix
func (b *Bot) resetChat(cs *chatState) {
+ b.clearPendingIndicators(cs)
if cancel := cs.Reset(); cancel != nil {
cancel()
}
- cs.thinkingMu.Lock()
- cs.thinkingMessage = nil
- cs.thinkingMu.Unlock()
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/service/slack/bot.go` around lines 827 - 835, resetChat currently
nulled cs.thinkingMessage without removing the Slack placeholder, leaving a
stray "_Thinking..._" message; modify resetChat to, while holding cs.thinkingMu,
check if cs.thinkingMessage is non-nil and call the bot's message-deletion
helper (e.g., the Bot method or client used to delete messages) with the stored
channel/timestamp before setting cs.thinkingMessage = nil, handle/log any
deletion error, then unlock; keep the cs.Reset() cancel logic unchanged.
| threadKey := channelID + ":" + threadTS | ||
| m.bot.activeThreads.Store(threadKey, true) | ||
|
|
||
| cs := m.bot.getOrCreateChat(threadKey, channelID, threadTS) | ||
| user := m.bot.userIdentity(threadKey) | ||
|
|
||
| sessionID, stored, err := chatbridge.AppendNotification(ctx, m.agentAPI, &cs.State, user, s.Name, m.bot.cfg.SafeMode, msg) | ||
| if err != nil { | ||
| m.logger.Warn("Failed to create notification session", | ||
| m.logger.Warn("Failed to append threaded notification message", | ||
| slog.String("session", sessionID), | ||
| slog.String("dag", s.Name), | ||
| slog.String("error", err.Error()), | ||
| ) | ||
| m.bot.sendText(channelID, fmt.Sprintf("%s DAG '%s' %s\n(AI unavailable: %s)", | ||
| statusEmoji(s.Status), s.Name, s.Status.String(), err.Error())) | ||
| return false | ||
| return true |
There was a problem hiding this comment.
Only activate the thread after the notification is stored.
activeThreads.Store currently runs before chatbridge.AppendNotification. If that append fails, later replies in this thread still bypass the normal @mention gate and start a fresh session without the seeded notification context.
🛠 Possible fix
threadKey := channelID + ":" + threadTS
- m.bot.activeThreads.Store(threadKey, true)
cs := m.bot.getOrCreateChat(threadKey, channelID, threadTS)
user := m.bot.userIdentity(threadKey)
sessionID, stored, err := chatbridge.AppendNotification(ctx, m.agentAPI, &cs.State, user, s.Name, m.bot.cfg.SafeMode, msg)
if err != nil {
m.logger.Warn("Failed to append threaded notification message",
slog.String("session", sessionID),
slog.String("dag", s.Name),
slog.String("error", err.Error()),
)
return true
}
+ m.bot.activeThreads.Store(threadKey, true)
m.bot.markDelivered(cs, stored.SequenceID)
return true🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/service/slack/monitor.go` around lines 221 - 234, The thread is
being marked active before the notification is persisted; change the flow so you
only call m.bot.activeThreads.Store(threadKey, true) after
chatbridge.AppendNotification succeeds and indicates the notification was stored
(check the stored return value), and do not mark the thread active if err != nil
or stored == false; use the existing variables threadKey,
chatbridge.AppendNotification, m.bot.getOrCreateChat, and m.bot.userIdentity to
locate and update the logic.
| func (b *Bot) flushIncomingMessages(ctx context.Context, cs *chatState, chatID int64, gen uint64) { | ||
| text, ok := cs.TakePendingMessages(gen, "\n\n") | ||
| if !ok { | ||
| return | ||
| } | ||
|
|
||
| user := b.userIdentity(chatID) | ||
| if cs.SessionID() == "" { | ||
| b.createSession(ctx, cs, chatID, user, text) | ||
| return | ||
| } | ||
|
|
||
| sid, _ := b.prepareSessionForMessage(ctx, cs, user) | ||
| if sid == "" { | ||
| b.createSession(ctx, cs, chatID, user, text) | ||
| return | ||
| } | ||
| b.sendMessage(ctx, cs, chatID, user, text) | ||
| } |
There was a problem hiding this comment.
Reuse the prepared session ID for the send.
prepareSessionForMessage returns the session to target, but sendMessage then calls cs.SessionID() again. subscribeLoop can clear the active session concurrently when a stream ends, so this batch can end up being sent with sid == "" and fail instead of reusing/reactivating the prepared session. Please thread the returned sid through to sendMessage and its ensureSubscription call.
Also applies to: 384-403
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/service/telegram/bot.go` around lines 226 - 244,
flushIncomingMessages currently calls prepareSessionForMessage which returns a
session id (sid) but then ignores it and calls sendMessage which re-reads
cs.SessionID(), allowing subscribeLoop to clear the session concurrently; change
flushIncomingMessages (and the other similar block around lines 384-403) to pass
the returned sid into sendMessage so sendMessage and its call to
ensureSubscription use that explicit sid instead of calling cs.SessionID();
update sendMessage signature/usage to accept an optional sid (or a concrete sid
string) and ensure ensureSubscription uses that sid to reactivate/reuse the
prepared session.
5355af1 to
69da222
Compare
0fecee8 to
e762610
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1793 +/- ##
==========================================
+ Coverage 67.74% 68.57% +0.83%
==========================================
Files 418 422 +4
Lines 49383 50452 +1069
==========================================
+ Hits 33453 34596 +1143
+ Misses 13140 12919 -221
- Partials 2790 2937 +147
... and 14 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary
Testing
Summary by CodeRabbit
New Features
Tests
Documentation