Skip to content

Commit 1bc59cc

Browse files
authored
Gateway: tighten node pending drain semantics (#41429)
Merged via squash. Prepared head SHA: 361c2eb Co-authored-by: mbelinky <[email protected]> Co-authored-by: mbelinky <[email protected]> Reviewed-by: @mbelinky
1 parent ef95975 commit 1bc59cc

File tree

3 files changed

+42
-9
lines changed

3 files changed

+42
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai
2626
- iOS/gateway foreground recovery: reconnect immediately on foreground return after stale background sockets are torn down, so the app no longer stays disconnected until a later wake path happens. (#41384) Thanks @mbelinky.
2727
- Cron/subagent followup: do not misclassify empty or `NO_REPLY` cron responses as interim acknowledgements that need a rerun, so deliberately silent cron jobs are no longer retried. (#41383) thanks @jackal092927.
2828
- Auth/cooldowns: reset expired auth-profile cooldown error counters before computing the next backoff so stale on-disk counters do not re-escalate into long cooldown loops after expiry. (#41028) thanks @zerone0x.
29+
- Gateway/node pending drain followup: keep `hasMore` true when the deferred baseline status item still needs delivery, and avoid allocating empty pending-work state for drain-only nodes with no queued work. (#41429) Thanks @mbelinky.
2930

3031
## 2026.3.8
3132

src/gateway/node-pending-work.test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
acknowledgeNodePendingWork,
44
drainNodePendingWork,
55
enqueueNodePendingWork,
6+
getNodePendingWorkStateCountForTests,
67
resetNodePendingWorkForTests,
78
} from "./node-pending-work.js";
89

@@ -43,4 +44,24 @@ describe("node pending work", () => {
4344
const afterAck = drainNodePendingWork("node-2");
4445
expect(afterAck.items.map((item) => item.id)).toEqual(["baseline-status"]);
4546
});
47+
48+
it("keeps hasMore true when the baseline status item is deferred by maxItems", () => {
49+
enqueueNodePendingWork({ nodeId: "node-3", type: "location.request" });
50+
51+
const drained = drainNodePendingWork("node-3", { maxItems: 1 });
52+
53+
expect(drained.items.map((item) => item.type)).toEqual(["location.request"]);
54+
expect(drained.hasMore).toBe(true);
55+
});
56+
57+
it("does not allocate state for drain-only nodes with no queued work", () => {
58+
expect(getNodePendingWorkStateCountForTests()).toBe(0);
59+
60+
const drained = drainNodePendingWork("node-4");
61+
const acked = acknowledgeNodePendingWork({ nodeId: "node-4", itemIds: ["baseline-status"] });
62+
63+
expect(drained.items.map((item) => item.id)).toEqual(["baseline-status"]);
64+
expect(acked).toEqual({ revision: 0, removedItemIds: [] });
65+
expect(getNodePendingWorkStateCountForTests()).toBe(0);
66+
});
4667
});

src/gateway/node-pending-work.ts

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ const PRIORITY_RANK: Record<NodePendingWorkPriority, number> = {
4545

4646
const stateByNodeId = new Map<string, NodePendingWorkState>();
4747

48-
function getState(nodeId: string): NodePendingWorkState {
48+
function getOrCreateState(nodeId: string): NodePendingWorkState {
4949
let state = stateByNodeId.get(nodeId);
5050
if (!state) {
5151
state = {
@@ -106,7 +106,7 @@ export function enqueueNodePendingWork(params: {
106106
throw new Error("nodeId required");
107107
}
108108
const nowMs = Date.now();
109-
const state = getState(nodeId);
109+
const state = getOrCreateState(nodeId);
110110
pruneExpired(state, nowMs);
111111
const existing = [...state.itemsById.values()].find((item) => item.type === params.type);
112112
if (existing) {
@@ -134,21 +134,25 @@ export function drainNodePendingWork(nodeId: string, opts: DrainOptions = {}): D
134134
return { revision: 0, items: [], hasMore: false };
135135
}
136136
const nowMs = opts.nowMs ?? Date.now();
137-
const state = getState(normalizedNodeId);
138-
pruneExpired(state, nowMs);
137+
const state = stateByNodeId.get(normalizedNodeId);
138+
const revision = state?.revision ?? 0;
139+
if (state) {
140+
pruneExpired(state, nowMs);
141+
}
139142
const maxItems = Math.min(MAX_ITEMS, Math.max(1, Math.trunc(opts.maxItems ?? DEFAULT_MAX_ITEMS)));
140-
const explicitItems = sortedItems(state);
143+
const explicitItems = state ? sortedItems(state) : [];
141144
const items = explicitItems.slice(0, maxItems);
142145
const hasExplicitStatus = explicitItems.some((item) => item.type === "status.request");
143146
const includeBaseline = opts.includeDefaultStatus !== false && !hasExplicitStatus;
144147
if (includeBaseline && items.length < maxItems) {
145148
items.push(makeBaselineStatusItem(nowMs));
146149
}
150+
const explicitReturnedCount = items.filter((item) => item.id !== DEFAULT_STATUS_ITEM_ID).length;
151+
const baselineIncluded = items.some((item) => item.id === DEFAULT_STATUS_ITEM_ID);
147152
return {
148-
revision: state.revision,
153+
revision,
149154
items,
150-
hasMore:
151-
explicitItems.length > items.filter((item) => item.id !== DEFAULT_STATUS_ITEM_ID).length,
155+
hasMore: explicitItems.length > explicitReturnedCount || (includeBaseline && !baselineIncluded),
152156
};
153157
}
154158

@@ -160,7 +164,10 @@ export function acknowledgeNodePendingWork(params: { nodeId: string; itemIds: st
160164
if (!nodeId) {
161165
return { revision: 0, removedItemIds: [] };
162166
}
163-
const state = getState(nodeId);
167+
const state = stateByNodeId.get(nodeId);
168+
if (!state) {
169+
return { revision: 0, removedItemIds: [] };
170+
}
164171
const removedItemIds: string[] = [];
165172
for (const itemId of params.itemIds) {
166173
const trimmedId = itemId.trim();
@@ -180,3 +187,7 @@ export function acknowledgeNodePendingWork(params: { nodeId: string; itemIds: st
180187
export function resetNodePendingWorkForTests() {
181188
stateByNodeId.clear();
182189
}
190+
191+
export function getNodePendingWorkStateCountForTests(): number {
192+
return stateByNodeId.size;
193+
}

0 commit comments

Comments
 (0)