Skip to content

Commit 35a784c

Browse files
authored
fix(imessage): retry watch.subscribe startup failures (#65482)
* fix(imessage): retry watch.subscribe startup failures * fix(imessage): sanitize watch error logging
1 parent 0fd9aa8 commit 35a784c

5 files changed

Lines changed: 298 additions & 25 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ Docs: https://docs.openclaw.ai
5252
- Memory/QMD: stop registering the legacy lowercase root memory file as a separate default collection, so QMD now prefers `MEMORY.md` and the `memory/` tree without duplicate collection-add warnings.
5353
- Memory/memory-core: watch the `memory` directory directly and ignore non-markdown churn so nested note changes still sync on macOS + Node 25 environments where recursive `memory/**/*.md` glob watching fails. (#64711) Thanks @jasonxargs-boop and @vincentkoc.
5454
- WhatsApp: centralize per-account connection ownership so reconnects, login recovery, and outbound readiness stay attached to the live socket instead of drifting across monitor and login paths. (#65290) Thanks @mcaxtr and @vincentkoc.
55+
- iMessage: retry transient `watch.subscribe` startup failures before tearing down the monitor, and sanitize startup error logging so brief local transport stalls do not immediately bounce the channel or leak raw imsg RPC payloads into logs. (#65393) Thanks @vincentkoc.
5556

5657
## 2026.4.11
5758

@@ -227,6 +228,7 @@ Docs: https://docs.openclaw.ai
227228
- Agents/locks: unregister the session write-lock `exit` cleanup handler during teardown so repeated lock lifecycle resets stop stacking process listeners in long-running gateway processes. (#65391) Thanks @adminfedres and @vincentkoc.
228229
- CLI/Claude: rename the trusted inbound metadata schema to `openclaw.inbound_meta.v2` so Claude CLI no longer trips Anthropic's blocked `openclaw.inbound_meta.v1` filter on channel-originated turns. (#65399) Thanks @SzyMig and @vincentkoc.
229230
- Agents/inbound metadata: strip NUL bytes from serialized inbound context blocks before they reach backend spawn args, so malformed message metadata cannot crash agent spawn with `ERR_INVALID_ARG_VALUE`. (#65389) Thanks @adminfedres and @vincentkoc.
231+
- iMessage: retry transient `watch.subscribe` startup failures before tearing down the monitor, so brief local transport stalls do not immediately bounce the channel. (#65393) Thanks @vincentkoc.
230232

231233
## 2026.4.9
232234

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
2+
import { monitorIMessageProvider } from "./monitor.js";
3+
4+
const waitForTransportReadyMock = vi.hoisted(() => vi.fn(async () => {}));
5+
const createIMessageRpcClientMock = vi.hoisted(() => vi.fn());
6+
const attachIMessageMonitorAbortHandlerMock = vi.hoisted(() => vi.fn(() => () => {}));
7+
8+
vi.mock("openclaw/plugin-sdk/infra-runtime", () => ({
9+
waitForTransportReady: (...args: unknown[]) => waitForTransportReadyMock(...args),
10+
}));
11+
12+
vi.mock("./client.js", () => ({
13+
createIMessageRpcClient: (...args: unknown[]) => createIMessageRpcClientMock(...args),
14+
}));
15+
16+
vi.mock("./monitor/abort-handler.js", () => ({
17+
attachIMessageMonitorAbortHandler: (...args: unknown[]) =>
18+
attachIMessageMonitorAbortHandlerMock(...args),
19+
}));
20+
21+
function createRuntime() {
22+
return {
23+
log: vi.fn(),
24+
error: vi.fn(),
25+
};
26+
}
27+
28+
function createRpcClient(overrides?: {
29+
request?: (method: string) => Promise<unknown>;
30+
waitForClose?: () => Promise<void>;
31+
}) {
32+
return {
33+
request: vi.fn(
34+
overrides?.request ??
35+
(async () => {
36+
return { subscription: 1 };
37+
}),
38+
),
39+
waitForClose: vi.fn(
40+
overrides?.waitForClose ??
41+
(async () => {
42+
return undefined;
43+
}),
44+
),
45+
stop: vi.fn(async () => {}),
46+
};
47+
}
48+
49+
describe("monitorIMessageProvider watch.subscribe startup retry", () => {
50+
beforeEach(() => {
51+
vi.useFakeTimers();
52+
waitForTransportReadyMock.mockReset().mockResolvedValue(undefined);
53+
createIMessageRpcClientMock.mockReset();
54+
attachIMessageMonitorAbortHandlerMock.mockReset().mockReturnValue(() => {});
55+
});
56+
57+
afterEach(() => {
58+
vi.useRealTimers();
59+
});
60+
61+
it("retries a transient watch.subscribe startup timeout without tearing down the monitor", async () => {
62+
const runtime = createRuntime();
63+
const firstClient = createRpcClient({
64+
request: async () => {
65+
throw new Error("imsg rpc timeout (watch.subscribe)");
66+
},
67+
});
68+
const secondClient = createRpcClient();
69+
70+
createIMessageRpcClientMock
71+
.mockResolvedValueOnce(firstClient)
72+
.mockResolvedValueOnce(secondClient);
73+
74+
const monitorPromise = monitorIMessageProvider({
75+
config: { channels: { imessage: {} } } as never,
76+
runtime: runtime as never,
77+
});
78+
79+
await vi.runAllTimersAsync();
80+
await monitorPromise;
81+
82+
expect(createIMessageRpcClientMock).toHaveBeenCalledTimes(2);
83+
expect(firstClient.stop).toHaveBeenCalledTimes(1);
84+
expect(secondClient.waitForClose).toHaveBeenCalledTimes(1);
85+
expect(secondClient.stop).toHaveBeenCalledTimes(1);
86+
expect(runtime.log).toHaveBeenCalledWith(
87+
expect.stringContaining("watch.subscribe startup failed"),
88+
);
89+
expect(runtime.error).not.toHaveBeenCalledWith(
90+
expect.stringContaining("imessage: monitor failed"),
91+
);
92+
});
93+
94+
it("still fails after bounded startup retries are exhausted", async () => {
95+
const runtime = createRuntime();
96+
createIMessageRpcClientMock.mockImplementation(async () =>
97+
createRpcClient({
98+
request: async () => {
99+
throw new Error("imsg rpc timeout (watch.subscribe)");
100+
},
101+
}),
102+
);
103+
104+
const monitorErrorPromise = monitorIMessageProvider({
105+
config: { channels: { imessage: {} } } as never,
106+
runtime: runtime as never,
107+
}).catch((error) => error);
108+
109+
await vi.runAllTimersAsync();
110+
const monitorError = await monitorErrorPromise;
111+
112+
expect(monitorError).toBeInstanceOf(Error);
113+
expect((monitorError as Error).message).toContain("imsg rpc timeout (watch.subscribe)");
114+
expect(createIMessageRpcClientMock).toHaveBeenCalledTimes(3);
115+
expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("imessage: monitor failed"));
116+
});
117+
});

extensions/imessage/src/monitor/monitor-provider.ts

Lines changed: 111 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import { danger, logVerbose, shouldLogVerbose, warn } from "openclaw/plugin-sdk/
3333
import { resolvePinnedMainDmOwnerFromAllowlist } from "openclaw/plugin-sdk/security-runtime";
3434
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-runtime";
3535
import { resolveIMessageAccount } from "../accounts.js";
36-
import { createIMessageRpcClient } from "../client.js";
36+
import { createIMessageRpcClient, type IMessageRpcClient } from "../client.js";
3737
import { DEFAULT_IMESSAGE_PROBE_TIMEOUT_MS } from "../constants.js";
3838
import {
3939
resolveIMessageAttachmentRoots,
@@ -54,6 +54,10 @@ import { parseIMessageNotification } from "./parse-notification.js";
5454
import { normalizeAllowList, resolveRuntime } from "./runtime.js";
5555
import { createSelfChatCache } from "./self-chat-cache.js";
5656
import type { IMessagePayload, MonitorIMessageOpts } from "./types.js";
57+
import { sanitizeIMessageWatchErrorPayload } from "./watch-error-log.js";
58+
59+
const WATCH_SUBSCRIBE_MAX_ATTEMPTS = 3;
60+
const WATCH_SUBSCRIBE_RETRY_DELAY_MS = 1_000;
5761

5862
/**
5963
* Try to detect remote host from an SSH wrapper script like:
@@ -83,6 +87,33 @@ async function detectRemoteHostFromCliPath(cliPath: string): Promise<string | un
8387
}
8488
}
8589

90+
function isRetriableWatchSubscribeStartupError(error: unknown): boolean {
91+
return /imsg rpc timeout \(watch\.subscribe\)|imsg rpc (closed|exited|not running)/i.test(
92+
String(error),
93+
);
94+
}
95+
96+
async function waitForWatchSubscribeRetryDelay(params: {
97+
ms: number;
98+
abortSignal?: AbortSignal;
99+
}): Promise<void> {
100+
if (params.ms <= 0) {
101+
return;
102+
}
103+
await new Promise<void>((resolve) => {
104+
const timer = setTimeout(() => {
105+
params.abortSignal?.removeEventListener("abort", onAbort);
106+
resolve();
107+
}, params.ms);
108+
const onAbort = () => {
109+
clearTimeout(timer);
110+
params.abortSignal?.removeEventListener("abort", onAbort);
111+
resolve();
112+
};
113+
params.abortSignal?.addEventListener("abort", onAbort, { once: true });
114+
});
115+
}
116+
86117
export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): Promise<void> {
87118
const runtime = resolveRuntime(opts);
88119
const cfg = opts.config ?? loadConfig();
@@ -489,35 +520,90 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
489520
if (opts.abortSignal?.aborted) {
490521
return;
491522
}
523+
const abort = opts.abortSignal;
524+
const createWatchClient = async () =>
525+
await createIMessageRpcClient({
526+
cliPath,
527+
dbPath,
528+
runtime,
529+
onNotification: (msg) => {
530+
if (msg.method === "message") {
531+
void handleMessage(msg.params).catch((err) => {
532+
runtime.error?.(`imessage: handler failed: ${String(err)}`);
533+
});
534+
} else if (msg.method === "error") {
535+
runtime.error?.(
536+
`imessage: watch error ${JSON.stringify(sanitizeIMessageWatchErrorPayload(msg.params))}`,
537+
);
538+
}
539+
},
540+
});
492541

493-
const client = await createIMessageRpcClient({
494-
cliPath,
495-
dbPath,
496-
runtime,
497-
onNotification: (msg) => {
498-
if (msg.method === "message") {
499-
void handleMessage(msg.params).catch((err) => {
500-
runtime.error?.(`imessage: handler failed: ${String(err)}`);
501-
});
502-
} else if (msg.method === "error") {
503-
runtime.error?.(`imessage: watch error ${JSON.stringify(msg.params)}`);
542+
let client: IMessageRpcClient | null = null;
543+
let detachAbortHandler = () => {};
544+
545+
for (let attempt = 1; attempt <= WATCH_SUBSCRIBE_MAX_ATTEMPTS; attempt++) {
546+
if (abort?.aborted) {
547+
return;
548+
}
549+
let attemptClient: IMessageRpcClient | null = null;
550+
let attemptDetachAbortHandler = () => {};
551+
let keepAttemptClient = false;
552+
try {
553+
attemptClient = await createWatchClient();
554+
let attemptSubscriptionId: number | null = null;
555+
attemptDetachAbortHandler = attachIMessageMonitorAbortHandler({
556+
abortSignal: abort,
557+
client: attemptClient,
558+
getSubscriptionId: () => attemptSubscriptionId,
559+
});
560+
const result = await attemptClient.request<{ subscription?: number }>(
561+
"watch.subscribe",
562+
{
563+
attachments: includeAttachments,
564+
},
565+
{ timeoutMs: probeTimeoutMs },
566+
);
567+
attemptSubscriptionId = result?.subscription ?? null;
568+
client = attemptClient;
569+
detachAbortHandler = attemptDetachAbortHandler;
570+
keepAttemptClient = true;
571+
break;
572+
} catch (err) {
573+
if (abort?.aborted) {
574+
return;
504575
}
505-
},
506-
});
576+
const shouldRetry =
577+
attempt < WATCH_SUBSCRIBE_MAX_ATTEMPTS && isRetriableWatchSubscribeStartupError(err);
578+
if (!shouldRetry) {
579+
runtime.error?.(danger(`imessage: monitor failed: ${String(err)}`));
580+
throw err;
581+
}
582+
runtime.log?.(
583+
warn(
584+
`imessage: watch.subscribe startup failed (attempt ${attempt}/${WATCH_SUBSCRIBE_MAX_ATTEMPTS}): ${String(err)}; retrying`,
585+
),
586+
);
587+
await waitForWatchSubscribeRetryDelay({
588+
ms: WATCH_SUBSCRIBE_RETRY_DELAY_MS,
589+
abortSignal: abort,
590+
});
591+
if (abort?.aborted) {
592+
return;
593+
}
594+
} finally {
595+
if (!keepAttemptClient) {
596+
attemptDetachAbortHandler();
597+
await attemptClient?.stop();
598+
}
599+
}
600+
}
507601

508-
let subscriptionId: number | null = null;
509-
const abort = opts.abortSignal;
510-
const detachAbortHandler = attachIMessageMonitorAbortHandler({
511-
abortSignal: abort,
512-
client,
513-
getSubscriptionId: () => subscriptionId,
514-
});
602+
if (!client) {
603+
return;
604+
}
515605

516606
try {
517-
const result = await client.request<{ subscription?: number }>("watch.subscribe", {
518-
attachments: includeAttachments,
519-
});
520-
subscriptionId = result?.subscription ?? null;
521607
await client.waitForClose();
522608
} catch (err) {
523609
if (abort?.aborted) {
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import { describe, expect, it } from "vitest";
2+
import { sanitizeIMessageWatchErrorPayload } from "./watch-error-log.js";
3+
4+
describe("sanitizeIMessageWatchErrorPayload", () => {
5+
it("keeps only code and a sanitized truncated message", () => {
6+
expect(
7+
sanitizeIMessageWatchErrorPayload({
8+
code: 500,
9+
message: `boom\n\t\u001b[2K${"x".repeat(250)}`,
10+
chatId: "chat-123",
11+
participants: ["+15555550123"],
12+
path: "/Users/me/Library/Messages/chat.db",
13+
}),
14+
).toEqual({
15+
code: 500,
16+
message: `boom\\n\\t${"x".repeat(191)}…`,
17+
});
18+
});
19+
20+
it("drops non-object payloads and unsupported fields", () => {
21+
expect(sanitizeIMessageWatchErrorPayload("boom")).toEqual({});
22+
expect(
23+
sanitizeIMessageWatchErrorPayload({
24+
code: Number.POSITIVE_INFINITY,
25+
message: 123,
26+
data: { sender: "+15555550123" },
27+
}),
28+
).toEqual({});
29+
});
30+
});
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import {
2+
isRecord,
3+
sanitizeTerminalText,
4+
truncateUtf16Safe,
5+
} from "openclaw/plugin-sdk/text-runtime";
6+
7+
const MAX_WATCH_ERROR_MESSAGE_CHARS = 200;
8+
9+
export type SanitizedIMessageWatchErrorPayload = {
10+
code?: number;
11+
message?: string;
12+
};
13+
14+
export function sanitizeIMessageWatchErrorPayload(
15+
payload: unknown,
16+
): SanitizedIMessageWatchErrorPayload {
17+
if (!isRecord(payload)) {
18+
return {};
19+
}
20+
21+
const safe: SanitizedIMessageWatchErrorPayload = {};
22+
23+
if (typeof payload.code === "number" && Number.isFinite(payload.code)) {
24+
safe.code = payload.code;
25+
}
26+
27+
if (typeof payload.message === "string") {
28+
const sanitizedMessage = sanitizeTerminalText(payload.message);
29+
if (sanitizedMessage) {
30+
safe.message =
31+
sanitizedMessage.length > MAX_WATCH_ERROR_MESSAGE_CHARS
32+
? `${truncateUtf16Safe(sanitizedMessage, MAX_WATCH_ERROR_MESSAGE_CHARS - 1)}…`
33+
: sanitizedMessage;
34+
}
35+
}
36+
37+
return safe;
38+
}

0 commit comments

Comments
 (0)