Skip to content

Commit 4ca84ac

Browse files
vincentkocngutman
andauthored
fix(runtime): duplicate messages, share singleton state across bundled chunks (openclaw#43683)
* Tests: add fresh module import helper * Process: share command queue runtime state * Agents: share embedded run runtime state * Reply: share followup queue runtime state * Reply: share followup drain callback state * Reply: share queued message dedupe state * Reply: share inbound dedupe state * Tests: cover shared command queue runtime state * Tests: cover shared embedded run runtime state * Tests: cover shared followup queue runtime state * Tests: cover shared inbound dedupe state * Tests: cover shared Slack thread participation state * Slack: share sent thread participation state * Tests: document fresh import helper * Telegram: share draft stream runtime state * Tests: cover shared Telegram draft stream state * Telegram: share sent message cache state * Tests: cover shared Telegram sent message cache * Telegram: share thread binding runtime state * Tests: cover shared Telegram thread binding state * Tests: avoid duplicate shared queue reset * refactor(runtime): centralize global singleton access * refactor(runtime): preserve undefined global singleton values * test(runtime): cover undefined global singleton values --------- Co-authored-by: Nimrod Gutman <[email protected]>
1 parent 08aa57a commit 4ca84ac

21 files changed

+569
-38
lines changed

src/agents/pi-embedded-runner/runs.test.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { afterEach, describe, expect, it, vi } from "vitest";
2+
import { importFreshModule } from "../../../test/helpers/import-fresh.js";
23
import {
34
__testing,
45
abortEmbeddedPiRun,
@@ -105,4 +106,35 @@ describe("pi-embedded runner run registry", () => {
105106
vi.useRealTimers();
106107
}
107108
});
109+
110+
it("shares active run state across distinct module instances", async () => {
111+
const runsA = await importFreshModule<typeof import("./runs.js")>(
112+
import.meta.url,
113+
"./runs.js?scope=shared-a",
114+
);
115+
const runsB = await importFreshModule<typeof import("./runs.js")>(
116+
import.meta.url,
117+
"./runs.js?scope=shared-b",
118+
);
119+
const handle = {
120+
queueMessage: async () => {},
121+
isStreaming: () => true,
122+
isCompacting: () => false,
123+
abort: vi.fn(),
124+
};
125+
126+
runsA.__testing.resetActiveEmbeddedRuns();
127+
runsB.__testing.resetActiveEmbeddedRuns();
128+
129+
try {
130+
runsA.setActiveEmbeddedRun("session-shared", handle);
131+
expect(runsB.isEmbeddedPiRunActive("session-shared")).toBe(true);
132+
133+
runsB.clearActiveEmbeddedRun("session-shared", handle);
134+
expect(runsA.isEmbeddedPiRunActive("session-shared")).toBe(false);
135+
} finally {
136+
runsA.__testing.resetActiveEmbeddedRuns();
137+
runsB.__testing.resetActiveEmbeddedRuns();
138+
}
139+
});
108140
});

src/agents/pi-embedded-runner/runs.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
logMessageQueued,
44
logSessionStateChange,
55
} from "../../logging/diagnostic.js";
6+
import { resolveGlobalSingleton } from "../../shared/global-singleton.js";
67

