Skip to content

Commit e33d7fc

Browse files
frankeknobviyus
andauthored
fix(telegram): prevent update offset skipping queued updates (#23284)
Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: 92efaf9 Co-authored-by: frankekn <[email protected]> Co-authored-by: obviyus <[email protected]> Reviewed-by: @obviyus
1 parent 98a03c4 commit e33d7fc

File tree

3 files changed

+129
-13
lines changed

3 files changed

+129
-13
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ Docs: https://docs.openclaw.ai
3131
- Security/Hooks transforms: enforce symlink-safe containment for webhook transform module paths (including `hooks.transformsDir` and `hooks.mappings[].transform.module`) by resolving existing-path ancestors via realpath before import, while preserving in-root symlink support; add regression coverage for both escape and allow cases. This ships in the next npm release. Thanks @aether-ai-agent for reporting.
3232
- Telegram/WSL2: disable `autoSelectFamily` by default on WSL2 and memoize WSL2 detection in Telegram network decision logic to avoid repeated sync `/proc/version` probes on fetch/send paths. (#21916) Thanks @MizukiMachine.
3333
- Telegram/Streaming: preserve archived draft preview mapping after flush and clean superseded reasoning preview bubbles so multi-message preview finals no longer cross-edit or orphan stale messages under send/rotation races. (#23202) Thanks @obviyus.
34+
- Telegram/Polling: persist a safe update-offset watermark bounded by pending updates so crash/restart cannot skip queued lower `update_id` updates after out-of-order completion. (#23284) thanks @frankekn.
3435
- Slack/Slash commands: preserve the Bolt app receiver when registering external select options handlers so monitor startup does not crash on runtimes that require bound `app.options` calls. (#23209) Thanks @0xgaia.
3536
- Slack/Telegram slash sessions: await session metadata persistence before dispatch so first-turn native slash runs do not race session-origin metadata updates. (#23065) thanks @hydro13.
3637
- Agents/Ollama: preserve unsafe integer tool-call arguments as exact strings during NDJSON parsing, preventing large numeric IDs from being rounded before tool execution. (#23170) Thanks @BestJoester.

src/telegram/bot.create-telegram-bot.test.ts

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,83 @@ describe("createTelegramBot", () => {
445445
});
446446
expect(replySpy).toHaveBeenCalledTimes(1);
447447
});
448+
449+
it("does not persist update offset past pending updates", async () => {
450+
// For this test we need sequentialize(...) to behave like a normal middleware and call next().
451+
sequentializeSpy.mockImplementationOnce(
452+
() => async (_ctx: unknown, next: () => Promise<void>) => {
453+
await next();
454+
},
455+
);
456+
457+
const onUpdateId = vi.fn();
458+
loadConfig.mockReturnValue({
459+
channels: { telegram: { dmPolicy: "open", allowFrom: ["*"] } },
460+
});
461+
462+
createTelegramBot({
463+
token: "tok",
464+
updateOffset: {
465+
lastUpdateId: 100,
466+
onUpdateId,
467+
},
468+
});
469+
470+
type Middleware = (
471+
ctx: Record<string, unknown>,
472+
next: () => Promise<void>,
473+
) => Promise<void> | void;
474+
475+
const middlewares = middlewareUseSpy.mock.calls
476+
.map((call) => call[0])
477+
.filter((fn): fn is Middleware => typeof fn === "function");
478+
479+
const runMiddlewareChain = async (
480+
ctx: Record<string, unknown>,
481+
finalNext: () => Promise<void>,
482+
) => {
483+
let idx = -1;
484+
const dispatch = async (i: number): Promise<void> => {
485+
if (i <= idx) {
486+
throw new Error("middleware dispatch called multiple times");
487+
}
488+
idx = i;
489+
const fn = middlewares[i];
490+
if (!fn) {
491+
await finalNext();
492+
return;
493+
}
494+
await fn(ctx, async () => dispatch(i + 1));
495+
};
496+
await dispatch(0);
497+
};
498+
499+
let releaseUpdate101: (() => void) | undefined;
500+
const update101Gate = new Promise<void>((resolve) => {
501+
releaseUpdate101 = resolve;
502+
});
503+
504+
// Start processing update 101 but keep it pending (simulates an update queued behind sequentialize()).
505+
const p101 = runMiddlewareChain({ update: { update_id: 101 } }, async () => update101Gate);
506+
// Let update 101 enter the chain and mark itself pending before 102 completes.
507+
await Promise.resolve();
508+
509+
// Complete update 102 while 101 is still pending. The persisted watermark must not jump to 102.
510+
await runMiddlewareChain({ update: { update_id: 102 } }, async () => {});
511+
512+
const persistedValues = onUpdateId.mock.calls.map((call) => Number(call[0]));
513+
const maxPersisted = persistedValues.length > 0 ? Math.max(...persistedValues) : -Infinity;
514+
expect(maxPersisted).toBeLessThan(101);
515+
516+
releaseUpdate101?.();
517+
await p101;
518+
519+
// Once the pending update finishes, the watermark can safely catch up.
520+
const persistedAfterDrain = onUpdateId.mock.calls.map((call) => Number(call[0]));
521+
const maxPersistedAfterDrain =
522+
persistedAfterDrain.length > 0 ? Math.max(...persistedAfterDrain) : -Infinity;
523+
expect(maxPersistedAfterDrain).toBe(102);
524+
});
448525
it("allows distinct callback_query ids without update_id", async () => {
449526
loadConfig.mockReturnValue({
450527
channels: {

src/telegram/bot.ts

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -148,34 +148,53 @@ export function createTelegramBot(opts: TelegramBotOptions) {
148148

149149
const bot = new Bot(opts.token, client ? { client } : undefined);
150150
bot.api.config.use(apiThrottler());
151-
bot.use(sequentialize(getTelegramSequentialKey));
152151
// Catch all errors from bot middleware to prevent unhandled rejections
153152
bot.catch((err) => {
154153
runtime.error?.(danger(`telegram bot error: ${formatUncaughtError(err)}`));
155154
});
156155

157156
const recentUpdates = createTelegramUpdateDedupe();
158-
let lastUpdateId =
157+
const initialUpdateId =
159158
typeof opts.updateOffset?.lastUpdateId === "number" ? opts.updateOffset.lastUpdateId : null;
160159

161-
const recordUpdateId = (ctx: TelegramUpdateKeyContext) => {
162-
const updateId = resolveTelegramUpdateId(ctx);
163-
if (typeof updateId !== "number") {
160+
// Track update_ids that have entered the middleware pipeline but have not completed yet.
161+
// This includes updates that are "queued" behind sequentialize(...) for a chat/topic key.
162+
// We only persist a watermark that is strictly less than the smallest pending update_id,
163+
// so we never write an offset that would skip an update still waiting to run.
164+
const pendingUpdateIds = new Set<number>();
165+
let highestCompletedUpdateId: number | null = initialUpdateId;
166+
let highestPersistedUpdateId: number | null = initialUpdateId;
167+
const maybePersistSafeWatermark = () => {
168+
if (typeof opts.updateOffset?.onUpdateId !== "function") {
164169
return;
165170
}
166-
if (lastUpdateId !== null && updateId <= lastUpdateId) {
171+
if (highestCompletedUpdateId === null) {
167172
return;
168173
}
169-
lastUpdateId = updateId;
170-
void opts.updateOffset?.onUpdateId?.(updateId);
174+
let safe = highestCompletedUpdateId;
175+
if (pendingUpdateIds.size > 0) {
176+
let minPending: number | null = null;
177+
for (const id of pendingUpdateIds) {
178+
if (minPending === null || id < minPending) {
179+
minPending = id;
180+
}
181+
}
182+
if (minPending !== null) {
183+
safe = Math.min(safe, minPending - 1);
184+
}
185+
}
186+
if (highestPersistedUpdateId !== null && safe <= highestPersistedUpdateId) {
187+
return;
188+
}
189+
highestPersistedUpdateId = safe;
190+
void opts.updateOffset.onUpdateId(safe);
171191
};
172192

173193
const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => {
174194
const updateId = resolveTelegramUpdateId(ctx);
175-
if (typeof updateId === "number" && lastUpdateId !== null) {
176-
if (updateId <= lastUpdateId) {
177-
return true;
178-
}
195+
const skipCutoff = highestPersistedUpdateId ?? initialUpdateId;
196+
if (typeof updateId === "number" && skipCutoff !== null && updateId <= skipCutoff) {
197+
return true;
179198
}
180199
const key = buildTelegramUpdateKey(ctx);
181200
const skipped = recentUpdates.check(key);
@@ -185,6 +204,26 @@ export function createTelegramBot(opts: TelegramBotOptions) {
185204
return skipped;
186205
};
187206

207+
bot.use(async (ctx, next) => {
208+
const updateId = resolveTelegramUpdateId(ctx);
209+
if (typeof updateId === "number") {
210+
pendingUpdateIds.add(updateId);
211+
}
212+
try {
213+
await next();
214+
} finally {
215+
if (typeof updateId === "number") {
216+
pendingUpdateIds.delete(updateId);
217+
if (highestCompletedUpdateId === null || updateId > highestCompletedUpdateId) {
218+
highestCompletedUpdateId = updateId;
219+
}
220+
maybePersistSafeWatermark();
221+
}
222+
}
223+
});
224+
225+
bot.use(sequentialize(getTelegramSequentialKey));
226+
188227
const rawUpdateLogger = createSubsystemLogger("gateway/channels/telegram/raw-update");
189228
const MAX_RAW_UPDATE_CHARS = 8000;
190229
const MAX_RAW_UPDATE_STRING = 500;
@@ -223,7 +262,6 @@ export function createTelegramBot(opts: TelegramBotOptions) {
223262
}
224263
}
225264
await next();
226-
recordUpdateId(ctx);
227265
});
228266

229267
const historyLimit = Math.max(

0 commit comments

Comments
 (0)