Skip to content

Commit 1890089

Browse files
committed
fix: serialize duplicate channel starts (#49583) (thanks @sudie-codes)
1 parent 1040ae5 commit 1890089

File tree

3 files changed

+204
-107
lines changed

3 files changed

+204
-107
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ Docs: https://docs.openclaw.ai
128128
- Plugins/subagents: forward per-run provider and model overrides through gateway plugin subagent dispatch so plugin-launched agent delegations honor explicit model selection again. (#48277) Thanks @jalehman.
129129
- Agents/compaction: write minimal boundary summaries for empty preparations while keeping split-turn prefixes on the normal path, so no-summarizable-message sessions stop retriggering the safeguard loop. (#42215) thanks @lml2468.
130130
- Models/chat commands: keep `/model ...@YYYYMMDD` version suffixes intact by default, but still honor matching stored numeric auth-profile overrides for the same provider. (#48896) Thanks @Alix-007.
131+
- Gateway/channels: serialize per-account channel startup so overlapping starts do not boot the same provider twice, preventing MS Teams `EADDRINUSE` crash loops during startup and restart. (#49583) Thanks @sudie-codes.
131132

132133
### Fixes
133134

src/gateway/server-channels.test.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,15 @@ function createTestPlugin(params?: {
4545
startAccount?: NonNullable<ChannelPlugin<TestAccount>["gateway"]>["startAccount"];
4646
includeDescribeAccount?: boolean;
4747
resolveAccount?: ChannelPlugin<TestAccount>["config"]["resolveAccount"];
48+
isConfigured?: ChannelPlugin<TestAccount>["config"]["isConfigured"];
4849
}): ChannelPlugin<TestAccount> {
4950
const account = params?.account ?? { enabled: true, configured: true };
5051
const includeDescribeAccount = params?.includeDescribeAccount !== false;
5152
const config: ChannelPlugin<TestAccount>["config"] = {
5253
listAccountIds: () => [DEFAULT_ACCOUNT_ID],
5354
resolveAccount: params?.resolveAccount ?? (() => account),
5455
isEnabled: (resolved) => resolved.enabled !== false,
56+
...(params?.isConfigured ? { isConfigured: params.isConfigured } : {}),
5557
};
5658
if (includeDescribeAccount) {
5759
config.describeAccount = (resolved) => ({
@@ -79,6 +81,14 @@ function createTestPlugin(params?: {
7981
};
8082
}
8183

84+
function createDeferred(): { promise: Promise<void>; resolve: () => void } {
85+
let resolvePromise = () => {};
86+
const promise = new Promise<void>((resolve) => {
87+
resolvePromise = resolve;
88+
});
89+
return { promise, resolve: resolvePromise };
90+
}
91+
8292
function installTestRegistry(plugin: ChannelPlugin<TestAccount>) {
8393
const registry = createEmptyPluginRegistry();
8494
registry.channels.push({
@@ -189,6 +199,52 @@ describe("server-channels auto restart", () => {
189199
expect(startAccount).toHaveBeenCalledTimes(1);
190200
});
191201

202+
it("deduplicates concurrent start requests for the same account", async () => {
203+
const startupGate = createDeferred();
204+
const isConfigured = vi.fn(async () => {
205+
await startupGate.promise;
206+
return true;
207+
});
208+
const startAccount = vi.fn(async () => {});
209+
210+
installTestRegistry(createTestPlugin({ startAccount, isConfigured }));
211+
const manager = createManager();
212+
213+
const firstStart = manager.startChannel("discord", DEFAULT_ACCOUNT_ID);
214+
const secondStart = manager.startChannel("discord", DEFAULT_ACCOUNT_ID);
215+
216+
await Promise.resolve();
217+
expect(isConfigured).toHaveBeenCalledTimes(1);
218+
expect(startAccount).not.toHaveBeenCalled();
219+
220+
startupGate.resolve();
221+
await Promise.all([firstStart, secondStart]);
222+
223+
expect(startAccount).toHaveBeenCalledTimes(1);
224+
});
225+
226+
it("cancels a pending startup when the account is stopped mid-boot", async () => {
227+
const startupGate = createDeferred();
228+
const isConfigured = vi.fn(async () => {
229+
await startupGate.promise;
230+
return true;
231+
});
232+
const startAccount = vi.fn(async () => {});
233+
234+
installTestRegistry(createTestPlugin({ startAccount, isConfigured }));
235+
const manager = createManager();
236+
237+
const startTask = manager.startChannel("discord", DEFAULT_ACCOUNT_ID);
238+
await Promise.resolve();
239+
240+
const stopTask = manager.stopChannel("discord", DEFAULT_ACCOUNT_ID);
241+
startupGate.resolve();
242+
243+
await Promise.all([startTask, stopTask]);
244+
245+
expect(startAccount).not.toHaveBeenCalled();
246+
});
247+
192248
it("does not resolve channelRuntime until a channel starts", async () => {
193249
const channelRuntime = {
194250
marker: "lazy-channel-runtime",

src/gateway/server-channels.ts

Lines changed: 147 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
3232

3333
type ChannelRuntimeStore = {
3434
aborts: Map<string, AbortController>;
35+
starting: Map<string, Promise<void>>;
3536
tasks: Map<string, Promise<unknown>>;
3637
runtimes: Map<string, ChannelAccountSnapshot>;
3738
};
@@ -49,6 +50,7 @@ type ChannelHealthMonitorConfig = HealthMonitorConfig & {
4950
function createRuntimeStore(): ChannelRuntimeStore {
5051
return {
5152
aborts: new Map(),
53+
starting: new Map(),
5254
tasks: new Map(),
5355
runtimes: new Map(),
5456
};
@@ -256,137 +258,174 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
256258
if (store.tasks.has(id)) {
257259
return;
258260
}
259-
const account = plugin.config.resolveAccount(cfg, id);
260-
const enabled = plugin.config.isEnabled
261-
? plugin.config.isEnabled(account, cfg)
262-
: isAccountEnabled(account);
263-
if (!enabled) {
264-
setRuntime(channelId, id, {
265-
accountId: id,
266-
enabled: false,
267-
configured: true,
268-
running: false,
269-
restartPending: false,
270-
lastError: plugin.config.disabledReason?.(account, cfg) ?? "disabled",
271-
});
272-
return;
273-
}
274-
275-
let configured = true;
276-
if (plugin.config.isConfigured) {
277-
configured = await plugin.config.isConfigured(account, cfg);
278-
}
279-
if (!configured) {
280-
setRuntime(channelId, id, {
281-
accountId: id,
282-
enabled: true,
283-
configured: false,
284-
running: false,
285-
restartPending: false,
286-
lastError: plugin.config.unconfiguredReason?.(account, cfg) ?? "not configured",
287-
});
261+
const existingStart = store.starting.get(id);
262+
if (existingStart) {
263+
await existingStart;
288264
return;
289265
}
290266

291-
const rKey = restartKey(channelId, id);
292-
if (!preserveManualStop) {
293-
manuallyStopped.delete(rKey);
294-
}
267+
let resolveStart: (() => void) | undefined;
268+
const startGate = new Promise<void>((resolve) => {
269+
resolveStart = resolve;
270+
});
271+
store.starting.set(id, startGate);
295272

273+
// Reserve the account before the first await so overlapping start calls
274+
// cannot race into duplicate provider boots for the same account.
296275
const abort = new AbortController();
297276
store.aborts.set(id, abort);
298-
if (!preserveRestartAttempts) {
299-
restartAttempts.delete(rKey);
300-
}
301-
setRuntime(channelId, id, {
302-
accountId: id,
303-
enabled: true,
304-
configured: true,
305-
running: true,
306-
restartPending: false,
307-
lastStartAt: Date.now(),
308-
lastError: null,
309-
reconnectAttempts: preserveRestartAttempts ? (restartAttempts.get(rKey) ?? 0) : 0,
310-
});
277+
let handedOffTask = false;
311278

312-
const log = channelLogs[channelId];
313-
const resolvedChannelRuntime = getChannelRuntime();
314-
const task = startAccount({
315-
cfg,
316-
accountId: id,
317-
account,
318-
runtime: channelRuntimeEnvs[channelId],
319-
abortSignal: abort.signal,
320-
log,
321-
getStatus: () => getRuntime(channelId, id),
322-
setStatus: (next) => setRuntime(channelId, id, next),
323-
...(resolvedChannelRuntime ? { channelRuntime: resolvedChannelRuntime } : {}),
324-
});
325-
const trackedPromise = Promise.resolve(task)
326-
.catch((err) => {
327-
const message = formatErrorMessage(err);
328-
setRuntime(channelId, id, { accountId: id, lastError: message });
329-
log.error?.(`[${id}] channel exited: ${message}`);
330-
})
331-
.finally(() => {
279+
try {
280+
const account = plugin.config.resolveAccount(cfg, id);
281+
const enabled = plugin.config.isEnabled
282+
? plugin.config.isEnabled(account, cfg)
283+
: isAccountEnabled(account);
284+
if (!enabled) {
332285
setRuntime(channelId, id, {
333286
accountId: id,
287+
enabled: false,
288+
configured: true,
334289
running: false,
290+
restartPending: false,
291+
lastError: plugin.config.disabledReason?.(account, cfg) ?? "disabled",
292+
});
293+
return;
294+
}
295+
296+
let configured = true;
297+
if (plugin.config.isConfigured) {
298+
configured = await plugin.config.isConfigured(account, cfg);
299+
}
300+
if (!configured) {
301+
setRuntime(channelId, id, {
302+
accountId: id,
303+
enabled: true,
304+
configured: false,
305+
running: false,
306+
restartPending: false,
307+
lastError: plugin.config.unconfiguredReason?.(account, cfg) ?? "not configured",
308+
});
309+
return;
310+
}
311+
312+
const rKey = restartKey(channelId, id);
313+
if (!preserveManualStop) {
314+
manuallyStopped.delete(rKey);
315+
}
316+
317+
if (abort.signal.aborted || manuallyStopped.has(rKey)) {
318+
setRuntime(channelId, id, {
319+
accountId: id,
320+
running: false,
321+
restartPending: false,
335322
lastStopAt: Date.now(),
336323
});
337-
})
338-
.then(async () => {
339-
if (manuallyStopped.has(rKey)) {
340-
return;
341-
}
342-
const attempt = (restartAttempts.get(rKey) ?? 0) + 1;
343-
restartAttempts.set(rKey, attempt);
344-
if (attempt > MAX_RESTART_ATTEMPTS) {
324+
return;
325+
}
326+
327+
if (!preserveRestartAttempts) {
328+
restartAttempts.delete(rKey);
329+
}
330+
setRuntime(channelId, id, {
331+
accountId: id,
332+
enabled: true,
333+
configured: true,
334+
running: true,
335+
restartPending: false,
336+
lastStartAt: Date.now(),
337+
lastError: null,
338+
reconnectAttempts: preserveRestartAttempts ? (restartAttempts.get(rKey) ?? 0) : 0,
339+
});
340+
341+
const log = channelLogs[channelId];
342+
const resolvedChannelRuntime = getChannelRuntime();
343+
const task = startAccount({
344+
cfg,
345+
accountId: id,
346+
account,
347+
runtime: channelRuntimeEnvs[channelId],
348+
abortSignal: abort.signal,
349+
log,
350+
getStatus: () => getRuntime(channelId, id),
351+
setStatus: (next) => setRuntime(channelId, id, next),
352+
...(resolvedChannelRuntime ? { channelRuntime: resolvedChannelRuntime } : {}),
353+
});
354+
const trackedPromise = Promise.resolve(task)
355+
.catch((err) => {
356+
const message = formatErrorMessage(err);
357+
setRuntime(channelId, id, { accountId: id, lastError: message });
358+
log.error?.(`[${id}] channel exited: ${message}`);
359+
})
360+
.finally(() => {
345361
setRuntime(channelId, id, {
346362
accountId: id,
347-
restartPending: false,
348-
reconnectAttempts: attempt,
363+
running: false,
364+
lastStopAt: Date.now(),
349365
});
350-
log.error?.(`[${id}] giving up after ${MAX_RESTART_ATTEMPTS} restart attempts`);
351-
return;
352-
}
353-
const delayMs = computeBackoff(CHANNEL_RESTART_POLICY, attempt);
354-
log.info?.(
355-
`[${id}] auto-restart attempt ${attempt}/${MAX_RESTART_ATTEMPTS} in ${Math.round(delayMs / 1000)}s`,
356-
);
357-
setRuntime(channelId, id, {
358-
accountId: id,
359-
restartPending: true,
360-
reconnectAttempts: attempt,
361-
});
362-
try {
363-
await sleepWithAbort(delayMs, abort.signal);
366+
})
367+
.then(async () => {
364368
if (manuallyStopped.has(rKey)) {
365369
return;
366370
}
371+
const attempt = (restartAttempts.get(rKey) ?? 0) + 1;
372+
restartAttempts.set(rKey, attempt);
373+
if (attempt > MAX_RESTART_ATTEMPTS) {
374+
setRuntime(channelId, id, {
375+
accountId: id,
376+
restartPending: false,
377+
reconnectAttempts: attempt,
378+
});
379+
log.error?.(`[${id}] giving up after ${MAX_RESTART_ATTEMPTS} restart attempts`);
380+
return;
381+
}
382+
const delayMs = computeBackoff(CHANNEL_RESTART_POLICY, attempt);
383+
log.info?.(
384+
`[${id}] auto-restart attempt ${attempt}/${MAX_RESTART_ATTEMPTS} in ${Math.round(delayMs / 1000)}s`,
385+
);
386+
setRuntime(channelId, id, {
387+
accountId: id,
388+
restartPending: true,
389+
reconnectAttempts: attempt,
390+
});
391+
try {
392+
await sleepWithAbort(delayMs, abort.signal);
393+
if (manuallyStopped.has(rKey)) {
394+
return;
395+
}
396+
if (store.tasks.get(id) === trackedPromise) {
397+
store.tasks.delete(id);
398+
}
399+
if (store.aborts.get(id) === abort) {
400+
store.aborts.delete(id);
401+
}
402+
await startChannelInternal(channelId, id, {
403+
preserveRestartAttempts: true,
404+
preserveManualStop: true,
405+
});
406+
} catch {
407+
// abort or startup failure — next crash will retry
408+
}
409+
})
410+
.finally(() => {
367411
if (store.tasks.get(id) === trackedPromise) {
368412
store.tasks.delete(id);
369413
}
370414
if (store.aborts.get(id) === abort) {
371415
store.aborts.delete(id);
372416
}
373-
await startChannelInternal(channelId, id, {
374-
preserveRestartAttempts: true,
375-
preserveManualStop: true,
376-
});
377-
} catch {
378-
// abort or startup failure — next crash will retry
379-
}
380-
})
381-
.finally(() => {
382-
if (store.tasks.get(id) === trackedPromise) {
383-
store.tasks.delete(id);
384-
}
385-
if (store.aborts.get(id) === abort) {
386-
store.aborts.delete(id);
387-
}
388-
});
389-
store.tasks.set(id, trackedPromise);
417+
});
418+
handedOffTask = true;
419+
store.tasks.set(id, trackedPromise);
420+
} finally {
421+
resolveStart?.();
422+
if (store.starting.get(id) === startGate) {
423+
store.starting.delete(id);
424+
}
425+
if (!handedOffTask && store.aborts.get(id) === abort) {
426+
store.aborts.delete(id);
427+
}
428+
}
390429
}),
391430
);
392431
};
@@ -405,6 +444,7 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
405444
const cfg = loadConfig();
406445
const knownIds = new Set<string>([
407446
...store.aborts.keys(),
447+
...store.starting.keys(),
408448
...store.tasks.keys(),
409449
...(plugin ? plugin.config.listAccountIds(cfg) : []),
410450
]);

0 commit comments

Comments
 (0)