feat: Telegram bot for AI agent interaction#1783
Conversation
Add a Telegram bot service that bridges Telegram chats with the Dagu AI agent, allowing users to interact with the agent via Telegram messages. - New `dagu telegram` CLI command to run the bot standalone - Integrated into `dagu start-all` (auto-starts when token is configured) - Config via DAGU_TELEGRAM_TOKEN env var or telegram section in config.yaml - Chat ID allowlist for access control - Agent prompts rendered as Telegram inline keyboard buttons - Long messages auto-split at paragraph boundaries
The bot now monitors all DAG runs and sends AI-generated notifications when they complete. Instead of fixed template messages, the AI agent analyzes the run results (status, errors, step details) and writes a contextual, helpful notification message. - New DAGRunMonitor polls for completed DAG runs every 10 seconds - On completion, creates a temporary agent session to generate the message - Falls back to a simple status message if the agent is unavailable - Notifications sent to all allowed Telegram chats - Seeds seen-set on startup to avoid notifying about old runs
Add 'waiting' to monitored statuses so the bot proactively notifies when a DAG needs human approval. The AI agent generates an urgent message highlighting which steps are waiting for action.
Show why the AI agent couldn't generate a notification so the user can diagnose configuration issues (e.g., no model configured).
- Increase agent response timeout from 30s to 60s - Disable safe mode for notification sessions since the agent only needs to write a message, not execute tools. Safe mode could cause the agent to hang waiting for command approval with no one to approve.
The monitor now runs in the server process and uses the same agent API instance as the web UI. This ensures the agent has proper model config, tools, and session management. Previously it created a separate agent API that wasn't working correctly. - Add Server.AgentAPI() getter to expose the agent API - Remove initTelegramBot() which created a duplicate agent API - Initialize Telegram bot after server, using server.AgentAPI() - Handle nil agentAPI gracefully (falls back to simple notifications)
The latestAssistant variable was declared inside the poll loop, so it reset to empty on every tick. If the assistant message arrived while Working was still true, the next poll (when Working became false) saw no new messages and latestAssistant stayed empty — hanging forever. Fix: move latestAssistant outside the loop and scan all messages on each poll instead of only new ones.
Two fixes for 'prompt expired or already answered' error: 1. Don't process historical messages when subscription starts. Previously, subscribeLoop replayed ALL session messages on startup, including already-answered prompts. Now it only tracks new messages. 2. Only show prompt buttons when HasPendingPrompt is true in the session state, preventing stale prompts from being displayed.
Previously the monitor created a throwaway session with a system user for each notification, so the user couldn't ask follow-up questions. Now each notification creates a session with the Telegram user's identity and adopts it as the chat's active session. After receiving the notification, the user can immediately ask "show me the logs", "retry it", etc. and the agent remembers the DAG context.
Automatically start a fresh session when total token usage reaches 50% of the context limit (default 200k tokens). Prevents the agent from hitting context window errors during long conversations.
Instead of losing all context on rotation, the last 3 user/assistant exchanges are extracted from the old session and prepended to the user's new message. This gives the agent enough context to continue the conversation naturally without hitting the context limit.
- Define AgentService interface in telegram package with the 5 methods the bot uses, replacing direct *agent.API dependency - Replace Server.AgentAPI() getter with WithAgentAPICallback option so the server no longer permanently exposes its internal agent API - Require AllowedChatIDs to be non-empty, preventing open access
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis pull request adds comprehensive Telegram bot integration to Dagu, enabling users to interact with the AI agent via Telegram chats. The implementation includes a new CLI command to run the Telegram bot, configuration structures for tokens and permissions, a bot service that bridges Telegram to the agent API, DAG run monitoring with AI-powered notifications, and a callback mechanism for exposing the agent API to external services. The integration maintains backward compatibility when Telegram is not configured. Changes
Sequence Diagram(s)sequenceDiagram
participant CLI as CLI Init
participant Server as Web Server
participant Callback as Agent API Callback
participant Bot as Telegram Bot
participant AgentAPI as Agent Service
participant TgAPI as Telegram API
CLI->>Server: NewServer(resourceService, serverOpts...)
Server->>Callback: Apply WithAgentAPICallback option
Server->>AgentAPI: Create Agent API instance
Server->>Callback: Invoke callback with Agent API
Callback->>Bot: Initialize Bot with captured Agent API
Bot->>TgAPI: Create BotAPI with token
Bot->>AgentAPI: Ready for interactions
sequenceDiagram
participant TgAPI as Telegram API
participant Bot as Telegram Bot
participant Session as Session Manager
participant AgentAPI as Agent Service
participant Monitor as Subscription Loop
TgAPI->>Bot: User sends message
Bot->>Bot: Authorization check
Bot->>Session: createSession or sendMessage
Session->>AgentAPI: CreateSession or SendMessage
AgentAPI->>Session: Return session ID or accept
Bot->>Monitor: startSubscription
loop Streaming updates
AgentAPI->>Monitor: Stream responses
Monitor->>Bot: processStreamResponse
Bot->>TgAPI: Send message to chat
end
Bot->>Bot: shouldRotateSession?
Bot->>Session: rotateSession (with context summary)
sequenceDiagram
participant Monitor as DAG Run Monitor
participant DAGStore as DAG Run Store
participant AgentAPI as Agent Service
participant Bot as Telegram Bot
participant TgAPI as Telegram API
loop Every 10s
Monitor->>DAGStore: Poll for completed DAG runs
DAGStore->>Monitor: Return new completions
Monitor->>Monitor: Check seen tracking
alt New completion
Monitor->>AgentAPI: CreateSession for each allowed chat
Monitor->>Monitor: buildNotificationPrompt
AgentAPI->>Monitor: Stream response
Monitor->>Bot: Adopt session into chat
Bot->>TgAPI: Send AI notification + offer follow-up
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Replace the fragile polling-based subscription with the agent's built-in pub-sub mechanism (SubscribeWithSnapshot). This ensures messages are delivered reliably in real time instead of being missed due to sequence ID tracking race conditions. - Add API.SubscribeSession() to expose pub-sub for non-HTTP consumers - Add SubscribeSession to AgentService interface - Rewrite subscribeLoop to use blocking next() instead of polling
Previously each sendMessage call cancelled the existing subscription and started a new one, causing responses to be lost when messages were sent consecutively. Now ensureSubscription() checks if a subscription is already running for the session and skips if so. Only session creation and adoption force a new subscription.
There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/cmd/startall.go`:
- Around line 152-165: The code is passing a potentially typed-nil *agent.API
(agentAPI) into telegram.New which results in a non-nil interface value that
will panic when its methods are invoked; update the startup flow to avoid that
by checking agentAPI for nil before calling telegram.New (skip creating the bot
and log that agent API is unavailable), or modify telegram.New to validate the
incoming AgentService and return an error if it receives a typed-nil (reject the
service) so callers must not pass a nil *agent.API; reference the agentAPI
variable, telegram.New constructor, and the telegram.AgentService methods
(CreateSession, CancelSession, SendMessage, GetSessionDetail) when making the
change.
In `@internal/cmd/telegram.go`:
- Around line 76-96: The agent.APIConfig instantiation in telegram.go is missing
DAGStore which causes DAG context resolution to be skipped; call
ctx.dagStore(dagStoreConfig{}) and handle the error (returning
fmt.Errorf("failed to initialize DAG store: %w", err) on failure) to create
dagStore, then add DAGStore: dagStore to the agent.NewAPI(agent.APIConfig{...})
call so the DAG store is initialized before constructing the agent API.
In `@internal/cmn/config/loader.go`:
- Around line 1088-1110: The loadTelegramConfig function reads telegram.token,
telegram.allowed_chat_ids and telegram.safe_mode from Viper but there are no
BindEnv entries, so add explicit environment bindings to the envBindings slice
for these keys: bind "telegram.token" to DAGU_TELEGRAM_TOKEN,
"telegram.allowed_chat_ids" to DAGU_TELEGRAM_ALLOWED_CHAT_IDS, and
"telegram.safe_mode" to DAGU_TELEGRAM_SAFE_MODE so Viper will pick up those env
vars; update the envBindings array where other BindEnv entries are declared and
ensure the keys match those used in loadTelegramConfig.
In `@internal/service/telegram/bot.go`:
- Around line 52-57: chatState currently only stores sessionID which causes
identity mix-ups; add an owner field (e.g., ownerUserID string) to chatState and
set it when creating or adopting a session (where sessions are created/adopted
in the Telegram flows you touched). Update submitPromptResponse and the
monitor/adopt logic to read chatState.ownerUserID and use that user ID for all
agent/session operations instead of reconstructing identity from chatID; ensure
any re-keying uses the ownerUserID to look up or maintain the session. Also
update any codepaths referenced around the other regions (the block around lines
277-337 and the 574-580 area) to populate/consume the new ownerUserID field
consistently so group chats use the original session owner rather than chatID.
- Around line 294-315: The current flow in sendMessage (and similarly in
createSession) starts a new subscription only after sending, which lets fast
agent replies be missed because subscribeLoop seeds lastSeqID from the snapshot
taken on start; fix by ensuring the subscription is created/attached before
sending or by seeding the subscription with the pre-send last sequence ID:
acquire cs.mu, check if a subscriber for cs.sessionID already exists and reuse
it (avoid starting a second subscriber), or read the current latest sequence ID
from the session state (the pre-send snapshot) and pass that into
startSubscription/subscribeLoop so the subscription will include any replies
that arrive immediately after SendMessage; ensure cs.sessionID and cs.mu are
used to coordinate single-subscriber semantics.
- Around line 472-496: The prompt text (prompt.Question and prompt.Command) may
contain unescaped markdown characters, so stop sending these messages with
Markdown parsing enabled: in the block that builds msg :=
tgbotapi.NewMessage(chatID, text) (and before calling b.botAPI.Send), remove or
clear msg.ParseMode (do not set tgbotapi.ModeMarkdown) so the bot sends plain
text; keep the inline keyboard construction (tgbotapi.NewInlineKeyboardRow /
NewInlineKeyboardButtonData) and the Send call (b.botAPI.Send(msg)) unchanged.
In `@internal/service/telegram/monitor.go`:
- Around line 130-135: The loop currently calls m.markSeen(s) before attempting
notification, which hides transient send/session errors; change notifyCompletion
to return an error (or boolean success) that reflects whether all configured
chats were successfully notified (propagate errors from AI session creation and
telegram send in notifyCompletion and in the bot send path), and modify the loop
to call m.notifyCompletion(ctx, s) first and only call m.markSeen(s) if
notifyCompletion indicates success; update callers and tests for the new return
value and ensure notifyCompletion logs failures but does not mark seen on
partial/complete failure.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 81779571-c544-4551-8ecf-c396c16a903a
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (11)
cmd/main.gogo.modinternal/cmd/context.gointernal/cmd/startall.gointernal/cmd/telegram.gointernal/cmn/config/config.gointernal/cmn/config/definition.gointernal/cmn/config/loader.gointernal/service/frontend/server.gointernal/service/telegram/bot.gointernal/service/telegram/monitor.go
…ings When a user asks the agent to do something, it now: 1. Checks memory for saved task-to-DAG mappings 2. Lists existing DAGs to find relevant ones 3. Asks the user to confirm before executing 4. Saves the mapping to memory for future use This is a default behavior for all channels (web UI and Telegram).
The pub-sub delivers StreamResponse without SessionState, so hasPendingPrompt was always false and prompts were never shown. Fix by showing prompts whenever a UserPrompt message arrives.
1. Skip Telegram bot when agentAPI is nil to avoid typed-nil panic 2. Add missing DAGStore initialization in standalone telegram command 3. Add env var bindings (DAGU_TELEGRAM_TOKEN, etc.) for config loader 4. Store owning userID in chatState and reuse for all agent API calls instead of rebuilding identity from chat/callback IDs 5. Remove Markdown parse mode from prompt messages to avoid breaking on special characters in agent output 6. Move markSeen after notification delivery succeeds so transient failures are retried on the next poll 7. Remove unused userIdentityFromCallback method
The Telegram bot now starts in both `dagu server` and `dagu start-all`, using the server's agent API. This removes the standalone `dagu telegram` command which created a separate agent API that was missing DAGStore and other configuration. - Add Telegram bot startup to runServer using WithAgentAPICallback - Remove internal/cmd/telegram.go - Remove Telegram command registration from main.go - Remove ServiceTelegram from config loader
Restructure config from `telegram.*` to `bots.*` so future bot integrations (Slack, Discord, etc.) share a common namespace. `safe_mode` is now a shared field under `bots`, while Telegram-specific fields live under `bots.telegram`. Env vars change from DAGU_TELEGRAM_* to DAGU_BOTS_TELEGRAM_*.
Only one bot can be active at a time. The `bots.provider` field
selects which bot to start ("telegram", etc.). If empty, no bot
starts. Env var: DAGU_BOTS_PROVIDER.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1783 +/- ##
==========================================
- Coverage 69.71% 68.87% -0.84%
==========================================
Files 414 416 +2
Lines 47970 48602 +632
==========================================
+ Hits 33443 33477 +34
- Misses 11740 12339 +599
+ Partials 2787 2786 -1
... and 11 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary
dagu start-all(auto-starts whentelegram.tokenis configured)Features
/newto start fresh,/cancelto stop, auto-rotation at context limitallowed_chat_idsto restrict accessConfiguration
Test plan
dagu start-all, verify bot starts/newand/cancelcommandsallowed_chat_idsrestriction worksSummary by CodeRabbit
Release Notes
New Features
Chores