Skip to content

Commit 5edcab2

Browse files
steipeterylena
andcommitted
fix(queue): land openclaw#33168 from @rylena
Landed from contributor PR openclaw#33168 by @rylena. Co-authored-by: Rylen Anil <[email protected]>
1 parent 149ae45 commit 5edcab2

File tree

4 files changed

+93
-3
lines changed

4 files changed

+93
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ Docs: https://docs.openclaw.ai
318318
- Exec approvals/gateway-node policy: honor explicit `ask=off` from `exec-approvals.json` even when runtime defaults are stricter, so trusted full/off setups stop re-prompting on gateway and node exec paths. Landed from contributor PR #26789 by @pandego. Thanks @pandego.
319319
- Exec approvals/config fallback: inherit `ask` from `exec-approvals.json` when `tools.exec.ask` is unset, so local full/off defaults no longer fall back to `on-miss` for exec tool and `nodes run`. Landed from contributor PR #29187 by @Bartok9. Thanks @Bartok9.
320320
- Exec approvals/allow-always shell scripts: persist and match script paths for wrapper invocations like `bash scripts/foo.sh` while still blocking `-c`/`-s` wrapper bypasses. Landed from contributor PR #35137 by @yuweuii. Thanks @yuweuii.
321+
- Queue/followup dedupe across drain restarts: dedupe queued redelivery `message_id` values after queue recreation so busy-session followups no longer duplicate on replayed inbound events. Landed from contributor PR #33168 by @rylena. Thanks @rylena.
321322

322323
## 2026.3.2
323324

