Skip to content

Commit 0c0f556

Browse files
steipetetheotarrliuxiaopai-ai
committed
fix(discord): unify reconnect watchdog and land #31025/#30530
Landed follow-up intent from contributor PR #31025 (@theotarr) and PR #30530 (@liuxiaopai-ai). Co-authored-by: theotarr <[email protected]> Co-authored-by: liuxiaopai-ai <[email protected]>
1 parent 0eac494 commit 0c0f556

File tree

12 files changed

+462
-5
lines changed

12 files changed

+462
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ Docs: https://docs.openclaw.ai
9696
- Plugins/Discovery precedence: load bundled plugins before auto-discovered global extensions so bundled channel plugins win duplicate-ID resolution by default (explicit `plugins.load.paths` overrides remain highest precedence), with loader regression coverage. Landed from contributor PR #29710 by @Sid-Qin. Thanks @Sid-Qin.
9797
- Discord/Reconnect integrity: release Discord message listener lane immediately while preserving serialized handler execution, add HELLO-stall resume-first recovery with bounded fresh-identify fallback after repeated stalls, and extend lifecycle/listener regression coverage for forced reconnect scenarios. Landed from contributor PR #29508 by @cgdusek. Thanks @cgdusek.
9898
- Matrix/Conduit compatibility: avoid blocking startup on non-resolving Matrix sync start, preserve startup error propagation, prevent duplicate monitor listener registration, remove unreliable 2-member DM heuristics, accept `!room` IDs without alias resolution, and add matrix monitor/client regression coverage. Landed from contributor PR #31023 by @efe-arv. Thanks @efe-arv.
99+
- Discord/Reconnect watchdog: add a shared armable transport stall-watchdog and wire Discord gateway lifecycle force-stop semantics for silent close/reconnect zombies, with gateway/lifecycle watchdog regression coverage and runtime status liveness updates. Follow-up to contributor PR #31025 by @theotarr and PR #30530 by @liuxiaopai-ai. Thanks @theotarr and @liuxiaopai-ai.
99100
- Security/Skills: harden skill installer metadata parsing by rejecting unsafe installer specs (brew/node/go/uv/download) and constrain plugin-declared skill directories to the plugin root (including symlink-escape checks), with regression coverage.
100101
- Discord/DM command auth: unify DM allowlist + pairing-store authorization across message preflight and native command interactions so DM command gating is consistent for `open`/`pairing`/`allowlist` policies.
101102
- Sessions/Usage accounting: persist `cacheRead`/`cacheWrite` from the latest call snapshot (`lastCallUsage`) instead of accumulated multi-call totals, preventing inflated token/cost reporting in long tool/compaction runs. (#31005)

extensions/discord/src/channel.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,11 @@ export const discordPlugin: ChannelPlugin<ResolvedDiscordAccount> = {
343343
defaultRuntime: {
344344
accountId: DEFAULT_ACCOUNT_ID,
345345
running: false,
346+
connected: false,
347+
reconnectAttempts: 0,
348+
lastConnectedAt: null,
349+
lastDisconnect: null,
350+
lastEventAt: null,
346351
lastStartAt: null,
347352
lastStopAt: null,
348353
lastError: null,
@@ -394,6 +399,11 @@ export const discordPlugin: ChannelPlugin<ResolvedDiscordAccount> = {
394399
lastStartAt: runtime?.lastStartAt ?? null,
395400
lastStopAt: runtime?.lastStopAt ?? null,
396401
lastError: runtime?.lastError ?? null,
402+
connected: runtime?.connected ?? false,
403+
reconnectAttempts: runtime?.reconnectAttempts,
404+
lastConnectedAt: runtime?.lastConnectedAt ?? null,
405+
lastDisconnect: runtime?.lastDisconnect ?? null,
406+
lastEventAt: runtime?.lastEventAt ?? null,
397407
application: app ?? undefined,
398408
bot: bot ?? undefined,
399409
probe,
@@ -445,6 +455,7 @@ export const discordPlugin: ChannelPlugin<ResolvedDiscordAccount> = {
445455
abortSignal: ctx.abortSignal,
446456
mediaMaxMb: account.config.mediaMaxMb,
447457
historyLimit: account.config.historyLimit,
458+
setStatus: (patch) => ctx.setStatus({ accountId: account.accountId, ...patch }),
448459
});
449460
},
450461
},
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
import { createArmableStallWatchdog } from "./stall-watchdog.js";
3+
4+
describe("createArmableStallWatchdog", () => {
5+
it("fires onTimeout once when armed and idle exceeds timeout", async () => {
6+
vi.useFakeTimers();
7+
try {
8+
const onTimeout = vi.fn();
9+
const watchdog = createArmableStallWatchdog({
10+
label: "test-watchdog",
11+
timeoutMs: 1_000,
12+
checkIntervalMs: 100,
13+
onTimeout,
14+
});
15+
16+
watchdog.arm();
17+
await vi.advanceTimersByTimeAsync(1_500);
18+
19+
expect(onTimeout).toHaveBeenCalledTimes(1);
20+
expect(watchdog.isArmed()).toBe(false);
21+
watchdog.stop();
22+
} finally {
23+
vi.useRealTimers();
24+
}
25+
});
26+
27+
it("does not fire when disarmed before timeout", async () => {
28+
vi.useFakeTimers();
29+
try {
30+
const onTimeout = vi.fn();
31+
const watchdog = createArmableStallWatchdog({
32+
label: "test-watchdog",
33+
timeoutMs: 1_000,
34+
checkIntervalMs: 100,
35+
onTimeout,
36+
});
37+
38+
watchdog.arm();
39+
await vi.advanceTimersByTimeAsync(500);
40+
watchdog.disarm();
41+
await vi.advanceTimersByTimeAsync(2_000);
42+
43+
expect(onTimeout).not.toHaveBeenCalled();
44+
watchdog.stop();
45+
} finally {
46+
vi.useRealTimers();
47+
}
48+
});
49+
50+
it("extends timeout window when touched", async () => {
51+
vi.useFakeTimers();
52+
try {
53+
const onTimeout = vi.fn();
54+
const watchdog = createArmableStallWatchdog({
55+
label: "test-watchdog",
56+
timeoutMs: 1_000,
57+
checkIntervalMs: 100,
58+
onTimeout,
59+
});
60+
61+
watchdog.arm();
62+
await vi.advanceTimersByTimeAsync(700);
63+
watchdog.touch();
64+
await vi.advanceTimersByTimeAsync(700);
65+
expect(onTimeout).not.toHaveBeenCalled();
66+
67+
await vi.advanceTimersByTimeAsync(400);
68+
expect(onTimeout).toHaveBeenCalledTimes(1);
69+
watchdog.stop();
70+
} finally {
71+
vi.useRealTimers();
72+
}
73+
});
74+
});
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import type { RuntimeEnv } from "../../runtime.js";
2+
3+
export type StallWatchdogTimeoutMeta = {
4+
idleMs: number;
5+
timeoutMs: number;
6+
};
7+
8+
export type ArmableStallWatchdog = {
9+
arm: (atMs?: number) => void;
10+
touch: (atMs?: number) => void;
11+
disarm: () => void;
12+
stop: () => void;
13+
isArmed: () => boolean;
14+
};
15+
16+
export function createArmableStallWatchdog(params: {
17+
label: string;
18+
timeoutMs: number;
19+
checkIntervalMs?: number;
20+
abortSignal?: AbortSignal;
21+
runtime?: RuntimeEnv;
22+
onTimeout: (meta: StallWatchdogTimeoutMeta) => void;
23+
}): ArmableStallWatchdog {
24+
const timeoutMs = Math.max(1, Math.floor(params.timeoutMs));
25+
const checkIntervalMs = Math.max(
26+
100,
27+
Math.floor(params.checkIntervalMs ?? Math.min(5_000, Math.max(250, timeoutMs / 6))),
28+
);
29+
30+
let armed = false;
31+
let stopped = false;
32+
let lastActivityAt = Date.now();
33+
let timer: ReturnType<typeof setInterval> | null = null;
34+
35+
const clearTimer = () => {
36+
if (!timer) {
37+
return;
38+
}
39+
clearInterval(timer);
40+
timer = null;
41+
};
42+
43+
const disarm = () => {
44+
armed = false;
45+
};
46+
47+
const stop = () => {
48+
if (stopped) {
49+
return;
50+
}
51+
stopped = true;
52+
disarm();
53+
clearTimer();
54+
params.abortSignal?.removeEventListener("abort", stop);
55+
};
56+
57+
const arm = (atMs?: number) => {
58+
if (stopped) {
59+
return;
60+
}
61+
lastActivityAt = atMs ?? Date.now();
62+
armed = true;
63+
};
64+
65+
const touch = (atMs?: number) => {
66+
if (stopped) {
67+
return;
68+
}
69+
lastActivityAt = atMs ?? Date.now();
70+
};
71+
72+
const check = () => {
73+
if (!armed || stopped) {
74+
return;
75+
}
76+
const now = Date.now();
77+
const idleMs = now - lastActivityAt;
78+
if (idleMs < timeoutMs) {
79+
return;
80+
}
81+
disarm();
82+
params.runtime?.error?.(
83+
`[${params.label}] transport watchdog timeout: idle ${Math.round(idleMs / 1000)}s (limit ${Math.round(timeoutMs / 1000)}s)`,
84+
);
85+
params.onTimeout({ idleMs, timeoutMs });
86+
};
87+
88+
if (params.abortSignal?.aborted) {
89+
stop();
90+
} else {
91+
params.abortSignal?.addEventListener("abort", stop, { once: true });
92+
timer = setInterval(check, checkIntervalMs);
93+
timer.unref?.();
94+
}
95+
96+
return {
97+
arm,
98+
touch,
99+
disarm,
100+
stop,
101+
isArmed: () => armed,
102+
};
103+
}

src/discord/monitor.gateway.test.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,48 @@ describe("waitForDiscordGatewayStop", () => {
8181

8282
await expect(promise).resolves.toBeUndefined();
8383
});
84+
85+
it("rejects via registerForceStop and disconnects gateway", async () => {
86+
const emitter = new EventEmitter();
87+
const disconnect = vi.fn();
88+
const abort = new AbortController();
89+
let forceStop: ((err: unknown) => void) | undefined;
90+
91+
const promise = waitForDiscordGatewayStop({
92+
gateway: { emitter, disconnect },
93+
abortSignal: abort.signal,
94+
registerForceStop: (fn) => {
95+
forceStop = fn;
96+
},
97+
});
98+
99+
expect(forceStop).toBeDefined();
100+
101+
forceStop?.(new Error("reconnect watchdog timeout"));
102+
103+
await expect(promise).rejects.toThrow("reconnect watchdog timeout");
104+
expect(disconnect).toHaveBeenCalledTimes(1);
105+
expect(emitter.listenerCount("error")).toBe(0);
106+
});
107+
108+
it("ignores forceStop after promise already settled", async () => {
109+
const emitter = new EventEmitter();
110+
const disconnect = vi.fn();
111+
const abort = new AbortController();
112+
let forceStop: ((err: unknown) => void) | undefined;
113+
114+
const promise = waitForDiscordGatewayStop({
115+
gateway: { emitter, disconnect },
116+
abortSignal: abort.signal,
117+
registerForceStop: (fn) => {
118+
forceStop = fn;
119+
},
120+
});
121+
122+
abort.abort();
123+
await expect(promise).resolves.toBeUndefined();
124+
125+
forceStop?.(new Error("too late"));
126+
expect(disconnect).toHaveBeenCalledTimes(1);
127+
});
84128
});

