Skip to content

Commit b304ccd

Browse files
fix(gateway): reject RPCs from invalidated device-token clients during rotation/revoke race
device.token.rotate, device.token.revoke and device.pair.remove all respond 200 OK to the admin, then schedule disconnectClientsForDevice via queueMicrotask so the response can flush before the socket close. That microtask window plus the absence of a per-RPC re-check for device-token auth (unlike shared-auth, which gets checked at message-handler.ts:1444-1458) created a race: an attacker with RPCs already pipelined in the WS socket buffer could land a few more authenticated operations with the rotated/revoked token before the socket actually closed. Fix: add a cheap in-memory 'invalidated' flag on GatewayWsClient and mark it synchronously *before* responding in the three handlers. Add a mirror check at the start of the per-RPC dispatch that force-closes the client if the flag is set, regardless of whether socket.close() has taken effect yet. Disconnect still happens via queueMicrotask so the admin's rotate/revoke response flushes normally. Introduces context.invalidateClientsForDevice(deviceId, opts) as a sync companion to the existing disconnectClientsForDevice. Also defense-in-depth: disconnectClientsForDevice now sets the flag too, so any other caller of the hard-disconnect path gets the per-RPC gate for free.
1 parent fab76f3 commit b304ccd

7 files changed

Lines changed: 263 additions & 1 deletion

File tree

