Skip to content

Commit 0f2b99f

Browse files
committed
Telegram: preserve inbound debounce order
1 parent 8ed33c2 commit 0f2b99f

File tree

4 files changed

+277
-14
lines changed

4 files changed

+277
-14
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ Docs: https://docs.openclaw.ai
325325
- Memory/core tools: register `memory_search` and `memory_get` independently so one unavailable memory tool no longer suppresses the other in new sessions. (#50198) Thanks @artwalker.
326326
- Telegram/Mattermost message tool: keep plugin button schemas optional in isolated and cron sessions so plain sends do not fail validation when no current channel is active. (#52589) Thanks @tylerliu612.
327327
- Release/npm publish: fail the npm release check when `dist/control-ui/index.html` is missing from the packed tarball, so broken Control UI asset releases are blocked before publish. Fixes #52808. (#52852) Thanks @kevinheinrichs.
328+
- Channels/inbound debounce: reserve same-conversation timer-backed flush order so a buffered run cannot be overtaken by a newer message for the same key, preventing Telegram reply desync after long debounced turns. Fixes #52982.
328329

329330
## 2026.3.13
330331

extensions/telegram/src/bot.create-telegram-bot.test.ts

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,162 @@ describe("createTelegramBot", () => {
165165
expect(middlewareUseSpy).toHaveBeenCalledWith(sequentializeSpy.mock.results[0]?.value);
166166
expect(harness.sequentializeKey).toBe(getTelegramSequentialKey);
167167
});
168+
169+
it("preserves same-chat reply order when a debounced run is still active", async () => {
170+
const DEBOUNCE_MS = 4321;
171+
loadConfig.mockReturnValue({
172+
agents: {
173+
defaults: {
174+
envelopeTimezone: "utc",
175+
},
176+
},
177+
messages: {
178+
inbound: {
179+
debounceMs: DEBOUNCE_MS,
180+
},
181+
},
182+
channels: {
183+
telegram: { dmPolicy: "open", allowFrom: ["*"] },
184+
},
185+
});
186+
187+
sequentializeSpy.mockImplementationOnce(() => {
188+
const lanes = new Map<string, Promise<void>>();
189+
return async (ctx: Record<string, unknown>, next: () => Promise<void>) => {
190+
const key = harness.sequentializeKey?.(ctx) ?? "default";
191+
const previous = lanes.get(key) ?? Promise.resolve();
192+
const current = previous.then(async () => {
193+
await next();
194+
});
195+
lanes.set(
196+
key,
197+
current.catch(() => undefined),
198+
);
199+
try {
200+
await current;
201+
} finally {
202+
if (lanes.get(key) === current) {
203+
lanes.delete(key);
204+
}
205+
}
206+
};
207+
});
208+
209+
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
210+
const startedBodies: string[] = [];
211+
let releaseFirstRun!: () => void;
212+
const firstRunGate = new Promise<void>((resolve) => {
213+
releaseFirstRun = resolve;
214+
});
215+
216+
replySpy.mockImplementation(async (ctx: MsgContext, opts?: GetReplyOptions) => {
217+
await opts?.onReplyStart?.();
218+
const body = String(ctx.Body ?? "");
219+
startedBodies.push(body);
220+
if (body.includes("first")) {
221+
await firstRunGate;
222+
}
223+
return { text: `reply:${body}` };
224+
});
225+
226+
const runMiddlewareChain = async (ctx: Record<string, unknown>) => {
227+
const middlewares = middlewareUseSpy.mock.calls
228+
.map((call) => call[0])
229+
.filter(
230+
(fn): fn is (ctx: Record<string, unknown>, next: () => Promise<void>) => Promise<void> =>
231+
typeof fn === "function",
232+
);
233+
const handler = getOnHandler("message") as (ctx: Record<string, unknown>) => Promise<void>;
234+
let idx = -1;
235+
const dispatch = async (i: number): Promise<void> => {
236+
if (i <= idx) {
237+
throw new Error("middleware dispatch called multiple times");
238+
}
239+
idx = i;
240+
const fn = middlewares[i];
241+
if (!fn) {
242+
await handler(ctx);
243+
return;
244+
}
245+
await fn(ctx, async () => dispatch(i + 1));
246+
};
247+
await dispatch(0);
248+
};
249+
250+
const extractLatestDebounceFlush = () => {
251+
const debounceCallIndex = setTimeoutSpy.mock.calls.findLastIndex(
252+
(call) => call[1] === DEBOUNCE_MS,
253+
);
254+
expect(debounceCallIndex).toBeGreaterThanOrEqual(0);
255+
clearTimeout(
256+
setTimeoutSpy.mock.results[debounceCallIndex]?.value as ReturnType<typeof setTimeout>,
257+
);
258+
return setTimeoutSpy.mock.calls[debounceCallIndex]?.[0] as (() => Promise<void>) | undefined;
259+
};
260+
261+
try {
262+
createTelegramBot({ token: "tok" });
263+
264+
await runMiddlewareChain({
265+
update: { update_id: 101 },
266+
message: {
267+
chat: { id: 7, type: "private" },
268+
text: "first",
269+
date: 1736380800,
270+
message_id: 101,
271+
from: { id: 42, first_name: "Ada" },
272+
},
273+
me: { username: "openclaw_bot" },
274+
getFile: async () => ({}),
275+
});
276+
277+
const flushFirst = extractLatestDebounceFlush();
278+
const firstFlush = flushFirst?.();
279+
280+
await vi.waitFor(() => {
281+
expect(startedBodies).toHaveLength(1);
282+
expect(startedBodies[0]).toContain("first");
283+
});
284+
285+
await runMiddlewareChain({
286+
update: { update_id: 102 },
287+
message: {
288+
chat: { id: 7, type: "private" },
289+
text: "second",
290+
date: 1736380801,
291+
message_id: 102,
292+
from: { id: 42, first_name: "Ada" },
293+
},
294+
me: { username: "openclaw_bot" },
295+
getFile: async () => ({}),
296+
});
297+
298+
const flushSecond = extractLatestDebounceFlush();
299+
const secondFlush = flushSecond?.();
300+
await Promise.resolve();
301+
302+
expect(startedBodies).toHaveLength(1);
303+
expect(sendMessageSpy).not.toHaveBeenCalled();
304+
305+
releaseFirstRun();
306+
await Promise.all([firstFlush, secondFlush]);
307+
308+
await vi.waitFor(() => {
309+
expect(startedBodies).toHaveLength(2);
310+
expect(sendMessageSpy).toHaveBeenCalledTimes(2);
311+
});
312+
313+
expect(startedBodies[0]).toContain("first");
314+
expect(startedBodies[1]).toContain("second");
315+
expect(sendMessageSpy.mock.calls.map((call) => call[1])).toEqual([
316+
expect.stringContaining("first"),
317+
expect.stringContaining("second"),
318+
]);
319+
} finally {
320+
setTimeoutSpy.mockRestore();
321+
}
322+
});
323+
168324
it("routes callback_query payloads as messages and answers callbacks", async () => {
169325
createTelegramBot({ token: "tok" });
170326
const callbackHandler = onSpy.mock.calls.find((call) => call[0] === "callback_query")?.[1] as (

src/auto-reply/inbound-debounce.ts

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ type DebounceBuffer<T> = {
3737
items: T[];
3838
timeout: ReturnType<typeof setTimeout> | null;
3939
debounceMs: number;
40+
ready: Promise<void>;
41+
releaseReady: () => void;
42+
readyReleased: boolean;
43+
task: Promise<void>;
4044
};
4145

4246
export type InboundDebounceCreateParams<T> = {
@@ -50,6 +54,7 @@ export type InboundDebounceCreateParams<T> = {
5054

5155
export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>) {
5256
const buffers = new Map<string, DebounceBuffer<T>>();
57+
const keyChains = new Map<string, Promise<void>>();
5358
const defaultDebounceMs = Math.max(0, Math.trunc(params.debounceMs));
5459

5560
const resolveDebounceMs = (item: T) => {
@@ -60,20 +65,47 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
6065
return Math.max(0, Math.trunc(resolved));
6166
};
6267

68+
const runFlush = async (items: T[]) => {
69+
try {
70+
await params.onFlush(items);
71+
} catch (err) {
72+
params.onError?.(err, items);
73+
}
74+
};
75+
76+
const enqueueKeyTask = (key: string, task: () => Promise<void>) => {
77+
const previous = keyChains.get(key) ?? Promise.resolve();
78+
const next = previous.catch(() => undefined).then(task);
79+
const settled = next.catch(() => undefined);
80+
keyChains.set(key, settled);
81+
void settled.finally(() => {
82+
if (keyChains.get(key) === settled) {
83+
keyChains.delete(key);
84+
}
85+
});
86+
return next;
87+
};
88+
89+
const releaseBuffer = (buffer: DebounceBuffer<T>) => {
90+
if (buffer.readyReleased) {
91+
return;
92+
}
93+
buffer.readyReleased = true;
94+
buffer.releaseReady();
95+
};
96+
6397
const flushBuffer = async (key: string, buffer: DebounceBuffer<T>) => {
64-
buffers.delete(key);
98+
if (buffers.get(key) === buffer) {
99+
buffers.delete(key);
100+
}
65101
if (buffer.timeout) {
66102
clearTimeout(buffer.timeout);
67103
buffer.timeout = null;
68104
}
69-
if (buffer.items.length === 0) {
70-
return;
71-
}
72-
try {
73-
await params.onFlush(buffer.items);
74-
} catch (err) {
75-
params.onError?.(err, buffer.items);
76-
}
105+
// Reserve each key's execution slot as soon as the first buffered item
106+
// arrives, so later same-key work cannot overtake a timer-backed flush.
107+
releaseBuffer(buffer);
108+
await buffer.task;
77109
};
78110

79111
const flushKey = async (key: string) => {
@@ -103,10 +135,12 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
103135
if (key && buffers.has(key)) {
104136
await flushKey(key);
105137
}
106-
try {
107-
await params.onFlush([item]);
108-
} catch (err) {
109-
params.onError?.(err, [item]);
138+
if (key) {
139+
await enqueueKeyTask(key, async () => {
140+
await runFlush([item]);
141+
});
142+
} else {
143+
await runFlush([item]);
110144
}
111145
return;
112146
}
@@ -119,7 +153,25 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
119153
return;
120154
}
121155

122-
const buffer: DebounceBuffer<T> = { items: [item], timeout: null, debounceMs };
156+
let releaseReady!: () => void;
157+
const buffer: DebounceBuffer<T> = {
158+
items: [item],
159+
timeout: null,
160+
debounceMs,
161+
ready: new Promise<void>((resolve) => {
162+
releaseReady = resolve;
163+
}),
164+
releaseReady,
165+
readyReleased: false,
166+
task: Promise.resolve(),
167+
};
168+
buffer.task = enqueueKeyTask(key, async () => {
169+
await buffer.ready;
170+
if (buffer.items.length === 0) {
171+
return;
172+
}
173+
await runFlush(buffer.items);
174+
});
123175
buffers.set(key, buffer);
124176
scheduleFlush(key, buffer);
125177
};

src/auto-reply/inbound.test.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,60 @@ describe("createInboundDebouncer", () => {
347347

348348
vi.useRealTimers();
349349
});
350+
351+
it("keeps later same-key work behind a timer-backed flush that already started", async () => {
352+
const started: string[] = [];
353+
const finished: string[] = [];
354+
let releaseFirst!: () => void;
355+
const firstGate = new Promise<void>((resolve) => {
356+
releaseFirst = resolve;
357+
});
358+
359+
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
360+
const debouncer = createInboundDebouncer<{ key: string; id: string; debounce: boolean }>({
361+
debounceMs: 50,
362+
buildKey: (item) => item.key,
363+
shouldDebounce: (item) => item.debounce,
364+
onFlush: async (items) => {
365+
const ids = items.map((entry) => entry.id).join(",");
366+
started.push(ids);
367+
if (ids === "1") {
368+
await firstGate;
369+
}
370+
finished.push(ids);
371+
},
372+
});
373+
374+
try {
375+
await debouncer.enqueue({ key: "a", id: "1", debounce: true });
376+
377+
const timerIndex = setTimeoutSpy.mock.calls.findLastIndex((call) => call[1] === 50);
378+
expect(timerIndex).toBeGreaterThanOrEqual(0);
379+
clearTimeout(setTimeoutSpy.mock.results[timerIndex]?.value as ReturnType<typeof setTimeout>);
380+
const flushTimer = setTimeoutSpy.mock.calls[timerIndex]?.[0] as
381+
| (() => Promise<void>)
382+
| undefined;
383+
const firstFlush = flushTimer?.();
384+
385+
await vi.waitFor(() => {
386+
expect(started).toEqual(["1"]);
387+
});
388+
389+
const secondEnqueue = debouncer.enqueue({ key: "a", id: "2", debounce: false });
390+
await Promise.resolve();
391+
392+
expect(started).toEqual(["1"]);
393+
expect(finished).toEqual([]);
394+
395+
releaseFirst();
396+
await Promise.all([firstFlush, secondEnqueue]);
397+
398+
expect(started).toEqual(["1", "2"]);
399+
expect(finished).toEqual(["1", "2"]);
400+
} finally {
401+
setTimeoutSpy.mockRestore();
402+
}
403+
});
350404
});
351405

352406
describe("initSessionState BodyStripped", () => {

0 commit comments

Comments
 (0)