Skip to content

Commit f4a2bbe

Browse files
fix(feishu): add early event-level dedup to prevent duplicate replies (openclaw#43762)
* fix(feishu): add early event-level dedup to prevent duplicate replies Add synchronous in-memory dedup at EventDispatcher handler level using message_id as key with 5-minute TTL and 2000-entry cap. This catches duplicate events immediately when they arrive from the Lark SDK — before the inbound debouncer or processing queue — preventing the race condition where two concurrent dispatches enter the pipeline before either records the messageId in the downstream dedup layer. Fixes the root cause reported in openclaw#42687. * fix(feishu): correct inverted dedup condition check() returns false on first call (new key) and true on subsequent calls (duplicate). The previous `!check()` guard was inverted — dropping every first delivery and passing all duplicates. Remove the negation so the guard correctly drops duplicates. * fix(feishu): simplify eventDedup key — drop redundant accountId prefix eventDedup is already scoped per account (one instance per registerEventHandlers call), so the accountId prefix in the cache key is redundant. Use `evt:${messageId}` instead. * fix(feishu): share inbound processing claim dedupe --------- Co-authored-by: Tak Hoffman <[email protected]>
1 parent 2659fc6 commit f4a2bbe

File tree

6 files changed

+210
-45
lines changed

6 files changed

+210
-45
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ Docs: https://docs.openclaw.ai
139139
- Plugins/env-scoped roots: fix plugin discovery/load caches and provenance tracking so same-process `HOME`/`OPENCLAW_HOME` changes no longer reuse stale plugin state or misreport `~/...` plugins as untracked. (#44046) thanks @gumadeiras.
140140
- Gateway/session discovery: discover disk-only and retired ACP session stores under custom templated `session.store` roots so ACP reconciliation, session-id/session-label targeting, and run-id fallback keep working after restart. (#44176) thanks @gumadeiras.
141141
- Models/OpenRouter native ids: canonicalize native OpenRouter model keys across config writes, runtime lookups, fallback management, and `models list --plain`, and migrate legacy duplicated `openrouter/openrouter/...` config entries forward on write.
142+
- Feishu/event dedupe: keep early duplicate suppression aligned with the shared Feishu message-id contract and release the pre-queue dedupe marker after failed dispatch so retried events can recover instead of being dropped until the short TTL expires. (#43762) Thanks @yunweibang.
142143
- Gateway/hooks: bucket hook auth failures by forwarded client IP behind trusted proxies and warn when `hooks.allowedAgentIds` leaves hook routing unrestricted.
143144
- Agents/compaction: skip the post-compaction `cache-ttl` marker write when a compaction completed in the same attempt, preventing the next turn from immediately triggering a second tiny compaction. (#28548) thanks @MoerAI.
144145
- Native chat/macOS: add `/new`, `/reset`, and `/clear` reset triggers, keep shared main-session aliases aligned, and ignore stale model-selection completions so native chat state stays in sync across reset and fast model changes. (#10898) Thanks @Nachx639.

extensions/feishu/src/bot.ts

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import {
1515
} from "openclaw/plugin-sdk/feishu";
1616
import { resolveFeishuAccount } from "./accounts.js";
1717
import { createFeishuClient } from "./client.js";
18-
import { tryRecordMessage, tryRecordMessagePersistent } from "./dedup.js";
18+
import { finalizeFeishuMessageProcessing, tryRecordMessagePersistent } from "./dedup.js";
1919
import { maybeCreateDynamicAgent } from "./dynamic-agent.js";
2020
import { normalizeFeishuExternalKey } from "./external-keys.js";
2121
import { downloadMessageResourceFeishu } from "./media.js";
@@ -867,8 +867,18 @@ export async function handleFeishuMessage(params: {
867867
runtime?: RuntimeEnv;
868868
chatHistories?: Map<string, HistoryEntry[]>;
869869
accountId?: string;
870+
processingClaimHeld?: boolean;
870871
}): Promise<void> {
871-
const { cfg, event, botOpenId, botName, runtime, chatHistories, accountId } = params;
872+
const {
873+
cfg,
874+
event,
875+
botOpenId,
876+
botName,
877+
runtime,
878+
chatHistories,
879+
accountId,
880+
processingClaimHeld = false,
881+
} = params;
872882

873883
// Resolve account with merged config
874884
const account = resolveFeishuAccount({ cfg, accountId });
@@ -877,16 +887,15 @@ export async function handleFeishuMessage(params: {
877887
const log = runtime?.log ?? console.log;
878888
const error = runtime?.error ?? console.error;
879889

880-
// Dedup: synchronous memory guard prevents concurrent duplicate dispatch
881-
// before the async persistent check completes.
882890
const messageId = event.message.message_id;
883-
const memoryDedupeKey = `${account.accountId}:${messageId}`;
884-
if (!tryRecordMessage(memoryDedupeKey)) {
885-
log(`feishu: skipping duplicate message ${messageId} (memory dedup)`);
886-
return;
887-
}
888-
// Persistent dedup survives restarts and reconnects.
889-
if (!(await tryRecordMessagePersistent(messageId, account.accountId, log))) {
891+
if (
892+
!(await finalizeFeishuMessageProcessing({
893+
messageId,
894+
namespace: account.accountId,
895+
log,
896+
claimHeld: processingClaimHeld,
897+
}))
898+
) {
890899
log(`feishu: skipping duplicate message ${messageId}`);
891900
return;
892901
}

extensions/feishu/src/dedup.ts

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,15 @@ import {
1010
const DEDUP_TTL_MS = 24 * 60 * 60 * 1000;
1111
const MEMORY_MAX_SIZE = 1_000;
1212
const FILE_MAX_ENTRIES = 10_000;
13+
const EVENT_DEDUP_TTL_MS = 5 * 60 * 1000;
14+
const EVENT_MEMORY_MAX_SIZE = 2_000;
1315
type PersistentDedupeData = Record<string, number>;
1416

1517
const memoryDedupe = createDedupeCache({ ttlMs: DEDUP_TTL_MS, maxSize: MEMORY_MAX_SIZE });
18+
const processingClaims = createDedupeCache({
19+
ttlMs: EVENT_DEDUP_TTL_MS,
20+
maxSize: EVENT_MEMORY_MAX_SIZE,
21+
});
1622

1723
function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string {
1824
const stateOverride = env.OPENCLAW_STATE_DIR?.trim() || env.CLAWDBOT_STATE_DIR?.trim();
@@ -37,6 +43,103 @@ const persistentDedupe = createPersistentDedupe({
3743
resolveFilePath: resolveNamespaceFilePath,
3844
});
3945

46+
function resolveEventDedupeKey(
47+
namespace: string,
48+
messageId: string | undefined | null,
49+
): string | null {
50+
const trimmed = messageId?.trim();
51+
if (!trimmed) {
52+
return null;
53+
}
54+
return `${namespace}:${trimmed}`;
55+
}
56+
57+
function normalizeMessageId(messageId: string | undefined | null): string | null {
58+
const trimmed = messageId?.trim();
59+
return trimmed ? trimmed : null;
60+
}
61+
62+
function resolveMemoryDedupeKey(
63+
namespace: string,
64+
messageId: string | undefined | null,
65+
): string | null {
66+
const trimmed = normalizeMessageId(messageId);
67+
if (!trimmed) {
68+
return null;
69+
}
70+
return `${namespace}:${trimmed}`;
71+
}
72+
73+
export function tryBeginFeishuMessageProcessing(
74+
messageId: string | undefined | null,
75+
namespace = "global",
76+
): boolean {
77+
return !processingClaims.check(resolveEventDedupeKey(namespace, messageId));
78+
}
79+
80+
export function releaseFeishuMessageProcessing(
81+
messageId: string | undefined | null,
82+
namespace = "global",
83+
): void {
84+
processingClaims.delete(resolveEventDedupeKey(namespace, messageId));
85+
}
86+
87+
export async function finalizeFeishuMessageProcessing(params: {
88+
messageId: string | undefined | null;
89+
namespace?: string;
90+
log?: (...args: unknown[]) => void;
91+
claimHeld?: boolean;
92+
}): Promise<boolean> {
93+
const { messageId, namespace = "global", log, claimHeld = false } = params;
94+
const normalizedMessageId = normalizeMessageId(messageId);
95+
const memoryKey = resolveMemoryDedupeKey(namespace, messageId);
96+
if (!memoryKey || !normalizedMessageId) {
97+
return false;
98+
}
99+
if (!claimHeld && !tryBeginFeishuMessageProcessing(normalizedMessageId, namespace)) {
100+
return false;
101+
}
102+
if (!tryRecordMessage(memoryKey)) {
103+
releaseFeishuMessageProcessing(normalizedMessageId, namespace);
104+
return false;
105+
}
106+
if (!(await tryRecordMessagePersistent(normalizedMessageId, namespace, log))) {
107+
releaseFeishuMessageProcessing(normalizedMessageId, namespace);
108+
return false;
109+
}
110+
return true;
111+
}
112+
113+
export async function recordProcessedFeishuMessage(
114+
messageId: string | undefined | null,
115+
namespace = "global",
116+
log?: (...args: unknown[]) => void,
117+
): Promise<boolean> {
118+
const normalizedMessageId = normalizeMessageId(messageId);
119+
const memoryKey = resolveMemoryDedupeKey(namespace, messageId);
120+
if (!memoryKey || !normalizedMessageId) {
121+
return false;
122+
}
123+
tryRecordMessage(memoryKey);
124+
return await tryRecordMessagePersistent(normalizedMessageId, namespace, log);
125+
}
126+
127+
export async function hasProcessedFeishuMessage(
128+
messageId: string | undefined | null,
129+
namespace = "global",
130+
log?: (...args: unknown[]) => void,
131+
): Promise<boolean> {
132+
const normalizedMessageId = normalizeMessageId(messageId);
133+
const memoryKey = resolveMemoryDedupeKey(namespace, messageId);
134+
if (!memoryKey || !normalizedMessageId) {
135+
return false;
136+
}
137+
if (hasRecordedMessage(memoryKey)) {
138+
return true;
139+
}
140+
return hasRecordedMessagePersistent(normalizedMessageId, namespace, log);
141+
}
142+
40143
/**
41144
* Synchronous dedup — memory only.
42145
* Kept for backward compatibility; prefer {@link tryRecordMessagePersistent}.

extensions/feishu/src/monitor.account.ts

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ import {
1212
import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-action.js";
1313
import { createEventDispatcher } from "./client.js";
1414
import {
15-
hasRecordedMessage,
16-
hasRecordedMessagePersistent,
17-
tryRecordMessage,
18-
tryRecordMessagePersistent,
15+
hasProcessedFeishuMessage,
16+
recordProcessedFeishuMessage,
17+
releaseFeishuMessageProcessing,
18+
tryBeginFeishuMessageProcessing,
1919
warmupDedupFromDisk,
2020
} from "./dedup.js";
2121
import { isMentionForwardRequest } from "./mention.js";
@@ -264,6 +264,7 @@ function registerEventHandlers(
264264
runtime,
265265
chatHistories,
266266
accountId,
267+
processingClaimHeld: true,
267268
});
268269
await enqueue(chatId, task);
269270
};
@@ -291,10 +292,8 @@ function registerEventHandlers(
291292
return;
292293
}
293294
for (const messageId of suppressedIds) {
294-
// Keep in-memory dedupe in sync with handleFeishuMessage's keying.
295-
tryRecordMessage(`${accountId}:${messageId}`);
296295
try {
297-
await tryRecordMessagePersistent(messageId, accountId, log);
296+
await recordProcessedFeishuMessage(messageId, accountId, log);
298297
} catch (err) {
299298
error(
300299
`feishu[${accountId}]: failed to record merged dedupe id ${messageId}: ${String(err)}`,
@@ -303,15 +302,7 @@ function registerEventHandlers(
303302
}
304303
};
305304
const isMessageAlreadyProcessed = async (entry: FeishuMessageEvent): Promise<boolean> => {
306-
const messageId = entry.message.message_id?.trim();
307-
if (!messageId) {
308-
return false;
309-
}
310-
const memoryKey = `${accountId}:${messageId}`;
311-
if (hasRecordedMessage(memoryKey)) {
312-
return true;
313-
}
314-
return hasRecordedMessagePersistent(messageId, accountId, log);
305+
return await hasProcessedFeishuMessage(entry.message.message_id, accountId, log);
315306
};
316307
const inboundDebouncer = core.channel.debounce.createInboundDebouncer<FeishuMessageEvent>({
317308
debounceMs: inboundDebounceMs,
@@ -384,26 +375,36 @@ function registerEventHandlers(
384375
},
385376
});
386377
},
387-
onError: (err) => {
378+
onError: (err, entries) => {
379+
for (const entry of entries) {
380+
releaseFeishuMessageProcessing(entry.message.message_id, accountId);
381+
}
388382
error(`feishu[${accountId}]: inbound debounce flush failed: ${String(err)}`);
389383
},
390384
});
391385

392386
eventDispatcher.register({
393387
"im.message.receive_v1": async (data) => {
388+
const event = data as unknown as FeishuMessageEvent;
389+
const messageId = event.message?.message_id?.trim();
390+
if (!tryBeginFeishuMessageProcessing(messageId, accountId)) {
391+
log(`feishu[${accountId}]: dropping duplicate event for message ${messageId}`);
392+
return;
393+
}
394394
const processMessage = async () => {
395-
const event = data as unknown as FeishuMessageEvent;
396395
await inboundDebouncer.enqueue(event);
397396
};
398397
if (fireAndForget) {
399398
void processMessage().catch((err) => {
399+
releaseFeishuMessageProcessing(messageId, accountId);
400400
error(`feishu[${accountId}]: error handling message: ${String(err)}`);
401401
});
402402
return;
403403
}
404404
try {
405405
await processMessage();
406406
} catch (err) {
407+
releaseFeishuMessageProcessing(messageId, accountId);
407408
error(`feishu[${accountId}]: error handling message: ${String(err)}`);
408409
}
409410
},

extensions/feishu/src/monitor.reaction.test.ts

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -212,10 +212,9 @@ function expectParsedFirstDispatchedEvent(botOpenId = "ou_bot") {
212212
}
213213

214214
function setDedupPassThroughMocks(): void {
215-
vi.spyOn(dedup, "tryRecordMessage").mockReturnValue(true);
216-
vi.spyOn(dedup, "tryRecordMessagePersistent").mockResolvedValue(true);
217-
vi.spyOn(dedup, "hasRecordedMessage").mockReturnValue(false);
218-
vi.spyOn(dedup, "hasRecordedMessagePersistent").mockResolvedValue(false);
215+
vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true);
216+
vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true);
217+
vi.spyOn(dedup, "hasProcessedFeishuMessage").mockResolvedValue(false);
219218
}
220219

221220
function createMention(params: { openId: string; name: string; key?: string }): FeishuMention {
@@ -236,8 +235,7 @@ async function enqueueDebouncedMessage(
236235
}
237236

238237
function setStaleRetryMocks(messageId = "om_old") {
239-
vi.spyOn(dedup, "hasRecordedMessage").mockImplementation((key) => key.endsWith(`:${messageId}`));
240-
vi.spyOn(dedup, "hasRecordedMessagePersistent").mockImplementation(
238+
vi.spyOn(dedup, "hasProcessedFeishuMessage").mockImplementation(
241239
async (currentMessageId) => currentMessageId === messageId,
242240
);
243241
}
@@ -475,10 +473,9 @@ describe("Feishu inbound debounce regressions", () => {
475473
});
476474

477475
it("passes prefetched botName through to handleFeishuMessage", async () => {
478-
vi.spyOn(dedup, "tryRecordMessage").mockReturnValue(true);
479-
vi.spyOn(dedup, "tryRecordMessagePersistent").mockResolvedValue(true);
480-
vi.spyOn(dedup, "hasRecordedMessage").mockReturnValue(false);
481-
vi.spyOn(dedup, "hasRecordedMessagePersistent").mockResolvedValue(false);
476+
vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true);
477+
vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true);
478+
vi.spyOn(dedup, "hasProcessedFeishuMessage").mockResolvedValue(false);
482479
const onMessage = await setupDebounceMonitor({ botName: "OpenClaw Bot" });
483480

484481
await onMessage(
@@ -560,8 +557,8 @@ describe("Feishu inbound debounce regressions", () => {
560557
});
561558

562559
it("excludes previously processed retries from combined debounce text", async () => {
563-
vi.spyOn(dedup, "tryRecordMessage").mockReturnValue(true);
564-
vi.spyOn(dedup, "tryRecordMessagePersistent").mockResolvedValue(true);
560+
vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true);
561+
vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true);
565562
setStaleRetryMocks();
566563
const onMessage = await setupDebounceMonitor();
567564

@@ -586,8 +583,8 @@ describe("Feishu inbound debounce regressions", () => {
586583
});
587584

588585
it("uses latest fresh message id when debounce batch ends with stale retry", async () => {
589-
const recordSpy = vi.spyOn(dedup, "tryRecordMessage").mockReturnValue(true);
590-
vi.spyOn(dedup, "tryRecordMessagePersistent").mockResolvedValue(true);
586+
vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true);
587+
const recordSpy = vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true);
591588
setStaleRetryMocks();
592589
const onMessage = await setupDebounceMonitor();
593590

@@ -603,7 +600,54 @@ describe("Feishu inbound debounce regressions", () => {
603600
expect(dispatched.message.message_id).toBe("om_new");
604601
const combined = JSON.parse(dispatched.message.content) as { text?: string };
605602
expect(combined.text).toBe("fresh");
606-
expect(recordSpy).toHaveBeenCalledWith("default:om_old");
607-
expect(recordSpy).not.toHaveBeenCalledWith("default:om_new");
603+
expect(recordSpy).toHaveBeenCalledWith("om_old", "default", expect.any(Function));
604+
expect(recordSpy).not.toHaveBeenCalledWith("om_new", "default", expect.any(Function));
605+
});
606+
607+
it("releases early event dedupe when debounced dispatch fails", async () => {
608+
setDedupPassThroughMocks();
609+
const enqueueMock = vi.fn();
610+
setFeishuRuntime(
611+
createPluginRuntimeMock({
612+
channel: {
613+
debounce: {
614+
createInboundDebouncer: <T>(params: {
615+
onError?: (err: unknown, items: T[]) => void;
616+
}) => ({
617+
enqueue: async (item: T) => {
618+
enqueueMock(item);
619+
params.onError?.(new Error("dispatch failed"), [item]);
620+
},
621+
flushKey: async () => {},
622+
}),
623+
resolveInboundDebounceMs,
624+
},
625+
text: {
626+
hasControlCommand,
627+
},
628+
},
629+
}),
630+
);
631+
const onMessage = await setupDebounceMonitor();
632+
const event = createTextEvent({ messageId: "om_retryable", text: "hello" });
633+
634+
await enqueueDebouncedMessage(onMessage, event);
635+
expect(enqueueMock).toHaveBeenCalledTimes(1);
636+
637+
await enqueueDebouncedMessage(onMessage, event);
638+
expect(enqueueMock).toHaveBeenCalledTimes(2);
639+
expect(handleFeishuMessageMock).not.toHaveBeenCalled();
640+
});
641+
642+
it("drops duplicate inbound events before they re-enter the debounce pipeline", async () => {
643+
const onMessage = await setupDebounceMonitor();
644+
const event = createTextEvent({ messageId: "om_duplicate", text: "hello" });
645+
646+
await enqueueDebouncedMessage(onMessage, event);
647+
await vi.advanceTimersByTimeAsync(25);
648+
await enqueueDebouncedMessage(onMessage, event);
649+
await vi.advanceTimersByTimeAsync(25);
650+
651+
expect(handleFeishuMessageMock).toHaveBeenCalledTimes(1);
608652
});
609653
});

0 commit comments

Comments
 (0)