src/gateway/server-methods/devices.test.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ function createOptions(
6969
context: {
7070
broadcast: vi.fn(),
7171
disconnectClientsForDevice: vi.fn(),
72+
invalidateClientsForDevice: vi.fn(),
7273
logGateway: {
7374
debug: vi.fn(),
7475
error: vi.fn(),
@@ -131,6 +132,28 @@ describe("deviceHandlers", () => {
131132
);
132133
});
133134

135+
it("invalidates affected clients synchronously before responding to device.pair.remove", async () => {
136+
removePairedDeviceMock.mockResolvedValue({ deviceId: "device-1", removedAtMs: 123 });
137+
const opts = createOptions("device.pair.remove", { deviceId: "device-1" });
138+
const respond = opts.respond as ReturnType<typeof vi.fn>;
139+
const invalidate = opts.context.invalidateClientsForDevice as ReturnType<typeof vi.fn>;
140+
const disconnect = opts.context.disconnectClientsForDevice as ReturnType<typeof vi.fn>;
141+
142+
respond.mockImplementation(() => {
143+
// At the moment the response is emitted, the client flag must already
144+
// be set so any RPCs pipelined in the WS buffer are rejected.
145+
expect(invalidate).toHaveBeenCalledWith("device-1", { reason: "device-pair-removed" });
146+
// The hard close is deferred to the microtask to let the response flush.
147+
expect(disconnect).not.toHaveBeenCalled();
148+
});
149+
150+
await deviceHandlers["device.pair.remove"](opts);
151+
await Promise.resolve();
152+
153+
expect(respond).toHaveBeenCalled();
154+
expect(disconnect).toHaveBeenCalledWith("device-1");
155+
});
156+
134157
it("does not disconnect clients when device removal fails", async () => {
135158
removePairedDeviceMock.mockResolvedValue(null);
136159
const opts = createOptions("device.pair.remove", { deviceId: "device-1" });
@@ -289,6 +312,59 @@ describe("deviceHandlers", () => {
289312
);
290313
});
291314

315+
it("invalidates affected clients synchronously before responding to device.token.rotate", async () => {
316+
mockPairedOperatorDevice();
317+
mockRotateOperatorTokenSuccess();
318+
const opts = createOptions(
319+
"device.token.rotate",
320+
{ deviceId: "device-1", role: "operator", scopes: ["operator.pairing"] },
321+
{ client: createClient(["operator.pairing"], "device-1", { isDeviceTokenAuth: true }) },
322+
);
323+
const respond = opts.respond as ReturnType<typeof vi.fn>;
324+
const invalidate = opts.context.invalidateClientsForDevice as ReturnType<typeof vi.fn>;
325+
const disconnect = opts.context.disconnectClientsForDevice as ReturnType<typeof vi.fn>;
326+
327+
respond.mockImplementation(() => {
328+
expect(invalidate).toHaveBeenCalledWith("device-1", {
329+
role: "operator",
330+
reason: "device-token-rotated",
331+
});
332+
expect(disconnect).not.toHaveBeenCalled();
333+
});
334+
335+
await deviceHandlers["device.token.rotate"](opts);
336+
await Promise.resolve();
337+
338+
expect(respond).toHaveBeenCalled();
339+
expect(disconnect).toHaveBeenCalledWith("device-1", { role: "operator" });
340+
});
341+
342+
it("invalidates affected clients synchronously before responding to device.token.revoke", async () => {
343+
revokeDeviceTokenMock.mockResolvedValue({ role: "operator", revokedAtMs: 456 });
344+
const opts = createOptions(
345+
"device.token.revoke",
346+
{ deviceId: "device-1", role: "operator" },
347+
{ client: createClient(["operator.pairing"], "device-1", { isDeviceTokenAuth: true }) },
348+
);
349+
const respond = opts.respond as ReturnType<typeof vi.fn>;
350+
const invalidate = opts.context.invalidateClientsForDevice as ReturnType<typeof vi.fn>;
351+
const disconnect = opts.context.disconnectClientsForDevice as ReturnType<typeof vi.fn>;
352+
353+
respond.mockImplementation(() => {
354+
expect(invalidate).toHaveBeenCalledWith("device-1", {
355+
role: "operator",
356+
reason: "device-token-revoked",
357+
});
358+
expect(disconnect).not.toHaveBeenCalled();
359+
});
360+
361+
await deviceHandlers["device.token.revoke"](opts);
362+
await Promise.resolve();
363+
364+
expect(respond).toHaveBeenCalled();
365+
expect(disconnect).toHaveBeenCalledWith("device-1", { role: "operator" });
366+
});
367+
292368
it("treats normalized device ids as self-owned for token rotation", async () => {
293369
mockPairedOperatorDevice();
294370
mockRotateOperatorTokenSuccess();

src/gateway/server-methods/devices.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,13 @@ export const deviceHandlers: GatewayRequestHandlers = {
315315
return;
316316
}
317317
context.logGateway.info(`device pairing removed device=${removed.deviceId}`);
318+
// Mark affected clients invalid *before* responding so any RPCs already
319+
// pipelined into their WS socket buffer are rejected at the per-request
320+
// dispatch check, closing the race between queueMicrotask-scheduled
321+
// disconnect and inflight frames.
322+
context.invalidateClientsForDevice?.(removed.deviceId, {
323+
reason: "device-pair-removed",
324+
});
318325
respond(true, removed, undefined);
319326
queueMicrotask(() => {
320327
context.disconnectClientsForDevice?.(removed.deviceId);
@@ -410,6 +417,14 @@ export const deviceHandlers: GatewayRequestHandlers = {
410417
context.logGateway.info(
411418
`device token rotated device=${deviceId} role=${entry.role} scopes=${entry.scopes.join(",")}`,
412419
);
420+
// Mark affected clients invalid *before* responding so any RPCs already
421+
// pipelined into their WS socket buffer are rejected at the per-request
422+
// dispatch check, closing the race between queueMicrotask-scheduled
423+
// disconnect and inflight frames.
424+
context.invalidateClientsForDevice?.(deviceId.trim(), {
425+
role: entry.role,
426+
reason: "device-token-rotated",
427+
});
413428
respond(
414429
true,
415430
{
@@ -459,6 +474,14 @@ export const deviceHandlers: GatewayRequestHandlers = {
459474
}
460475
const normalizedDeviceId = deviceId.trim();
461476
context.logGateway.info(`device token revoked device=${normalizedDeviceId} role=${entry.role}`);
477+
// Mark affected clients invalid *before* responding so any RPCs already
478+
// pipelined into their WS socket buffer are rejected at the per-request
479+
// dispatch check, closing the race between queueMicrotask-scheduled
480+
// disconnect and inflight frames.
481+
context.invalidateClientsForDevice?.(normalizedDeviceId, {
482+
role: entry.role,
483+
reason: "device-token-revoked",
484+
});
462485
respond(
463486
true,
464487
{

src/gateway/server-methods/shared-types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ export type GatewayRequestContext = {
5959
hasConnectedMobileNode: () => boolean;
6060
hasExecApprovalClients?: (excludeConnId?: string) => boolean;
6161
disconnectClientsForDevice?: (deviceId: string, opts?: { role?: string }) => void;
62+
invalidateClientsForDevice?: (
63+
deviceId: string,
64+
opts?: { role?: string; reason?: string },
65+
) => void;
6266
disconnectClientsUsingSharedGatewayAuth?: () => void;
6367
enforceSharedGatewayAuthGenerationForConfigWrite?: (nextConfig: OpenClawConfig) => void;
6468
nodeRegistry: NodeRegistry;

src/gateway/server-request-context.test.ts

Lines changed: 129 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,72 @@
11
import { describe, expect, it, vi } from "vitest";
22
import type { GatewayServerLiveState } from "./server-live-state.js";
3-
import { createGatewayRequestContext } from "./server-request-context.js";
3+
import {
4+
createGatewayRequestContext,
5+
type GatewayRequestContextParams,
6+
} from "./server-request-context.js";
7+
8+
function makeContextParams(
9+
overrides: Partial<GatewayRequestContextParams> = {},
10+
): GatewayRequestContextParams {
11+
const runtimeState: Pick<GatewayServerLiveState, "cronState"> = {
12+
cronState: {
13+
cron: { start: vi.fn(), stop: vi.fn() } as never,
14+
storePath: "/tmp/cron",
15+
cronEnabled: true,
16+
},
17+
};
18+
return {
19+
deps: {} as never,
20+
runtimeState,
21+
execApprovalManager: undefined,
22+
pluginApprovalManager: undefined,
23+
loadGatewayModelCatalog: vi.fn(async () => []),
24+
getHealthCache: vi.fn(() => null),
25+
refreshHealthSnapshot: vi.fn(async () => ({}) as never),
26+
logHealth: { error: vi.fn() },
27+
logGateway: { warn: vi.fn(), info: vi.fn(), error: vi.fn() } as never,
28+
incrementPresenceVersion: vi.fn(() => 1),
29+
getHealthVersion: vi.fn(() => 1),
30+
broadcast: vi.fn(),
31+
broadcastToConnIds: vi.fn(),
32+
nodeSendToSession: vi.fn(),
33+
nodeSendToAllSubscribed: vi.fn(),
34+
nodeSubscribe: vi.fn(),
35+
nodeUnsubscribe: vi.fn(),
36+
nodeUnsubscribeAll: vi.fn(),
37+
hasConnectedMobileNode: vi.fn(() => false),
38+
clients: new Set(),
39+
enforceSharedGatewayAuthGenerationForConfigWrite: vi.fn(),
40+
nodeRegistry: {} as never,
41+
agentRunSeq: new Map(),
42+
chatAbortControllers: new Map(),
43+
chatAbortedRuns: new Map(),
44+
chatRunBuffers: new Map(),
45+
chatDeltaSentAt: new Map(),
46+
chatDeltaLastBroadcastLen: new Map(),
47+
addChatRun: vi.fn(),
48+
removeChatRun: vi.fn(),
49+
subscribeSessionEvents: vi.fn(),
50+
unsubscribeSessionEvents: vi.fn(),
51+
subscribeSessionMessageEvents: vi.fn(),
52+
unsubscribeSessionMessageEvents: vi.fn(),
53+
unsubscribeAllSessionEvents: vi.fn(),
54+
getSessionEventSubscriberConnIds: vi.fn(() => new Set<string>()),
55+
registerToolEventRecipient: vi.fn(),
56+
dedupe: new Map(),
57+
wizardSessions: new Map(),
58+
findRunningWizard: vi.fn(() => null),
59+
purgeWizardSession: vi.fn(),
60+
getRuntimeSnapshot: vi.fn(() => ({}) as never),
61+
startChannel: vi.fn(async () => undefined),
62+
stopChannel: vi.fn(async () => undefined),
63+
markChannelLoggedOut: vi.fn(),
64+
wizardRunner: vi.fn(async () => undefined),
65+
broadcastVoiceWakeChanged: vi.fn(),
66+
unavailableGatewayMethods: new Set(),
67+
...overrides,
68+
};
69+
}
470

571
describe("createGatewayRequestContext", () => {
672
it("reads cron state live from runtime state", () => {
@@ -77,4 +143,66 @@ describe("createGatewayRequestContext", () => {
77143
expect(context.cron).toBe(cronB);
78144
expect(context.cronStorePath).toBe("/tmp/cron-b");
79145
});
146+
147+
it("invalidateClientsForDevice sets the flag on matching clients without closing the socket", () => {
148+
const target = {
149+
connId: "conn-target",
150+
connect: { device: { id: "device-1" }, role: "primary" },
151+
socket: { close: vi.fn() },
152+
};
153+
const unrelated = {
154+
connId: "conn-unrelated",
155+
connect: { device: { id: "device-2" }, role: "primary" },
156+
socket: { close: vi.fn() },
157+
};
158+
const clients = new Set([target, unrelated] as never);
159+
160+
const context = createGatewayRequestContext(makeContextParams({ clients }));
161+
context.invalidateClientsForDevice?.("device-1", { reason: "device-token-rotated" });
162+
163+
expect((target as { invalidated?: boolean }).invalidated).toBe(true);
164+
expect((target as { invalidatedReason?: string }).invalidatedReason).toBe(
165+
"device-token-rotated",
166+
);
167+
expect(target.socket.close).not.toHaveBeenCalled();
168+
169+
expect((unrelated as { invalidated?: boolean }).invalidated).toBeUndefined();
170+
expect(unrelated.socket.close).not.toHaveBeenCalled();
171+
});
172+
173+
it("disconnectClientsForDevice also marks the invalidated flag before closing", () => {
174+
const target = {
175+
connId: "conn-target",
176+
connect: { device: { id: "device-1" }, role: "primary" },
177+
socket: { close: vi.fn() },
178+
};
179+
const clients = new Set([target] as never);
180+
181+
const context = createGatewayRequestContext(makeContextParams({ clients }));
182+
context.disconnectClientsForDevice?.("device-1");
183+
184+
expect((target as { invalidated?: boolean }).invalidated).toBe(true);
185+
expect((target as { invalidatedReason?: string }).invalidatedReason).toBe("device-removed");
186+
expect(target.socket.close).toHaveBeenCalledWith(4001, "device removed");
187+
});
188+
189+
it("invalidateClientsForDevice filters by role when provided", () => {
190+
const primary = {
191+
connId: "conn-primary",
192+
connect: { device: { id: "device-1" }, role: "primary" },
193+
socket: { close: vi.fn() },
194+
};
195+
const secondary = {
196+
connId: "conn-secondary",
197+
connect: { device: { id: "device-1" }, role: "secondary" },
198+
socket: { close: vi.fn() },
199+
};
200+
const clients = new Set([primary, secondary] as never);
201+
202+
const context = createGatewayRequestContext(makeContextParams({ clients }));
203+
context.invalidateClientsForDevice?.("device-1", { role: "primary" });
204+
205+
expect((primary as { invalidated?: boolean }).invalidated).toBe(true);
206+
expect((secondary as { invalidated?: boolean }).invalidated).toBeUndefined();
207+
});
80208
});

src/gateway/server-request-context.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import { disconnectAllSharedGatewayAuthClients } from "./server-shared-auth-gene
66
type GatewayRequestContextClient = GatewayClient & {
77
socket: { close: (code: number, reason: string) => void };
88
usesSharedGatewayAuth?: boolean;
9+
invalidated?: boolean;
10+
invalidatedReason?: string;
911
};
1012

1113
export type GatewayRequestContextParams = {
@@ -103,6 +105,19 @@ export function createGatewayRequestContext(
103105
}
104106
return false;
105107
},
108+
invalidateClientsForDevice: (deviceId: string, opts?: { role?: string; reason?: string }) => {
109+
const reason = opts?.reason ?? "device-invalidated";
110+
for (const gatewayClient of params.clients) {
111+
if (gatewayClient.connect.device?.id !== deviceId) {
112+
continue;
113+
}
114+
if (opts?.role && gatewayClient.connect.role !== opts.role) {
115+
continue;
116+
}
117+
gatewayClient.invalidated = true;
118+
gatewayClient.invalidatedReason = reason;
119+
}
120+
},
106121
disconnectClientsForDevice: (deviceId: string, opts?: { role?: string }) => {
107122
for (const gatewayClient of params.clients) {
108123
if (gatewayClient.connect.device?.id !== deviceId) {
@@ -111,6 +126,11 @@ export function createGatewayRequestContext(
111126
if (opts?.role && gatewayClient.connect.role !== opts.role) {
112127
continue;
113128
}
129+
// Mark before closing so any RPCs already pipelined in the WS buffer
130+
// are rejected at the per-request dispatch check, regardless of
131+
// whether socket.close() takes effect synchronously.
132+
gatewayClient.invalidated = true;
133+
gatewayClient.invalidatedReason ??= "device-removed";
114134
try {
115135
gatewayClient.socket.close(4001, "device removed");
116136
} catch {

src/gateway/server/ws-connection/message-handler.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1441,6 +1441,15 @@ export function attachGatewayWsMessageHandler(params: {
14411441
}
14421442
const req = parsed;
14431443
logWs("in", "req", { connId, id: req.id, method: req.method });
1444+
if (client.invalidated) {
1445+
const reason = client.invalidatedReason ?? "invalidated";
1446+
setCloseCause("client-invalidated", {
1447+
reason,
1448+
method: req.method,
1449+
});
1450+
close(4001, `client invalidated: ${reason}`);
1451+
return;
1452+
}
14441453
if (client.usesSharedGatewayAuth) {
14451454
const requiredSharedGatewaySessionGeneration =
14461455
getRequiredSharedGatewaySessionGeneration?.();

src/gateway/server/ws-types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,6 @@ export type GatewayWsClient = {
1313
canvasHostUrl?: string;
1414
canvasCapability?: string;
1515
canvasCapabilityExpiresAtMs?: number;
16+
invalidated?: boolean;
17+
invalidatedReason?: string;
1618
};

0 commit comments

Comments
 (0)