Skip to content

Commit ef95975

Browse files
authored
Gateway: add pending node work primitives (#41409)
Merged via squash. Prepared head SHA: a6d7ca9 Co-authored-by: mbelinky <[email protected]> Co-authored-by: mbelinky <[email protected]> Reviewed-by: @mbelinky
1 parent 5f90883 commit ef95975

15 files changed

+678
-8
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ Docs: https://docs.openclaw.ai
66

77
### Changes
88

9+
- Gateway/node pending work: add narrow in-memory pending-work queue primitives (`node.pending.enqueue` / `node.pending.drain`) and wake-helper reuse as a foundation for dormant-node work delivery. (#41409) Thanks @mbelinky.
10+
911
### Breaking
1012

1113
- Cron/doctor: tighten isolated cron delivery so cron jobs can no longer notify through ad hoc agent sends or fallback main-session summaries, and add `openclaw doctor --fix` migration for legacy cron storage and legacy notify/webhook delivery metadata. (#40998) Thanks @mbelinky.

src/gateway/method-scopes.test.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ describe("method scope resolution", () => {
1818
expect(resolveLeastPrivilegeOperatorScopesForMethod("poll")).toEqual(["operator.write"]);
1919
});
2020

21+
it("leaves node-only pending drain outside operator scopes", () => {
22+
expect(resolveLeastPrivilegeOperatorScopesForMethod("node.pending.drain")).toEqual([]);
23+
});
24+
2125
it("returns empty scopes for unknown methods", () => {
2226
expect(resolveLeastPrivilegeOperatorScopesForMethod("totally.unknown.method")).toEqual([]);
2327
});

src/gateway/method-scopes.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ export const CLI_DEFAULT_OPERATOR_SCOPES: OperatorScope[] = [
2222
const NODE_ROLE_METHODS = new Set([
2323
"node.invoke.result",
2424
"node.event",
25+
"node.pending.drain",
2526
"node.canvas.capability.refresh",
2627
"node.pending.pull",
2728
"node.pending.ack",
@@ -102,6 +103,7 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
102103
"chat.abort",
103104
"browser.request",
104105
"push.test",
106+
"node.pending.enqueue",
105107
],
106108
[ADMIN_SCOPE]: [
107109
"channels.logout",
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { describe, expect, it, beforeEach } from "vitest";
2+
import {
3+
acknowledgeNodePendingWork,
4+
drainNodePendingWork,
5+
enqueueNodePendingWork,
6+
resetNodePendingWorkForTests,
7+
} from "./node-pending-work.js";
8+
9+
describe("node pending work", () => {
10+
beforeEach(() => {
11+
resetNodePendingWorkForTests();
12+
});
13+
14+
it("returns a baseline status request even when no explicit work is queued", () => {
15+
const drained = drainNodePendingWork("node-1");
16+
expect(drained.items).toEqual([
17+
expect.objectContaining({
18+
id: "baseline-status",
19+
type: "status.request",
20+
priority: "default",
21+
}),
22+
]);
23+
expect(drained.hasMore).toBe(false);
24+
});
25+
26+
it("dedupes explicit work by type and removes acknowledged items", () => {
27+
const first = enqueueNodePendingWork({ nodeId: "node-2", type: "location.request" });
28+
const second = enqueueNodePendingWork({ nodeId: "node-2", type: "location.request" });
29+
30+
expect(first.deduped).toBe(false);
31+
expect(second.deduped).toBe(true);
32+
expect(second.item.id).toBe(first.item.id);
33+
34+
const drained = drainNodePendingWork("node-2");
35+
expect(drained.items.map((item) => item.type)).toEqual(["location.request", "status.request"]);
36+
37+
const acked = acknowledgeNodePendingWork({
38+
nodeId: "node-2",
39+
itemIds: [first.item.id, "baseline-status"],
40+
});
41+
expect(acked.removedItemIds).toEqual([first.item.id]);
42+
43+
const afterAck = drainNodePendingWork("node-2");
44+
expect(afterAck.items.map((item) => item.id)).toEqual(["baseline-status"]);
45+
});
46+
});

src/gateway/node-pending-work.ts

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
import { randomUUID } from "node:crypto";
2+
3+
export const NODE_PENDING_WORK_TYPES = ["status.request", "location.request"] as const;
4+
export type NodePendingWorkType = (typeof NODE_PENDING_WORK_TYPES)[number];
5+
6+
export const NODE_PENDING_WORK_PRIORITIES = ["default", "normal", "high"] as const;
7+
export type NodePendingWorkPriority = (typeof NODE_PENDING_WORK_PRIORITIES)[number];
8+
9+
export type NodePendingWorkItem = {
10+
id: string;
11+
type: NodePendingWorkType;
12+
priority: NodePendingWorkPriority;
13+
createdAtMs: number;
14+
expiresAtMs: number | null;
15+
payload?: Record<string, unknown>;
16+
};
17+
18+
type NodePendingWorkState = {
19+
revision: number;
20+
itemsById: Map<string, NodePendingWorkItem>;
21+
};
22+
23+
type DrainOptions = {
24+
maxItems?: number;
25+
includeDefaultStatus?: boolean;
26+
nowMs?: number;
27+
};
28+
29+
type DrainResult = {
30+
revision: number;
31+
items: NodePendingWorkItem[];
32+
hasMore: boolean;
33+
};
34+
35+
const DEFAULT_STATUS_ITEM_ID = "baseline-status";
36+
const DEFAULT_STATUS_PRIORITY: NodePendingWorkPriority = "default";
37+
const DEFAULT_PRIORITY: NodePendingWorkPriority = "normal";
38+
const DEFAULT_MAX_ITEMS = 4;
39+
const MAX_ITEMS = 10;
40+
const PRIORITY_RANK: Record<NodePendingWorkPriority, number> = {
41+
high: 3,
42+
normal: 2,
43+
default: 1,
44+
};
45+
46+
const stateByNodeId = new Map<string, NodePendingWorkState>();
47+
48+
function getState(nodeId: string): NodePendingWorkState {
49+
let state = stateByNodeId.get(nodeId);
50+
if (!state) {
51+
state = {
52+
revision: 0,
53+
itemsById: new Map(),
54+
};
55+
stateByNodeId.set(nodeId, state);
56+
}
57+
return state;
58+
}
59+
60+
function pruneExpired(state: NodePendingWorkState, nowMs: number): boolean {
61+
let changed = false;
62+
for (const [id, item] of state.itemsById) {
63+
if (item.expiresAtMs !== null && item.expiresAtMs <= nowMs) {
64+
state.itemsById.delete(id);
65+
changed = true;
66+
}
67+
}
68+
if (changed) {
69+
state.revision += 1;
70+
}
71+
return changed;
72+
}
73+
74+
function sortedItems(state: NodePendingWorkState): NodePendingWorkItem[] {
75+
return [...state.itemsById.values()].toSorted((a, b) => {
76+
const priorityDelta = PRIORITY_RANK[b.priority] - PRIORITY_RANK[a.priority];
77+
if (priorityDelta !== 0) {
78+
return priorityDelta;
79+
}
80+
if (a.createdAtMs !== b.createdAtMs) {
81+
return a.createdAtMs - b.createdAtMs;
82+
}
83+
return a.id.localeCompare(b.id);
84+
});
85+
}
86+
87+
function makeBaselineStatusItem(nowMs: number): NodePendingWorkItem {
88+
return {
89+
id: DEFAULT_STATUS_ITEM_ID,
90+
type: "status.request",
91+
priority: DEFAULT_STATUS_PRIORITY,
92+
createdAtMs: nowMs,
93+
expiresAtMs: null,
94+
};
95+
}
96+
97+
export function enqueueNodePendingWork(params: {
98+
nodeId: string;
99+
type: NodePendingWorkType;
100+
priority?: NodePendingWorkPriority;
101+
expiresInMs?: number;
102+
payload?: Record<string, unknown>;
103+
}): { revision: number; item: NodePendingWorkItem; deduped: boolean } {
104+
const nodeId = params.nodeId.trim();
105+
if (!nodeId) {
106+
throw new Error("nodeId required");
107+
}
108+
const nowMs = Date.now();
109+
const state = getState(nodeId);
110+
pruneExpired(state, nowMs);
111+
const existing = [...state.itemsById.values()].find((item) => item.type === params.type);
112+
if (existing) {
113+
return { revision: state.revision, item: existing, deduped: true };
114+
}
115+
const item: NodePendingWorkItem = {
116+
id: randomUUID(),
117+
type: params.type,
118+
priority: params.priority ?? DEFAULT_PRIORITY,
119+
createdAtMs: nowMs,
120+
expiresAtMs:
121+
typeof params.expiresInMs === "number" && Number.isFinite(params.expiresInMs)
122+
? nowMs + Math.max(1_000, Math.trunc(params.expiresInMs))
123+
: null,
124+
...(params.payload ? { payload: params.payload } : {}),
125+
};
126+
state.itemsById.set(item.id, item);
127+
state.revision += 1;
128+
return { revision: state.revision, item, deduped: false };
129+
}
130+
131+
export function drainNodePendingWork(nodeId: string, opts: DrainOptions = {}): DrainResult {
132+
const normalizedNodeId = nodeId.trim();
133+
if (!normalizedNodeId) {
134+
return { revision: 0, items: [], hasMore: false };
135+
}
136+
const nowMs = opts.nowMs ?? Date.now();
137+
const state = getState(normalizedNodeId);
138+
pruneExpired(state, nowMs);
139+
const maxItems = Math.min(MAX_ITEMS, Math.max(1, Math.trunc(opts.maxItems ?? DEFAULT_MAX_ITEMS)));
140+
const explicitItems = sortedItems(state);
141+
const items = explicitItems.slice(0, maxItems);
142+
const hasExplicitStatus = explicitItems.some((item) => item.type === "status.request");
143+
const includeBaseline = opts.includeDefaultStatus !== false && !hasExplicitStatus;
144+
if (includeBaseline && items.length < maxItems) {
145+
items.push(makeBaselineStatusItem(nowMs));
146+
}
147+
return {
148+
revision: state.revision,
149+
items,
150+
hasMore:
151+
explicitItems.length > items.filter((item) => item.id !== DEFAULT_STATUS_ITEM_ID).length,
152+
};
153+
}
154+
155+
export function acknowledgeNodePendingWork(params: { nodeId: string; itemIds: string[] }): {
156+
revision: number;
157+
removedItemIds: string[];
158+
} {
159+
const nodeId = params.nodeId.trim();
160+
if (!nodeId) {
161+
return { revision: 0, removedItemIds: [] };
162+
}
163+
const state = getState(nodeId);
164+
const removedItemIds: string[] = [];
165+
for (const itemId of params.itemIds) {
166+
const trimmedId = itemId.trim();
167+
if (!trimmedId || trimmedId === DEFAULT_STATUS_ITEM_ID) {
168+
continue;
169+
}
170+
if (state.itemsById.delete(trimmedId)) {
171+
removedItemIds.push(trimmedId);
172+
}
173+
}
174+
if (removedItemIds.length > 0) {
175+
state.revision += 1;
176+
}
177+
return { revision: state.revision, removedItemIds };
178+
}
179+
180+
export function resetNodePendingWorkForTests() {
181+
stateByNodeId.clear();
182+
}

src/gateway/protocol/index.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,14 @@ import {
140140
NodeDescribeParamsSchema,
141141
type NodeEventParams,
142142
NodeEventParamsSchema,
143+
type NodePendingDrainParams,
144+
NodePendingDrainParamsSchema,
145+
type NodePendingDrainResult,
146+
NodePendingDrainResultSchema,
147+
type NodePendingEnqueueParams,
148+
NodePendingEnqueueParamsSchema,
149+
type NodePendingEnqueueResult,
150+
NodePendingEnqueueResultSchema,
143151
type NodeInvokeParams,
144152
NodeInvokeParamsSchema,
145153
type NodeInvokeResultParams,
@@ -296,6 +304,12 @@ export const validateNodeInvokeResultParams = ajv.compile<NodeInvokeResultParams
296304
NodeInvokeResultParamsSchema,
297305
);
298306
export const validateNodeEventParams = ajv.compile<NodeEventParams>(NodeEventParamsSchema);
307+
export const validateNodePendingDrainParams = ajv.compile<NodePendingDrainParams>(
308+
NodePendingDrainParamsSchema,
309+
);
310+
export const validateNodePendingEnqueueParams = ajv.compile<NodePendingEnqueueParams>(
311+
NodePendingEnqueueParamsSchema,
312+
);
299313
export const validatePushTestParams = ajv.compile<PushTestParams>(PushTestParamsSchema);
300314
export const validateSecretsResolveParams = ajv.compile<SecretsResolveParams>(
301315
SecretsResolveParamsSchema,
@@ -472,6 +486,10 @@ export {
472486
NodeListParamsSchema,
473487
NodePendingAckParamsSchema,
474488
NodeInvokeParamsSchema,
489+
NodePendingDrainParamsSchema,
490+
NodePendingDrainResultSchema,
491+
NodePendingEnqueueParamsSchema,
492+
NodePendingEnqueueResultSchema,
475493
SessionsListParamsSchema,
476494
SessionsPreviewParamsSchema,
477495
SessionsPatchParamsSchema,
@@ -621,6 +639,10 @@ export type {
621639
NodeInvokeParams,
622640
NodeInvokeResultParams,
623641
NodeEventParams,
642+
NodePendingDrainParams,
643+
NodePendingDrainResult,
644+
NodePendingEnqueueParams,
645+
NodePendingEnqueueResult,
624646
SessionsListParams,
625647
SessionsPreviewParams,
626648
SessionsResolveParams,

src/gateway/protocol/schema/nodes.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
import { Type } from "@sinclair/typebox";
22
import { NonEmptyString } from "./primitives.js";
33

4+
const NodePendingWorkTypeSchema = Type.String({
5+
enum: ["status.request", "location.request"],
6+
});
7+
8+
const NodePendingWorkPrioritySchema = Type.String({
9+
enum: ["normal", "high"],
10+
});
11+
412
export const NodePairRequestParamsSchema = Type.Object(
513
{
614
nodeId: NonEmptyString,
@@ -95,6 +103,56 @@ export const NodeEventParamsSchema = Type.Object(
95103
{ additionalProperties: false },
96104
);
97105

106+
export const NodePendingDrainParamsSchema = Type.Object(
107+
{
108+
maxItems: Type.Optional(Type.Integer({ minimum: 1, maximum: 10 })),
109+
},
110+
{ additionalProperties: false },
111+
);
112+
113+
export const NodePendingDrainItemSchema = Type.Object(
114+
{
115+
id: NonEmptyString,
116+
type: NodePendingWorkTypeSchema,
117+
priority: Type.String({ enum: ["default", "normal", "high"] }),
118+
createdAtMs: Type.Integer({ minimum: 0 }),
119+
expiresAtMs: Type.Optional(Type.Union([Type.Integer({ minimum: 0 }), Type.Null()])),
120+
payload: Type.Optional(Type.Record(Type.String(), Type.Unknown())),
121+
},
122+
{ additionalProperties: false },
123+
);
124+
125+
export const NodePendingDrainResultSchema = Type.Object(
126+
{
127+
nodeId: NonEmptyString,
128+
revision: Type.Integer({ minimum: 0 }),
129+
items: Type.Array(NodePendingDrainItemSchema),
130+
hasMore: Type.Boolean(),
131+
},
132+
{ additionalProperties: false },
133+
);
134+
135+
export const NodePendingEnqueueParamsSchema = Type.Object(
136+
{
137+
nodeId: NonEmptyString,
138+
type: NodePendingWorkTypeSchema,
139+
priority: Type.Optional(NodePendingWorkPrioritySchema),
140+
expiresInMs: Type.Optional(Type.Integer({ minimum: 1_000, maximum: 86_400_000 })),
141+
wake: Type.Optional(Type.Boolean()),
142+
},
143+
{ additionalProperties: false },
144+
);
145+
146+
export const NodePendingEnqueueResultSchema = Type.Object(
147+
{
148+
nodeId: NonEmptyString,
149+
revision: Type.Integer({ minimum: 0 }),
150+
queued: NodePendingDrainItemSchema,
151+
wakeTriggered: Type.Boolean(),
152+
},
153+
{ additionalProperties: false },
154+
);
155+
98156
export const NodeInvokeRequestEventSchema = Type.Object(
99157
{
100158
id: NonEmptyString,

0 commit comments

Comments
 (0)