78
type EmbeddedPiQueueHandle = {
89
queueMessage: (text: string) => Promise<void>;
@@ -11,12 +12,23 @@ type EmbeddedPiQueueHandle = {
1112
abort: () => void;
1213
};
1314

14-
const ACTIVE_EMBEDDED_RUNS = new Map<string, EmbeddedPiQueueHandle>();
1515
type EmbeddedRunWaiter = {
1616
resolve: (ended: boolean) => void;
1717
timer: NodeJS.Timeout;
1818
};
19-
const EMBEDDED_RUN_WAITERS = new Map<string, Set<EmbeddedRunWaiter>>();
19+
20+
/**
21+
* Use global singleton state so busy/streaming checks stay consistent even
22+
* when the bundler emits multiple copies of this module into separate chunks.
23+
*/
24+
const EMBEDDED_RUN_STATE_KEY = Symbol.for("openclaw.embeddedRunState");
25+
26+
const embeddedRunState = resolveGlobalSingleton(EMBEDDED_RUN_STATE_KEY, () => ({
27+
activeRuns: new Map<string, EmbeddedPiQueueHandle>(),
28+
waiters: new Map<string, Set<EmbeddedRunWaiter>>(),
29+
}));
30+
const ACTIVE_EMBEDDED_RUNS = embeddedRunState.activeRuns;
31+
const EMBEDDED_RUN_WAITERS = embeddedRunState.waiters;
2032

2133
export function queueEmbeddedPiMessage(sessionId: string, text: string): boolean {
2234
const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { afterEach, describe, expect, it } from "vitest";
2+
import { importFreshModule } from "../../../test/helpers/import-fresh.js";
3+
import type { MsgContext } from "../templating.js";
4+
import { resetInboundDedupe } from "./inbound-dedupe.js";
5+
6+
const sharedInboundContext: MsgContext = {
7+
Provider: "discord",
8+
Surface: "discord",
9+
From: "discord:user-1",
10+
To: "channel:c1",
11+
OriginatingChannel: "discord",
12+
OriginatingTo: "channel:c1",
13+
SessionKey: "agent:main:discord:channel:c1",
14+
MessageSid: "msg-1",
15+
};
16+
17+
describe("inbound dedupe", () => {
18+
afterEach(() => {
19+
resetInboundDedupe();
20+
});
21+
22+
it("shares dedupe state across distinct module instances", async () => {
23+
const inboundA = await importFreshModule<typeof import("./inbound-dedupe.js")>(
24+
import.meta.url,
25+
"./inbound-dedupe.js?scope=shared-a",
26+
);
27+
const inboundB = await importFreshModule<typeof import("./inbound-dedupe.js")>(
28+
import.meta.url,
29+
"./inbound-dedupe.js?scope=shared-b",
30+
);
31+
32+
inboundA.resetInboundDedupe();
33+
inboundB.resetInboundDedupe();
34+
35+
try {
36+
expect(inboundA.shouldSkipDuplicateInbound(sharedInboundContext)).toBe(false);
37+
expect(inboundB.shouldSkipDuplicateInbound(sharedInboundContext)).toBe(true);
38+
} finally {
39+
inboundA.resetInboundDedupe();
40+
inboundB.resetInboundDedupe();
41+
}
42+
});
43+
});

src/auto-reply/reply/inbound-dedupe.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,24 @@
11
import { logVerbose, shouldLogVerbose } from "../../globals.js";
22
import { createDedupeCache, type DedupeCache } from "../../infra/dedupe.js";
33
import { parseAgentSessionKey } from "../../sessions/session-key-utils.js";
4+
import { resolveGlobalSingleton } from "../../shared/global-singleton.js";
45
import type { MsgContext } from "../templating.js";
56

67
const DEFAULT_INBOUND_DEDUPE_TTL_MS = 20 * 60_000;
78
const DEFAULT_INBOUND_DEDUPE_MAX = 5000;
89

9-
const inboundDedupeCache = createDedupeCache({
10-
ttlMs: DEFAULT_INBOUND_DEDUPE_TTL_MS,
11-
maxSize: DEFAULT_INBOUND_DEDUPE_MAX,
12-
});
10+
/**
11+
* Keep inbound dedupe shared across bundled chunks so the same provider
12+
* message cannot bypass dedupe by entering through a different chunk copy.
13+
*/
14+
const INBOUND_DEDUPE_CACHE_KEY = Symbol.for("openclaw.inboundDedupeCache");
15+
16+
const inboundDedupeCache = resolveGlobalSingleton<DedupeCache>(INBOUND_DEDUPE_CACHE_KEY, () =>
17+
createDedupeCache({
18+
ttlMs: DEFAULT_INBOUND_DEDUPE_TTL_MS,
19+
maxSize: DEFAULT_INBOUND_DEDUPE_MAX,
20+
}),
21+
);
1322

1423
const normalizeProvider = (value?: string | null) => value?.trim().toLowerCase() || "";
1524

src/auto-reply/reply/queue/drain.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { defaultRuntime } from "../../../runtime.js";
2+
import { resolveGlobalMap } from "../../../shared/global-singleton.js";
23
import {
34
buildCollectPrompt,
45
beginQueueDrain,
@@ -15,7 +16,11 @@ import type { FollowupRun } from "./types.js";
1516

1617
// Persists the most recent runFollowup callback per queue key so that
1718
// enqueueFollowupRun can restart a drain that finished and deleted the queue.
18-
const FOLLOWUP_RUN_CALLBACKS = new Map<string, (run: FollowupRun) => Promise<void>>();
19+
const FOLLOWUP_DRAIN_CALLBACKS_KEY = Symbol.for("openclaw.followupDrainCallbacks");
20+
21+
const FOLLOWUP_RUN_CALLBACKS = resolveGlobalMap<string, (run: FollowupRun) => Promise<void>>(
22+
FOLLOWUP_DRAIN_CALLBACKS_KEY,
23+
);
1924

2025
export function clearFollowupDrainCallback(key: string): void {
2126
FOLLOWUP_RUN_CALLBACKS.delete(key);

src/auto-reply/reply/queue/enqueue.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
11
import { createDedupeCache } from "../../../infra/dedupe.js";
2+
import { resolveGlobalSingleton } from "../../../shared/global-singleton.js";
23
import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js";
34
import { kickFollowupDrainIfIdle } from "./drain.js";
45
import { getExistingFollowupQueue, getFollowupQueue } from "./state.js";
56
import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js";
67

7-
const RECENT_QUEUE_MESSAGE_IDS = createDedupeCache({
8-
ttlMs: 5 * 60 * 1000,
9-
maxSize: 10_000,
10-
});
8+
/**
9+
* Keep queued message-id dedupe shared across bundled chunks so redeliveries
10+
* are rejected no matter which chunk receives the enqueue call.
11+
*/
12+
const RECENT_QUEUE_MESSAGE_IDS_KEY = Symbol.for("openclaw.recentQueueMessageIds");
13+
14+
const RECENT_QUEUE_MESSAGE_IDS = resolveGlobalSingleton(RECENT_QUEUE_MESSAGE_IDS_KEY, () =>
15+
createDedupeCache({
16+
ttlMs: 5 * 60 * 1000,
17+
maxSize: 10_000,
18+
}),
19+
);
1120

1221
function buildRecentMessageIdKey(run: FollowupRun, queueKey: string): string | undefined {
1322
const messageId = run.messageId?.trim();

src/auto-reply/reply/queue/state.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { resolveGlobalMap } from "../../../shared/global-singleton.js";
12
import { applyQueueRuntimeSettings } from "../../../utils/queue-helpers.js";
23
import type { FollowupRun, QueueDropPolicy, QueueMode, QueueSettings } from "./types.js";
34

@@ -18,7 +19,13 @@ export const DEFAULT_QUEUE_DEBOUNCE_MS = 1000;
1819
export const DEFAULT_QUEUE_CAP = 20;
1920
export const DEFAULT_QUEUE_DROP: QueueDropPolicy = "summarize";
2021

21-
export const FOLLOWUP_QUEUES = new Map<string, FollowupQueueState>();
22+
/**
23+
* Share followup queues across bundled chunks so busy-session enqueue/drain
24+
* logic observes one queue registry per process.
25+
*/
26+
const FOLLOWUP_QUEUES_KEY = Symbol.for("openclaw.followupQueues");
27+
28+
export const FOLLOWUP_QUEUES = resolveGlobalMap<string, FollowupQueueState>(FOLLOWUP_QUEUES_KEY);
2229

2330
export function getExistingFollowupQueue(key: string): FollowupQueueState | undefined {
2431
const cleaned = key.trim();

src/auto-reply/reply/reply-flow.test.ts

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
2+
import { importFreshModule } from "../../../test/helpers/import-fresh.js";
23
import { expectInboundContextContract } from "../../../test/helpers/inbound-contract.js";
34
import type { OpenClawConfig } from "../../config/config.js";
45
import { defaultRuntime } from "../../runtime.js";
@@ -743,6 +744,71 @@ describe("followup queue deduplication", () => {
743744
expect(calls).toHaveLength(1);
744745
});
745746

747+
it("deduplicates same message_id across distinct enqueue module instances", async () => {
748+
const enqueueA = await importFreshModule<typeof import("./queue/enqueue.js")>(
749+
import.meta.url,
750+
"./queue/enqueue.js?scope=dedupe-a",
751+
);
752+
const enqueueB = await importFreshModule<typeof import("./queue/enqueue.js")>(
753+
import.meta.url,
754+
"./queue/enqueue.js?scope=dedupe-b",
755+
);
756+
const { clearSessionQueues } = await import("./queue.js");
757+
const key = `test-dedup-cross-module-${Date.now()}`;
758+
const calls: FollowupRun[] = [];
759+
const done = createDeferred<void>();
760+
const runFollowup = async (run: FollowupRun) => {
761+
calls.push(run);
762+
done.resolve();
763+
};
764+
const settings: QueueSettings = {
765+
mode: "collect",
766+
debounceMs: 0,
767+
cap: 50,
768+
dropPolicy: "summarize",
769+
};
770+
771+
enqueueA.resetRecentQueuedMessageIdDedupe();
772+
enqueueB.resetRecentQueuedMessageIdDedupe();
773+
774+
try {
775+
expect(
776+
enqueueA.enqueueFollowupRun(
777+
key,
778+
createRun({
779+
prompt: "first",
780+
messageId: "same-id",
781+
originatingChannel: "signal",
782+
originatingTo: "+10000000000",
783+
}),
784+
settings,
785+
),
786+
).toBe(true);
787+
788+
scheduleFollowupDrain(key, runFollowup);
789+
await done.promise;
790+
await new Promise<void>((resolve) => setImmediate(resolve));
791+
792+
expect(
793+
enqueueB.enqueueFollowupRun(
794+
key,
795+
createRun({
796+
prompt: "first-redelivery",
797+
messageId: "same-id",
798+
originatingChannel: "signal",
799+
originatingTo: "+10000000000",
800+
}),
801+
settings,
802+
),
803+
).toBe(false);
804+
expect(calls).toHaveLength(1);
805+
} finally {
806+
clearSessionQueues([key]);
807+
enqueueA.resetRecentQueuedMessageIdDedupe();
808+
enqueueB.resetRecentQueuedMessageIdDedupe();
809+
}
810+
});
811+
746812
it("does not collide recent message-id keys when routing contains delimiters", async () => {
747813
const key = `test-dedup-key-collision-${Date.now()}`;
748814
const calls: FollowupRun[] = [];
@@ -1264,6 +1330,55 @@ describe("followup queue drain restart after idle window", () => {
12641330
expect(calls[1]?.prompt).toBe("after-idle");
12651331
});
12661332

1333+
it("restarts an idle drain across distinct enqueue and drain module instances", async () => {
1334+
const drainA = await importFreshModule<typeof import("./queue/drain.js")>(
1335+
import.meta.url,
1336+
"./queue/drain.js?scope=restart-a",
1337+
);
1338+
const enqueueB = await importFreshModule<typeof import("./queue/enqueue.js")>(
1339+
import.meta.url,
1340+
"./queue/enqueue.js?scope=restart-b",
1341+
);
1342+
const { clearSessionQueues } = await import("./queue.js");
1343+
const key = `test-idle-window-cross-module-${Date.now()}`;
1344+
const calls: FollowupRun[] = [];
1345+
const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 };
1346+
const firstProcessed = createDeferred<void>();
1347+
1348+
enqueueB.resetRecentQueuedMessageIdDedupe();
1349+
1350+
try {
1351+
const runFollowup = async (run: FollowupRun) => {
1352+
calls.push(run);
1353+
if (calls.length === 1) {
1354+
firstProcessed.resolve();
1355+
}
1356+
};
1357+
1358+
enqueueB.enqueueFollowupRun(key, createRun({ prompt: "before-idle" }), settings);
1359+
drainA.scheduleFollowupDrain(key, runFollowup);
1360+
await firstProcessed.promise;
1361+
1362+
await new Promise<void>((resolve) => setImmediate(resolve));
1363+
1364+
enqueueB.enqueueFollowupRun(key, createRun({ prompt: "after-idle" }), settings);
1365+
1366+
await vi.waitFor(
1367+
() => {
1368+
expect(calls).toHaveLength(2);
1369+
},
1370+
{ timeout: 1_000 },
1371+
);
1372+
1373+
expect(calls[0]?.prompt).toBe("before-idle");
1374+
expect(calls[1]?.prompt).toBe("after-idle");
1375+
} finally {
1376+
clearSessionQueues([key]);
1377+
drainA.clearFollowupDrainCallback(key);
1378+
enqueueB.resetRecentQueuedMessageIdDedupe();
1379+
}
1380+
});
1381+
12671382
it("does not double-drain when a message arrives while drain is still running", async () => {
12681383
const key = `test-no-double-drain-${Date.now()}`;
12691384
const calls: FollowupRun[] = [];

src/process/command-queue.test.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { beforeEach, describe, expect, it, vi } from "vitest";
2+
import { importFreshModule } from "../../test/helpers/import-fresh.js";
23

34
const diagnosticMocks = vi.hoisted(() => ({
45
logLaneEnqueue: vi.fn(),
@@ -334,4 +335,42 @@ describe("command queue", () => {
334335
resetAllLanes();
335336
await expect(enqueueCommand(async () => "ok")).resolves.toBe("ok");
336337
});
338+
339+
it("shares lane state across distinct module instances", async () => {
340+
const commandQueueA = await importFreshModule<typeof import("./command-queue.js")>(
341+
import.meta.url,
342+
"./command-queue.js?scope=shared-a",
343+
);
344+
const commandQueueB = await importFreshModule<typeof import("./command-queue.js")>(
345+
import.meta.url,
346+
"./command-queue.js?scope=shared-b",
347+
);
348+
const lane = `shared-state-${Date.now()}-${Math.random().toString(16).slice(2)}`;
349+
350+
let release!: () => void;
351+
const blocker = new Promise<void>((resolve) => {
352+
release = resolve;
353+
});
354+
355+
commandQueueA.resetAllLanes();
356+
357+
try {
358+
const task = commandQueueA.enqueueCommandInLane(lane, async () => {
359+
await blocker;
360+
return "done";
361+
});
362+
363+
await vi.waitFor(() => {
364+
expect(commandQueueB.getQueueSize(lane)).toBe(1);
365+
expect(commandQueueB.getActiveTaskCount()).toBe(1);
366+
});
367+
368+
release();
369+
await expect(task).resolves.toBe("done");
370+
expect(commandQueueB.getQueueSize(lane)).toBe(0);
371+
} finally {
372+
release();
373+
commandQueueA.resetAllLanes();
374+
}
375+
});
337376
});

0 commit comments

Comments
 (0)