Skip to content

Commit 0ce23dc

Browse files
authored
refactor: move iMessage channel to extensions/imessage (#45539)
1 parent 4540c6b commit 0ce23dc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+2699
-2656
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import { createAccountListHelpers } from "../../../src/channels/plugins/account-helpers.js";
2+
import type { OpenClawConfig } from "../../../src/config/config.js";
3+
import type { IMessageAccountConfig } from "../../../src/config/types.js";
4+
import { resolveAccountEntry } from "../../../src/routing/account-lookup.js";
5+
import { normalizeAccountId } from "../../../src/routing/session-key.js";
6+
7+
export type ResolvedIMessageAccount = {
8+
accountId: string;
9+
enabled: boolean;
10+
name?: string;
11+
config: IMessageAccountConfig;
12+
configured: boolean;
13+
};
14+
15+
const { listAccountIds, resolveDefaultAccountId } = createAccountListHelpers("imessage");
16+
export const listIMessageAccountIds = listAccountIds;
17+
export const resolveDefaultIMessageAccountId = resolveDefaultAccountId;
18+
19+
function resolveAccountConfig(
20+
cfg: OpenClawConfig,
21+
accountId: string,
22+
): IMessageAccountConfig | undefined {
23+
return resolveAccountEntry(cfg.channels?.imessage?.accounts, accountId);
24+
}
25+
26+
function mergeIMessageAccountConfig(cfg: OpenClawConfig, accountId: string): IMessageAccountConfig {
27+
const { accounts: _ignored, ...base } = (cfg.channels?.imessage ??
28+
{}) as IMessageAccountConfig & { accounts?: unknown };
29+
const account = resolveAccountConfig(cfg, accountId) ?? {};
30+
return { ...base, ...account };
31+
}
32+
33+
export function resolveIMessageAccount(params: {
34+
cfg: OpenClawConfig;
35+
accountId?: string | null;
36+
}): ResolvedIMessageAccount {
37+
const accountId = normalizeAccountId(params.accountId);
38+
const baseEnabled = params.cfg.channels?.imessage?.enabled !== false;
39+
const merged = mergeIMessageAccountConfig(params.cfg, accountId);
40+
const accountEnabled = merged.enabled !== false;
41+
const configured = Boolean(
42+
merged.cliPath?.trim() ||
43+
merged.dbPath?.trim() ||
44+
merged.service ||
45+
merged.region?.trim() ||
46+
(merged.allowFrom && merged.allowFrom.length > 0) ||
47+
(merged.groupAllowFrom && merged.groupAllowFrom.length > 0) ||
48+
merged.dmPolicy ||
49+
merged.groupPolicy ||
50+
typeof merged.includeAttachments === "boolean" ||
51+
(merged.attachmentRoots && merged.attachmentRoots.length > 0) ||
52+
(merged.remoteAttachmentRoots && merged.remoteAttachmentRoots.length > 0) ||
53+
typeof merged.mediaMaxMb === "number" ||
54+
typeof merged.textChunkLimit === "number" ||
55+
(merged.groups && Object.keys(merged.groups).length > 0),
56+
);
57+
return {
58+
accountId,
59+
enabled: baseEnabled && accountEnabled,
60+
name: merged.name?.trim() || undefined,
61+
config: merged,
62+
configured,
63+
};
64+
}
65+
66+
export function listEnabledIMessageAccounts(cfg: OpenClawConfig): ResolvedIMessageAccount[] {
67+
return listIMessageAccountIds(cfg)
68+
.map((accountId) => resolveIMessageAccount({ cfg, accountId }))
69+
.filter((account) => account.enabled);
70+
}

extensions/imessage/src/client.ts

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process";
2+
import { createInterface, type Interface } from "node:readline";
3+
import type { RuntimeEnv } from "../../../src/runtime.js";
4+
import { resolveUserPath } from "../../../src/utils.js";
5+
import { DEFAULT_IMESSAGE_PROBE_TIMEOUT_MS } from "./constants.js";
6+
7+
export type IMessageRpcError = {
8+
code?: number;
9+
message?: string;
10+
data?: unknown;
11+
};
12+
13+
export type IMessageRpcResponse<T> = {
14+
jsonrpc?: string;
15+
id?: string | number | null;
16+
result?: T;
17+
error?: IMessageRpcError;
18+
method?: string;
19+
params?: unknown;
20+
};
21+
22+
export type IMessageRpcNotification = {
23+
method: string;
24+
params?: unknown;
25+
};
26+
27+
export type IMessageRpcClientOptions = {
28+
cliPath?: string;
29+
dbPath?: string;
30+
runtime?: RuntimeEnv;
31+
onNotification?: (msg: IMessageRpcNotification) => void;
32+
};
33+
34+
type PendingRequest = {
35+
resolve: (value: unknown) => void;
36+
reject: (error: Error) => void;
37+
timer?: NodeJS.Timeout;
38+
};
39+
40+
function isTestEnv(): boolean {
41+
if (process.env.NODE_ENV === "test") {
42+
return true;
43+
}
44+
const vitest = process.env.VITEST?.trim().toLowerCase();
45+
return Boolean(vitest);
46+
}
47+
48+
export class IMessageRpcClient {
49+
private readonly cliPath: string;
50+
private readonly dbPath?: string;
51+
private readonly runtime?: RuntimeEnv;
52+
private readonly onNotification?: (msg: IMessageRpcNotification) => void;
53+
private readonly pending = new Map<string, PendingRequest>();
54+
private readonly closed: Promise<void>;
55+
private closedResolve: (() => void) | null = null;
56+
private child: ChildProcessWithoutNullStreams | null = null;
57+
private reader: Interface | null = null;
58+
private nextId = 1;
59+
60+
constructor(opts: IMessageRpcClientOptions = {}) {
61+
this.cliPath = opts.cliPath?.trim() || "imsg";
62+
this.dbPath = opts.dbPath?.trim() ? resolveUserPath(opts.dbPath) : undefined;
63+
this.runtime = opts.runtime;
64+
this.onNotification = opts.onNotification;
65+
this.closed = new Promise((resolve) => {
66+
this.closedResolve = resolve;
67+
});
68+
}
69+
70+
async start(): Promise<void> {
71+
if (this.child) {
72+
return;
73+
}
74+
if (isTestEnv()) {
75+
throw new Error("Refusing to start imsg rpc in test environment; mock iMessage RPC client");
76+
}
77+
const args = ["rpc"];
78+
if (this.dbPath) {
79+
args.push("--db", this.dbPath);
80+
}
81+
const child = spawn(this.cliPath, args, {
82+
stdio: ["pipe", "pipe", "pipe"],
83+
});
84+
this.child = child;
85+
this.reader = createInterface({ input: child.stdout });
86+
87+
this.reader.on("line", (line) => {
88+
const trimmed = line.trim();
89+
if (!trimmed) {
90+
return;
91+
}
92+
this.handleLine(trimmed);
93+
});
94+
95+
child.stderr?.on("data", (chunk) => {
96+
const lines = chunk.toString().split(/\r?\n/);
97+
for (const line of lines) {
98+
if (!line.trim()) {
99+
continue;
100+
}
101+
this.runtime?.error?.(`imsg rpc: ${line.trim()}`);
102+
}
103+
});
104+
105+
child.on("error", (err) => {
106+
this.failAll(err instanceof Error ? err : new Error(String(err)));
107+
this.closedResolve?.();
108+
});
109+
110+
child.on("close", (code, signal) => {
111+
if (code !== 0 && code !== null) {
112+
const reason = signal ? `signal ${signal}` : `code ${code}`;
113+
this.failAll(new Error(`imsg rpc exited (${reason})`));
114+
} else {
115+
this.failAll(new Error("imsg rpc closed"));
116+
}
117+
this.closedResolve?.();
118+
});
119+
}
120+
121+
async stop(): Promise<void> {
122+
if (!this.child) {
123+
return;
124+
}
125+
this.reader?.close();
126+
this.reader = null;
127+
this.child.stdin?.end();
128+
const child = this.child;
129+
this.child = null;
130+
131+
await Promise.race([
132+
this.closed,
133+
new Promise<void>((resolve) => {
134+
setTimeout(() => {
135+
if (!child.killed) {
136+
child.kill("SIGTERM");
137+
}
138+
resolve();
139+
}, 500);
140+
}),
141+
]);
142+
}
143+
144+
async waitForClose(): Promise<void> {
145+
await this.closed;
146+
}
147+
148+
async request<T = unknown>(
149+
method: string,
150+
params?: Record<string, unknown>,
151+
opts?: { timeoutMs?: number },
152+
): Promise<T> {
153+
if (!this.child || !this.child.stdin) {
154+
throw new Error("imsg rpc not running");
155+
}
156+
const id = this.nextId++;
157+
const payload = {
158+
jsonrpc: "2.0",
159+
id,
160+
method,
161+
params: params ?? {},
162+
};
163+
const line = `${JSON.stringify(payload)}\n`;
164+
const timeoutMs = opts?.timeoutMs ?? DEFAULT_IMESSAGE_PROBE_TIMEOUT_MS;
165+
166+
const response = new Promise<T>((resolve, reject) => {
167+
const key = String(id);
168+
const timer =
169+
timeoutMs > 0
170+
? setTimeout(() => {
171+
this.pending.delete(key);
172+
reject(new Error(`imsg rpc timeout (${method})`));
173+
}, timeoutMs)
174+
: undefined;
175+
this.pending.set(key, {
176+
resolve: (value) => resolve(value as T),
177+
reject,
178+
timer,
179+
});
180+
});
181+
182+
this.child.stdin.write(line);
183+
return await response;
184+
}
185+
186+
private handleLine(line: string) {
187+
let parsed: IMessageRpcResponse<unknown>;
188+
try {
189+
parsed = JSON.parse(line) as IMessageRpcResponse<unknown>;
190+
} catch (err) {
191+
const detail = err instanceof Error ? err.message : String(err);
192+
this.runtime?.error?.(`imsg rpc: failed to parse ${line}: ${detail}`);
193+
return;
194+
}
195+
196+
if (parsed.id !== undefined && parsed.id !== null) {
197+
const key = String(parsed.id);
198+
const pending = this.pending.get(key);
199+
if (!pending) {
200+
return;
201+
}
202+
if (pending.timer) {
203+
clearTimeout(pending.timer);
204+
}
205+
this.pending.delete(key);
206+
207+
if (parsed.error) {
208+
const baseMessage = parsed.error.message ?? "imsg rpc error";
209+
const details = parsed.error.data;
210+
const code = parsed.error.code;
211+
const suffixes = [] as string[];
212+
if (typeof code === "number") {
213+
suffixes.push(`code=${code}`);
214+
}
215+
if (details !== undefined) {
216+
const detailText =
217+
typeof details === "string" ? details : JSON.stringify(details, null, 2);
218+
if (detailText) {
219+
suffixes.push(detailText);
220+
}
221+
}
222+
const msg = suffixes.length > 0 ? `${baseMessage}: ${suffixes.join(" ")}` : baseMessage;
223+
pending.reject(new Error(msg));
224+
return;
225+
}
226+
pending.resolve(parsed.result);
227+
return;
228+
}
229+
230+
if (parsed.method) {
231+
this.onNotification?.({
232+
method: parsed.method,
233+
params: parsed.params,
234+
});
235+
}
236+
}
237+
238+
private failAll(err: Error) {
239+
for (const [key, pending] of this.pending.entries()) {
240+
if (pending.timer) {
241+
clearTimeout(pending.timer);
242+
}
243+
pending.reject(err);
244+
this.pending.delete(key);
245+
}
246+
}
247+
}
248+
249+
export async function createIMessageRpcClient(
250+
opts: IMessageRpcClientOptions = {},
251+
): Promise<IMessageRpcClient> {
252+
const client = new IMessageRpcClient(opts);
253+
await client.start();
254+
return client;
255+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/** Default timeout for iMessage probe/RPC operations (10 seconds). */
2+
export const DEFAULT_IMESSAGE_PROBE_TIMEOUT_MS = 10_000;

src/imessage/monitor.gating.test.ts renamed to extensions/imessage/src/monitor.gating.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { describe, expect, it } from "vitest";
2-
import type { OpenClawConfig } from "../config/config.js";
2+
import type { OpenClawConfig } from "../../../src/config/config.js";
33
import {
44
buildIMessageInboundContext,
55
resolveIMessageInboundDecision,

src/imessage/monitor.shutdown.unhandled-rejection.test.ts renamed to extensions/imessage/src/monitor.shutdown.unhandled-rejection.test.ts

File renamed without changes.

extensions/imessage/src/monitor.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export { monitorIMessageProvider } from "./monitor/monitor-provider.js";
2+
export type { MonitorIMessageOpts } from "./monitor/types.js";
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
export type IMessageMonitorClient = {
2+
request: (method: string, params?: Record<string, unknown>) => Promise<unknown>;
3+
stop: () => Promise<void>;
4+
};
5+
6+
export function attachIMessageMonitorAbortHandler(params: {
7+
abortSignal?: AbortSignal;
8+
client: IMessageMonitorClient;
9+
getSubscriptionId: () => number | null;
10+
}): () => void {
11+
const abort = params.abortSignal;
12+
if (!abort) {
13+
return () => {};
14+
}
15+
16+
const onAbort = () => {
17+
const subscriptionId = params.getSubscriptionId();
18+
if (subscriptionId) {
19+
void params.client
20+
.request("watch.unsubscribe", {
21+
subscription: subscriptionId,
22+
})
23+
.catch(() => {
24+
// Ignore disconnect errors during shutdown.
25+
});
26+
}
27+
void params.client.stop().catch(() => {
28+
// Ignore disconnect errors during shutdown.
29+
});
30+
};
31+
32+
abort.addEventListener("abort", onAbort, { once: true });
33+
return () => abort.removeEventListener("abort", onAbort);
34+
}

0 commit comments

Comments
 (0)