feat(outbox): migrate delivery queue from file-based to SQLite outbox#29998
feat(outbox): migrate delivery queue from file-based to SQLite outbox#29998nohat wants to merge 7 commits intoopenclaw:mainfrom
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: cda23e90e0
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
src/infra/outbound/delivery-queue.ts
Outdated
| entry.lastAttemptAt ?? null, | ||
| ); | ||
| }); | ||
| fs.unlinkSync(filePath); |
There was a problem hiding this comment.
Keep legacy queue files when DB is in-memory fallback
If opening message-lifecycle.db fails, getLifecycleDb transparently falls back to :memory:; importLegacyFileQueue still proceeds and then deletes each legacy delivery-queue/*.json file. In that failure mode (for example, transient SQLite open failures), startup migrates pending deliveries into non-durable memory and removes the only durable source, so those unsent messages are lost after the next restart.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Already addressed — importLegacyFileQueue checks isLifecycleDbInMemory(stateDir) at line 359 and only deletes legacy .json files when the DB is persistent (if (!inMemory) at line 414). When the DB falls back to in-memory, legacy files are preserved as the durable source.
| SET status='delivered', delivered_at=?, completed_at=? | ||
| WHERE id=?`, |
There was a problem hiding this comment.
Prune delivered outbox rows after acknowledgement
ackDelivery now marks rows as delivered instead of deleting them, but there is no caller that runs pruneOutbox, so successful deliveries accumulate indefinitely in message_outbox. On busy/long-lived gateways this causes unbounded DB growth and eventually degrades write and recovery performance; either remove rows on ack or wire periodic pruning.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Periodic pruning is wired up in the write-ahead PR (#30009) which calls pruneOutbox on a schedule. Keeping rows on ack is intentional here so the write-ahead layer can track delivery stats per-turn before pruning.
Greptile SummaryThis PR successfully migrates the delivery queue from a file-based system to a SQLite-backed outbox table. The implementation includes proper WAL mode configuration, transaction handling, and fallback to in-memory storage. Legacy file queue entries are imported on startup and deleted after migration. Key changes:
The migration is backward compatible and handles edge cases well (malformed entries, missing files, concurrent access). Confidence Score: 4/5
Last reviewed commit: cda23e9 |
| gifPlayback?: boolean; | ||
| silent?: boolean; | ||
| mirror?: DeliveryMirrorPayload; | ||
| dispatchKind?: string; |
There was a problem hiding this comment.
dispatchKind parameter defined but never set anywhere in codebase - all entries will have NULL value. Consider adding test coverage for the dispatch_kind filtering behavior once this feature is fully implemented.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/infra/outbound/delivery-queue.ts
Line: 45
Comment:
`dispatchKind` parameter defined but never set anywhere in codebase - all entries will have NULL value. Consider adding test coverage for the `dispatch_kind` filtering behavior once this feature is fully implemented.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
dispatch_kind is set in the write-ahead PR (#30009) which populates it when enqueueing outbox entries. The column is defined here so the schema is ready; test coverage for filtering will land with that PR.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a11173bfa9
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| const fallbackKey = `memory:${dbPath}`; | ||
| const fallbackCached = DB_CACHE.get(fallbackKey); | ||
| if (fallbackCached) { | ||
| return fallbackCached; | ||
| } |
There was a problem hiding this comment.
Reuse in-memory fallback for all subsequent lifecycle DB opens
When opening message-lifecycle.db fails, the fallback is cached only under memory:${dbPath} and not under the canonical dbPath key, so a later getLifecycleDb call can silently switch to a newly opened file-backed DB once the filesystem issue clears. That mid-process backend switch strands any outbox rows written to the in-memory DB and can drop pending deliveries after restart; it also leaves isLifecycleDbInMemory true because the memory key remains cached. Keep one stable cache key per stateDir (or check the memory key before retrying disk open) to avoid losing queued data.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed — getLifecycleDb() now checks memory:${dbPath} in the cache before attempting to open the disk DB. Once a path falls back to in-memory, all subsequent calls for that stateDir return the in-memory DB for the rest of the process lifetime. See commit 830c6f8.
a11173b to
61c9b84
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 830c6f8aff
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
src/infra/outbound/delivery-queue.ts
Outdated
| const row = db.prepare(`SELECT attempt_count FROM message_outbox WHERE id=?`).get(id) as | ||
| | { attempt_count: number } |
There was a problem hiding this comment.
Restrict failDelivery to non-terminal outbox rows
failDelivery reads and updates by id only, so it can overwrite rows that were already terminalized (including delivered). Startup recovery is kicked off in a detached async task and can overlap with live delivery, so if one path ackDeliverys a row and the other path later calls failDelivery for that same row, this code flips it back to retryable and it gets re-sent on a later recovery despite already being delivered. Guard the select/update with a status predicate (only queued/failed_retryable) so terminal rows cannot be resurrected.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed — the failDelivery SELECT now guards with AND status IN ('queued','failed_retryable') so it is a no-op if the row is already terminal. This prevents the recovery worker from resurrecting rows that the live path already acked/failed.
06a50f2 to
2b3a810
Compare
There was a problem hiding this comment.
Pull request overview
This PR replaces the legacy file-based outbound delivery queue with a SQLite-backed outbox, adding schema/init code and updating recovery + tests to operate on database rows rather than queue files.
Changes:
- Add a cached SQLite “message lifecycle” DB with an outbox table and indexes.
- Migrate delivery-queue operations (enqueue/ack/fail/load/recover) from filesystem JSON files to SQLite rows, plus a legacy file-queue importer.
- Extend config schema/help/labels for delivery expiry behavior and update gateway startup to run legacy import + recovery.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| src/infra/outbound/outbound.test.ts | Updates delivery queue tests to validate SQLite outbox behavior instead of filesystem queue files. |
| src/infra/outbound/delivery-queue.ts | Reimplements queue persistence/recovery on top of message_outbox (SQLite), adds legacy import + pruning. |
| src/infra/message-lifecycle/index.ts | Re-exports lifecycle DB helpers. |
| src/infra/message-lifecycle/db.ts | Introduces SQLite DB open/cache logic and creates the message_outbox schema/indexes. |
| src/gateway/server.impl.ts | Runs legacy file-queue import and outbox recovery on startup. |
| src/config/zod-schema.session.ts | Adds messages.delivery config validation for expiry settings. |
| src/config/types.messages.ts | Adds typed config for delivery expiry behavior. |
| src/config/schema.labels.ts | Adds UI labels for new delivery config fields. |
| src/config/schema.help.ts | Adds help text for new delivery config fields. |
| export function pruneOutbox(ageMs: number, stateDir?: string): void { | ||
| const db = getLifecycleDb(stateDir); | ||
| const cutoff = Date.now() - ageMs; | ||
| try { | ||
| db.prepare( | ||
| `DELETE FROM message_outbox | ||
| WHERE status IN ('delivered','failed_terminal','expired') | ||
| AND COALESCE(completed_at, delivered_at, queued_at) < ?`, | ||
| ).run(cutoff); |
There was a problem hiding this comment.
pruneOutbox(ageMs) will delete all terminal rows if called with ageMs <= 0 (cutoff becomes now or in the future). Since this is an exported helper, consider validating ageMs (finite, >0) and no-op/log when invalid to avoid accidental mass deletion via misconfiguration or call-site bugs.
There was a problem hiding this comment.
Fixed in d9f04d1. pruneOutbox now guards against invalid ageMs (<=0 or non-finite) with an early return and log message.
src/infra/message-lifecycle/db.ts
Outdated
| db.exec("DROP INDEX IF EXISTS idx_message_outbox_idem"); | ||
| db.exec(` | ||
| CREATE UNIQUE INDEX IF NOT EXISTS idx_message_outbox_idem | ||
| ON message_outbox(idempotency_key) | ||
| WHERE idempotency_key IS NOT NULL; | ||
| `); |
There was a problem hiding this comment.
ensureLifecycleSchema() unconditionally drops idx_message_outbox_idem and then recreates it, which means every process start will perform a DROP+CREATE even when no migration is needed. This can add avoidable lock contention and startup cost; consider only dropping when you detect an old/incorrect index definition (or gate the DROP behind a one-time schema version).
There was a problem hiding this comment.
Fixed in d9f04d1. Removed the unconditional DROP INDEX IF EXISTS — CREATE UNIQUE INDEX IF NOT EXISTS is sufficient and the drop/recreate cycle briefly removes the unique constraint.
| import type { OpenClawConfig } from "../../config/config.js"; | ||
| import { typedCases } from "../../test-utils/typed-cases.js"; | ||
| import { getLifecycleDb } from "../message-lifecycle/db.js"; | ||
| import { | ||
| ackDelivery, |
There was a problem hiding this comment.
This test suite now opens a cached SQLite DB via getLifecycleDb(tmpDir), but the afterEach only deletes tmpDir. Because the DB handle is cached globally, this can leak file descriptors and may prevent tmpDir deletion on platforms like Windows. Consider calling clearLifecycleDbCacheForTest() (or otherwise closing the DB) in afterEach when using getLifecycleDb in tests.
There was a problem hiding this comment.
Fixed in d9f04d1. Added clearLifecycleDbCacheForTest() in afterEach before rmSync so cached DB handles are released.
| // Age all queued entries to simulate having survived a gateway crash. | ||
| // Without aging, entries are not eligible because the outbox worker defers fresh | ||
| // entries for 5 s to avoid racing with the direct delivery path. | ||
| const makeEntriesEligible = () => { | ||
| const db = getLifecycleDb(tmpDir); | ||
| const past = Date.now() - 10_000; | ||
| db.prepare( | ||
| `UPDATE message_outbox SET queued_at=?, next_attempt_at=? WHERE status='queued'`, | ||
| ).run(past, past); |
There was a problem hiding this comment.
The comment says recovery defers fresh entries for 5s to avoid racing with the direct delivery path, but the SQLite outbox implementation shown here enqueues with next_attempt_at=now and loadPendingDeliveries() has no age cutoff. Either implement the stated deferral (e.g., ignore queued_at newer than a threshold on startup) or adjust the comment/test helper so it matches actual behavior.
There was a problem hiding this comment.
Fixed in d9f04d1. Updated the comment to describe the actual next_attempt_at mechanism instead of the stale "5s delay" reference.
| ); | ||
| } | ||
| db.exec("PRAGMA journal_mode=WAL;"); | ||
| db.exec("PRAGMA synchronous=NORMAL;"); |
There was a problem hiding this comment.
getLifecycleDb() enables WAL/NORMAL but does not configure a busy timeout. Without PRAGMA busy_timeout, concurrent access can fail immediately with SQLITE_BUSY instead of waiting briefly; consider setting a small busy_timeout (consistent with other SQLite usage in the repo) to reduce spurious lock errors under load.
| db.exec("PRAGMA synchronous=NORMAL;"); | |
| db.exec("PRAGMA synchronous=NORMAL;"); | |
| db.exec("PRAGMA busy_timeout=5000;"); |
There was a problem hiding this comment.
Fixed in d9f04d1. Added PRAGMA busy_timeout=5000; after the WAL/NORMAL pragmas to prevent SQLITE_BUSY during concurrent access.
| // Once a path falls back to in-memory, stick with it for the process lifetime | ||
| // to avoid orphaning the in-memory DB if the disk recovers later. | ||
| const memCached = DB_CACHE.get(`memory:${dbPath}`); | ||
| if (memCached) { | ||
| return memCached; | ||
| } | ||
|
|
||
| const { DatabaseSync } = requireNodeSqlite(); | ||
| let db: DatabaseSync; | ||
| let cacheKey = dbPath; | ||
| try { | ||
| fs.mkdirSync(path.dirname(dbPath), { recursive: true, mode: 0o700 }); | ||
| db = new DatabaseSync(dbPath); | ||
| } catch (err) { | ||
| const fallbackKey = `memory:${dbPath}`; | ||
| const fallbackCached = DB_CACHE.get(fallbackKey); | ||
| if (fallbackCached) { | ||
| return fallbackCached; | ||
| } | ||
| db = new DatabaseSync(":memory:"); | ||
| cacheKey = fallbackKey; | ||
| logVerbose( | ||
| `message-lifecycle/db: failed to open ${dbPath}; using in-memory fallback (${String(err)})`, | ||
| ); |
There was a problem hiding this comment.
getLifecycleDb() silently falls back to an in-memory database on open errors. For an outbox intended to provide durable delivery across restarts, this fallback can cause message loss after a crash/restart while appearing to work. Consider failing fast (or at least logging at warn/error and disabling outbox/recovery) when the persistent DB cannot be opened, instead of switching to :memory: in production paths.
| // Once a path falls back to in-memory, stick with it for the process lifetime | |
| // to avoid orphaning the in-memory DB if the disk recovers later. | |
| const memCached = DB_CACHE.get(`memory:${dbPath}`); | |
| if (memCached) { | |
| return memCached; | |
| } | |
| const { DatabaseSync } = requireNodeSqlite(); | |
| let db: DatabaseSync; | |
| let cacheKey = dbPath; | |
| try { | |
| fs.mkdirSync(path.dirname(dbPath), { recursive: true, mode: 0o700 }); | |
| db = new DatabaseSync(dbPath); | |
| } catch (err) { | |
| const fallbackKey = `memory:${dbPath}`; | |
| const fallbackCached = DB_CACHE.get(fallbackKey); | |
| if (fallbackCached) { | |
| return fallbackCached; | |
| } | |
| db = new DatabaseSync(":memory:"); | |
| cacheKey = fallbackKey; | |
| logVerbose( | |
| `message-lifecycle/db: failed to open ${dbPath}; using in-memory fallback (${String(err)})`, | |
| ); | |
| const { DatabaseSync } = requireNodeSqlite(); | |
| let db: DatabaseSync; | |
| const cacheKey = dbPath; | |
| try { | |
| fs.mkdirSync(path.dirname(dbPath), { recursive: true, mode: 0o700 }); | |
| db = new DatabaseSync(dbPath); | |
| } catch (err) { | |
| logVerbose( | |
| `message-lifecycle/db: failed to open persistent database at ${dbPath} (${String( | |
| err, | |
| )})`, | |
| ); | |
| throw err; |
There was a problem hiding this comment.
Intentional design tradeoff documented in the code comment at lines 19-20. An in-memory outbox that loses data on crash is strictly better than crashing the gateway entirely. The isLifecycleDbInMemory() function is exposed for callers that need to know the fallback state. Additionally, logVerbose emits a warning when the fallback triggers.
| ): { eligible: true } | { eligible: false; remainingBackoffMs: number } { | ||
| const backoff = computeBackoffMs(entry.retryCount + 1); | ||
| if (backoff <= 0) { | ||
| return { eligible: true }; | ||
| } | ||
| const firstReplayAfterCrash = entry.retryCount === 0 && entry.lastAttemptAt === undefined; | ||
| if (firstReplayAfterCrash) { | ||
| return { eligible: true }; | ||
| } | ||
| const hasAttemptTimestamp = | ||
| typeof entry.lastAttemptAt === "number" && | ||
| Number.isFinite(entry.lastAttemptAt) && | ||
| entry.lastAttemptAt > 0; | ||
| const baseAttemptAt = hasAttemptTimestamp | ||
| ? (entry.lastAttemptAt ?? entry.enqueuedAt) | ||
| : entry.enqueuedAt; | ||
| const baseAttemptAt = hasAttemptTimestamp ? entry.lastAttemptAt! : entry.enqueuedAt; | ||
| const nextEligibleAt = baseAttemptAt + backoff; |
There was a problem hiding this comment.
isEntryEligibleForRecoveryRetry() computes backoff using computeBackoffMs(entry.retryCount + 1), but failDelivery() schedules next_attempt_at using computeBackoffMs(attempt_count). This off-by-one makes recovery more conservative than the persisted schedule (e.g., retryCount=1 waits 25s instead of 5s) and can leave entries deferred during the one-time startup recovery. Align the backoff calculation with attempt_count/next_attempt_at (or simply use next_attempt_at as the source of truth).
There was a problem hiding this comment.
Verified not a bug. The math is consistent:
failDeliveryincrementsattempt_counttonextCountand writesnext_attempt_at = now + computeBackoffMs(nextCount)isEntryEligibleForRecoveryRetryreadsentry.retryCount(= storedattempt_count=nextCount) and computescomputeBackoffMs(entry.retryCount)— same backoff level
The +1 in isEntryEligibleForRecoveryRetry at line 342 was from an earlier version. On the current branch, this reads computeBackoffMs(entry.retryCount) directly (the stored incremented value), matching what failDelivery used to compute next_attempt_at.
| if (isPermanentDeliveryError(error)) { | ||
| db.prepare( | ||
| `UPDATE message_outbox | ||
| SET status='failed_terminal', | ||
| error_class='permanent', | ||
| last_error=?, | ||
| completed_at=?, | ||
| terminal_reason=? | ||
| WHERE id=?`, | ||
| ).run(error, now, error, id); | ||
| return; | ||
| } | ||
| const nextCount = row.attempt_count + 1; |
There was a problem hiding this comment.
failDelivery(): when classifying an error as permanent, the UPDATE does not increment attempt_count or set last_attempt_at. This makes the outbox row look like it never had an attempt despite being terminally failed, and can skew metrics/debugging. Consider incrementing attempt_count and recording last_attempt_at consistently for all failure paths (permanent, retryable, terminal-by-max-retries).
| if (isPermanentDeliveryError(error)) { | |
| db.prepare( | |
| `UPDATE message_outbox | |
| SET status='failed_terminal', | |
| error_class='permanent', | |
| last_error=?, | |
| completed_at=?, | |
| terminal_reason=? | |
| WHERE id=?`, | |
| ).run(error, now, error, id); | |
| return; | |
| } | |
| const nextCount = row.attempt_count + 1; | |
| const nextCount = row.attempt_count + 1; | |
| if (isPermanentDeliveryError(error)) { | |
| db.prepare( | |
| `UPDATE message_outbox | |
| SET status='failed_terminal', | |
| error_class='permanent', | |
| attempt_count=?, | |
| last_error=?, | |
| last_attempt_at=?, | |
| completed_at=?, | |
| terminal_reason=? | |
| WHERE id=?`, | |
| ).run(nextCount, error, now, now, error, id); | |
| return; | |
| } |
There was a problem hiding this comment.
Fixed in d9f04d1. The permanent-error path in failDelivery now also sets attempt_count and last_attempt_at for observability.
src/infra/outbound/delivery-queue.ts
Outdated
| if (now >= deadline) { | ||
| const deferred = pending.length - recovered - failed - skippedMaxRetries - deferredBackoff; | ||
| opts.log.warn(`Recovery time budget exceeded — ${deferred} entries deferred to next restart`); | ||
| opts.log.warn(`Recovery time budget exceeded — ${deferred} entries deferred to next tick`); |
There was a problem hiding this comment.
recoverPendingDeliveries() logs that remaining entries are "deferred to next tick", but this recovery is invoked only once on gateway startup (no periodic rerun in this PR). The message is misleading; consider changing it back to "next restart" or "next recovery run" (or scheduling a follow-up run if "tick" is intended).
| opts.log.warn(`Recovery time budget exceeded — ${deferred} entries deferred to next tick`); | |
| opts.log.warn(`Recovery time budget exceeded — ${deferred} entries deferred to next recovery run`); |
There was a problem hiding this comment.
Fixed in d9f04d1. Changed "deferred to next tick" to "deferred to next recovery pass" which is accurate in both startup-only and periodic recovery contexts.
085588f to
d9f04d1
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d9f04d1f4a
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
src/infra/outbound/delivery-queue.ts
Outdated
| error_class='terminal', | ||
| terminal_reason=COALESCE(terminal_reason,'moved to failed'), | ||
| completed_at=? | ||
| WHERE id=?`, |
There was a problem hiding this comment.
Guard moveToFailed from overwriting terminal outbox rows
moveToFailed updates by id only, so it can overwrite rows that another path already terminalized as delivered. Startup recovery is launched in a detached task from startGatewayServer and can overlap live delivery, so a concurrent ackDelivery followed by this update can flip a successfully delivered row back to failed_terminal, corrupting lifecycle state and downstream delivery accounting; constrain this update to non-terminal statuses (for example queued/failed_retryable) like failDelivery does.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in fc01448. Added AND status IN ('queued','failed_retryable') predicate to the moveToFailed UPDATE, mirroring the guard in failDelivery. Also added a test that acks a row first and verifies moveToFailed is a no-op for already-delivered rows.
d9f04d1 to
14611e0
Compare
Replace unbounded file-based delivery queue with queryable SQLite message_outbox table. Adds TTL/expiry for stale entries, delivery outcome retention, and one-time legacy file queue import on startup. Closes openclaw#23777, openclaw#16555, openclaw#29128
- failDelivery: terminalize non-final rows immediately (no recovery path) - failDelivery: set attempt_count/last_attempt_at on permanent errors - failDelivery: add dispatch_kind to SELECT for non-final detection - pruneOutbox: guard against invalid ageMs (<=0 or non-finite) - db: add PRAGMA busy_timeout=5000 to prevent SQLITE_BUSY - db: remove unconditional DROP INDEX before CREATE IF NOT EXISTS - test: add clearLifecycleDbCacheForTest in afterEach - test: update stale comments (5s delay, deferred-to-next-tick)
fc01448 to
1380f4d
Compare
Part of Message Reliability: Durable SQLite Outbox, Recovery Worker, and Unified sendPayload (#32063)
Summary
Change Type (select all)
Scope (select all touched areas)
Linked Issue/PR
User-visible / Behavior Changes
None — internal storage migration. Delivery queue data now in SQLite instead of flat files.
Security Impact (required)
Repro + Verification
Environment
Steps
Expected
Actual
Evidence
Human Verification (required)
Compatibility / Migration
Failure Recovery (if this breaks)
Risks and Mitigations