Skip to content

Commit 32c8710

Browse files
committed
feat(lifecycle): enable persistent inbound dedup across restarts
1 parent e5e4c0d commit 32c8710

File tree

2 files changed

+112
-6
lines changed

2 files changed

+112
-6
lines changed
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import fs from "node:fs";
2+
import os from "node:os";
3+
import path from "node:path";
4+
import { afterEach, beforeEach, describe, expect, it } from "vitest";
5+
import type { MsgContext } from "../../auto-reply/templating.js";
6+
import { clearLifecycleDbCacheForTest, getLifecycleDb } from "./db.js";
7+
import { acceptTurn, finalizeTurn, pruneTurns } from "./turns.js";
8+
9+
function makeCtx(overrides: Partial<MsgContext> = {}): MsgContext {
10+
return {
11+
Body: "hello",
12+
From: "+1555",
13+
To: "+1999",
14+
SessionKey: "session-1",
15+
AccountId: "acct-1",
16+
MessageSid: "msg-001",
17+
Provider: "telegram",
18+
OriginatingChannel: "telegram",
19+
OriginatingTo: "+1999",
20+
...overrides,
21+
} as MsgContext;
22+
}
23+
24+
describe("acceptTurn persistent dedup", () => {
25+
let tmpDir: string;
26+
27+
beforeEach(() => {
28+
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-turns-test-"));
29+
});
30+
31+
afterEach(() => {
32+
clearLifecycleDbCacheForTest();
33+
fs.rmSync(tmpDir, { recursive: true, force: true });
34+
});
35+
36+
it("first accept returns accepted: true", () => {
37+
const result = acceptTurn(makeCtx(), { stateDir: tmpDir });
38+
expect(result.accepted).toBe(true);
39+
expect(result.id).toBeTruthy();
40+
});
41+
42+
it("duplicate dedupe_key returns accepted: false", () => {
43+
const ctx = makeCtx();
44+
const first = acceptTurn(ctx, { stateDir: tmpDir });
45+
expect(first.accepted).toBe(true);
46+
47+
const second = acceptTurn(ctx, { stateDir: tmpDir });
48+
expect(second.accepted).toBe(false);
49+
});
50+
51+
it("different messages are accepted independently", () => {
52+
const ctx1 = makeCtx({ MessageSid: "msg-001" });
53+
const ctx2 = makeCtx({ MessageSid: "msg-002" });
54+
55+
expect(acceptTurn(ctx1, { stateDir: tmpDir }).accepted).toBe(true);
56+
expect(acceptTurn(ctx2, { stateDir: tmpDir }).accepted).toBe(true);
57+
});
58+
59+
it("missing MessageSid falls through to unconditional INSERT", () => {
60+
const ctx = makeCtx({ MessageSid: undefined });
61+
const first = acceptTurn(ctx, { stateDir: tmpDir });
62+
expect(first.accepted).toBe(true);
63+
64+
// Without a dedupe key, a second insert also succeeds (no dedup)
65+
const second = acceptTurn(ctx, { stateDir: tmpDir });
66+
expect(second.accepted).toBe(true);
67+
});
68+
69+
it("missing Provider falls through to unconditional INSERT", () => {
70+
const ctx = makeCtx({
71+
Provider: undefined,
72+
OriginatingChannel: undefined,
73+
Surface: undefined,
74+
});
75+
const first = acceptTurn(ctx, { stateDir: tmpDir });
76+
expect(first.accepted).toBe(true);
77+
78+
const second = acceptTurn(ctx, { stateDir: tmpDir });
79+
expect(second.accepted).toBe(true);
80+
});
81+
82+
it("dedup survives across calls (simulates restart persistence)", () => {
83+
const ctx = makeCtx();
84+
expect(acceptTurn(ctx, { stateDir: tmpDir }).accepted).toBe(true);
85+
86+
// Clear the DB cache to simulate a restart — DB file persists on disk
87+
clearLifecycleDbCacheForTest();
88+
89+
expect(acceptTurn(ctx, { stateDir: tmpDir }).accepted).toBe(false);
90+
});
91+
92+
it("pruned turn allows re-acceptance", () => {
93+
const ctx = makeCtx();
94+
const result = acceptTurn(ctx, { stateDir: tmpDir });
95+
expect(result.accepted).toBe(true);
96+
97+
// Finalize and backdate so prune with age=0 picks it up (strict < comparison)
98+
finalizeTurn(result.id, "delivered", { stateDir: tmpDir });
99+
const db = getLifecycleDb(tmpDir);
100+
const past = Date.now() - 10_000;
101+
db.prepare(
102+
"UPDATE message_turns SET completed_at=?, updated_at=?, accepted_at=? WHERE id=?",
103+
).run(past, past, past, result.id);
104+
pruneTurns(0, { stateDir: tmpDir });
105+
106+
// Same dedupe key should now be accepted again
107+
const reaccepted = acceptTurn(ctx, { stateDir: tmpDir });
108+
expect(reaccepted.accepted).toBe(true);
109+
});
110+
});

src/infra/message-lifecycle/turns.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -303,12 +303,8 @@ export function acceptTurn(
303303
const db = getLifecycleDb(opts?.stateDir);
304304
const id = opts?.turnId?.trim() || generateSecureUuid();
305305
const now = Date.now();
306-
// Keep durable turn tracking enabled while deferring persistent dedupe to the
307-
// existing inbound dedupe path until per-channel message identity semantics
308-
// are fully normalized (for example callback/query ids vs message ids).
309-
const disablePersistentDedupe = true;
310-
const dedupeKey = disablePersistentDedupe ? undefined : (buildInboundDedupeKey(ctx) ?? undefined);
311-
const externalId = disablePersistentDedupe ? null : ctx.MessageSid?.trim() || null;
306+
const dedupeKey = buildInboundDedupeKey(ctx) ?? undefined;
307+
const externalId = ctx.MessageSid?.trim() || null;
312308
const channel = String(ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider ?? "").toLowerCase();
313309
const accountId = ctx.AccountId?.trim() ?? "";
314310
const sessionKey = ctx.SessionKey?.trim() ?? "";

0 commit comments

Comments
 (0)