|
1 | 1 | import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; |
| 2 | +import { importFreshModule } from "../../../test/helpers/import-fresh.js"; |
2 | 3 | import { expectInboundContextContract } from "../../../test/helpers/inbound-contract.js"; |
3 | 4 | import type { OpenClawConfig } from "../../config/config.js"; |
4 | 5 | import { defaultRuntime } from "../../runtime.js"; |
@@ -743,6 +744,71 @@ describe("followup queue deduplication", () => { |
743 | 744 | expect(calls).toHaveLength(1); |
744 | 745 | }); |
745 | 746 |
|
| 747 | + it("deduplicates same message_id across distinct enqueue module instances", async () => { |
| 748 | + const enqueueA = await importFreshModule<typeof import("./queue/enqueue.js")>( |
| 749 | + import.meta.url, |
| 750 | + "./queue/enqueue.js?scope=dedupe-a", |
| 751 | + ); |
| 752 | + const enqueueB = await importFreshModule<typeof import("./queue/enqueue.js")>( |
| 753 | + import.meta.url, |
| 754 | + "./queue/enqueue.js?scope=dedupe-b", |
| 755 | + ); |
| 756 | + const { clearSessionQueues } = await import("./queue.js"); |
| 757 | + const key = `test-dedup-cross-module-${Date.now()}`; |
| 758 | + const calls: FollowupRun[] = []; |
| 759 | + const done = createDeferred<void>(); |
| 760 | + const runFollowup = async (run: FollowupRun) => { |
| 761 | + calls.push(run); |
| 762 | + done.resolve(); |
| 763 | + }; |
| 764 | + const settings: QueueSettings = { |
| 765 | + mode: "collect", |
| 766 | + debounceMs: 0, |
| 767 | + cap: 50, |
| 768 | + dropPolicy: "summarize", |
| 769 | + }; |
| 770 | + |
| 771 | + enqueueA.resetRecentQueuedMessageIdDedupe(); |
| 772 | + enqueueB.resetRecentQueuedMessageIdDedupe(); |
| 773 | + |
| 774 | + try { |
| 775 | + expect( |
| 776 | + enqueueA.enqueueFollowupRun( |
| 777 | + key, |
| 778 | + createRun({ |
| 779 | + prompt: "first", |
| 780 | + messageId: "same-id", |
| 781 | + originatingChannel: "signal", |
| 782 | + originatingTo: "+10000000000", |
| 783 | + }), |
| 784 | + settings, |
| 785 | + ), |
| 786 | + ).toBe(true); |
| 787 | + |
| 788 | + scheduleFollowupDrain(key, runFollowup); |
| 789 | + await done.promise; |
| 790 | + await new Promise<void>((resolve) => setImmediate(resolve)); |
| 791 | + |
| 792 | + expect( |
| 793 | + enqueueB.enqueueFollowupRun( |
| 794 | + key, |
| 795 | + createRun({ |
| 796 | + prompt: "first-redelivery", |
| 797 | + messageId: "same-id", |
| 798 | + originatingChannel: "signal", |
| 799 | + originatingTo: "+10000000000", |
| 800 | + }), |
| 801 | + settings, |
| 802 | + ), |
| 803 | + ).toBe(false); |
| 804 | + expect(calls).toHaveLength(1); |
| 805 | + } finally { |
| 806 | + clearSessionQueues([key]); |
| 807 | + enqueueA.resetRecentQueuedMessageIdDedupe(); |
| 808 | + enqueueB.resetRecentQueuedMessageIdDedupe(); |
| 809 | + } |
| 810 | + }); |
| 811 | + |
746 | 812 | it("does not collide recent message-id keys when routing contains delimiters", async () => { |
747 | 813 | const key = `test-dedup-key-collision-${Date.now()}`; |
748 | 814 | const calls: FollowupRun[] = []; |
@@ -1264,6 +1330,55 @@ describe("followup queue drain restart after idle window", () => { |
1264 | 1330 | expect(calls[1]?.prompt).toBe("after-idle"); |
1265 | 1331 | }); |
1266 | 1332 |
|
| 1333 | + it("restarts an idle drain across distinct enqueue and drain module instances", async () => { |
| 1334 | + const drainA = await importFreshModule<typeof import("./queue/drain.js")>( |
| 1335 | + import.meta.url, |
| 1336 | + "./queue/drain.js?scope=restart-a", |
| 1337 | + ); |
| 1338 | + const enqueueB = await importFreshModule<typeof import("./queue/enqueue.js")>( |
| 1339 | + import.meta.url, |
| 1340 | + "./queue/enqueue.js?scope=restart-b", |
| 1341 | + ); |
| 1342 | + const { clearSessionQueues } = await import("./queue.js"); |
| 1343 | + const key = `test-idle-window-cross-module-${Date.now()}`; |
| 1344 | + const calls: FollowupRun[] = []; |
| 1345 | + const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 }; |
| 1346 | + const firstProcessed = createDeferred<void>(); |
| 1347 | + |
| 1348 | + enqueueB.resetRecentQueuedMessageIdDedupe(); |
| 1349 | + |
| 1350 | + try { |
| 1351 | + const runFollowup = async (run: FollowupRun) => { |
| 1352 | + calls.push(run); |
| 1353 | + if (calls.length === 1) { |
| 1354 | + firstProcessed.resolve(); |
| 1355 | + } |
| 1356 | + }; |
| 1357 | + |
| 1358 | + enqueueB.enqueueFollowupRun(key, createRun({ prompt: "before-idle" }), settings); |
| 1359 | + drainA.scheduleFollowupDrain(key, runFollowup); |
| 1360 | + await firstProcessed.promise; |
| 1361 | + |
| 1362 | + await new Promise<void>((resolve) => setImmediate(resolve)); |
| 1363 | + |
| 1364 | + enqueueB.enqueueFollowupRun(key, createRun({ prompt: "after-idle" }), settings); |
| 1365 | + |
| 1366 | + await vi.waitFor( |
| 1367 | + () => { |
| 1368 | + expect(calls).toHaveLength(2); |
| 1369 | + }, |
| 1370 | + { timeout: 1_000 }, |
| 1371 | + ); |
| 1372 | + |
| 1373 | + expect(calls[0]?.prompt).toBe("before-idle"); |
| 1374 | + expect(calls[1]?.prompt).toBe("after-idle"); |
| 1375 | + } finally { |
| 1376 | + clearSessionQueues([key]); |
| 1377 | + drainA.clearFollowupDrainCallback(key); |
| 1378 | + enqueueB.resetRecentQueuedMessageIdDedupe(); |
| 1379 | + } |
| 1380 | + }); |
| 1381 | + |
1267 | 1382 | it("does not double-drain when a message arrives while drain is still running", async () => { |
1268 | 1383 | const key = `test-no-double-drain-${Date.now()}`; |
1269 | 1384 | const calls: FollowupRun[] = []; |
|
0 commit comments