Skip to content

Commit ea71a59

Browse files
committed
fix(imessage): repair monitor retry type checks
1 parent e4841d7 commit ea71a59

2 files changed

Lines changed: 29 additions & 11 deletions

File tree

extensions/imessage/src/monitor.watch-subscribe-retry.test.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,27 @@
1+
import type { waitForTransportReady } from "openclaw/plugin-sdk/infra-runtime";
12
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
3+
import type { createIMessageRpcClient } from "./client.js";
24
import { monitorIMessageProvider } from "./monitor.js";
5+
import type { attachIMessageMonitorAbortHandler } from "./monitor/abort-handler.js";
36

47
const waitForTransportReadyMock = vi.hoisted(() =>
5-
vi.fn<(...args: unknown[]) => Promise<void>>(async () => {}),
8+
vi.fn<typeof waitForTransportReady>(async () => {}),
69
);
7-
const createIMessageRpcClientMock = vi.hoisted(() => vi.fn<(...args: unknown[]) => unknown>());
10+
const createIMessageRpcClientMock = vi.hoisted(() => vi.fn<typeof createIMessageRpcClient>());
811
const attachIMessageMonitorAbortHandlerMock = vi.hoisted(() =>
9-
vi.fn<(...args: unknown[]) => () => void>(() => () => {}),
12+
vi.fn<typeof attachIMessageMonitorAbortHandler>(() => () => {}),
1013
);
1114

1215
vi.mock("openclaw/plugin-sdk/infra-runtime", () => ({
13-
waitForTransportReady: (...args: unknown[]) => waitForTransportReadyMock(...args),
16+
waitForTransportReady: waitForTransportReadyMock,
1417
}));
1518

1619
vi.mock("./client.js", () => ({
17-
createIMessageRpcClient: (...args: unknown[]) => createIMessageRpcClientMock(...args),
20+
createIMessageRpcClient: createIMessageRpcClientMock,
1821
}));
1922

2023
vi.mock("./monitor/abort-handler.js", () => ({
21-
attachIMessageMonitorAbortHandler: (...args: unknown[]) =>
22-
attachIMessageMonitorAbortHandlerMock(...args),
24+
attachIMessageMonitorAbortHandler: attachIMessageMonitorAbortHandlerMock,
2325
}));
2426

2527
function createRuntime() {

extensions/imessage/src/monitor/monitor-provider.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,15 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
548548
},
549549
});
550550

551+
const requireWatchClient = (
552+
watchClient: IMessageRpcClient | null | undefined,
553+
): IMessageRpcClient => {
554+
if (!watchClient) {
555+
throw new Error("imessage monitor client not initialized");
556+
}
557+
return watchClient;
558+
};
559+
551560
for (let attempt = 1; attempt <= WATCH_SUBSCRIBE_MAX_ATTEMPTS; attempt++) {
552561
if (abort?.aborted) {
553562
return;
@@ -556,7 +565,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
556565
let attemptDetachAbortHandler = () => {};
557566
let keepAttemptClient = false;
558567
try {
559-
attemptClient = await createWatchClient();
568+
attemptClient = requireWatchClient(await createWatchClient());
560569
let attemptSubscriptionId: number | null = null;
561570
attemptDetachAbortHandler = attachIMessageMonitorAbortHandler({
562571
abortSignal: abort,
@@ -590,6 +599,12 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
590599
`imessage: watch.subscribe startup failed (attempt ${attempt}/${WATCH_SUBSCRIBE_MAX_ATTEMPTS}): ${String(err)}; retrying`,
591600
),
592601
);
602+
// Tear down the failed client before waiting so a slow subscribe attempt
603+
// cannot keep emitting notifications into the next retry window.
604+
attemptDetachAbortHandler();
605+
attemptDetachAbortHandler = () => {};
606+
await attemptClient?.stop();
607+
attemptClient = undefined;
593608
await waitForWatchSubscribeRetryDelay({
594609
ms: WATCH_SUBSCRIBE_RETRY_DELAY_MS,
595610
abortSignal: abort,
@@ -605,12 +620,13 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
605620
}
606621
}
607622

608-
if (!client) {
623+
const activeClient = client;
624+
if (!activeClient) {
609625
return;
610626
}
611627

612628
try {
613-
await client.waitForClose();
629+
await activeClient.waitForClose();
614630
} catch (err) {
615631
if (abort?.aborted) {
616632
return;
@@ -619,7 +635,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
619635
throw err;
620636
} finally {
621637
detachAbortHandler();
622-
await client.stop();
638+
await activeClient.stop();
623639
}
624640
}
625641

0 commit comments

Comments
 (0)