Skip to content

Commit af20938

Browse files
committed
fix: preserve debounce and followup ordering (#52998) (thanks @osolmaz)
1 parent 1168c3b commit af20938

File tree

6 files changed

+96
-1
lines changed

6 files changed

+96
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai
2727
- Voice-call/Plivo: stabilize Plivo v2 replay keys so webhook retries and replay protection stop colliding on valid follow-up deliveries.
2828
- Release/install: keep previously released bundled plugins and Control UI assets in published openclaw npm installs, and fail release checks when those shipped artifacts are missing. Thanks @vincentkoc.
2929
- Mistral/models: lower bundled Mistral max-token defaults to safe output budgets and teach `openclaw doctor --fix` to repair old persisted Mistral provider configs that still carry context-sized output limits, avoiding deterministic Mistral 422 rejects on fresh and existing setups. Fixes #52599. Thanks @vincentkoc.
30+
- Telegram/auto-reply: preserve same-chat inbound debounce order without stranding stale busy-session followups, and keep same-key overflow turns ordered when tracked debounce keys are saturated. (#52998) Thanks @osolmaz.
3031

3132
## 2026.3.22
3233

src/auto-reply/inbound-debounce.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,11 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
203203
return;
204204
}
205205
if (!canTrackKey(key)) {
206-
await runFlush([item]);
206+
// When the debounce map is saturated, fall back to immediate keyed work
207+
// instead of buffering, but still preserve same-key ordering.
208+
await enqueueKeyTask(key, async () => {
209+
await runFlush([item]);
210+
});
207211
return;
208212
}
209213

src/auto-reply/inbound.test.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,69 @@ describe("createInboundDebouncer", () => {
543543

544544
vi.useRealTimers();
545545
});
546+
547+
it("keeps same-key overflow work ordered after falling back to immediate flushes", async () => {
548+
const started: string[] = [];
549+
const finished: string[] = [];
550+
let releaseOverflow!: () => void;
551+
const overflowGate = new Promise<void>((resolve) => {
552+
releaseOverflow = resolve;
553+
});
554+
555+
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
556+
const debouncer = createInboundDebouncer<{ key: string; id: string }>({
557+
debounceMs: 50,
558+
maxTrackedKeys: 1,
559+
buildKey: (item) => item.key,
560+
onFlush: async (items) => {
561+
const ids = items.map((entry) => entry.id).join(",");
562+
started.push(ids);
563+
if (ids === "2") {
564+
await overflowGate;
565+
}
566+
finished.push(ids);
567+
},
568+
});
569+
570+
try {
571+
await debouncer.enqueue({ key: "a", id: "1" });
572+
const callCountBeforeOverflow = setTimeoutSpy.mock.calls.length;
573+
clearTimeout(
574+
setTimeoutSpy.mock.results[callCountBeforeOverflow - 1]?.value as ReturnType<
575+
typeof setTimeout
576+
>,
577+
);
578+
579+
const overflowEnqueue = debouncer.enqueue({ key: "b", id: "2" });
580+
await vi.waitFor(() => {
581+
expect(started).toEqual(["2"]);
582+
});
583+
584+
const bufferedEnqueue = debouncer.enqueue({ key: "b", id: "3" });
585+
const bufferedTimerIndex = setTimeoutSpy.mock.calls.findLastIndex(
586+
(call, index) => index >= callCountBeforeOverflow && call[1] === 50,
587+
);
588+
expect(bufferedTimerIndex).toBeGreaterThanOrEqual(callCountBeforeOverflow);
589+
clearTimeout(
590+
setTimeoutSpy.mock.results[bufferedTimerIndex]?.value as ReturnType<typeof setTimeout>,
591+
);
592+
const bufferedFlush = (
593+
setTimeoutSpy.mock.calls[bufferedTimerIndex]?.[0] as (() => Promise<void>) | undefined
594+
)?.();
595+
596+
await Promise.resolve();
597+
expect(started).toEqual(["2"]);
598+
expect(finished).toEqual([]);
599+
600+
releaseOverflow();
601+
await Promise.all([overflowEnqueue, bufferedEnqueue, bufferedFlush]);
602+
603+
expect(started).toEqual(["2", "3"]);
604+
expect(finished).toEqual(["2", "3"]);
605+
} finally {
606+
setTimeoutSpy.mockRestore();
607+
}
608+
});
546609
});
547610

548611
describe("initSessionState BodyStripped", () => {

src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ function createMinimalRun(params?: {
114114
typingMode?: TypingMode;
115115
blockStreamingEnabled?: boolean;
116116
isActive?: boolean;
117+
isRunActive?: () => boolean;
117118
shouldFollowup?: boolean;
118119
resolvedQueueMode?: string;
119120
runOverrides?: Partial<FollowupRun["run"]>;
@@ -169,6 +170,7 @@ function createMinimalRun(params?: {
169170
shouldSteer: false,
170171
shouldFollowup: params?.shouldFollowup ?? false,
171172
isActive: params?.isActive ?? false,
173+
isRunActive: params?.isRunActive,
172174
isStreaming: false,
173175
opts,
174176
typing,
@@ -326,6 +328,23 @@ describe("runReplyAgent heartbeat followup guard", () => {
326328
expect(state.runEmbeddedPiAgentMock).not.toHaveBeenCalled();
327329
});
328330

331+
it("starts draining immediately when the active snapshot is already stale", async () => {
332+
const { run } = createMinimalRun({
333+
opts: { isHeartbeat: false },
334+
isActive: true,
335+
isRunActive: () => false,
336+
shouldFollowup: true,
337+
resolvedQueueMode: "collect",
338+
});
339+
340+
const result = await run();
341+
342+
expect(result).toBeUndefined();
343+
expect(vi.mocked(enqueueFollowupRun)).toHaveBeenCalledTimes(1);
344+
expect(vi.mocked(scheduleFollowupDrain)).toHaveBeenCalledTimes(1);
345+
expect(state.runEmbeddedPiAgentMock).not.toHaveBeenCalled();
346+
});
347+
329348
it("drains followup queue when an unexpected exception escapes the run path", async () => {
330349
const accounting = await import("./session-run-accounting.js");
331350
const persistSpy = vi

src/auto-reply/reply/agent-runner.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ export async function runReplyAgent(params: {
7373
shouldSteer: boolean;
7474
shouldFollowup: boolean;
7575
isActive: boolean;
76+
isRunActive?: () => boolean;
7677
isStreaming: boolean;
7778
opts?: GetReplyOptions;
7879
typing: TypingController;
@@ -104,6 +105,7 @@ export async function runReplyAgent(params: {
104105
shouldSteer,
105106
shouldFollowup,
106107
isActive,
108+
isRunActive,
107109
isStreaming,
108110
opts,
109111
typing,
@@ -241,6 +243,11 @@ export async function runReplyAgent(params: {
241243
queuedRunFollowupTurn,
242244
false,
243245
);
246+
// Re-check liveness after enqueue so a stale active snapshot cannot leave
247+
// the followup queue idle if the original run already finished.
248+
if (!isRunActive?.()) {
249+
finalizeWithFollowup(undefined, queueKey, queuedRunFollowupTurn);
250+
}
244251
await touchActiveSessionEntry();
245252
typing.cleanup();
246253
return undefined;

src/auto-reply/reply/get-reply-run.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,7 @@ export async function runPreparedReply(
585585
shouldSteer,
586586
shouldFollowup,
587587
isActive,
588+
isRunActive: () => isEmbeddedPiRunActive(sessionIdFinal),
588589
isStreaming,
589590
opts,
590591
typing,

0 commit comments

Comments
 (0)