Skip to content

Commit d839de4

Browse files
committed
fix(msteams): keep monitor alive until shutdown
1 parent c1b75ab commit d839de4

File tree

2 files changed

+221
-6
lines changed

2 files changed

+221
-6
lines changed
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
import { EventEmitter } from "node:events";
2+
import type { OpenClawConfig, RuntimeEnv } from "openclaw/plugin-sdk";
3+
import { afterEach, describe, expect, it, vi } from "vitest";
4+
import type { MSTeamsConversationStore } from "./conversation-store.js";
5+
import type { MSTeamsPollStore } from "./polls.js";
6+
7+
type FakeServer = EventEmitter & {
8+
close: (callback?: (err?: Error | null) => void) => void;
9+
};
10+
11+
const expressControl = vi.hoisted(() => ({
12+
mode: { value: "listening" as "listening" | "error" },
13+
}));
14+
15+
vi.mock("openclaw/plugin-sdk", () => ({
16+
DEFAULT_WEBHOOK_MAX_BODY_BYTES: 1024 * 1024,
17+
mergeAllowlist: (params: { existing?: string[]; additions?: string[] }) =>
18+
Array.from(new Set([...(params.existing ?? []), ...(params.additions ?? [])])),
19+
summarizeMapping: vi.fn(),
20+
}));
21+
22+
vi.mock("express", () => {
23+
const json = vi.fn(() => {
24+
return (_req: unknown, _res: unknown, next?: (err?: unknown) => void) => {
25+
next?.();
26+
};
27+
});
28+
29+
const factory = () => ({
30+
use: vi.fn(),
31+
post: vi.fn(),
32+
listen: vi.fn((_port: number) => {
33+
const server = new EventEmitter() as FakeServer;
34+
server.close = (callback?: (err?: Error | null) => void) => {
35+
queueMicrotask(() => {
36+
server.emit("close");
37+
callback?.(null);
38+
});
39+
};
40+
queueMicrotask(() => {
41+
if (expressControl.mode.value === "error") {
42+
server.emit("error", new Error("listen EADDRINUSE"));
43+
return;
44+
}
45+
server.emit("listening");
46+
});
47+
return server;
48+
}),
49+
});
50+
51+
return {
52+
default: factory,
53+
json,
54+
};
55+
});
56+
57+
const registerMSTeamsHandlers = vi.hoisted(() =>
58+
vi.fn(() => ({
59+
run: vi.fn(async () => {}),
60+
})),
61+
);
62+
const createMSTeamsAdapter = vi.hoisted(() =>
63+
vi.fn(() => ({
64+
process: vi.fn(async () => {}),
65+
})),
66+
);
67+
const loadMSTeamsSdkWithAuth = vi.hoisted(() =>
68+
vi.fn(async () => ({
69+
sdk: {
70+
ActivityHandler: class {},
71+
MsalTokenProvider: class {},
72+
authorizeJWT:
73+
() => (_req: unknown, _res: unknown, next: ((err?: unknown) => void) | undefined) =>
74+
next?.(),
75+
},
76+
authConfig: {},
77+
})),
78+
);
79+
80+
vi.mock("./monitor-handler.js", () => ({
81+
registerMSTeamsHandlers: (...args: unknown[]) => registerMSTeamsHandlers(...args),
82+
}));
83+
84+
vi.mock("./resolve-allowlist.js", () => ({
85+
resolveMSTeamsChannelAllowlist: vi.fn(async () => []),
86+
resolveMSTeamsUserAllowlist: vi.fn(async () => []),
87+
}));
88+
89+
vi.mock("./sdk.js", () => ({
90+
createMSTeamsAdapter: (...args: unknown[]) => createMSTeamsAdapter(...args),
91+
loadMSTeamsSdkWithAuth: (...args: unknown[]) => loadMSTeamsSdkWithAuth(...args),
92+
}));
93+
94+
vi.mock("./runtime.js", () => ({
95+
getMSTeamsRuntime: () => ({
96+
logging: {
97+
getChildLogger: () => ({
98+
info: vi.fn(),
99+
error: vi.fn(),
100+
debug: vi.fn(),
101+
}),
102+
},
103+
channel: {
104+
text: {
105+
resolveTextChunkLimit: () => 4000,
106+
},
107+
},
108+
}),
109+
}));
110+
111+
import { monitorMSTeamsProvider } from "./monitor.js";
112+
113+
function createConfig(port: number): OpenClawConfig {
114+
return {
115+
channels: {
116+
msteams: {
117+
enabled: true,
118+
appId: "app-id",
119+
appPassword: "app-password",
120+
tenantId: "tenant-id",
121+
webhook: {
122+
port,
123+
path: "/api/messages",
124+
},
125+
},
126+
},
127+
} as OpenClawConfig;
128+
}
129+
130+
function createRuntime(): RuntimeEnv {
131+
return {
132+
log: vi.fn(),
133+
error: vi.fn(),
134+
exit: (code: number): never => {
135+
throw new Error(`exit ${code}`);
136+
},
137+
};
138+
}
139+
140+
function createStores() {
141+
return {
142+
conversationStore: {} as MSTeamsConversationStore,
143+
pollStore: {} as MSTeamsPollStore,
144+
};
145+
}
146+
147+
describe("monitorMSTeamsProvider lifecycle", () => {
148+
afterEach(() => {
149+
vi.clearAllMocks();
150+
expressControl.mode.value = "listening";
151+
});
152+
153+
it("stays active until aborted", async () => {
154+
const abort = new AbortController();
155+
const stores = createStores();
156+
const task = monitorMSTeamsProvider({
157+
cfg: createConfig(0),
158+
runtime: createRuntime(),
159+
abortSignal: abort.signal,
160+
conversationStore: stores.conversationStore,
161+
pollStore: stores.pollStore,
162+
});
163+
164+
const early = await Promise.race([
165+
task.then(() => "resolved"),
166+
new Promise<"pending">((resolve) => setTimeout(() => resolve("pending"), 50)),
167+
]);
168+
expect(early).toBe("pending");
169+
170+
abort.abort();
171+
await expect(task).resolves.toEqual(
172+
expect.objectContaining({
173+
shutdown: expect.any(Function),
174+
}),
175+
);
176+
});
177+
178+
it("rejects startup when webhook port is already in use", async () => {
179+
expressControl.mode.value = "error";
180+
await expect(
181+
monitorMSTeamsProvider({
182+
cfg: createConfig(3978),
183+
runtime: createRuntime(),
184+
abortSignal: new AbortController().signal,
185+
conversationStore: createStores().conversationStore,
186+
pollStore: createStores().pollStore,
187+
}),
188+
).rejects.toThrow(/EADDRINUSE/);
189+
});
190+
});

