Skip to content

Commit 6bc1c52

Browse files
KoPangKoPang
authored andcommitted
fix(discord): dedupe inbound messages by message id
1 parent 98ea71a commit 6bc1c52

File tree

2 files changed

+73
-2
lines changed

2 files changed

+73
-2
lines changed

src/discord/monitor/message-handler.queue.test.ts

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { describe, expect, it, vi } from "vitest";
1+
import { beforeEach, describe, expect, it, vi } from "vitest";
22
import {
33
createDiscordHandlerParams,
44
createDiscordPreflightContext,
@@ -17,7 +17,7 @@ vi.mock("./message-handler.process.js", () => ({
1717
processDiscordMessage: processDiscordMessageMock,
1818
}));
1919

20-
const { createDiscordMessageHandler } = await import("./message-handler.js");
20+
const { createDiscordMessageHandler, resetDiscordInboundMessageDedupe } = await import("./message-handler.js");
2121

2222
function createDeferred<T = void>() {
2323
let resolve: (value: T | PromiseLike<T>) => void = () => {};
@@ -83,6 +83,10 @@ async function createLifecycleStopScenario(params: {
8383
}
8484

8585
describe("createDiscordMessageHandler queue behavior", () => {
86+
beforeEach(() => {
87+
resetDiscordInboundMessageDedupe();
88+
});
89+
8690
it("resets busy counters when the handler is created", () => {
8791
preflightDiscordMessageMock.mockReset();
8892
processDiscordMessageMock.mockReset();
@@ -98,6 +102,30 @@ describe("createDiscordMessageHandler queue behavior", () => {
98102
);
99103
});
100104

105+
it("drops duplicate inbound Discord messages with the same message id", async () => {
106+
preflightDiscordMessageMock.mockReset();
107+
processDiscordMessageMock.mockReset();
108+
109+
preflightDiscordMessageMock.mockImplementation(
110+
async (params: { data: { channel_id: string } }) =>
111+
createPreflightContext(params.data.channel_id),
112+
);
113+
processDiscordMessageMock.mockResolvedValue(undefined);
114+
115+
const handler = createDiscordMessageHandler(createDiscordHandlerParams());
116+
117+
await expect(handler(createMessageData("dup-1") as never, {} as never)).resolves.toBeUndefined();
118+
await vi.waitFor(() => {
119+
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
120+
});
121+
122+
await expect(handler(createMessageData("dup-1") as never, {} as never)).resolves.toBeUndefined();
123+
await Promise.resolve();
124+
125+
expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(1);
126+
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
127+
});
128+
101129
it("returns immediately and tracks busy status while queued runs execute", async () => {
102130
preflightDiscordMessageMock.mockReset();
103131
processDiscordMessageMock.mockReset();

src/discord/monitor/message-handler.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
} from "../../channels/inbound-debounce-policy.js";
66
import { resolveOpenProviderRuntimeGroupPolicy } from "../../config/runtime-group-policy.js";
77
import { danger } from "../../globals.js";
8+
import { createDedupeCache } from "../../infra/dedupe.js";
89
import { buildDiscordInboundJob } from "./inbound-job.js";
910
import { createDiscordInboundWorker } from "./inbound-worker.js";
1011
import type { DiscordMessageEvent, DiscordMessageHandler } from "./listeners.js";
@@ -30,6 +31,37 @@ export type DiscordMessageHandlerWithLifecycle = DiscordMessageHandler & {
3031
deactivate: () => void;
3132
};
3233

34+
const discordInboundMessageDedupe = createDedupeCache({
35+
ttlMs: 2 * 60_000,
36+
maxSize: 10_000,
37+
});
38+
39+
export function resetDiscordInboundMessageDedupe(): void {
40+
discordInboundMessageDedupe.clear();
41+
}
42+
43+
function buildDiscordInboundMessageDedupeKey(params: {
44+
accountId?: string;
45+
data: DiscordMessageEvent;
46+
}): string | null {
47+
const messageId = params.data.message?.id?.trim() || params.data.id?.trim();
48+
const authorId = params.data.message?.author?.id ?? params.data.author?.id;
49+
const channelId = resolveDiscordMessageChannelId({
50+
message: params.data.message,
51+
eventChannelId: params.data.channel_id,
52+
});
53+
if (!messageId || !channelId) {
54+
return null;
55+
}
56+
return JSON.stringify([
57+
"discord-inbound",
58+
params.accountId ?? "",
59+
channelId,
60+
authorId ?? "",
61+
messageId,
62+
]);
63+
}
64+
3365
export function createDiscordMessageHandler(
3466
params: DiscordMessageHandlerParams,
3567
): DiscordMessageHandlerWithLifecycle {
@@ -174,6 +206,17 @@ export function createDiscordMessageHandler(
174206
return;
175207
}
176208

209+
const dedupeKey = buildDiscordInboundMessageDedupeKey({
210+
accountId: params.accountId,
211+
data,
212+
});
213+
if (dedupeKey && discordInboundMessageDedupe.peek(dedupeKey)) {
214+
return;
215+
}
216+
if (dedupeKey) {
217+
discordInboundMessageDedupe.check(dedupeKey);
218+
}
219+
177220
await debouncer.enqueue({ data, client, abortSignal: options?.abortSignal });
178221
} catch (err) {
179222
params.runtime.error?.(danger(`handler failed: ${String(err)}`));

0 commit comments

Comments
 (0)