Skip to content

Commit 70eea02

Browse files
committed
fix: unify telegram streaming answer delivery
1 parent f6d0712 commit 70eea02

File tree

7 files changed

+145
-759
lines changed

7 files changed

+145
-759
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai
1111
### Fixes
1212

1313
- macOS/LaunchAgent install: tighten LaunchAgent directory and plist permissions during install so launchd bootstrap does not fail when the target home path or generated plist inherited group/world-writable modes.
14+
- Telegram/streaming: keep one answer preview lane per inbound turn and never send a replacement final text bubble when preview finalization edits fail, fixing duplicate split replies during streamed answers.
1415

1516
## 2026.3.8
1617

src/telegram/bot-message-dispatch.test.ts

Lines changed: 32 additions & 470 deletions
Large diffs are not rendered by default.

src/telegram/bot-message-dispatch.ts

Lines changed: 90 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import type { TelegramInlineButtons } from "./button-types.js";
3232
import { createTelegramDraftStream } from "./draft-stream.js";
3333
import { renderTelegramHtmlText } from "./format.js";
3434
import {
35-
type ArchivedPreview,
3635
createLaneDeliveryStateTracker,
3736
createLaneTextDeliverer,
3837
type DraftLaneState,
@@ -49,6 +48,24 @@ const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again.";
4948

5049
/** Minimum chars before sending first streaming message (improves push notification UX) */
5150
const DRAFT_MIN_INITIAL_CHARS = 30;
51+
const ANSWER_SEGMENT_NO_SPACE_BEFORE_RE = /^[,.;:!?)}\]]/;
52+
53+
function appendAnswerSegment(prefix: string, segment: string): string {
54+
if (!prefix) {
55+
return segment;
56+
}
57+
if (!segment) {
58+
return prefix;
59+
}
60+
if (
61+
/\s$/.test(prefix) ||
62+
/^\s/.test(segment) ||
63+
ANSWER_SEGMENT_NO_SPACE_BEFORE_RE.test(segment)
64+
) {
65+
return `${prefix}${segment}`;
66+
}
67+
return `${prefix} ${segment}`;
68+
}
5269

