Skip to content

Commit c7cebd6

Browse files
committed
test: add Feishu broadcast lifecycle regression
1 parent 7d50e7f commit c7cebd6

File tree

1 file changed

+391
-0
lines changed

1 file changed

+391
-0
lines changed
Lines changed: 391 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,391 @@
1+
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
2+
import { createPluginRuntimeMock } from "../../../test/helpers/extensions/plugin-runtime-mock.js";
3+
import type { ClawdbotConfig, PluginRuntime, RuntimeEnv } from "../runtime-api.js";
4+
import { monitorSingleAccount } from "./monitor.account.js";
5+
import { setFeishuRuntime } from "./runtime.js";
6+
import type { ResolvedFeishuAccount } from "./types.js";
7+
8+
const createEventDispatcherMock = vi.hoisted(() => vi.fn());
9+
const monitorWebSocketMock = vi.hoisted(() => vi.fn(async () => {}));
10+
const monitorWebhookMock = vi.hoisted(() => vi.fn(async () => {}));
11+
const createFeishuThreadBindingManagerMock = vi.hoisted(() => vi.fn(() => ({ stop: vi.fn() })));
12+
const createFeishuReplyDispatcherMock = vi.hoisted(() => vi.fn());
13+
const resolveBoundConversationMock = vi.hoisted(() => vi.fn(() => null));
14+
const touchBindingMock = vi.hoisted(() => vi.fn());
15+
const resolveAgentRouteMock = vi.hoisted(() => vi.fn());
16+
const dispatchReplyFromConfigMock = vi.hoisted(() => vi.fn());
17+
const withReplyDispatcherMock = vi.hoisted(() => vi.fn());
18+
const finalizeInboundContextMock = vi.hoisted(() => vi.fn((ctx) => ctx));
19+
const getMessageFeishuMock = vi.hoisted(() => vi.fn(async () => null));
20+
const listFeishuThreadMessagesMock = vi.hoisted(() => vi.fn(async () => []));
21+
const sendMessageFeishuMock = vi.hoisted(() =>
22+
vi.fn(async () => ({ messageId: "om_sent", chatId: "oc_broadcast_group" })),
23+
);
24+
25+
let handlersByAccount = new Map<string, Record<string, (data: unknown) => Promise<void>>>();
26+
let runtimesByAccount = new Map<string, RuntimeEnv>();
27+
const originalStateDir = process.env.OPENCLAW_STATE_DIR;
28+
29+
vi.mock("./client.js", async () => {
30+
const actual = await vi.importActual<typeof import("./client.js")>("./client.js");
31+
return {
32+
...actual,
33+
createEventDispatcher: createEventDispatcherMock,
34+
};
35+
});
36+
37+
vi.mock("./monitor.transport.js", () => ({
38+
monitorWebSocket: monitorWebSocketMock,
39+
monitorWebhook: monitorWebhookMock,
40+
}));
41+
42+
vi.mock("./thread-bindings.js", () => ({
43+
createFeishuThreadBindingManager: createFeishuThreadBindingManagerMock,
44+
}));
45+
46+
vi.mock("./reply-dispatcher.js", () => ({
47+
createFeishuReplyDispatcher: createFeishuReplyDispatcherMock,
48+
}));
49+
50+
vi.mock("./send.js", () => ({
51+
getMessageFeishu: getMessageFeishuMock,
52+
listFeishuThreadMessages: listFeishuThreadMessagesMock,
53+
sendMessageFeishu: sendMessageFeishuMock,
54+
}));
55+
56+
vi.mock("openclaw/plugin-sdk/conversation-runtime", async (importOriginal) => {
57+
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/conversation-runtime")>();
58+
return {
59+
...actual,
60+
getSessionBindingService: () => ({
61+
resolveByConversation: resolveBoundConversationMock,
62+
touch: touchBindingMock,
63+
}),
64+
};
65+
});
66+
67+
vi.mock("../../../src/infra/outbound/session-binding-service.js", () => ({
68+
getSessionBindingService: () => ({
69+
resolveByConversation: resolveBoundConversationMock,
70+
touch: touchBindingMock,
71+
}),
72+
}));
73+
74+
function createLifecycleConfig(): ClawdbotConfig {
75+
return {
76+
broadcast: {
77+
oc_broadcast_group: ["susan", "main"],
78+
},
79+
agents: {
80+
list: [{ id: "main" }, { id: "susan" }],
81+
},
82+
channels: {
83+
feishu: {
84+
enabled: true,
85+
groupPolicy: "open",
86+
requireMention: false,
87+
resolveSenderNames: false,
88+
accounts: {
89+
"account-A": {
90+
enabled: true,
91+
appId: "cli_a",
92+
appSecret: "secret_a", // pragma: allowlist secret
93+
connectionMode: "websocket",
94+
groupPolicy: "open",
95+
requireMention: false,
96+
resolveSenderNames: false,
97+
groups: {
98+
oc_broadcast_group: {
99+
requireMention: false,
100+
},
101+
},
102+
},
103+
"account-B": {
104+
enabled: true,
105+
appId: "cli_b",
106+
appSecret: "secret_b", // pragma: allowlist secret
107+
connectionMode: "websocket",
108+
groupPolicy: "open",
109+
requireMention: false,
110+
resolveSenderNames: false,
111+
groups: {
112+
oc_broadcast_group: {
113+
requireMention: false,
114+
},
115+
},
116+
},
117+
},
118+
},
119+
},
120+
messages: {
121+
inbound: {
122+
debounceMs: 0,
123+
byChannel: {
124+
feishu: 0,
125+
},
126+
},
127+
},
128+
} as ClawdbotConfig;
129+
}
130+
131+
function createLifecycleAccount(accountId: "account-A" | "account-B"): ResolvedFeishuAccount {
132+
return {
133+
accountId,
134+
enabled: true,
135+
configured: true,
136+
appId: accountId === "account-A" ? "cli_a" : "cli_b",
137+
appSecret: accountId === "account-A" ? "secret_a" : "secret_b", // pragma: allowlist secret
138+
domain: "feishu",
139+
config: {
140+
enabled: true,
141+
connectionMode: "websocket",
142+
groupPolicy: "open",
143+
requireMention: false,
144+
resolveSenderNames: false,
145+
groups: {
146+
oc_broadcast_group: {
147+
requireMention: false,
148+
},
149+
},
150+
},
151+
} as ResolvedFeishuAccount;
152+
}
153+
154+
function createRuntimeEnv(): RuntimeEnv {
155+
return {
156+
log: vi.fn(),
157+
error: vi.fn(),
158+
exit: vi.fn(),
159+
} as RuntimeEnv;
160+
}
161+
162+
function createBroadcastEvent(messageId: string) {
163+
return {
164+
sender: {
165+
sender_id: { open_id: "ou_sender_1" },
166+
sender_type: "user",
167+
},
168+
message: {
169+
message_id: messageId,
170+
chat_id: "oc_broadcast_group",
171+
chat_type: "group" as const,
172+
message_type: "text",
173+
content: JSON.stringify({ text: "hello broadcast" }),
174+
create_time: "1710000000000",
175+
},
176+
};
177+
}
178+
179+
async function settleAsyncWork(): Promise<void> {
180+
for (let i = 0; i < 6; i += 1) {
181+
await Promise.resolve();
182+
await new Promise((resolve) => setTimeout(resolve, 0));
183+
}
184+
}
185+
186+
async function setupLifecycleMonitor(accountId: "account-A" | "account-B") {
187+
const register = vi.fn((registered: Record<string, (data: unknown) => Promise<void>>) => {
188+
handlersByAccount.set(accountId, registered);
189+
});
190+
createEventDispatcherMock.mockReturnValueOnce({ register });
191+
192+
const runtime = createRuntimeEnv();
193+
runtimesByAccount.set(accountId, runtime);
194+
195+
await monitorSingleAccount({
196+
cfg: createLifecycleConfig(),
197+
account: createLifecycleAccount(accountId),
198+
runtime,
199+
botOpenIdSource: {
200+
kind: "prefetched",
201+
botOpenId: "ou_bot_1",
202+
botName: "Bot",
203+
},
204+
});
205+
206+
const onMessage = handlersByAccount.get(accountId)?.["im.message.receive_v1"];
207+
if (!onMessage) {
208+
throw new Error(`missing im.message.receive_v1 handler for ${accountId}`);
209+
}
210+
return onMessage;
211+
}
212+
213+
describe("Feishu broadcast reply-once lifecycle", () => {
214+
beforeEach(() => {
215+
vi.clearAllMocks();
216+
handlersByAccount = new Map();
217+
runtimesByAccount = new Map();
218+
process.env.OPENCLAW_STATE_DIR = `/tmp/openclaw-feishu-broadcast-${Date.now()}-${Math.random().toString(36).slice(2)}`;
219+
220+
const activeDispatcher = {
221+
sendToolResult: vi.fn(() => false),
222+
sendBlockReply: vi.fn(() => false),
223+
sendFinalReply: vi.fn(async () => true),
224+
waitForIdle: vi.fn(async () => {}),
225+
getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
226+
markComplete: vi.fn(),
227+
};
228+
229+
createFeishuReplyDispatcherMock.mockReturnValue({
230+
dispatcher: activeDispatcher,
231+
replyOptions: {},
232+
markDispatchIdle: vi.fn(),
233+
});
234+
235+
resolveBoundConversationMock.mockReturnValue(null);
236+
resolveAgentRouteMock.mockReturnValue({
237+
agentId: "main",
238+
channel: "feishu",
239+
accountId: "account-A",
240+
sessionKey: "agent:main:feishu:group:oc_broadcast_group",
241+
mainSessionKey: "agent:main:main",
242+
matchedBy: "default",
243+
});
244+
245+
dispatchReplyFromConfigMock.mockImplementation(async ({ ctx, dispatcher }) => {
246+
if (
247+
typeof ctx?.SessionKey === "string" &&
248+
ctx.SessionKey.includes("agent:main:") &&
249+
typeof dispatcher?.sendFinalReply === "function"
250+
) {
251+
await dispatcher.sendFinalReply({ text: "broadcast reply once" });
252+
}
253+
return {
254+
queuedFinal: false,
255+
counts: {
256+
final:
257+
typeof ctx?.SessionKey === "string" && ctx.SessionKey.includes("agent:main:") ? 1 : 0,
258+
},
259+
};
260+
});
261+
262+
withReplyDispatcherMock.mockImplementation(async ({ run }) => await run());
263+
264+
setFeishuRuntime(
265+
createPluginRuntimeMock({
266+
channel: {
267+
debounce: {
268+
resolveInboundDebounceMs: vi.fn(() => 0),
269+
createInboundDebouncer: <T>(params: {
270+
onFlush?: (items: T[]) => Promise<void>;
271+
onError?: (err: unknown, items: T[]) => void;
272+
}) => ({
273+
enqueue: async (item: T) => {
274+
try {
275+
await params.onFlush?.([item]);
276+
} catch (err) {
277+
params.onError?.(err, [item]);
278+
}
279+
},
280+
flushKey: async () => {},
281+
}),
282+
},
283+
text: {
284+
hasControlCommand: vi.fn(() => false),
285+
},
286+
routing: {
287+
resolveAgentRoute:
288+
resolveAgentRouteMock as unknown as PluginRuntime["channel"]["routing"]["resolveAgentRoute"],
289+
},
290+
reply: {
291+
resolveEnvelopeFormatOptions: vi.fn(() => ({})),
292+
formatAgentEnvelope: vi.fn((params: { body: string }) => params.body),
293+
finalizeInboundContext:
294+
finalizeInboundContextMock as unknown as PluginRuntime["channel"]["reply"]["finalizeInboundContext"],
295+
dispatchReplyFromConfig:
296+
dispatchReplyFromConfigMock as unknown as PluginRuntime["channel"]["reply"]["dispatchReplyFromConfig"],
297+
withReplyDispatcher:
298+
withReplyDispatcherMock as unknown as PluginRuntime["channel"]["reply"]["withReplyDispatcher"],
299+
},
300+
commands: {
301+
shouldComputeCommandAuthorized: vi.fn(() => false),
302+
resolveCommandAuthorizedFromAuthorizers: vi.fn(() => false),
303+
},
304+
session: {
305+
readSessionUpdatedAt: vi.fn(),
306+
resolveStorePath: vi.fn(() => "/tmp/feishu-broadcast-sessions.json"),
307+
},
308+
pairing: {
309+
readAllowFromStore: vi.fn().mockResolvedValue([]),
310+
upsertPairingRequest: vi.fn(),
311+
buildPairingReply: vi.fn(),
312+
},
313+
},
314+
media: {
315+
detectMime: vi.fn(async () => "text/plain"),
316+
},
317+
}) as unknown as PluginRuntime,
318+
);
319+
});
320+
321+
afterEach(() => {
322+
if (originalStateDir === undefined) {
323+
delete process.env.OPENCLAW_STATE_DIR;
324+
return;
325+
}
326+
process.env.OPENCLAW_STATE_DIR = originalStateDir;
327+
});
328+
329+
it("uses one active reply path when the same broadcast event reaches two accounts", async () => {
330+
const onMessageA = await setupLifecycleMonitor("account-A");
331+
const onMessageB = await setupLifecycleMonitor("account-B");
332+
const event = createBroadcastEvent("om_broadcast_once");
333+
334+
await onMessageA(event);
335+
await settleAsyncWork();
336+
await onMessageB(event);
337+
await settleAsyncWork();
338+
339+
expect(runtimesByAccount.get("account-A")?.error).not.toHaveBeenCalled();
340+
expect(runtimesByAccount.get("account-B")?.error).not.toHaveBeenCalled();
341+
342+
expect(dispatchReplyFromConfigMock).toHaveBeenCalledTimes(2);
343+
expect(createFeishuReplyDispatcherMock).toHaveBeenCalledTimes(1);
344+
expect(createFeishuReplyDispatcherMock).toHaveBeenCalledWith(
345+
expect.objectContaining({
346+
accountId: "account-a",
347+
chatId: "oc_broadcast_group",
348+
replyToMessageId: "om_broadcast_once",
349+
}),
350+
);
351+
352+
const sessionKeys = finalizeInboundContextMock.mock.calls.map(
353+
(call) => (call[0] as { SessionKey?: string }).SessionKey,
354+
);
355+
expect(sessionKeys).toContain("agent:main:feishu:group:oc_broadcast_group");
356+
expect(sessionKeys).toContain("agent:susan:feishu:group:oc_broadcast_group");
357+
358+
const activeDispatcher = createFeishuReplyDispatcherMock.mock.results[0]?.value.dispatcher as {
359+
sendFinalReply: ReturnType<typeof vi.fn>;
360+
};
361+
expect(activeDispatcher.sendFinalReply).toHaveBeenCalledTimes(1);
362+
});
363+
364+
it("does not duplicate delivery after a post-send failure on the first account", async () => {
365+
const onMessageA = await setupLifecycleMonitor("account-A");
366+
const onMessageB = await setupLifecycleMonitor("account-B");
367+
const event = createBroadcastEvent("om_broadcast_retry");
368+
369+
dispatchReplyFromConfigMock.mockImplementationOnce(async ({ ctx, dispatcher }) => {
370+
if (typeof ctx?.SessionKey === "string" && ctx.SessionKey.includes("agent:susan:")) {
371+
return { queuedFinal: false, counts: { final: 0 } };
372+
}
373+
await dispatcher.sendFinalReply({ text: "broadcast reply once" });
374+
throw new Error("post-send failure");
375+
});
376+
377+
await onMessageA(event);
378+
await settleAsyncWork();
379+
await onMessageB(event);
380+
await settleAsyncWork();
381+
382+
expect(runtimesByAccount.get("account-A")?.error).not.toHaveBeenCalled();
383+
expect(runtimesByAccount.get("account-B")?.error).not.toHaveBeenCalled();
384+
expect(dispatchReplyFromConfigMock).toHaveBeenCalledTimes(2);
385+
386+
const activeDispatcher = createFeishuReplyDispatcherMock.mock.results[0]?.value.dispatcher as {
387+
sendFinalReply: ReturnType<typeof vi.fn>;
388+
};
389+
expect(activeDispatcher.sendFinalReply).toHaveBeenCalledTimes(1);
390+
});
391+
});

0 commit comments

Comments
 (0)