Skip to content

Commit 785fd30

Browse files
committed
feat(infra): replace file delivery-queue with SQLite message journal
The file-based delivery-queue/ stored pending outbound messages as JSON files: not atomic, not queryable, no status tracking, and no pruning. Inbound dedup was in-memory only, so channel redeliveries were not caught across restarts. In-flight turns at crash time were silently dropped. Replace both with a SQLite journal (message-journal.db in the state dir): - outbound_messages replaces delivery-queue/*.json with indexed status/retry tracking and permanent-error quarantine - inbound_events persists dedup via UNIQUE(channel, account_id, external_id) and tracks per-turn status for orphan recovery - dispatchInboundMessageInternal now journals every real inbound turn and marks it 'delivered' after the dispatcher fully drains - Startup: stale queued entries expire (30m TTL), orphaned 'processing' turns are re-dispatched, old delivery-queue files are migrated in place - Periodic prune every 6h keeps both tables bounded; terminal rows older than 48h are removed Journal errors are best-effort and never block message processing. Closes #22376, #9208, #14827, #16555
1 parent 35e40f1 commit 785fd30

22 files changed

+2154
-1042
lines changed

src/auto-reply/dispatch.test.ts

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,22 @@
1-
import { describe, expect, it, vi } from "vitest";
1+
import { beforeEach, describe, expect, it, vi } from "vitest";
22
import type { OpenClawConfig } from "../config/config.js";
3-
import { dispatchInboundMessage, withReplyDispatcher } from "./dispatch.js";
3+
import {
4+
dispatchInboundMessage,
5+
dispatchRecoveredPendingReply,
6+
withReplyDispatcher,
7+
} from "./dispatch.js";
48
import type { ReplyDispatcher } from "./reply/reply-dispatcher.js";
59
import { buildTestCtx } from "./reply/test-ctx.js";
610

11+
vi.mock("../infra/message-journal/inbound.js", () => ({
12+
acceptInboundOrSkip: vi.fn(() => true),
13+
completeInboundTurn: vi.fn(),
14+
}));
15+
16+
// Import after mock registration so we get the spy instances.
17+
const { acceptInboundOrSkip, completeInboundTurn } =
18+
await import("../infra/message-journal/inbound.js");
19+
720
function createDispatcher(record: string[]): ReplyDispatcher {
821
return {
922
sendToolResult: () => true,
@@ -89,3 +102,92 @@ describe("withReplyDispatcher", () => {
89102
expect(order).toEqual(["sendFinalReply", "markComplete", "waitForIdle"]);
90103
});
91104
});
105+
106+
describe("dispatchInboundMessage — journal integration", () => {
107+
function makeDispatcher(): ReplyDispatcher {
108+
return {
109+
sendToolResult: () => true,
110+
sendBlockReply: () => true,
111+
sendFinalReply: () => true,
112+
getQueuedCounts: () => ({ tool: 0, block: 0, final: 0 }),
113+
markComplete: vi.fn(),
114+
waitForIdle: vi.fn(async () => {}),
115+
};
116+
}
117+
118+
beforeEach(() => {
119+
vi.mocked(acceptInboundOrSkip).mockClear().mockReturnValue(true);
120+
vi.mocked(completeInboundTurn).mockClear();
121+
});
122+
123+
it("calls acceptInboundOrSkip and assigns PendingReplyId when absent", async () => {
124+
const ctx = buildTestCtx(); // no PendingReplyId set
125+
expect(ctx.PendingReplyId).toBeUndefined();
126+
127+
await dispatchInboundMessage({
128+
ctx,
129+
cfg: {} as OpenClawConfig,
130+
dispatcher: makeDispatcher(),
131+
replyResolver: async () => ({ text: "ok" }),
132+
});
133+
134+
expect(acceptInboundOrSkip).toHaveBeenCalledTimes(1);
135+
// dispatch should have generated a UUID and set it on ctx
136+
const calledCtx = vi.mocked(acceptInboundOrSkip).mock.calls[0][0];
137+
expect(typeof calledCtx.PendingReplyId).toBe("string");
138+
expect(calledCtx.PendingReplyId!.length).toBeGreaterThan(0);
139+
});
140+
141+
it("calls completeInboundTurn('delivered') after successful dispatch", async () => {
142+
await dispatchInboundMessage({
143+
ctx: buildTestCtx(),
144+
cfg: {} as OpenClawConfig,
145+
dispatcher: makeDispatcher(),
146+
replyResolver: async () => ({ text: "ok" }),
147+
});
148+
149+
expect(completeInboundTurn).toHaveBeenCalledTimes(1);
150+
expect(vi.mocked(completeInboundTurn).mock.calls[0][1]).toBe("delivered");
151+
});
152+
153+
it("skips journal tracking for heartbeats", async () => {
154+
await dispatchInboundMessage({
155+
ctx: buildTestCtx(),
156+
cfg: {} as OpenClawConfig,
157+
dispatcher: makeDispatcher(),
158+
replyOptions: { isHeartbeat: true },
159+
replyResolver: async () => ({ text: "ok" }),
160+
});
161+
162+
expect(acceptInboundOrSkip).not.toHaveBeenCalled();
163+
expect(completeInboundTurn).not.toHaveBeenCalled();
164+
});
165+
166+
it("returns early without dispatching when acceptInboundOrSkip returns false (duplicate)", async () => {
167+
vi.mocked(acceptInboundOrSkip).mockReturnValue(false);
168+
const dispatcher = makeDispatcher();
169+
const sendFinalReply = vi.spyOn(dispatcher, "sendFinalReply");
170+
171+
await dispatchInboundMessage({
172+
ctx: buildTestCtx({ MessageSid: "dup-msg-001" }),
173+
cfg: {} as OpenClawConfig,
174+
dispatcher,
175+
replyResolver: async () => ({ text: "ok" }),
176+
});
177+
178+
expect(sendFinalReply).not.toHaveBeenCalled();
179+
expect(completeInboundTurn).not.toHaveBeenCalled();
180+
});
181+
182+
it("skips journal insert for orphan recovery re-dispatches", async () => {
183+
await dispatchRecoveredPendingReply({
184+
ctx: buildTestCtx({ PendingReplyId: "orphan-turn-001" }),
185+
cfg: {} as OpenClawConfig,
186+
dispatcher: makeDispatcher(),
187+
replyResolver: async () => ({ text: "ok" }),
188+
});
189+
190+
// acceptInboundOrSkip must NOT be called — orphan recovery re-uses the existing row
191+
expect(acceptInboundOrSkip).not.toHaveBeenCalled();
192+
});
193+
});

src/auto-reply/dispatch.ts

Lines changed: 88 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
import type { OpenClawConfig } from "../config/config.js";
2+
import { logVerbose } from "../globals.js";
3+
import { acceptInboundOrSkip, completeInboundTurn } from "../infra/message-journal/inbound.js";
4+
import { generateSecureUuid } from "../infra/secure-random.js";
25
import type { DispatchFromConfigResult } from "./reply/dispatch-from-config.js";
36
import { dispatchReplyFromConfig } from "./reply/dispatch-from-config.js";
47
import { finalizeInboundContext } from "./reply/inbound-context.js";
@@ -32,25 +35,99 @@ export async function withReplyDispatcher<T>(params: {
3235
}
3336
}
3437

35-
export async function dispatchInboundMessage(params: {
38+
type DispatchInboundMessageInternalParams = {
3639
ctx: MsgContext | FinalizedMsgContext;
3740
cfg: OpenClawConfig;
3841
dispatcher: ReplyDispatcher;
3942
replyOptions?: Omit<GetReplyOptions, "onToolResult" | "onBlockReply">;
4043
replyResolver?: typeof import("./reply.js").getReplyFromConfig;
41-
}): Promise<DispatchInboundResult> {
42-
const finalized = finalizeInboundContext(params.ctx);
43-
return await withReplyDispatcher({
44-
dispatcher: params.dispatcher,
44+
/** True when re-dispatching a recovered orphan turn — skips dedup + journal insert. */
45+
isOrphanReplyRecovery?: boolean;
46+
};
47+
48+
async function dispatchInboundMessageInternal({
49+
ctx,
50+
cfg,
51+
dispatcher,
52+
replyOptions,
53+
replyResolver,
54+
isOrphanReplyRecovery = false,
55+
}: DispatchInboundMessageInternalParams): Promise<DispatchInboundResult> {
56+
const finalized = finalizeInboundContext(ctx);
57+
58+
// Journal-based dedup + orphan-recovery tracking.
59+
// Only runs for real inbound turns (not heartbeats, not recovery replays).
60+
const shouldTrackTurn = !isOrphanReplyRecovery && replyOptions?.isHeartbeat !== true;
61+
62+
let pendingReplyId: string | undefined;
63+
if (shouldTrackTurn) {
64+
// Generate a stable turn ID if the caller didn't provide one.
65+
if (!finalized.PendingReplyId?.trim()) {
66+
finalized.PendingReplyId = generateSecureUuid();
67+
}
68+
pendingReplyId = finalized.PendingReplyId;
69+
// acceptInboundOrSkip returns false when this external_id was already processed
70+
// (duplicate delivery from the channel). Skip immediately if so.
71+
try {
72+
const accepted = acceptInboundOrSkip(finalized);
73+
if (!accepted) {
74+
const channel =
75+
finalized.OriginatingChannel ?? finalized.Surface ?? finalized.Provider ?? "unknown";
76+
const externalId = finalized.MessageSid ?? "(no message id)";
77+
logVerbose(
78+
`dispatch: deduped inbound turn — channel=${channel} external_id=${externalId} account=${finalized.AccountId ?? ""} turn=${pendingReplyId}`,
79+
);
80+
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
81+
}
82+
} catch (err) {
83+
// Journal errors must not block message processing.
84+
logVerbose(`dispatch: journal accept failed (continuing): ${String(err)}`);
85+
}
86+
}
87+
88+
const result = await withReplyDispatcher({
89+
dispatcher,
4590
run: () =>
4691
dispatchReplyFromConfig({
4792
ctx: finalized,
48-
cfg: params.cfg,
49-
dispatcher: params.dispatcher,
50-
replyOptions: params.replyOptions,
51-
replyResolver: params.replyResolver,
93+
cfg,
94+
dispatcher,
95+
replyOptions,
96+
replyResolver,
5297
}),
5398
});
99+
100+
// Mark turn delivered after dispatcher fully drains (including queued replies).
101+
if (pendingReplyId) {
102+
try {
103+
completeInboundTurn(pendingReplyId, "delivered");
104+
} catch (err) {
105+
logVerbose(`dispatch: journal complete failed: ${String(err)}`);
106+
}
107+
}
108+
109+
return result;
110+
}
111+
112+
export async function dispatchInboundMessage(params: {
113+
ctx: MsgContext | FinalizedMsgContext;
114+
cfg: OpenClawConfig;
115+
dispatcher: ReplyDispatcher;
116+
replyOptions?: Omit<GetReplyOptions, "onToolResult" | "onBlockReply">;
117+
replyResolver?: typeof import("./reply.js").getReplyFromConfig;
118+
}): Promise<DispatchInboundResult> {
119+
return dispatchInboundMessageInternal(params);
120+
}
121+
122+
/** Re-dispatch a recovered orphan turn — skips dedup check and journal insert. */
123+
export async function dispatchRecoveredPendingReply(params: {
124+
ctx: MsgContext | FinalizedMsgContext;
125+
cfg: OpenClawConfig;
126+
dispatcher: ReplyDispatcher;
127+
replyOptions?: Omit<GetReplyOptions, "onToolResult" | "onBlockReply">;
128+
replyResolver?: typeof import("./reply.js").getReplyFromConfig;
129+
}): Promise<DispatchInboundResult> {
130+
return dispatchInboundMessageInternal({ ...params, isOrphanReplyRecovery: true });
54131
}
55132

