Skip to content

Commit 880aff8

Browse files
committed
fix: harden ACP gateway startup sequencing (#23390) (thanks @janckerchen)
1 parent ea9d658 commit 880aff8

File tree

3 files changed

+189
-12
lines changed

3 files changed

+189
-12
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai
2020

2121
- Gateway/Restart: fix restart-loop edge cases by keeping `openclaw.mjs -> dist/entry.js` bootstrap detection explicit, reacquiring the gateway lock for in-process restart fallback paths, and tightening restart-loop regression coverage. (#23416) Thanks @jeffwnli.
2222
- Channels/Dedupe: centralize plugin dedupe primitives in plugin SDK (memory + persistent), move Feishu inbound dedupe to a namespace-scoped persistent store, and reuse shared dedupe cache logic for Zalo webhook replay + Tlon processed-message tracking to reduce duplicate handling during reconnect/replay paths.
23+
- ACP/Gateway: wait for gateway hello before opening ACP requests, and fail fast on pre-hello connect failures to avoid startup hangs and early `gateway not connected` request races. (#23390) Thanks @janckerchen.
2324
- Security/Audit: add `openclaw security audit` detection for open group policies that expose runtime/filesystem tools without sandbox/workspace guards (`security.exposure.open_groups_with_runtime_or_fs`).
2425
- Security/Exec env: block request-scoped `HOME` and `ZDOTDIR` overrides in host exec env sanitizers (Node + macOS), preventing shell startup-file execution before allowlist-evaluated command bodies. This ships in the next npm release. Thanks @tdjackey for reporting.
2526
- Security/Gateway: emit a startup security warning when insecure/dangerous config flags are enabled (including `gateway.controlUi.dangerouslyDisableDeviceAuth=true`) and point operators to `openclaw security audit`.

src/acp/server.startup.test.ts

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
2+
3+
type GatewayClientCallbacks = {
4+
onHelloOk?: () => void;
5+
onConnectError?: (err: Error) => void;
6+
onClose?: (code: number, reason: string) => void;
7+
};
8+
9+
const mockState = {
10+
gateways: [] as MockGatewayClient[],
11+
agentSideConnectionCtor: vi.fn(),
12+
agentStart: vi.fn(),
13+
};
14+
15+
class MockGatewayClient {
16+
private callbacks: GatewayClientCallbacks;
17+
18+
constructor(opts: GatewayClientCallbacks) {
19+
this.callbacks = opts;
20+
mockState.gateways.push(this);
21+
}
22+
23+
start(): void {}
24+
25+
stop(): void {
26+
this.callbacks.onClose?.(1000, "gateway stopped");
27+
}
28+
29+
emitHello(): void {
30+
this.callbacks.onHelloOk?.();
31+
}
32+
33+
emitConnectError(message: string): void {
34+
this.callbacks.onConnectError?.(new Error(message));
35+
}
36+
}
37+
38+
vi.mock("@agentclientprotocol/sdk", () => ({
39+
AgentSideConnection: class {
40+
constructor(factory: (conn: unknown) => unknown, stream: unknown) {
41+
mockState.agentSideConnectionCtor(factory, stream);
42+
factory({});
43+
}
44+
},
45+
ndJsonStream: vi.fn(() => ({ type: "mock-stream" })),
46+
}));
47+
48+
vi.mock("../config/config.js", () => ({
49+
loadConfig: () => ({
50+
gateway: {
51+
mode: "local",
52+
},
53+
}),
54+
}));
55+
56+
vi.mock("../gateway/auth.js", () => ({
57+
resolveGatewayAuth: () => ({}),
58+
}));
59+
60+
vi.mock("../gateway/call.js", () => ({
61+
buildGatewayConnectionDetails: () => ({
62+
url: "ws://127.0.0.1:18789",
63+
}),
64+
}));
65+
66+
vi.mock("../gateway/client.js", () => ({
67+
GatewayClient: MockGatewayClient,
68+
}));
69+
70+
vi.mock("./translator.js", () => ({
71+
AcpGatewayAgent: class {
72+
start(): void {
73+
mockState.agentStart();
74+
}
75+
76+
handleGatewayReconnect(): void {}
77+
78+
handleGatewayDisconnect(): void {}
79+
80+
async handleGatewayEvent(): Promise<void> {}
81+
},
82+
}));
83+
84+
describe("serveAcpGateway startup", () => {
85+
let serveAcpGateway: typeof import("./server.js").serveAcpGateway;
86+
87+
beforeAll(async () => {
88+
({ serveAcpGateway } = await import("./server.js"));
89+
});
90+
91+
beforeEach(() => {
92+
mockState.gateways.length = 0;
93+
mockState.agentSideConnectionCtor.mockReset();
94+
mockState.agentStart.mockReset();
95+
});
96+
97+
it("waits for gateway hello before creating AgentSideConnection", async () => {
98+
const signalHandlers = new Map<NodeJS.Signals, () => void>();
99+
const onceSpy = vi.spyOn(process, "once").mockImplementation(((
100+
signal: NodeJS.Signals,
101+
handler: () => void,
102+
) => {
103+
signalHandlers.set(signal, handler);
104+
return process;
105+
}) as typeof process.once);
106+
107+
try {
108+
const servePromise = serveAcpGateway({});
109+
await Promise.resolve();
110+
111+
expect(mockState.agentSideConnectionCtor).not.toHaveBeenCalled();
112+
const gateway = mockState.gateways[0];
113+
if (!gateway) {
114+
throw new Error("Expected mocked gateway instance");
115+
}
116+
117+
gateway.emitHello();
118+
await vi.waitFor(() => {
119+
expect(mockState.agentSideConnectionCtor).toHaveBeenCalledTimes(1);
120+
});
121+
122+
signalHandlers.get("SIGINT")?.();
123+
await servePromise;
124+
} finally {
125+
onceSpy.mockRestore();
126+
}
127+
});
128+
129+
it("rejects startup when gateway connect fails before hello", async () => {
130+
const onceSpy = vi
131+
.spyOn(process, "once")
132+
.mockImplementation(
133+
((_signal: NodeJS.Signals, _handler: () => void) => process) as typeof process.once,
134+
);
135+
136+
try {
137+
const servePromise = serveAcpGateway({});
138+
await Promise.resolve();
139+
140+
const gateway = mockState.gateways[0];
141+
if (!gateway) {
142+
throw new Error("Expected mocked gateway instance");
143+
}
144+
145+
gateway.emitConnectError("connect failed");
146+
await expect(servePromise).rejects.toThrow("connect failed");
147+
expect(mockState.agentSideConnectionCtor).not.toHaveBeenCalled();
148+
} finally {
149+
onceSpy.mockRestore();
150+
}
151+
});
152+
});

src/acp/server.ts

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,27 @@ export async function serveAcpGateway(opts: AcpServerOptions = {}): Promise<void
4040
onClosed = resolve;
4141
});
4242
let stopped = false;
43+
let onGatewayReadyResolve!: () => void;
44+
let onGatewayReadyReject!: (err: Error) => void;
45+
let gatewayReadySettled = false;
46+
const gatewayReady = new Promise<void>((resolve, reject) => {
47+
onGatewayReadyResolve = resolve;
48+
onGatewayReadyReject = reject;
49+
});
50+
const resolveGatewayReady = () => {
51+
if (gatewayReadySettled) {
52+
return;
53+
}
54+
gatewayReadySettled = true;
55+
onGatewayReadyResolve();
56+
};
57+
const rejectGatewayReady = (err: unknown) => {
58+
if (gatewayReadySettled) {
59+
return;
60+
}
61+
gatewayReadySettled = true;
62+
onGatewayReadyReject(err instanceof Error ? err : new Error(String(err)));
63+
};
4364

4465
const gateway = new GatewayClient({
4566
url: connection.url,
@@ -53,9 +74,16 @@ export async function serveAcpGateway(opts: AcpServerOptions = {}): Promise<void
5374
void agent?.handleGatewayEvent(evt);
5475
},
5576
onHelloOk: () => {
77+
resolveGatewayReady();
5678
agent?.handleGatewayReconnect();
5779
},
80+
onConnectError: (err) => {
81+
rejectGatewayReady(err);
82+
},
5883
onClose: (code, reason) => {
84+
if (!stopped) {
85+
rejectGatewayReady(new Error(`gateway closed before ready (${code}): ${reason}`));
86+
}
5987
agent?.handleGatewayDisconnect(`${code}: ${reason}`);
6088
// Resolve only on intentional shutdown (gateway.stop() sets closed
6189
// which skips scheduleReconnect, then fires onClose). Transient
@@ -71,6 +99,7 @@ export async function serveAcpGateway(opts: AcpServerOptions = {}): Promise<void
7199
return;
72100
}
73101
stopped = true;
102+
resolveGatewayReady();
74103
gateway.stop();
75104
// If no WebSocket is active (e.g. between reconnect attempts),
76105
// gateway.stop() won't trigger onClose, so resolve directly.
@@ -80,20 +109,15 @@ export async function serveAcpGateway(opts: AcpServerOptions = {}): Promise<void
80109
process.once("SIGINT", shutdown);
81110
process.once("SIGTERM", shutdown);
82111

83-
// Start gateway first and wait for connection before processing ACP messages
112+
// Start gateway first and wait for hello before accepting ACP requests.
84113
gateway.start();
85-
86-
// Use a promise to wait for hello (connection established)
87-
const helloReceived = new Promise<void>((resolve) => {
88-
const originalOnHelloOk = gateway.opts.onHelloOk;
89-
gateway.opts.onHelloOk = (hello) => {
90-
originalOnHelloOk?.(hello);
91-
resolve();
92-
};
114+
await gatewayReady.catch((err) => {
115+
shutdown();
116+
throw err;
93117
});
94-
95-
// Wait for gateway connection before creating AgentSideConnection
96-
await helloReceived;
118+
if (stopped) {
119+
return closed;
120+
}
97121

98122
const input = Writable.toWeb(process.stdout);
99123
const output = Readable.toWeb(process.stdin) as unknown as ReadableStream<Uint8Array>;

0 commit comments

Comments
 (0)