Skip to content

Commit 57f1cf6

Browse files
fix(gateway): skip seq-gap broadcast for stale post-lifecycle events (#43751)
* fix: stop stale gateway seq-gap errors (#43751) (thanks @caesargattuso) * fix: keep agent.request run ids session-scoped --------- Co-authored-by: Ayaan Zaidi <[email protected]>
1 parent 192f859 commit 57f1cf6

File tree

5 files changed

+56
-4
lines changed

5 files changed

+56
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ Docs: https://docs.openclaw.ai
118118
- Gateway/config validation: stop treating the implicit default memory slot as a required explicit plugin config, so startup no longer fails with `plugins.slots.memory: plugin not found: memory-core` when `memory-core` was only inferred. (#47494) Thanks @ngutman.
119119
- Tlon: honor explicit empty allowlists and defer cite expansion. (#46788) Thanks @zpbrent and @vincentkoc.
120120
- Tlon/DM auth: defer cited-message expansion until after DM authorization and owner command handling, so unauthorized DMs and owner approval/admin commands no longer trigger cross-channel cite fetches before the deny or command path.
121+
- Gateway/agent events: stop broadcasting false end-of-run `seq gap` errors to clients, and isolate node-driven ingress turns with per-turn run IDs so stale tail events cannot leak into later session runs. (#43751) Thanks @caesargattuso.
121122
- Docs/security audit: spell out that `gateway.controlUi.allowedOrigins: ["*"]` is an explicit allow-all browser-origin policy and should be avoided outside tightly controlled local testing.
122123
- Gateway/auth: clear self-declared scopes for device-less trusted-proxy Control UI sessions so proxy-authenticated connects cannot claim admin or secrets scopes without a bound device identity.
123124
- Nodes/pending actions: re-check queued foreground actions against the current node command policy before returning them to the node. (#46815) Thanks @zpbrent and @vincentkoc.

src/gateway/server-chat.agent-events.test.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,46 @@ describe("agent event handler", () => {
487487
nowSpy?.mockRestore();
488488
});
489489

490+
it("drops stale events that arrive after lifecycle completion", () => {
491+
const { broadcast, nodeSendToSession, chatRunState, handler, nowSpy } = createHarness({
492+
now: 2_500,
493+
});
494+
chatRunState.registry.add("run-stale-tail", {
495+
sessionKey: "session-stale-tail",
496+
clientRunId: "client-stale-tail",
497+
});
498+
499+
handler({
500+
runId: "run-stale-tail",
501+
seq: 1,
502+
stream: "assistant",
503+
ts: Date.now(),
504+
data: { text: "done" },
505+
});
506+
emitLifecycleEnd(handler, "run-stale-tail");
507+
const errorCallsBeforeStaleEvent = broadcast.mock.calls.filter(
508+
([event, payload]) =>
509+
event === "agent" && (payload as { stream?: string }).stream === "error",
510+
).length;
511+
const sessionChatCallsBeforeStaleEvent = sessionChatCalls(nodeSendToSession).length;
512+
513+
handler({
514+
runId: "run-stale-tail",
515+
seq: 3,
516+
stream: "assistant",
517+
ts: Date.now(),
518+
data: { text: "late tail" },
519+
});
520+
521+
const errorCalls = broadcast.mock.calls.filter(
522+
([event, payload]) =>
523+
event === "agent" && (payload as { stream?: string }).stream === "error",
524+
);
525+
expect(errorCalls).toHaveLength(errorCallsBeforeStaleEvent);
526+
expect(sessionChatCalls(nodeSendToSession)).toHaveLength(sessionChatCallsBeforeStaleEvent);
527+
nowSpy?.mockRestore();
528+
});
529+
490530
it("flushes buffered chat delta before tool start events", () => {
491531
let now = 12_000;
492532
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);

src/gateway/server-chat.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -710,7 +710,7 @@ export function createAgentEventHandler({
710710
: { ...eventForClients, data };
711711
})()
712712
: agentPayload;
713-
if (evt.seq !== last + 1) {
713+
if (last > 0 && evt.seq !== last + 1) {
714714
broadcast("agent", {
715715
runId: eventRunId,
716716
stream: "error",

src/gateway/server-node-events.test.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,9 @@ describe("voice transcript events", () => {
410410
});
411411

412412
it("forwards transcript with voice provenance", async () => {
413+
const addChatRun = vi.fn();
413414
const ctx = buildCtx();
415+
ctx.addChatRun = addChatRun;
414416

415417
await handleNodeEvent(ctx, "node-v2", {
416418
event: "voice.transcript",
@@ -432,6 +434,12 @@ describe("voice transcript events", () => {
432434
sourceTool: "gateway.voice.transcript",
433435
},
434436
});
437+
expect(typeof opts.runId).toBe("string");
438+
expect(opts.runId).not.toBe(opts.sessionId);
439+
expect(addChatRun).toHaveBeenCalledWith(
440+
opts.runId,
441+
expect.objectContaining({ clientRunId: expect.stringMatching(/^voice-/) }),
442+
);
435443
});
436444

437445
it("does not block agent dispatch when session-store touch fails", async () => {
@@ -674,5 +682,6 @@ describe("agent request events", () => {
674682
channel: "telegram",
675683
to: "123",
676684
});
685+
expect(opts.runId).toBe(opts.sessionId);
677686
});
678687
});

src/gateway/server-node-events.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -288,16 +288,18 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt
288288
sessionId,
289289
now,
290290
});
291+
const runId = randomUUID();
291292

292293
// Ensure chat UI clients refresh when this run completes (even though it wasn't started via chat.send).
293-
// This maps agent bus events (keyed by sessionId) to chat events (keyed by clientRunId).
294-
ctx.addChatRun(sessionId, {
294+
// This maps agent bus events (keyed by per-turn runId) to chat events (keyed by clientRunId).
295+
ctx.addChatRun(runId, {
295296
sessionKey: canonicalKey,
296297
clientRunId: `voice-${randomUUID()}`,
297298
});
298299

299300
void agentCommandFromIngress(
300301
{
302+
runId,
301303
message: text,
302304
sessionId,
303305
sessionKey: canonicalKey,
@@ -404,7 +406,6 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt
404406
const deliver = deliverRequested && Boolean(channel && to);
405407
const deliveryChannel = deliver ? channel : undefined;
406408
const deliveryTo = deliver ? to : undefined;
407-
408409
if (deliverRequested && !deliver) {
409410
ctx.logGateway.warn(
410411
`agent delivery disabled node=${nodeId}: missing session delivery route (channel=${channel ?? "-"} to=${to ?? "-"})`,
@@ -430,6 +431,7 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt
430431

431432
void agentCommandFromIngress(
432433
{
434+
runId: sessionId,
433435
message,
434436
images,
435437
sessionId,

0 commit comments

Comments
 (0)