src/discord/monitor.gateway.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export async function waitForDiscordGatewayStop(params: {
1414
abortSignal?: AbortSignal;
1515
onGatewayError?: (err: unknown) => void;
1616
shouldStopOnError?: (err: unknown) => boolean;
17+
registerForceStop?: (forceStop: (err: unknown) => void) => void;
1718
}): Promise<void> {
1819
const { gateway, abortSignal, onGatewayError, shouldStopOnError } = params;
1920
const emitter = gateway?.emitter;
@@ -57,6 +58,9 @@ export async function waitForDiscordGatewayStop(params: {
5758
finishReject(err);
5859
}
5960
};
61+
const onForceStop = (err: unknown) => {
62+
finishReject(err);
63+
};
6064

6165
if (abortSignal?.aborted) {
6266
onAbort();
@@ -65,5 +69,6 @@ export async function waitForDiscordGatewayStop(params: {
6569

6670
abortSignal?.addEventListener("abort", onAbort, { once: true });
6771
emitter?.on("error", onGatewayErrorEvent);
72+
params.registerForceStop?.(onForceStop);
6873
});
6974
}

src/discord/monitor/listeners.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type DiscordReactionListenerParams = {
5454
allowNameMatching: boolean;
5555
guildEntries?: Record<string, import("./allow-list.js").DiscordGuildEntryResolved>;
5656
logger: Logger;
57+
onEvent?: () => void;
5758
};
5859

5960
const DISCORD_SLOW_LISTENER_THRESHOLD_MS = 30_000;
@@ -123,11 +124,13 @@ export class DiscordMessageListener extends MessageCreateListener {
123124
constructor(
124125
private handler: DiscordMessageHandler,
125126
private logger?: Logger,
127+
private onEvent?: () => void,
126128
) {
127129
super();
128130
}
129131

130132
async handle(data: DiscordMessageEvent, client: Client) {
133+
this.onEvent?.();
131134
// Release Carbon's dispatch lane immediately, but keep our message handler
132135
// serialized to avoid unbounded parallel model/IO work on traffic bursts.
133136
this.messageQueue = this.messageQueue
@@ -157,6 +160,7 @@ export class DiscordReactionListener extends MessageReactionAddListener {
157160
}
158161

159162
async handle(data: DiscordReactionEvent, client: Client) {
163+
this.params.onEvent?.();
160164
await runDiscordReactionHandler({
161165
data,
162166
client,
@@ -174,6 +178,7 @@ export class DiscordReactionRemoveListener extends MessageReactionRemoveListener
174178
}
175179

176180
async handle(data: DiscordReactionEvent, client: Client) {
181+
this.params.onEvent?.();
177182
await runDiscordReactionHandler({
178183
data,
179184
client,

src/discord/monitor/native-command.model-picker.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ describe("Discord model picker interactions", () => {
310310
.mockResolvedValue();
311311
const dispatchSpy = vi
312312
.spyOn(dispatcherModule, "dispatchReplyWithDispatcher")
313-
.mockImplementation(() => new Promise(() => {}) as never);
313+
.mockResolvedValue({} as never);
314314
const withTimeoutSpy = vi
315315
.spyOn(timeoutModule, "withTimeout")
316316
.mockRejectedValue(new Error("timeout"));

0 commit comments

Comments
 (0)