src/auto-reply/reply/queue.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ export { extractQueueDirective } from "./queue/directive.js";
22
export { clearSessionQueues } from "./queue/cleanup.js";
33
export type { ClearSessionQueueResult } from "./queue/cleanup.js";
44
export { scheduleFollowupDrain } from "./queue/drain.js";
5-
export { enqueueFollowupRun, getFollowupQueueDepth } from "./queue/enqueue.js";
5+
export {
6+
enqueueFollowupRun,
7+
getFollowupQueueDepth,
8+
resetRecentQueuedMessageIdDedupe,
9+
} from "./queue/enqueue.js";
610
export { resolveQueueSettings } from "./queue/settings.js";
711
export { clearFollowupQueue } from "./queue/state.js";
812
export type {

src/auto-reply/reply/queue/enqueue.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,28 @@
1+
import { createDedupeCache } from "../../../infra/dedupe.js";
12
import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js";
23
import { kickFollowupDrainIfIdle } from "./drain.js";
34
import { getExistingFollowupQueue, getFollowupQueue } from "./state.js";
45
import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js";
56

7+
const RECENT_QUEUE_MESSAGE_IDS = createDedupeCache({
8+
ttlMs: 5 * 60 * 1000,
9+
maxSize: 10_000,
10+
});
11+
12+
function buildRecentMessageIdKey(run: FollowupRun, queueKey: string): string | undefined {
13+
const messageId = run.messageId?.trim();
14+
if (!messageId) {
15+
return undefined;
16+
}
17+
const route = [
18+
run.originatingChannel ?? "",
19+
run.originatingTo ?? "",
20+
run.originatingAccountId ?? "",
21+
run.originatingThreadId == null ? "" : String(run.originatingThreadId),
22+
].join("|");
23+
return `${queueKey}|${route}|${messageId}`;
24+
}
25+
626
function isRunAlreadyQueued(
727
run: FollowupRun,
828
items: FollowupRun[],
@@ -31,6 +51,11 @@ export function enqueueFollowupRun(
3151
dedupeMode: QueueDedupeMode = "message-id",
3252
): boolean {
3353
const queue = getFollowupQueue(key, settings);
54+
const recentMessageIdKey = dedupeMode !== "none" ? buildRecentMessageIdKey(run, key) : undefined;
55+
if (recentMessageIdKey && RECENT_QUEUE_MESSAGE_IDS.peek(recentMessageIdKey)) {
56+
return false;
57+
}
58+
3459
const dedupe =
3560
dedupeMode === "none"
3661
? undefined
@@ -54,6 +79,9 @@ export function enqueueFollowupRun(
5479
}
5580

5681
queue.items.push(run);
82+
if (recentMessageIdKey) {
83+
RECENT_QUEUE_MESSAGE_IDS.check(recentMessageIdKey);
84+
}
5785
// If drain finished and deleted the queue before this item arrived, a new queue
5886
// object was created (draining: false) but nobody scheduled a drain for it.
5987
// Use the cached callback to restart the drain now.
@@ -70,3 +98,7 @@ export function getFollowupQueueDepth(key: string): number {
7098
}
7199
return queue.items.length;
72100
}
101+
102+
export function resetRecentQueuedMessageIdDedupe(): void {
103+
RECENT_QUEUE_MESSAGE_IDS.clear();
104+
}

src/auto-reply/reply/reply-flow.test.ts

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
1+
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
22
import { expectInboundContextContract } from "../../../test/helpers/inbound-contract.js";
33
import type { OpenClawConfig } from "../../config/config.js";
44
import { defaultRuntime } from "../../runtime.js";
@@ -8,7 +8,11 @@ import { finalizeInboundContext } from "./inbound-context.js";
88
import { normalizeInboundTextNewlines } from "./inbound-text.js";
99
import { parseLineDirectives, hasLineDirectives } from "./line-directives.js";
1010
import type { FollowupRun, QueueSettings } from "./queue.js";
11-
import { enqueueFollowupRun, scheduleFollowupDrain } from "./queue.js";
11+
import {
12+
enqueueFollowupRun,
13+
resetRecentQueuedMessageIdDedupe,
14+
scheduleFollowupDrain,
15+
} from "./queue.js";
1216
import { createReplyDispatcher } from "./reply-dispatcher.js";
1317
import { createReplyToModeFilter, resolveReplyToMode } from "./reply-threading.js";
1418

@@ -627,6 +631,10 @@ function createRun(params: {
627631
}
628632

629633
describe("followup queue deduplication", () => {
634+
beforeEach(() => {
635+
resetRecentQueuedMessageIdDedupe();
636+
});
637+
630638
it("deduplicates messages with same Discord message_id", async () => {
631639
const key = `test-dedup-message-id-${Date.now()}`;
632640
const calls: FollowupRun[] = [];
@@ -690,6 +698,51 @@ describe("followup queue deduplication", () => {
690698
expect(calls[0]?.prompt).toContain("[Queued messages while agent was busy]");
691699
});
692700

701+
it("deduplicates same message_id after queue drain restarts", async () => {
702+
const key = `test-dedup-after-drain-${Date.now()}`;
703+
const calls: FollowupRun[] = [];
704+
const done = createDeferred<void>();
705+
const runFollowup = async (run: FollowupRun) => {
706+
calls.push(run);
707+
done.resolve();
708+
};
709+
const settings: QueueSettings = {
710+
mode: "collect",
711+
debounceMs: 0,
712+
cap: 50,
713+
dropPolicy: "summarize",
714+
};
715+
716+
const first = enqueueFollowupRun(
717+
key,
718+
createRun({
719+
prompt: "first",
720+
messageId: "same-id",
721+
originatingChannel: "signal",
722+
originatingTo: "+10000000000",
723+
}),
724+
settings,
725+
);
726+
expect(first).toBe(true);
727+
728+
scheduleFollowupDrain(key, runFollowup);
729+
await done.promise;
730+
731+
const redelivery = enqueueFollowupRun(
732+
key,
733+
createRun({
734+
prompt: "first-redelivery",
735+
messageId: "same-id",
736+
originatingChannel: "signal",
737+
originatingTo: "+10000000000",
738+
}),
739+
settings,
740+
);
741+
742+
expect(redelivery).toBe(false);
743+
expect(calls).toHaveLength(1);
744+
});
745+
693746
it("deduplicates exact prompt when routing matches and no message id", async () => {
694747
const key = `test-dedup-whatsapp-${Date.now()}`;
695748
const settings: QueueSettings = {

0 commit comments

Comments
 (0)