5370
async function resolveStickerVisionSupport(cfg: OpenClawConfig, agentId: string) {
5471
try {
@@ -195,7 +212,6 @@ export const dispatchTelegramMessage = async ({
195212
// a visible duplicate flash at finalize time.
196213
const useMessagePreviewTransportForDm = threadSpec?.scope === "dm" && canStreamAnswerDraft;
197214
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId);
198-
const archivedAnswerPreviews: ArchivedPreview[] = [];
199215
const archivedReasoningPreviewIds: number[] = [];
200216
const createDraftLane = (laneName: LaneName, enabled: boolean): DraftLaneState => {
201217
const stream = enabled
@@ -209,19 +225,11 @@ export const dispatchTelegramMessage = async ({
209225
minInitialChars: draftMinInitialChars,
210226
renderText: renderDraftPreview,
211227
onSupersededPreview:
212-
laneName === "answer" || laneName === "reasoning"
228+
laneName === "reasoning"
213229
? (preview) => {
214-
if (laneName === "reasoning") {
215-
if (!archivedReasoningPreviewIds.includes(preview.messageId)) {
216-
archivedReasoningPreviewIds.push(preview.messageId);
217-
}
218-
return;
230+
if (!archivedReasoningPreviewIds.includes(preview.messageId)) {
231+
archivedReasoningPreviewIds.push(preview.messageId);
219232
}
220-
archivedAnswerPreviews.push({
221-
messageId: preview.messageId,
222-
textSnapshot: preview.textSnapshot,
223-
deleteIfUnused: true,
224-
});
225233
}
226234
: undefined,
227235
log: logVerbose,
@@ -245,7 +253,14 @@ export const dispatchTelegramMessage = async ({
245253
const answerLane = lanes.answer;
246254
const reasoningLane = lanes.reasoning;
247255
let splitReasoningOnNextStream = false;
248-
let skipNextAnswerMessageStartRotation = false;
256+
let answerSegmentPrefixText = "";
257+
let pendingAnswerFinalSlots = 1;
258+
let bufferedAnswerFinal:
259+
| {
260+
payload: ReplyPayload;
261+
text: string;
262+
}
263+
| undefined;
249264
let draftLaneEventQueue = Promise.resolve();
250265
const reasoningStepState = createTelegramReasoningStepState();
251266
const enqueueDraftLaneEvent = (task: () => Promise<void>): Promise<void> => {
@@ -276,34 +291,20 @@ export const dispatchTelegramMessage = async ({
276291
Boolean(split.reasoningText) && suppressReasoning && !split.answerText,
277292
};
278293
};
294+
const getCurrentAnswerText = () => bufferedAnswerFinal?.text ?? answerLane.lastPartialText;
295+
const composeAnswerSegmentText = (text: string) =>
296+
appendAnswerSegment(answerSegmentPrefixText, text);
297+
const rememberAnswerBoundary = () => {
298+
answerSegmentPrefixText = getCurrentAnswerText();
299+
};
300+
const bufferAnswerFinal = (payload: ReplyPayload, text: string) => {
301+
bufferedAnswerFinal = { payload, text };
302+
answerSegmentPrefixText = text;
303+
};
279304
const resetDraftLaneState = (lane: DraftLaneState) => {
280305
lane.lastPartialText = "";
281306
lane.hasStreamedMessage = false;
282307
};
283-
const rotateAnswerLaneForNewAssistantMessage = async () => {
284-
let didForceNewMessage = false;
285-
if (answerLane.hasStreamedMessage) {
286-
// Materialize the current streamed draft into a permanent message
287-
// so it remains visible across tool boundaries.
288-
const materializedId = await answerLane.stream?.materialize?.();
289-
const previewMessageId = materializedId ?? answerLane.stream?.messageId();
290-
if (typeof previewMessageId === "number" && !finalizedPreviewByLane.answer) {
291-
archivedAnswerPreviews.push({
292-
messageId: previewMessageId,
293-
textSnapshot: answerLane.lastPartialText,
294-
deleteIfUnused: false,
295-
});
296-
}
297-
answerLane.stream?.forceNewMessage();
298-
didForceNewMessage = true;
299-
}
300-
resetDraftLaneState(answerLane);
301-
if (didForceNewMessage) {
302-
// New assistant message boundary: this lane now tracks a fresh preview lifecycle.
303-
finalizedPreviewByLane.answer = false;
304-
}
305-
return didForceNewMessage;
306-
};
307308
const updateDraftFromPartial = (lane: DraftLaneState, text: string | undefined) => {
308309
const laneStream = lane.stream;
309310
if (!laneStream || !text) {
@@ -329,19 +330,14 @@ export const dispatchTelegramMessage = async ({
329330
};
330331
const ingestDraftLaneSegments = async (text: string | undefined) => {
331332
const split = splitTextIntoLaneSegments(text);
332-
const hasAnswerSegment = split.segments.some((segment) => segment.lane === "answer");
333-
if (hasAnswerSegment && finalizedPreviewByLane.answer) {
334-
// Some providers can emit the first partial of a new assistant message before
335-
// onAssistantMessageStart() arrives. Rotate preemptively so we do not edit
336-
// the previously finalized preview message with the next message's text.
337-
skipNextAnswerMessageStartRotation = await rotateAnswerLaneForNewAssistantMessage();
338-
}
339333
for (const segment of split.segments) {
340334
if (segment.lane === "reasoning") {
341335
reasoningStepState.noteReasoningHint();
342336
reasoningStepState.noteReasoningDelivered();
337+
updateDraftFromPartial(lanes.reasoning, segment.text);
338+
continue;
343339
}
344-
updateDraftFromPartial(lanes[segment.lane], segment.text);
340+
updateDraftFromPartial(lanes.answer, composeAnswerSegmentText(segment.text));
345341
}
346342
};
347343
const flushDraftLane = async (lane: DraftLaneState) => {
@@ -464,7 +460,6 @@ export const dispatchTelegramMessage = async ({
464460
};
465461
const deliverLaneText = createLaneTextDeliverer({
466462
lanes,
467-
archivedAnswerPreviews,
468463
finalizedPreviewByLane,
469464
draftMaxChars,
470465
applyTextToPayload,
@@ -482,14 +477,29 @@ export const dispatchTelegramMessage = async ({
482477
buttons: previewButtons,
483478
});
484479
},
485-
deletePreviewMessage: async (messageId) => {
486-
await bot.api.deleteMessage(chatId, messageId);
487-
},
488480
log: logVerbose,
489481
markDelivered: () => {
490482
deliveryState.markDelivered();
491483
},
492484
});
485+
const flushBufferedAnswerFinal = async () => {
486+
if (!bufferedAnswerFinal) {
487+
return;
488+
}
489+
const { payload, text } = bufferedAnswerFinal;
490+
bufferedAnswerFinal = undefined;
491+
const previewButtons = (
492+
payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined
493+
)?.buttons;
494+
await deliverLaneText({
495+
laneName: "answer",
496+
text,
497+
payload,
498+
infoKind: "final",
499+
previewButtons,
500+
});
501+
reasoningStepState.resetForNextStep();
502+
};
493503

494504
let queuedFinal = false;
495505

@@ -530,59 +540,39 @@ export const dispatchTelegramMessage = async ({
530540
const segments = split.segments;
531541
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
532542

533-
const flushBufferedFinalAnswer = async () => {
534-
const buffered = reasoningStepState.takeBufferedFinalAnswer();
535-
if (!buffered) {
536-
return;
537-
}
538-
const bufferedButtons = (
539-
buffered.payload.channelData?.telegram as
540-
| { buttons?: TelegramInlineButtons }
541-
| undefined
542-
)?.buttons;
543-
await deliverLaneText({
544-
laneName: "answer",
545-
text: buffered.text,
546-
payload: buffered.payload,
547-
infoKind: "final",
548-
previewButtons: bufferedButtons,
549-
});
550-
reasoningStepState.resetForNextStep();
551-
};
552-
553543
for (const segment of segments) {
554-
if (
555-
segment.lane === "answer" &&
556-
info.kind === "final" &&
557-
reasoningStepState.shouldBufferFinalAnswer()
558-
) {
559-
reasoningStepState.bufferFinalAnswer({ payload, text: segment.text });
560-
continue;
561-
}
562544
if (segment.lane === "reasoning") {
563545
reasoningStepState.noteReasoningHint();
564-
}
565-
const result = await deliverLaneText({
566-
laneName: segment.lane,
567-
text: segment.text,
568-
payload,
569-
infoKind: info.kind,
570-
previewButtons,
571-
allowPreviewUpdateForNonFinal: segment.lane === "reasoning",
572-
});
573-
if (segment.lane === "reasoning") {
546+
const result = await deliverLaneText({
547+
laneName: "reasoning",
548+
text: segment.text,
549+
payload,
550+
infoKind: info.kind,
551+
previewButtons,
552+
allowPreviewUpdateForNonFinal: true,
553+
});
574554
if (result !== "skipped") {
575555
reasoningStepState.noteReasoningDelivered();
576-
await flushBufferedFinalAnswer();
577556
}
578557
continue;
579558
}
559+
const answerText = composeAnswerSegmentText(segment.text);
580560
if (info.kind === "final") {
581-
if (reasoningLane.hasStreamedMessage) {
582-
finalizedPreviewByLane.reasoning = true;
561+
if (pendingAnswerFinalSlots <= 0) {
562+
await sendPayload(payload);
563+
continue;
583564
}
584-
reasoningStepState.resetForNextStep();
565+
pendingAnswerFinalSlots -= 1;
566+
bufferAnswerFinal(payload, answerText);
567+
continue;
585568
}
569+
await deliverLaneText({
570+
laneName: "answer",
571+
text: answerText,
572+
payload,
573+
infoKind: info.kind,
574+
previewButtons,
575+
});
586576
}
587577
if (segments.length > 0) {
588578
return;
@@ -593,9 +583,6 @@ export const dispatchTelegramMessage = async ({
593583
typeof payload.text === "string" ? { ...payload, text: "" } : payload;
594584
await sendPayload(payloadWithoutSuppressedReasoning);
595585
}
596-
if (info.kind === "final") {
597-
await flushBufferedFinalAnswer();
598-
}
599586
return;
600587
}
601588

@@ -607,15 +594,9 @@ export const dispatchTelegramMessage = async ({
607594
const canSendAsIs =
608595
hasMedia || (typeof payload.text === "string" && payload.text.length > 0);
609596
if (!canSendAsIs) {
610-
if (info.kind === "final") {
611-
await flushBufferedFinalAnswer();
612-
}
613597
return;
614598
}
615599
await sendPayload(payload);
616-
if (info.kind === "final") {
617-
await flushBufferedFinalAnswer();
618-
}
619600
},
620601
onSkip: (_payload, info) => {
621602
if (info.reason !== "silent") {
@@ -655,17 +636,10 @@ export const dispatchTelegramMessage = async ({
655636
? () =>
656637
enqueueDraftLaneEvent(async () => {
657638
reasoningStepState.resetForNextStep();
658-
if (skipNextAnswerMessageStartRotation) {
659-
skipNextAnswerMessageStartRotation = false;
660-
finalizedPreviewByLane.answer = false;
661-
return;
639+
if (getCurrentAnswerText()) {
640+
pendingAnswerFinalSlots += 1;
641+
rememberAnswerBoundary();
662642
}
663-
await rotateAnswerLaneForNewAssistantMessage();
664-
// Message-start is an explicit assistant-message boundary.
665-
// Even when no forceNewMessage happened (e.g. prior answer had no
666-
// streamed partials), the next partial belongs to a fresh lifecycle
667-
// and must not trigger late pre-rotation mid-message.
668-
finalizedPreviewByLane.answer = false;
669643
})
670644
: undefined,
671645
onReasoningEnd: reasoningLane.stream
@@ -683,6 +657,10 @@ export const dispatchTelegramMessage = async ({
683657
onModelSelected,
684658
},
685659
}));
660+
await flushBufferedAnswerFinal();
661+
if (reasoningLane.hasStreamedMessage) {
662+
finalizedPreviewByLane.reasoning = true;
663+
}
686664
} catch (err) {
687665
dispatchError = err;
688666
runtime.error?.(danger(`telegram dispatch failed: ${String(err)}`));
@@ -704,17 +682,7 @@ export const dispatchTelegramMessage = async ({
704682
if (!stream) {
705683
continue;
706684
}
707-
// Don't clear (delete) the stream if: (a) it was finalized, or
708-
// (b) the active stream message is itself a boundary-finalized archive.
709-
const activePreviewMessageId = stream.messageId();
710-
const hasBoundaryFinalizedActivePreview =
711-
laneState.laneName === "answer" &&
712-
typeof activePreviewMessageId === "number" &&
713-
archivedAnswerPreviews.some(
714-
(p) => p.deleteIfUnused === false && p.messageId === activePreviewMessageId,
715-
);
716-
const shouldClear =
717-
!finalizedPreviewByLane[laneState.laneName] && !hasBoundaryFinalizedActivePreview;
685+
const shouldClear = !finalizedPreviewByLane[laneState.laneName];
718686
const existing = streamCleanupStates.get(stream);
719687
if (!existing) {
720688
streamCleanupStates.set(stream, { shouldClear });
@@ -728,18 +696,6 @@ export const dispatchTelegramMessage = async ({
728696
await stream.clear();
729697
}
730698
}
731-
for (const archivedPreview of archivedAnswerPreviews) {
732-
if (archivedPreview.deleteIfUnused === false) {
733-
continue;
734-
}
735-
try {
736-
await bot.api.deleteMessage(chatId, archivedPreview.messageId);
737-
} catch (err) {
738-
logVerbose(
739-
`telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`,
740-
);
741-
}
742-
}
743699
for (const messageId of archivedReasoningPreviewIds) {
744700
try {
745701
await bot.api.deleteMessage(chatId, messageId);

0 commit comments

Comments
 (0)