Skip to content

Commit 692438c

Browse files
fix(stream): tighten voice stream ingress guards (#66027)
* fix(stream): tighten voice stream ingress guards * fix(stream): address review follow-ups * fix(stream): normalize trusted proxy ip matching * changelog: note voice-call media-stream ingress guard tightening (#66027) * fix(stream): require non-empty trusted proxy list before honoring forwarding headers Without an explicit trusted proxy list, the prior gate treated every remote as 'from a trusted proxy', so enabling trustForwardingHeaders let any direct caller spoof X-Forwarded-For / X-Real-IP and rotate the resolved IP per request to evade maxPendingConnectionsPerIp. Require trustedProxyIPs to be non-empty AND match the remote before trusting forwarding headers. --------- Co-authored-by: Devin Robison <[email protected]>
1 parent 955270f commit 692438c

5 files changed

Lines changed: 426 additions & 4 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ Docs: https://docs.openclaw.ai
3636
- Memory/active-memory: move recalled memory onto the hidden untrusted prompt-prefix path instead of system prompt injection, label the visible Active Memory status line fields, and include the resolved recall provider/model in gateway debug logs so trace/debug output matches what the model actually saw.
3737
- Memory/QMD: stop treating legacy lowercase `memory.md` as a second default root collection, so QMD recall no longer searches phantom `memory-alt-*` collections and builtin/QMD root-memory fallback stays aligned. (#66141) Thanks @mbelinky.
3838
- Agents/OpenAI: map `minimal` thinking to OpenAI's supported `low` reasoning effort for GPT-5.4 requests, so embedded runs stop failing request validation.
39+
- Voice-call/media-stream: resolve the source IP from trusted forwarding headers for per-IP pending-connection limits when `webhookSecurity.trustForwardingHeaders` and `trustedProxyIPs` are configured, and reserve `maxConnections` capacity for in-flight WebSocket upgrades so concurrent handshakes can no longer momentarily exceed the operator-set cap. (#66027) Thanks @eleqtrizit.
3940

4041
## 2026.4.12
4142

extensions/voice-call/src/media-stream.test.ts

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import type { IncomingMessage } from "node:http";
2+
import net from "node:net";
13
import type {
24
RealtimeTranscriptionProviderPlugin,
35
RealtimeTranscriptionSession,
@@ -257,6 +259,42 @@ describe("MediaStreamHandler security hardening", () => {
257259
}
258260
});
259261

262+
it("uses resolved client IPs for per-IP pending limits", async () => {
263+
const handler = new MediaStreamHandler({
264+
transcriptionProvider: createStubSttProvider(),
265+
providerConfig: {},
266+
preStartTimeoutMs: 5_000,
267+
maxPendingConnections: 10,
268+
maxPendingConnectionsPerIp: 1,
269+
resolveClientIp: (request) => String(request.headers["x-forwarded-for"] ?? ""),
270+
});
271+
const server = await startWsServer(handler);
272+
273+
try {
274+
const first = new WebSocket(server.url, {
275+
headers: { "x-forwarded-for": "198.51.100.10" },
276+
});
277+
await withTimeout(new Promise((resolve) => first.once("open", resolve)));
278+
279+
const second = new WebSocket(server.url, {
280+
headers: { "x-forwarded-for": "203.0.113.20" },
281+
});
282+
await withTimeout(new Promise((resolve) => second.once("open", resolve)));
283+
284+
expect(first.readyState).toBe(WebSocket.OPEN);
285+
expect(second.readyState).toBe(WebSocket.OPEN);
286+
287+
const firstClosed = waitForClose(first);
288+
const secondClosed = waitForClose(second);
289+
first.close();
290+
second.close();
291+
await firstClosed;
292+
await secondClosed;
293+
} finally {
294+
await server.close();
295+
}
296+
});
297+
260298
it("rejects upgrades when max connection cap is reached", async () => {
261299
const handler = new MediaStreamHandler({
262300
transcriptionProvider: createStubSttProvider(),
@@ -286,6 +324,128 @@ describe("MediaStreamHandler security hardening", () => {
286324
}
287325
});
288326

327+
it("counts in-flight upgrades against the max connection cap", () => {
328+
const handler = new MediaStreamHandler({
329+
transcriptionProvider: createStubSttProvider(),
330+
providerConfig: {},
331+
maxConnections: 2,
332+
maxPendingConnections: 10,
333+
maxPendingConnectionsPerIp: 10,
334+
});
335+
336+
const fakeWss = {
337+
clients: new Set([{}]),
338+
handleUpgrade: vi.fn(),
339+
emit: vi.fn(),
340+
on: vi.fn(),
341+
};
342+
let upgradeCallback: ((ws: WebSocket) => void) | null = null;
343+
fakeWss.handleUpgrade.mockImplementation(
344+
(
345+
_request: IncomingMessage,
346+
_socket: unknown,
347+
_head: Buffer,
348+
callback: (ws: WebSocket) => void,
349+
) => {
350+
upgradeCallback = callback;
351+
},
352+
);
353+
354+
(
355+
handler as unknown as {
356+
wss: typeof fakeWss;
357+
}
358+
).wss = fakeWss;
359+
360+
const firstSocket = {
361+
once: vi.fn(),
362+
removeListener: vi.fn(),
363+
write: vi.fn(),
364+
destroy: vi.fn(),
365+
};
366+
handler.handleUpgrade(
367+
{ socket: { remoteAddress: "127.0.0.1" } } as IncomingMessage,
368+
firstSocket as never,
369+
Buffer.alloc(0),
370+
);
371+
372+
const secondSocket = {
373+
once: vi.fn(),
374+
removeListener: vi.fn(),
375+
write: vi.fn(),
376+
destroy: vi.fn(),
377+
};
378+
handler.handleUpgrade(
379+
{ socket: { remoteAddress: "127.0.0.1" } } as IncomingMessage,
380+
secondSocket as never,
381+
Buffer.alloc(0),
382+
);
383+
384+
expect(fakeWss.handleUpgrade).toHaveBeenCalledTimes(1);
385+
expect(secondSocket.write).toHaveBeenCalledOnce();
386+
expect(secondSocket.destroy).toHaveBeenCalledOnce();
387+
388+
expect(upgradeCallback).not.toBeNull();
389+
const completeUpgrade = upgradeCallback as ((ws: WebSocket) => void) | null;
390+
if (!completeUpgrade) {
391+
throw new Error("Expected upgrade callback to be registered");
392+
}
393+
completeUpgrade({} as WebSocket);
394+
expect(fakeWss.emit).toHaveBeenCalledWith(
395+
"connection",
396+
expect.anything(),
397+
expect.objectContaining({ socket: { remoteAddress: "127.0.0.1" } }),
398+
);
399+
});
400+
401+
it("releases in-flight reservations when ws rejects a malformed upgrade before the callback", async () => {
402+
const handler = new MediaStreamHandler({
403+
transcriptionProvider: createStubSttProvider(),
404+
providerConfig: {},
405+
preStartTimeoutMs: 5_000,
406+
maxConnections: 1,
407+
maxPendingConnections: 10,
408+
maxPendingConnectionsPerIp: 10,
409+
});
410+
const server = await startWsServer(handler);
411+
const serverUrl = new URL(server.url);
412+
413+
try {
414+
await withTimeout(
415+
new Promise<void>((resolve, reject) => {
416+
const socket = net.createConnection(
417+
{ host: serverUrl.hostname, port: Number(serverUrl.port) },
418+
() => {
419+
socket.write(
420+
[
421+
"GET /voice/stream HTTP/1.1",
422+
`Host: ${serverUrl.host}`,
423+
"Upgrade: websocket",
424+
"Connection: Upgrade",
425+
"Sec-WebSocket-Version: 13",
426+
"",
427+
"",
428+
].join("\r\n"),
429+
);
430+
},
431+
);
432+
socket.once("error", reject);
433+
socket.once("data", () => {
434+
socket.end();
435+
});
436+
socket.once("close", () => resolve());
437+
}),
438+
);
439+
440+
const ws = await connectWs(server.url);
441+
expect(ws.readyState).toBe(WebSocket.OPEN);
442+
ws.close();
443+
await waitForClose(ws);
444+
} finally {
445+
await server.close();
446+
}
447+
});
448+
289449
it("clears pending state after valid start", async () => {
290450
const handler = new MediaStreamHandler({
291451
transcriptionProvider: createStubSttProvider(),

extensions/voice-call/src/media-stream.ts

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ export interface MediaStreamConfig {
3232
maxPendingConnectionsPerIp?: number;
3333
/** Max total open sockets (pending + active sessions). */
3434
maxConnections?: number;
35+
/** Optional trusted resolver for the source IP used by pending-connection guards. */
36+
resolveClientIp?: (request: IncomingMessage) => string | undefined;
3537
/** Validate whether to accept a media stream for the given call ID */
3638
shouldAcceptStream?: (params: { callId: string; streamSid: string; token?: string }) => boolean;
3739
/** Callback when transcript is received */
@@ -119,6 +121,7 @@ export class MediaStreamHandler {
119121
private maxPendingConnections: number;
120122
private maxPendingConnectionsPerIp: number;
121123
private maxConnections: number;
124+
private inflightUpgrades = 0;
122125
/** TTS playback queues per stream (serialize audio to prevent overlap) */
123126
private ttsQueues = new Map<string, TtsQueueEntry[]>();
124127
/** Whether TTS is currently playing per stream */
@@ -148,15 +151,42 @@ export class MediaStreamHandler {
148151
this.wss.on("connection", (ws, req) => this.handleConnection(ws, req));
149152
}
150153

151-
const currentConnections = this.wss.clients.size;
154+
const currentConnections = this.getCurrentConnectionCount();
152155
if (currentConnections >= this.maxConnections) {
153156
this.rejectUpgrade(socket, 503, "Too many media stream connections");
154157
return;
155158
}
156159

157-
this.wss.handleUpgrade(request, socket, head, (ws) => {
158-
this.wss?.emit("connection", ws, request);
159-
});
160+
this.inflightUpgrades += 1;
161+
let released = false;
162+
const releaseUpgradeReservation = () => {
163+
if (released) {
164+
return;
165+
}
166+
released = true;
167+
this.inflightUpgrades = Math.max(0, this.inflightUpgrades - 1);
168+
};
169+
const handleUpgradeAbort = () => {
170+
socket.removeListener("error", handleUpgradeAbort);
171+
socket.removeListener("close", handleUpgradeAbort);
172+
releaseUpgradeReservation();
173+
};
174+
socket.once("error", handleUpgradeAbort);
175+
socket.once("close", handleUpgradeAbort);
176+
177+
try {
178+
this.wss.handleUpgrade(request, socket, head, (ws) => {
179+
socket.removeListener("error", handleUpgradeAbort);
180+
socket.removeListener("close", handleUpgradeAbort);
181+
releaseUpgradeReservation();
182+
this.wss?.emit("connection", ws, request);
183+
});
184+
} catch (error) {
185+
socket.removeListener("error", handleUpgradeAbort);
186+
socket.removeListener("close", handleUpgradeAbort);
187+
releaseUpgradeReservation();
188+
throw error;
189+
}
160190
}
161191

162192
/**
@@ -318,9 +348,17 @@ export class MediaStreamHandler {
318348
}
319349

320350
private getClientIp(request: IncomingMessage): string {
351+
const resolvedIp = this.config.resolveClientIp?.(request)?.trim();
352+
if (resolvedIp) {
353+
return resolvedIp;
354+
}
321355
return request.socket.remoteAddress || "unknown";
322356
}
323357

358+
private getCurrentConnectionCount(): number {
359+
return this.wss ? this.wss.clients.size + this.inflightUpgrades : this.inflightUpgrades;
360+
}
361+
324362
private registerPendingConnection(ws: WebSocket, ip: string): boolean {
325363
if (this.pendingConnections.size >= this.maxPendingConnections) {
326364
console.warn("[MediaStream] Rejecting connection: pending connection limit reached");

0 commit comments

Comments
 (0)