Skip to content

Commit abd948f

Browse files
committed
fix(whatsapp): preserve watchdog message age across reconnects
1 parent 3e10d4c commit abd948f

File tree

3 files changed

+93
-3
lines changed

3 files changed

+93
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ Docs: https://docs.openclaw.ai
250250
- Discord/ACP: forward worker abort signals into ACP turns so timed-out Discord jobs cancel the running turn instead of silently leaving the bound ACP session working in the background.
251251
- Gateway/openresponses: preserve assistant commentary and session continuity across hosted-tool `/v1/responses` turns, and emit streamed tool-call payloads before finalization so client tool loops stay resumable. (#52171) Thanks @CharZhou.
252252
- Android/Talk: serialize `TalkModeManager` player teardown so rapid interrupt/restart cycles stop double-releasing or overlapping TTS playback. (#52310) Thanks @Kaneki-x.
253+
- WhatsApp/reconnect: preserve the last inbound timestamp across reconnect attempts so the watchdog can still recycle linked-but-dead listeners after a restart instead of leaving them stuck connected forever.
253254

254255
### Breaking
255256

extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ describe("web auto-reply connection", () => {
184184
if (!completedQuickly) {
185185
await vi.waitFor(
186186
() => {
187-
expect(listenerFactory).toHaveBeenCalledTimes(2);
187+
expect(listenerFactory.mock.calls.length).toBeGreaterThanOrEqual(2);
188188
},
189189
{ timeout: 250, interval: 2 },
190190
);
@@ -264,7 +264,7 @@ describe("web auto-reply connection", () => {
264264
await Promise.resolve();
265265
await vi.waitFor(
266266
() => {
267-
expect(listenerFactory).toHaveBeenCalledTimes(2);
267+
expect(listenerFactory.mock.calls.length).toBeGreaterThanOrEqual(2);
268268
},
269269
{ timeout: 250, interval: 2 },
270270
);
@@ -278,6 +278,92 @@ describe("web auto-reply connection", () => {
278278
}
279279
});
280280

281+
it("keeps watchdog message age across reconnects", async () => {
282+
vi.useFakeTimers();
283+
try {
284+
const sleep = vi.fn(async () => {});
285+
const closeResolvers: Array<(reason: unknown) => void> = [];
286+
let capturedOnMessage:
287+
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
288+
| undefined;
289+
const listenerFactory = vi.fn(
290+
async (opts: {
291+
onMessage: (msg: import("./inbound.js").WebInboundMessage) => Promise<void>;
292+
}) => {
293+
capturedOnMessage = opts.onMessage;
294+
let resolveClose: (reason: unknown) => void = () => {};
295+
const onClose = new Promise<unknown>((res) => {
296+
resolveClose = res;
297+
closeResolvers.push(res);
298+
});
299+
return {
300+
close: vi.fn(),
301+
onClose,
302+
signalClose: (reason?: unknown) => resolveClose(reason),
303+
};
304+
},
305+
);
306+
const { controller, run } = startMonitorWebChannel({
307+
monitorWebChannelFn: monitorWebChannel as never,
308+
listenerFactory,
309+
sleep,
310+
heartbeatSeconds: 60,
311+
messageTimeoutMs: 30,
312+
watchdogCheckMs: 5,
313+
});
314+
315+
await Promise.resolve();
316+
expect(listenerFactory).toHaveBeenCalledTimes(1);
317+
await vi.waitFor(
318+
() => {
319+
expect(capturedOnMessage).toBeTypeOf("function");
320+
},
321+
{ timeout: 250, interval: 2 },
322+
);
323+
324+
const reply = vi.fn().mockResolvedValue(undefined);
325+
const sendComposing = vi.fn();
326+
const sendMedia = vi.fn();
327+
328+
void capturedOnMessage?.(
329+
makeInboundMessage({
330+
body: "hi",
331+
from: "+1",
332+
to: "+2",
333+
id: "m1",
334+
sendComposing,
335+
reply,
336+
sendMedia,
337+
}),
338+
);
339+
await Promise.resolve();
340+
341+
closeResolvers.shift()?.({ status: 499, isLoggedOut: false, error: "first-close" });
342+
await vi.waitFor(
343+
() => {
344+
expect(listenerFactory).toHaveBeenCalledTimes(2);
345+
},
346+
{ timeout: 250, interval: 2 },
347+
);
348+
349+
await vi.advanceTimersByTimeAsync(200);
350+
await Promise.resolve();
351+
await vi.waitFor(
352+
() => {
353+
expect(listenerFactory.mock.calls.length).toBeGreaterThanOrEqual(3);
354+
},
355+
{ timeout: 250, interval: 2 },
356+
);
357+
358+
controller.abort();
359+
closeResolvers.at(-1)?.({ status: 499, isLoggedOut: false, error: "aborted" });
360+
await Promise.resolve();
361+
await run;
362+
} finally {
363+
vi.useRealTimers();
364+
}
365+
});
366+
281367
it("processes inbound messages without batching and preserves timestamps", async () => {
282368
await withEnvAsync({ TZ: "Europe/Vienna" }, async () => {
283369
const originalMax = process.getMaxListeners();

extensions/whatsapp/src/auto-reply/monitor.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,10 @@ export async function monitorWebChannel(
151151
const startedAt = Date.now();
152152
let heartbeat: NodeJS.Timeout | null = null;
153153
let watchdogTimer: NodeJS.Timeout | null = null;
154-
let lastMessageAt: number | null = null;
154+
// Preserve the last known inbound timestamp across reconnects so the watchdog
155+
// can still detect a listener that comes back "connected" but never receives
156+
// another message after a restart cycle.
157+
let lastMessageAt: number | null = status.lastMessageAt ?? null;
155158
let handledMessages = 0;
156159
let _lastInboundMsg: WebInboundMsg | null = null;
157160
let unregisterUnhandled: (() => void) | null = null;

0 commit comments

Comments
 (0)