extensions/msteams/src/monitor.ts

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,21 @@ export async function monitorMSTeamsProvider(
273273
fallback: "/api/messages",
274274
});
275275

276-
// Start listening and capture the HTTP server handle
277-
const httpServer = expressApp.listen(port, () => {
278-
log.info(`msteams provider started on port ${port}`);
276+
// Start listening and fail fast if bind/listen fails.
277+
const httpServer = expressApp.listen(port);
278+
await new Promise<void>((resolve, reject) => {
279+
const onListening = () => {
280+
httpServer.off("error", onError);
281+
log.info(`msteams provider started on port ${port}`);
282+
resolve();
283+
};
284+
const onError = (err: unknown) => {
285+
httpServer.off("listening", onListening);
286+
log.error("msteams server error", { error: String(err) });
287+
reject(err);
288+
};
289+
httpServer.once("listening", onListening);
290+
httpServer.once("error", onError);
279291
});
280292

281293
httpServer.on("error", (err) => {
@@ -295,11 +307,24 @@ export async function monitorMSTeamsProvider(
295307
};
296308

297309
// Handle abort signal
310+
const onAbort = () => {
311+
void shutdown();
312+
};
298313
if (opts.abortSignal) {
299-
opts.abortSignal.addEventListener("abort", () => {
300-
void shutdown();
301-
});
314+
if (opts.abortSignal.aborted) {
315+
onAbort();
316+
} else {
317+
opts.abortSignal.addEventListener("abort", onAbort, { once: true });
318+
}
302319
}
303320

321+
// Keep this task alive until shutdown/close so gateway runtime does not treat startup as exit.
322+
await new Promise<void>((resolve) => {
323+
httpServer.once("close", () => {
324+
resolve();
325+
});
326+
});
327+
opts.abortSignal?.removeEventListener("abort", onAbort);
328+
304329
return { app: expressApp, shutdown };
305330
}

0 commit comments

Comments
 (0)