Skip to content

Commit 3d99882

Browse files
authored
fix: stabilize Telegram draft boundaries and suppress NO_REPLY lead leaks (openclaw#33169)
* fix: stabilize telegram draft stream message boundaries * fix: suppress NO_REPLY lead-fragment leaks * fix: keep underscore guard for non-NO_REPLY prefixes * fix: skip assistant-start rotation only after real lane rotation * fix: preserve finalized state when pre-rotation does not force * fix: reset finalized preview state on message-start boundary * fix: document Telegram draft boundary + NO_REPLY reliability updates (openclaw#33169) (thanks @obviyus)
1 parent a7a9a3d commit 3d99882

File tree

8 files changed

+212
-137
lines changed

8 files changed

+212
-137
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai
1717
- Discord/typing cleanup: stop typing indicators after silent/NO_REPLY runs by marking the run complete before dispatch idle cleanup. Thanks @thewilloftheshadow.
1818
- Discord/voice messages: request upload slots with JSON fetch calls so voice message uploads no longer fail with content-type errors. Thanks @thewilloftheshadow.
1919
- Telegram/DM draft finalization reliability: require verified final-text draft emission before treating preview finalization as delivered, and fall back to normal payload send when final draft delivery is not confirmed (preventing missing final responses and preserving media/button delivery). (#32118) Thanks @OpenCils.
20+
- Telegram/draft preview boundary + silent-token reliability: stabilize answer-lane message boundaries across late-partial/message-start races, preserve/reset finalized preview state at the correct boundaries, and suppress `NO_REPLY` lead-fragment leaks without broad heartbeat-prefix false positives. (#33169) Thanks @obviyus.
2021
- Discord/audit wildcard warnings: ignore "\*" wildcard keys when counting unresolved guild channels so doctor/status no longer warns on allow-all configs. (#33125) Thanks @thewilloftheshadow.
2122
- Discord/channel resolution: default bare numeric recipients to channels, harden allowlist numeric ID handling with safe fallbacks, and avoid inbound WS heartbeat stalls. (#33142) Thanks @thewilloftheshadow.
2223
- Discord/chunk delivery reliability: preserve chunk ordering when using a REST client and retry chunk sends on 429/5xx using account retry settings. (#33226) Thanks @thewilloftheshadow.

src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ describe("runReplyAgent typing (heartbeat)", () => {
410410
shouldType: false,
411411
},
412412
{
413-
partials: ["NO_", "NO_RE", "NO_REPLY"],
413+
partials: ["NO", "NO_", "NO_RE", "NO_REPLY"],
414414
finalText: "NO_REPLY",
415415
expectedForwarded: [] as string[],
416416
shouldType: false,

src/auto-reply/tokens.test.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ describe("stripSilentToken", () => {
7474
});
7575

7676
describe("isSilentReplyPrefixText", () => {
77-
it("matches uppercase underscore prefixes", () => {
77+
it("matches uppercase token lead fragments", () => {
78+
expect(isSilentReplyPrefixText("NO")).toBe(true);
7879
expect(isSilentReplyPrefixText("NO_")).toBe(true);
7980
expect(isSilentReplyPrefixText("NO_RE")).toBe(true);
8081
expect(isSilentReplyPrefixText("NO_REPLY")).toBe(true);
@@ -84,9 +85,17 @@ describe("isSilentReplyPrefixText", () => {
8485
it("rejects ambiguous natural-language prefixes", () => {
8586
expect(isSilentReplyPrefixText("N")).toBe(false);
8687
expect(isSilentReplyPrefixText("No")).toBe(false);
88+
expect(isSilentReplyPrefixText("no")).toBe(false);
8789
expect(isSilentReplyPrefixText("Hello")).toBe(false);
8890
});
8991

92+
it("keeps underscore guard for non-NO_REPLY tokens", () => {
93+
expect(isSilentReplyPrefixText("HE", "HEARTBEAT_OK")).toBe(false);
94+
expect(isSilentReplyPrefixText("HEART", "HEARTBEAT_OK")).toBe(false);
95+
expect(isSilentReplyPrefixText("HEARTBEAT", "HEARTBEAT_OK")).toBe(false);
96+
expect(isSilentReplyPrefixText("HEARTBEAT_", "HEARTBEAT_OK")).toBe(true);
97+
});
98+
9099
it("rejects non-prefixes and mixed characters", () => {
91100
expect(isSilentReplyPrefixText("NO_X")).toBe(false);
92101
expect(isSilentReplyPrefixText("NO_REPLY more")).toBe(false);

src/auto-reply/tokens.ts

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,34 @@ export function isSilentReplyPrefixText(
5656
if (!text) {
5757
return false;
5858
}
59-
const normalized = text.trimStart().toUpperCase();
59+
const trimmed = text.trimStart();
60+
if (!trimmed) {
61+
return false;
62+
}
63+
// Guard against suppressing natural-language "No..." text while still
64+
// catching uppercase lead fragments like "NO" from streamed NO_REPLY.
65+
if (trimmed !== trimmed.toUpperCase()) {
66+
return false;
67+
}
68+
const normalized = trimmed.toUpperCase();
6069
if (!normalized) {
6170
return false;
6271
}
63-
if (!normalized.includes("_")) {
72+
if (normalized.length < 2) {
6473
return false;
6574
}
6675
if (/[^A-Z_]/.test(normalized)) {
6776
return false;
6877
}
69-
return token.toUpperCase().startsWith(normalized);
78+
const tokenUpper = token.toUpperCase();
79+
if (!tokenUpper.startsWith(normalized)) {
80+
return false;
81+
}
82+
if (normalized.includes("_")) {
83+
return true;
84+
}
85+
// Keep underscore guard for generic tokens to avoid suppressing unrelated
86+
// uppercase words (e.g. HEART/HE with HEARTBEAT_OK). Only allow bare "NO"
87+
// because NO_REPLY streaming can transiently emit that fragment.
88+
return tokenUpper === SILENT_REPLY_TOKEN && normalized === "NO";
7089
}

src/telegram/bot-message-dispatch.test.ts

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,133 @@ describe("dispatchTelegramMessage draft streaming", () => {
444444
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
445445
});
446446

447+
it("rotates before a late second-message partial so finalized preview is not overwritten", async () => {
448+
const answerDraftStream = createSequencedDraftStream(1001);
449+
const reasoningDraftStream = createDraftStream();
450+
createTelegramDraftStream
451+
.mockImplementationOnce(() => answerDraftStream)
452+
.mockImplementationOnce(() => reasoningDraftStream);
453+
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
454+
async ({ dispatcherOptions, replyOptions }) => {
455+
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
456+
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
457+
// Simulate provider ordering bug: first chunk arrives before message-start callback.
458+
await replyOptions?.onPartialReply?.({ text: "Message B early" });
459+
await replyOptions?.onAssistantMessageStart?.();
460+
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
461+
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
462+
return { queuedFinal: true };
463+
},
464+
);
465+
deliverReplies.mockResolvedValue({ delivered: true });
466+
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
467+
468+
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
469+
470+
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
471+
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B early");
472+
const boundaryRotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0];
473+
const secondUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1];
474+
expect(boundaryRotationOrder).toBeLessThan(secondUpdateOrder);
475+
expect(editMessageTelegram).toHaveBeenNthCalledWith(
476+
1,
477+
123,
478+
1001,
479+
"Message A final",
480+
expect.any(Object),
481+
);
482+
expect(editMessageTelegram).toHaveBeenNthCalledWith(
483+
2,
484+
123,
485+
1002,
486+
"Message B final",
487+
expect.any(Object),
488+
);
489+
});
490+
491+
it("does not skip message-start rotation when pre-rotation did not force a new message", async () => {
492+
const answerDraftStream = createSequencedDraftStream(1002);
493+
answerDraftStream.setMessageId(1001);
494+
const reasoningDraftStream = createDraftStream();
495+
createTelegramDraftStream
496+
.mockImplementationOnce(() => answerDraftStream)
497+
.mockImplementationOnce(() => reasoningDraftStream);
498+
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
499+
async ({ dispatcherOptions, replyOptions }) => {
500+
// First message has only final text (no streamed partials), so answer lane
501+
// reaches finalized state with hasStreamedMessage still false.
502+
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
503+
// Provider ordering bug: next message partial arrives before message-start.
504+
await replyOptions?.onPartialReply?.({ text: "Message B early" });
505+
await replyOptions?.onAssistantMessageStart?.();
506+
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
507+
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
508+
return { queuedFinal: true };
509+
},
510+
);
511+
deliverReplies.mockResolvedValue({ delivered: true });
512+
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
513+
const bot = createBot();
514+
515+
await dispatchWithContext({ context: createContext(), streamMode: "partial", bot });
516+
517+
// Early pre-rotation could not force (no streamed partials yet), so the
518+
// real assistant message_start must still rotate once.
519+
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
520+
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Message B early");
521+
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B partial");
522+
const earlyUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[0];
523+
const boundaryRotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0];
524+
const secondUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1];
525+
expect(earlyUpdateOrder).toBeLessThan(boundaryRotationOrder);
526+
expect(boundaryRotationOrder).toBeLessThan(secondUpdateOrder);
527+
expect(editMessageTelegram).toHaveBeenNthCalledWith(
528+
1,
529+
123,
530+
1001,
531+
"Message A final",
532+
expect.any(Object),
533+
);
534+
expect(editMessageTelegram).toHaveBeenNthCalledWith(
535+
2,
536+
123,
537+
1002,
538+
"Message B final",
539+
expect.any(Object),
540+
);
541+
expect((bot.api.deleteMessage as ReturnType<typeof vi.fn>).mock.calls).toHaveLength(0);
542+
});
543+
544+
it("does not trigger late pre-rotation mid-message after an explicit assistant message start", async () => {
545+
const answerDraftStream = createDraftStream(1001);
546+
const reasoningDraftStream = createDraftStream();
547+
createTelegramDraftStream
548+
.mockImplementationOnce(() => answerDraftStream)
549+
.mockImplementationOnce(() => reasoningDraftStream);
550+
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
551+
async ({ dispatcherOptions, replyOptions }) => {
552+
// Message A finalizes without streamed partials.
553+
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
554+
// Message B starts normally before partials.
555+
await replyOptions?.onAssistantMessageStart?.();
556+
await replyOptions?.onPartialReply?.({ text: "Message B first chunk" });
557+
await replyOptions?.onPartialReply?.({ text: "Message B second chunk" });
558+
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
559+
return { queuedFinal: true };
560+
},
561+
);
562+
deliverReplies.mockResolvedValue({ delivered: true });
563+
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
564+
565+
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
566+
567+
// The explicit message_start boundary must clear finalized state so
568+
// same-message partials do not force a new preview mid-stream.
569+
expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled();
570+
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Message B first chunk");
571+
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B second chunk");
572+
});
573+
447574
it("finalizes multi-message assistant stream to matching preview messages in order", async () => {
448575
const answerDraftStream = createSequencedDraftStream(1001);
449576
const reasoningDraftStream = createDraftStream();

src/telegram/bot-message-dispatch.ts

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -225,16 +225,20 @@ export const dispatchTelegramMessage = async ({
225225
stream,
226226
lastPartialText: "",
227227
hasStreamedMessage: false,
228-
previewRevisionBaseline: stream?.previewRevision?.() ?? 0,
229228
};
230229
};
231230
const lanes: Record<LaneName, DraftLaneState> = {
232231
answer: createDraftLane("answer", canStreamAnswerDraft),
233232
reasoning: createDraftLane("reasoning", canStreamReasoningDraft),
234233
};
234+
const finalizedPreviewByLane: Record<LaneName, boolean> = {
235+
answer: false,
236+
reasoning: false,
237+
};
235238
const answerLane = lanes.answer;
236239
const reasoningLane = lanes.reasoning;
237240
let splitReasoningOnNextStream = false;
241+
let skipNextAnswerMessageStartRotation = false;
238242
const reasoningStepState = createTelegramReasoningStepState();
239243
type SplitLaneSegment = { lane: LaneName; text: string };
240244
type SplitLaneSegmentsResult = {
@@ -260,7 +264,29 @@ export const dispatchTelegramMessage = async ({
260264
const resetDraftLaneState = (lane: DraftLaneState) => {
261265
lane.lastPartialText = "";
262266
lane.hasStreamedMessage = false;
263-
lane.previewRevisionBaseline = lane.stream?.previewRevision?.() ?? lane.previewRevisionBaseline;
267+
};
268+
const rotateAnswerLaneForNewAssistantMessage = () => {
269+
let didForceNewMessage = false;
270+
if (answerLane.hasStreamedMessage) {
271+
const previewMessageId = answerLane.stream?.messageId();
272+
// Only archive previews that still need a matching final text update.
273+
// Once a preview has already been finalized, archiving it here causes
274+
// cleanup to delete a user-visible final message on later media-only turns.
275+
if (typeof previewMessageId === "number" && !finalizedPreviewByLane.answer) {
276+
archivedAnswerPreviews.push({
277+
messageId: previewMessageId,
278+
textSnapshot: answerLane.lastPartialText,
279+
});
280+
}
281+
answerLane.stream?.forceNewMessage();
282+
didForceNewMessage = true;
283+
}
284+
resetDraftLaneState(answerLane);
285+
if (didForceNewMessage) {
286+
// New assistant message boundary: this lane now tracks a fresh preview lifecycle.
287+
finalizedPreviewByLane.answer = false;
288+
}
289+
return didForceNewMessage;
264290
};
265291
const updateDraftFromPartial = (lane: DraftLaneState, text: string | undefined) => {
266292
const laneStream = lane.stream;
@@ -287,6 +313,13 @@ export const dispatchTelegramMessage = async ({
287313
};
288314
const ingestDraftLaneSegments = (text: string | undefined) => {
289315
const split = splitTextIntoLaneSegments(text);
316+
const hasAnswerSegment = split.segments.some((segment) => segment.lane === "answer");
317+
if (hasAnswerSegment && finalizedPreviewByLane.answer) {
318+
// Some providers can emit the first partial of a new assistant message before
319+
// onAssistantMessageStart() arrives. Rotate preemptively so we do not edit
320+
// the previously finalized preview message with the next message's text.
321+
skipNextAnswerMessageStartRotation = rotateAnswerLaneForNewAssistantMessage();
322+
}
290323
for (const segment of split.segments) {
291324
if (segment.lane === "reasoning") {
292325
reasoningStepState.noteReasoningHint();
@@ -376,10 +409,6 @@ export const dispatchTelegramMessage = async ({
376409
? ctxPayload.ReplyToBody.trim() || undefined
377410
: undefined;
378411
const deliveryState = createLaneDeliveryStateTracker();
379-
const finalizedPreviewByLane: Record<LaneName, boolean> = {
380-
answer: false,
381-
reasoning: false,
382-
};
383412
const clearGroupHistory = () => {
384413
if (isGroup && historyKey) {
385414
clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit });
@@ -599,21 +628,16 @@ export const dispatchTelegramMessage = async ({
599628
onAssistantMessageStart: answerLane.stream
600629
? async () => {
601630
reasoningStepState.resetForNextStep();
602-
if (answerLane.hasStreamedMessage) {
603-
const previewMessageId = answerLane.stream?.messageId();
604-
// Only archive previews that still need a matching final text update.
605-
// Once a preview has already been finalized, archiving it here causes
606-
// cleanup to delete a user-visible final message on later media-only turns.
607-
if (typeof previewMessageId === "number" && !finalizedPreviewByLane.answer) {
608-
archivedAnswerPreviews.push({
609-
messageId: previewMessageId,
610-
textSnapshot: answerLane.lastPartialText,
611-
});
612-
}
613-
answerLane.stream?.forceNewMessage();
631+
if (skipNextAnswerMessageStartRotation) {
632+
skipNextAnswerMessageStartRotation = false;
633+
finalizedPreviewByLane.answer = false;
634+
return;
614635
}
615-
resetDraftLaneState(answerLane);
616-
// New assistant message boundary: this lane now tracks a fresh preview lifecycle.
636+
rotateAnswerLaneForNewAssistantMessage();
637+
// Message-start is an explicit assistant-message boundary.
638+
// Even when no forceNewMessage happened (e.g. prior answer had no
639+
// streamed partials), the next partial belongs to a fresh lifecycle
640+
// and must not trigger late pre-rotation mid-message.
617641
finalizedPreviewByLane.answer = false;
618642
}
619643
: undefined,

0 commit comments

Comments
 (0)