56133
export async function dispatchInboundMessageWithBufferedDispatcher(params: {
@@ -64,7 +141,7 @@ export async function dispatchInboundMessageWithBufferedDispatcher(params: {
64141
params.dispatcherOptions,
65142
);
66143
try {
67-
return await dispatchInboundMessage({
144+
return await dispatchInboundMessageInternal({
68145
ctx: params.ctx,
69146
cfg: params.cfg,
70147
dispatcher,
@@ -87,7 +164,7 @@ export async function dispatchInboundMessageWithDispatcher(params: {
87164
replyResolver?: typeof import("./reply.js").getReplyFromConfig;
88165
}): Promise<DispatchInboundResult> {
89166
const dispatcher = createReplyDispatcher(params.dispatcherOptions);
90-
return await dispatchInboundMessage({
167+
return await dispatchInboundMessageInternal({
91168
ctx: params.ctx,
92169
cfg: params.cfg,
93170
dispatcher,

src/auto-reply/inbound.test.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,9 @@ describe("inbound dedupe", () => {
166166
expect(buildInboundDedupeKey(ctx)).toBe("telegram|telegram:123|42");
167167
});
168168

169-
it("skips duplicates with the same key", () => {
169+
it("shouldSkipDuplicateInbound is deprecated and always returns false (dedup is now SQLite-backed)", () => {
170+
// shouldSkipDuplicateInbound has been replaced by acceptInboundOrSkip in the message journal.
171+
// The stub always returns false to avoid breaking callers until they are removed.
170172
resetInboundDedupe();
171173
const ctx: MsgContext = {
172174
Provider: "whatsapp",
@@ -175,7 +177,7 @@ describe("inbound dedupe", () => {
175177
MessageSid: "msg-1",
176178
};
177179
expect(shouldSkipDuplicateInbound(ctx, { now: 100 })).toBe(false);
178-
expect(shouldSkipDuplicateInbound(ctx, { now: 200 })).toBe(true);
180+
expect(shouldSkipDuplicateInbound(ctx, { now: 200 })).toBe(false); // deprecated: always false
179181
});
180182

181183
it("does not dedupe when the peer changes", () => {
@@ -193,7 +195,9 @@ describe("inbound dedupe", () => {
193195
).toBe(false);
194196
});
195197

196-
it("does not dedupe across session keys", () => {
198+
it("shouldSkipDuplicateInbound does not dedupe across session keys (deprecated stub)", () => {
199+
// shouldSkipDuplicateInbound is a deprecated no-op; it always returns false.
200+
// Real per-session dedup is performed by acceptInboundOrSkip in the message journal.
197201
resetInboundDedupe();
198202
const base: MsgContext = {
199203
Provider: "whatsapp",
@@ -209,7 +213,7 @@ describe("inbound dedupe", () => {
209213
).toBe(false);
210214
expect(
211215
shouldSkipDuplicateInbound({ ...base, SessionKey: "agent:alpha:main" }, { now: 300 }),
212-
).toBe(true);
216+
).toBe(false); // deprecated: always false
213217
});
214218
});
215219

src/auto-reply/reply/abort.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ vi.mock("../../agents/pi-embedded.js", () => ({
2525
resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`,
2626
}));
2727

28+
// Stub journal so abort tests don't attempt to open a real SQLite DB.
29+
vi.mock("../../infra/message-journal/inbound.js", () => ({
30+
abortProcessingInboundForSession: vi.fn(),
31+
}));
32+
2833
const commandQueueMocks = vi.hoisted(() => ({
2934
clearCommandLane: vi.fn(),
3035
}));

src/auto-reply/reply/abort.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
updateSessionStore,
1717
} from "../../config/sessions.js";
1818
import { logVerbose } from "../../globals.js";
19+
import { abortProcessingInboundForSession } from "../../infra/message-journal/inbound.js";
1920
import { parseAgentSessionKey } from "../../routing/session-key.js";
2021
import { resolveCommandAuthorization } from "../command-auth.js";
2122
import { normalizeCommandBody, type CommandNormalizeOptions } from "../commands-registry.js";
@@ -330,6 +331,9 @@ export async function tryFastAbortFromMessage(params: {
330331
nextEntry.updatedAt = Date.now();
331332
nextStore[key] = nextEntry;
332333
});
334+
// Mark any in-flight inbound journal entries for this session as 'aborted'
335+
// so orphan recovery won't re-dispatch them after a restart.
336+
abortProcessingInboundForSession(key);
333337
} else if (abortKey) {
334338
setAbortMemory(abortKey, true);
335339
}

0 commit comments

Comments
 (0)