fix(gateway): message delivery reliability with SQLite journal (inbound dedup + orphan recovery)#27939
fix(gateway): message delivery reliability with SQLite journal (inbound dedup + orphan recovery)#27939nohat wants to merge 6 commits intoopenclaw:mainfrom
Conversation
e142249 to
1b4da1c
Compare
1b4da1c to
37aee7a
Compare
The file-based delivery-queue/ stored pending outbound messages as JSON files: not atomic, not queryable, no status tracking, and no pruning. Inbound dedup was in-memory only, so channel redeliveries were not caught across restarts. In-flight turns at crash time were silently dropped. Replace both with a SQLite journal (message-journal.db in the state dir): - outbound_messages replaces delivery-queue/*.json with indexed status/retry tracking and permanent-error quarantine - inbound_events persists dedup via UNIQUE(channel, account_id, external_id) and tracks per-turn status for orphan recovery - dispatchInboundMessageInternal now journals every real inbound turn and marks it 'delivered' after the dispatcher fully drains - Startup: stale queued entries expire (30m TTL), orphaned 'processing' turns are re-dispatched, old delivery-queue files are migrated in place - Periodic prune every 6h keeps both tables bounded; terminal rows older than 48h are removed Journal errors are best-effort and never block message processing. Closes openclaw#22376, openclaw#9208, openclaw#14827, openclaw#16555
37aee7a to
785fd30
Compare
Greptile SummaryReplaced file-based message queuing with SQLite journal to fix duplicate message delivery and orphaned turn recovery issues. The implementation adds persistent deduplication for inbound messages and reliable retry logic for outbound deliveries. Key improvements:
Test coverage:
Error handling:
Confidence Score: 4/5
Last reviewed commit: 785fd30 |
| const rows = db.prepare(`PRAGMA table_info(${tableName})`).all() as Array<{ | ||
| name?: string | null; | ||
| }>; | ||
| const hasColumn = rows.some((row) => row.name === columnName); | ||
| if (hasColumn) { | ||
| return; | ||
| } | ||
| db.exec(`ALTER TABLE ${tableName} ADD COLUMN ${columnName} ${definition}`); |
There was a problem hiding this comment.
string interpolation in SQL with ${tableName}, ${columnName}, and ${definition} could allow injection if called with user input
currently safe since only called with hardcoded literals, but add parameter binding or validation to prevent future misuse
| const rows = db.prepare(`PRAGMA table_info(${tableName})`).all() as Array<{ | |
| name?: string | null; | |
| }>; | |
| const hasColumn = rows.some((row) => row.name === columnName); | |
| if (hasColumn) { | |
| return; | |
| } | |
| db.exec(`ALTER TABLE ${tableName} ADD COLUMN ${columnName} ${definition}`); | |
| const rows = db.prepare('PRAGMA table_info(?)').all(tableName) as Array<{ | |
| name?: string | null; | |
| }>; | |
| const hasColumn = rows.some((row) => row.name === columnName); | |
| if (hasColumn) { | |
| return; | |
| } | |
| // Note: SQLite doesn't support parameter binding for DDL; validate inputs instead | |
| if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(tableName) || !/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(columnName)) { | |
| throw new Error('Invalid table or column name'); | |
| } | |
| db.exec(`ALTER TABLE ${tableName} ADD COLUMN ${columnName} ${definition}`); |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/infra/message-journal/db.ts
Line: 109-116
Comment:
string interpolation in SQL with `${tableName}`, `${columnName}`, and `${definition}` could allow injection if called with user input
currently safe since only called with hardcoded literals, but add parameter binding or validation to prevent future misuse
```suggestion
const rows = db.prepare('PRAGMA table_info(?)').all(tableName) as Array<{
name?: string | null;
}>;
const hasColumn = rows.some((row) => row.name === columnName);
if (hasColumn) {
return;
}
// Note: SQLite doesn't support parameter binding for DDL; validate inputs instead
if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(tableName) || !/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(columnName)) {
throw new Error('Invalid table or column name');
}
db.exec(`ALTER TABLE ${tableName} ADD COLUMN ${columnName} ${definition}`);
```
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Addressed in 7e0351e. Added identifier validation via SQL_IDENTIFIER regex before interpolating tableName and columnName into DDL. Throws on invalid identifiers. The definition parameter remains allowlisted at call sites (INTEGER, TEXT, etc.).
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 785fd30f05
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| logVerbose( | ||
| `dispatch: deduped inbound turn — channel=${channel} external_id=${externalId} account=${finalized.AccountId ?? ""} turn=${pendingReplyId}`, | ||
| ); | ||
| return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; |
There was a problem hiding this comment.
Release dispatcher reservations when dedupe skips a turn
When acceptInboundOrSkip returns false, this early return bypasses withReplyDispatcher, so markComplete()/waitForIdle() are never called for that dispatcher instance. Since createReplyDispatcher starts with a reserved pending slot, duplicate inbound deliveries will leave permanently registered dispatchers and inflate getTotalPendingReplies(), which can continuously defer restart/reload logic and leak registry entries over time in channels that redeliver duplicates.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in 7e0351e. When acceptInboundOrSkip returns false, we now call dispatcher.markComplete() and await dispatcher.waitForIdle() before returning, so deduped turns release their reservation and unregister from the dispatcher registry. Added an assertion in the dispatch test to cover this path.
src/infra/message-journal/db.ts
Outdated
| CREATE UNIQUE INDEX IF NOT EXISTS idx_inbound_dedup | ||
| ON inbound_events(channel, account_id, external_id) |
There was a problem hiding this comment.
Scope inbound dedupe index by peer/thread, not message id alone
This unique index treats (channel, account_id, external_id) as globally unique, but some providers do not guarantee that scope (for example Telegram maps MessageSid to msg.message_id in src/telegram/bot-message-context.ts, where IDs are per chat). Two different chats in the same account can therefore share external_id and the second message will be incorrectly dropped as a duplicate; dedupe needs peer/thread context (like OriginatingTo/thread) or a stored composite key equivalent to buildInboundDedupeKey.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in 7e0351e. Replaced the unique index on (channel, account_id, external_id) with one on dedupe_key, which comes from buildInboundDedupeKey and includes peer/thread (OriginatingTo, MessageThreadId, etc.). This correctly scopes dedup for providers like Telegram where message IDs are per-chat. Added a test: "different peers with same message_id are not duplicates".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7e0351e154
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
src/gateway/server.impl.ts
Outdated
| await deliverOutboundPayloads({ | ||
| cfg: cfgAtStart, | ||
| channel: row.channel, | ||
| to: ctx.To ?? "", |
There was a problem hiding this comment.
Route recovered replies to originating peer
Orphan recovery rebuilds context and then sends recovered output to ctx.To, but inbound contexts can legitimately have To set to the bot endpoint while the actual peer lives in OriginatingTo (for example WhatsApp inbound mapping in src/web/auto-reply/monitor/process-message.ts). In those cases, a recovered turn after crash/restart is delivered to the wrong target (or fails), so the user never receives the reply the recovery path is meant to restore.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in 573ab2d. Orphan recovery now uses ctx.OriginatingTo ?? ctx.To ?? "" for the delivery target, so channels where To is the bot endpoint (e.g. WhatsApp) deliver recovered replies to the correct peer.
src/gateway/server.impl.ts
Outdated
|
|
||
| // Find turns still marked 'processing' — these are orphans from a crash. | ||
| // Skip very recent rows (minAgeMs=5s) in case a turn is still actively running. | ||
| const orphans = findProcessingInbound({ minAgeMs: 5_000 }); |
There was a problem hiding this comment.
Recover fresh orphan turns on immediate restarts
This startup path runs orphan recovery once and filters candidates with minAgeMs: 5_000; if the gateway is restarted quickly after a crash (common with process supervisors), in-flight rows younger than 5s are excluded and never retried in this process. Because those rows remain status='processing' and there is no subsequent orphan-recovery pass, the affected turn can be dropped indefinitely until another restart happens much later.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in 573ab2d. Reduced minAgeMs from 5_000 to 0. After a crash, the process is gone, so there’s no overlap. All processing rows are orphans; quick restarts (e.g. process supervisor) now recover in-flight turns instead of skipping them.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 68f9cd5246
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| }); | ||
| }, | ||
| }); | ||
| await dispatchRecoveredPendingReply({ ctx, cfg: cfgAtStart, dispatcher }); |
There was a problem hiding this comment.
Prevent double replay when outbound retry already exists
This startup path re-dispatches every processing inbound row without checking whether that turn already has a queued outbound retry. A crash between enqueueDelivery(...) and completeInboundTurn(...) leaves both artifacts (queued outbound + processing inbound), so restart will recover the queued outbound message and then replay the full turn again here, producing duplicate user replies. Please gate orphan replay on outbound journal state for the same turn (for example by linking outbound_messages.inbound_id and skipping re-dispatch when queued retries exist).
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed. We now:
- Add
inbound_idto outbound journal rows when enqueueing (viarouteReply→deliverOutboundPayloads→enqueueDelivery). - Run delivery recovery before orphan recovery so queued outbound is delivered first.
- Before re-dispatching each orphan, call
hasQueuedOutboundForInbound(row.id). If true, skip re-dispatch and log that the queued outbound will deliver. - When delivery recovery acks a delivery,
ackDeliveryreadsinbound_idand callscompleteInboundTurn(inbound_id, "delivered").
inboundId is passed from dispatch-from-config and dispatch-acp into all routeReply calls that originate from an inbound turn.
| (id, channel, account_id, target, payload, queued_at, status, retry_count, last_error) | ||
| VALUES (?, ?, ?, ?, ?, ?, 'queued', ?, ?)`, |
There was a problem hiding this comment.
Preserve retry attempt timestamp during queue migration
The migration inserts retry_count but drops entry.lastAttemptAt, even though recovery backoff relies on last_attempt_at and falls back to enqueuedAt when it is missing. For migrated entries that already retried before upgrade, this makes them look older than their real last failure and can trigger immediate retries that bypass the intended backoff window. Include last_attempt_at in the migrated insert so post-upgrade recovery honors existing retry timing.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed. The migration now includes last_attempt_at in the INSERT, using entry.lastAttemptAt ?? null. Recovery backoff now respects the last attempt time for migrated entries. The test "preserves lastAttemptAt so recovery honors backoff" covers this path.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 1b13a48cf6
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| for (const row of orphans) { | ||
| const outboundStatus = getOutboundStatusForInbound(row.id); | ||
| if (outboundStatus.queued > 0) { | ||
| logOrphan.info( | ||
| `Skipping orphan ${row.id} — queued outbound exists (will deliver without re-dispatch)`, |
There was a problem hiding this comment.
Avoid replaying orphans that have no outbound journal coverage
This recovery path treats outboundStatus == 0 as safe to replay, but some channel handlers still send replies without deliverOutboundPayloads/enqueueDelivery (for example Slack dispatch uses a direct deliver callback in src/slack/monitor/message-handler/dispatch.ts), so they never create outbound_messages rows. If the process crashes after such a send but before completeInboundTurn runs, restart will see a processing row with no outbound history and call dispatchRecoveredPendingReply again, causing a duplicate user-visible reply. Even after linking inbound_id for journaled sends, this branch still replays non-journaled channels; replay should be gated to turns with reliable outbound journaling (or another persisted post-send marker).
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 07ea6367a2
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| }, | ||
| }); | ||
| await dispatchRecoveredPendingReply({ ctx, cfg: cfgAtStart, dispatcher }); | ||
| completeInboundTurn(row.id, "delivered"); |
There was a problem hiding this comment.
Only mark recovered turn delivered after a successful send
dispatchRecoveredPendingReply can resolve even when provider delivery fails because ReplyDispatcher swallows deliver(...) rejections in its internal .catch(...) path, so this unconditional completeInboundTurn(..., "delivered") can run during startup recovery after transient send errors (for example, API/network outages). In that case the orphan row is finalized as delivered instead of incrementing recovery attempts, and the turn is never retried, causing silent message loss.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 07ea6367a2
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| }, | ||
| }); | ||
| await dispatchRecoveredPendingReply({ ctx, cfg: cfgAtStart, dispatcher }); | ||
| completeInboundTurn(row.id, "delivered"); |
There was a problem hiding this comment.
Only mark recovered turn delivered after a successful send
dispatchRecoveredPendingReply can resolve even when provider delivery fails because ReplyDispatcher swallows deliver(...) rejections in its internal .catch(...) path, so this unconditional completeInboundTurn(..., "delivered") can run during startup recovery after transient send errors (for example, API/network outages). In that case the orphan row is finalized as delivered instead of incrementing recovery attempts, and the turn is never retried, causing silent message loss.
Useful? React with 👍 / 👎.
|
Closing — superseded by the full lifecycle stack:
The inbound dedup + orphan recovery goals from this PR are now covered by the turn-tracking and persistent-dedup layers. |
Summary
Problem: OpenClaw has two reliability gaps that have surfaced repeatedly in bug reports and support threads:
Outbound: The delivery queue stores pending messages as
delivery-queue/*.jsonfiles. Files are not atomic, not queryable, and accumulate silently — there is no status tracking, no per-entry retry state, and no pruning. A crash mid-write produces a corrupt entry; entries that will never succeed (blocked bot, deleted chat) are retried forever.Inbound: Duplicate-delivery suppression is in-memory only. Channels like Telegram and WhatsApp redeliver unacknowledged messages; after a gateway restart the seen-set is cleared and the bot replies twice. There is also no record of turns that were accepted for dispatch but not yet completed when the gateway stopped — those are silently dropped, the user sees no reply.
Why it matters: Silent message loss and duplicate replies are the most-reported reliability issues.
What changed:
Both queues replaced with a single SQLite journal (
message-journal.db, in the state dir). Usesnode:sqlite— Node 22 built-in, no new dependency.inbound_events — one row per accepted inbound turn. A UNIQUE index on
dedupe_key(built from provider + account + peer/thread + message id) enforces dedup across restarts with correct per-chat/per-thread scope. When DB writes fail, an in-memory fallback with throttled warnings prevents duplicate spam. Status transitions:processing→delivered/aborted/failed. On startup, rows still inprocessingare recovered with bounded attempts (max 3), and stale processing rows are marked failed.outbound_messages — replaces
delivery-queue/*.json. Indexed status/retry/error tracking with permanent-error quarantine (entries that match known-unrecoverable patterns are moved to failed immediately rather than cycling through max retries). Retry backoff useslast_attempt_atso elapsed time carries across restarts — entries whose window hasn't elapsed are skipped rather than blocking recovery with an inline sleep. Abort errors are acked immediately without retry. Outbound rows are linked to inbound turns viainbound_id, and startup reconciliation uses outbound status (queued/delivered/failed) to prevent double replay while still recovering true orphans.dispatchInboundMessageInternal now journals every real inbound turn (heartbeats and recovery replays excluded), releases dispatcher reservations on dedupe-skip paths, and marks the inbound turn delivered only after dispatcher drain completes. Journal writes are synchronous best-effort; errors are logged at verbose level and never block message processing. Atomic transactions protect multi-step operations (failDelivery read+update, recovery failure accounting).
Migration: On first startup after upgrade, any remaining
delivery-queue/*.jsonfiles are imported intooutbound_messagesand deleted. Migrated entries preserve retry metadata includinglastAttemptAt, so backoff timing remains correct post-upgrade. No manual steps required.Pruning: Startup expires stale queued outbound rows (>30 min old) to failed, and marks stale inbound
processingrows as failed (max recovery age 24h). The existing dedupeCleanup maintenance timer runs a prune every 6h, removing terminal rows older than 48h from both tables.What did NOT change (scope boundary): Message routing, channel adapters, plugin hooks, agent logic, session management.
Change Type (select all)
Scope (select all touched areas)
Linked Issue/PR
User-visible / Behavior Changes
None. This is an internal reliability improvement. No config changes required. Migration from old file queue happens automatically on first startup.
Security Impact (required)
No)No)No)No)No)Yes, explain risk + mitigation:Repro + Verification
Environment
Steps
completeInboundTurn),kill -9the gateway.Expected
Actual
[orphan-recovery] Recovered orphan turn <id> (session=<key>)processing→delivered.failedand skipped.Evidence
inbound.test.ts,outbound.test.ts, anddispatch.test.ts, including dedupe scope, orphan replay gating, migration retry-timestamp preservation, malformed queued-row quarantine, and stale processing-row terminalization.run-journal-e2e.sh— 6 before/after scenarios: migration, inbound dedup, abort, orphan recovery, delivery recovery, pruningtest-orphan-recovery-e2e.sh— automated kill-9 + restart + log assertionHuman Verification (required)
delivery-queue/→ journal; inbound dedup persists across restart (no duplicate reply); orphan turn recovered afterkill -9+ restart; delivery retry after crash; journal pruning removes old rows; abort mid-turn recorded correctly.delivery-queue/absent; same message id across different peers is not deduped incorrectly; permanent error patterns quarantine immediately; backoff window not elapsed on restart (entry deferred to next restart, not replayed inline); DB write failure uses in-memory dedupe fallback; abort errors skip retry queue; startup does not double-replay when outbound state already exists for an inbound turn; malformed queued payload rows are quarantined without blocking recovery of valid rows.Compatibility / Migration
Yes)No)No) — automatic on first startup viamigrateFileQueueToJournal.Downgrade: if reverted, migrated
delivery-queue/files are gone but failure mode is identical to pre-patch (no queue).Failure Recovery (if this breaks)
~/.openclaw/message-journal.db(gateway degrades to no-queue behavior) or revert this commit.delivery-queue/entries are migrated and deleted on first startup.getJournalDb()throwing on every message (SQLite file corrupt or missing write permission); startup log showing no journal operations where expected.Risks and Mitigations
DatabaseSyncsingleton per process. Journal writes are best-effort (errors logged, never thrown to callers). Atomic transactions for multi-step operations.node:sqliteunavailable in environments running Node < 22.5.requireNodeSqlite()throws early at first use with a clear error; the memory subsystem already requires this. No silent degradation.inbound_id; startup reconciliation inspects outbound status for each orphan inbound row (queuedskip replay,deliveredmark inbound delivered,failedmark inbound failed, none replay).AI-assisted. Reviewed and E2E-verified by @nohat.