@@ -32,7 +32,6 @@ import type { TelegramInlineButtons } from "./button-types.js";
3232import { createTelegramDraftStream } from "./draft-stream.js" ;
3333import { renderTelegramHtmlText } from "./format.js" ;
3434import {
35- type ArchivedPreview ,
3635 createLaneDeliveryStateTracker ,
3736 createLaneTextDeliverer ,
3837 type DraftLaneState ,
@@ -49,6 +48,24 @@ const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again.";
4948
5049/** Minimum chars before sending first streaming message (improves push notification UX) */
5150const DRAFT_MIN_INITIAL_CHARS = 30 ;
51+ const ANSWER_SEGMENT_NO_SPACE_BEFORE_RE = / ^ [ , . ; : ! ? ) } \] ] / ;
52+
53+ function appendAnswerSegment ( prefix : string , segment : string ) : string {
54+ if ( ! prefix ) {
55+ return segment ;
56+ }
57+ if ( ! segment ) {
58+ return prefix ;
59+ }
60+ if (
61+ / \s $ / . test ( prefix ) ||
62+ / ^ \s / . test ( segment ) ||
63+ ANSWER_SEGMENT_NO_SPACE_BEFORE_RE . test ( segment )
64+ ) {
65+ return `${ prefix } ${ segment } ` ;
66+ }
67+ return `${ prefix } ${ segment } ` ;
68+ }
5269
5370async function resolveStickerVisionSupport ( cfg : OpenClawConfig , agentId : string ) {
5471 try {
@@ -195,7 +212,6 @@ export const dispatchTelegramMessage = async ({
195212 // a visible duplicate flash at finalize time.
196213 const useMessagePreviewTransportForDm = threadSpec ?. scope === "dm" && canStreamAnswerDraft ;
197214 const mediaLocalRoots = getAgentScopedMediaLocalRoots ( cfg , route . agentId ) ;
198- const archivedAnswerPreviews : ArchivedPreview [ ] = [ ] ;
199215 const archivedReasoningPreviewIds : number [ ] = [ ] ;
200216 const createDraftLane = ( laneName : LaneName , enabled : boolean ) : DraftLaneState => {
201217 const stream = enabled
@@ -209,19 +225,11 @@ export const dispatchTelegramMessage = async ({
209225 minInitialChars : draftMinInitialChars ,
210226 renderText : renderDraftPreview ,
211227 onSupersededPreview :
212- laneName === "answer" || laneName === " reasoning"
228+ laneName === "reasoning"
213229 ? ( preview ) => {
214- if ( laneName === "reasoning" ) {
215- if ( ! archivedReasoningPreviewIds . includes ( preview . messageId ) ) {
216- archivedReasoningPreviewIds . push ( preview . messageId ) ;
217- }
218- return ;
230+ if ( ! archivedReasoningPreviewIds . includes ( preview . messageId ) ) {
231+ archivedReasoningPreviewIds . push ( preview . messageId ) ;
219232 }
220- archivedAnswerPreviews . push ( {
221- messageId : preview . messageId ,
222- textSnapshot : preview . textSnapshot ,
223- deleteIfUnused : true ,
224- } ) ;
225233 }
226234 : undefined ,
227235 log : logVerbose ,
@@ -245,7 +253,14 @@ export const dispatchTelegramMessage = async ({
245253 const answerLane = lanes . answer ;
246254 const reasoningLane = lanes . reasoning ;
247255 let splitReasoningOnNextStream = false ;
248- let skipNextAnswerMessageStartRotation = false ;
256+ let answerSegmentPrefixText = "" ;
257+ let pendingAnswerFinalSlots = 1 ;
258+ let bufferedAnswerFinal :
259+ | {
260+ payload : ReplyPayload ;
261+ text : string ;
262+ }
263+ | undefined ;
249264 let draftLaneEventQueue = Promise . resolve ( ) ;
250265 const reasoningStepState = createTelegramReasoningStepState ( ) ;
251266 const enqueueDraftLaneEvent = ( task : ( ) => Promise < void > ) : Promise < void > => {
@@ -276,34 +291,20 @@ export const dispatchTelegramMessage = async ({
276291 Boolean ( split . reasoningText ) && suppressReasoning && ! split . answerText ,
277292 } ;
278293 } ;
294+ const getCurrentAnswerText = ( ) => bufferedAnswerFinal ?. text ?? answerLane . lastPartialText ;
295+ const composeAnswerSegmentText = ( text : string ) =>
296+ appendAnswerSegment ( answerSegmentPrefixText , text ) ;
297+ const rememberAnswerBoundary = ( ) => {
298+ answerSegmentPrefixText = getCurrentAnswerText ( ) ;
299+ } ;
300+ const bufferAnswerFinal = ( payload : ReplyPayload , text : string ) => {
301+ bufferedAnswerFinal = { payload, text } ;
302+ answerSegmentPrefixText = text ;
303+ } ;
279304 const resetDraftLaneState = ( lane : DraftLaneState ) => {
280305 lane . lastPartialText = "" ;
281306 lane . hasStreamedMessage = false ;
282307 } ;
283- const rotateAnswerLaneForNewAssistantMessage = async ( ) => {
284- let didForceNewMessage = false ;
285- if ( answerLane . hasStreamedMessage ) {
286- // Materialize the current streamed draft into a permanent message
287- // so it remains visible across tool boundaries.
288- const materializedId = await answerLane . stream ?. materialize ?.( ) ;
289- const previewMessageId = materializedId ?? answerLane . stream ?. messageId ( ) ;
290- if ( typeof previewMessageId === "number" && ! finalizedPreviewByLane . answer ) {
291- archivedAnswerPreviews . push ( {
292- messageId : previewMessageId ,
293- textSnapshot : answerLane . lastPartialText ,
294- deleteIfUnused : false ,
295- } ) ;
296- }
297- answerLane . stream ?. forceNewMessage ( ) ;
298- didForceNewMessage = true ;
299- }
300- resetDraftLaneState ( answerLane ) ;
301- if ( didForceNewMessage ) {
302- // New assistant message boundary: this lane now tracks a fresh preview lifecycle.
303- finalizedPreviewByLane . answer = false ;
304- }
305- return didForceNewMessage ;
306- } ;
307308 const updateDraftFromPartial = ( lane : DraftLaneState , text : string | undefined ) => {
308309 const laneStream = lane . stream ;
309310 if ( ! laneStream || ! text ) {
@@ -329,19 +330,14 @@ export const dispatchTelegramMessage = async ({
329330 } ;
330331 const ingestDraftLaneSegments = async ( text : string | undefined ) => {
331332 const split = splitTextIntoLaneSegments ( text ) ;
332- const hasAnswerSegment = split . segments . some ( ( segment ) => segment . lane === "answer" ) ;
333- if ( hasAnswerSegment && finalizedPreviewByLane . answer ) {
334- // Some providers can emit the first partial of a new assistant message before
335- // onAssistantMessageStart() arrives. Rotate preemptively so we do not edit
336- // the previously finalized preview message with the next message's text.
337- skipNextAnswerMessageStartRotation = await rotateAnswerLaneForNewAssistantMessage ( ) ;
338- }
339333 for ( const segment of split . segments ) {
340334 if ( segment . lane === "reasoning" ) {
341335 reasoningStepState . noteReasoningHint ( ) ;
342336 reasoningStepState . noteReasoningDelivered ( ) ;
337+ updateDraftFromPartial ( lanes . reasoning , segment . text ) ;
338+ continue ;
343339 }
344- updateDraftFromPartial ( lanes [ segment . lane ] , segment . text ) ;
340+ updateDraftFromPartial ( lanes . answer , composeAnswerSegmentText ( segment . text ) ) ;
345341 }
346342 } ;
347343 const flushDraftLane = async ( lane : DraftLaneState ) => {
@@ -464,7 +460,6 @@ export const dispatchTelegramMessage = async ({
464460 } ;
465461 const deliverLaneText = createLaneTextDeliverer ( {
466462 lanes,
467- archivedAnswerPreviews,
468463 finalizedPreviewByLane,
469464 draftMaxChars,
470465 applyTextToPayload,
@@ -482,14 +477,29 @@ export const dispatchTelegramMessage = async ({
482477 buttons : previewButtons ,
483478 } ) ;
484479 } ,
485- deletePreviewMessage : async ( messageId ) => {
486- await bot . api . deleteMessage ( chatId , messageId ) ;
487- } ,
488480 log : logVerbose ,
489481 markDelivered : ( ) => {
490482 deliveryState . markDelivered ( ) ;
491483 } ,
492484 } ) ;
485+ const flushBufferedAnswerFinal = async ( ) => {
486+ if ( ! bufferedAnswerFinal ) {
487+ return ;
488+ }
489+ const { payload, text } = bufferedAnswerFinal ;
490+ bufferedAnswerFinal = undefined ;
491+ const previewButtons = (
492+ payload . channelData ?. telegram as { buttons ?: TelegramInlineButtons } | undefined
493+ ) ?. buttons ;
494+ await deliverLaneText ( {
495+ laneName : "answer" ,
496+ text,
497+ payload,
498+ infoKind : "final" ,
499+ previewButtons,
500+ } ) ;
501+ reasoningStepState . resetForNextStep ( ) ;
502+ } ;
493503
494504 let queuedFinal = false ;
495505
@@ -530,59 +540,39 @@ export const dispatchTelegramMessage = async ({
530540 const segments = split . segments ;
531541 const hasMedia = Boolean ( payload . mediaUrl ) || ( payload . mediaUrls ?. length ?? 0 ) > 0 ;
532542
533- const flushBufferedFinalAnswer = async ( ) => {
534- const buffered = reasoningStepState . takeBufferedFinalAnswer ( ) ;
535- if ( ! buffered ) {
536- return ;
537- }
538- const bufferedButtons = (
539- buffered . payload . channelData ?. telegram as
540- | { buttons ?: TelegramInlineButtons }
541- | undefined
542- ) ?. buttons ;
543- await deliverLaneText ( {
544- laneName : "answer" ,
545- text : buffered . text ,
546- payload : buffered . payload ,
547- infoKind : "final" ,
548- previewButtons : bufferedButtons ,
549- } ) ;
550- reasoningStepState . resetForNextStep ( ) ;
551- } ;
552-
553543 for ( const segment of segments ) {
554- if (
555- segment . lane === "answer" &&
556- info . kind === "final" &&
557- reasoningStepState . shouldBufferFinalAnswer ( )
558- ) {
559- reasoningStepState . bufferFinalAnswer ( { payload, text : segment . text } ) ;
560- continue ;
561- }
562544 if ( segment . lane === "reasoning" ) {
563545 reasoningStepState . noteReasoningHint ( ) ;
564- }
565- const result = await deliverLaneText ( {
566- laneName : segment . lane ,
567- text : segment . text ,
568- payload,
569- infoKind : info . kind ,
570- previewButtons,
571- allowPreviewUpdateForNonFinal : segment . lane === "reasoning" ,
572- } ) ;
573- if ( segment . lane === "reasoning" ) {
546+ const result = await deliverLaneText ( {
547+ laneName : "reasoning" ,
548+ text : segment . text ,
549+ payload,
550+ infoKind : info . kind ,
551+ previewButtons,
552+ allowPreviewUpdateForNonFinal : true ,
553+ } ) ;
574554 if ( result !== "skipped" ) {
575555 reasoningStepState . noteReasoningDelivered ( ) ;
576- await flushBufferedFinalAnswer ( ) ;
577556 }
578557 continue ;
579558 }
559+ const answerText = composeAnswerSegmentText ( segment . text ) ;
580560 if ( info . kind === "final" ) {
581- if ( reasoningLane . hasStreamedMessage ) {
582- finalizedPreviewByLane . reasoning = true ;
561+ if ( pendingAnswerFinalSlots <= 0 ) {
562+ await sendPayload ( payload ) ;
563+ continue ;
583564 }
584- reasoningStepState . resetForNextStep ( ) ;
565+ pendingAnswerFinalSlots -= 1 ;
566+ bufferAnswerFinal ( payload , answerText ) ;
567+ continue ;
585568 }
569+ await deliverLaneText ( {
570+ laneName : "answer" ,
571+ text : answerText ,
572+ payload,
573+ infoKind : info . kind ,
574+ previewButtons,
575+ } ) ;
586576 }
587577 if ( segments . length > 0 ) {
588578 return ;
@@ -593,9 +583,6 @@ export const dispatchTelegramMessage = async ({
593583 typeof payload . text === "string" ? { ...payload , text : "" } : payload ;
594584 await sendPayload ( payloadWithoutSuppressedReasoning ) ;
595585 }
596- if ( info . kind === "final" ) {
597- await flushBufferedFinalAnswer ( ) ;
598- }
599586 return ;
600587 }
601588
@@ -607,15 +594,9 @@ export const dispatchTelegramMessage = async ({
607594 const canSendAsIs =
608595 hasMedia || ( typeof payload . text === "string" && payload . text . length > 0 ) ;
609596 if ( ! canSendAsIs ) {
610- if ( info . kind === "final" ) {
611- await flushBufferedFinalAnswer ( ) ;
612- }
613597 return ;
614598 }
615599 await sendPayload ( payload ) ;
616- if ( info . kind === "final" ) {
617- await flushBufferedFinalAnswer ( ) ;
618- }
619600 } ,
620601 onSkip : ( _payload , info ) => {
621602 if ( info . reason !== "silent" ) {
@@ -655,17 +636,10 @@ export const dispatchTelegramMessage = async ({
655636 ? ( ) =>
656637 enqueueDraftLaneEvent ( async ( ) => {
657638 reasoningStepState . resetForNextStep ( ) ;
658- if ( skipNextAnswerMessageStartRotation ) {
659- skipNextAnswerMessageStartRotation = false ;
660- finalizedPreviewByLane . answer = false ;
661- return ;
639+ if ( getCurrentAnswerText ( ) ) {
640+ pendingAnswerFinalSlots += 1 ;
641+ rememberAnswerBoundary ( ) ;
662642 }
663- await rotateAnswerLaneForNewAssistantMessage ( ) ;
664- // Message-start is an explicit assistant-message boundary.
665- // Even when no forceNewMessage happened (e.g. prior answer had no
666- // streamed partials), the next partial belongs to a fresh lifecycle
667- // and must not trigger late pre-rotation mid-message.
668- finalizedPreviewByLane . answer = false ;
669643 } )
670644 : undefined ,
671645 onReasoningEnd : reasoningLane . stream
@@ -683,6 +657,10 @@ export const dispatchTelegramMessage = async ({
683657 onModelSelected,
684658 } ,
685659 } ) ) ;
660+ await flushBufferedAnswerFinal ( ) ;
661+ if ( reasoningLane . hasStreamedMessage ) {
662+ finalizedPreviewByLane . reasoning = true ;
663+ }
686664 } catch ( err ) {
687665 dispatchError = err ;
688666 runtime . error ?.( danger ( `telegram dispatch failed: ${ String ( err ) } ` ) ) ;
@@ -704,17 +682,7 @@ export const dispatchTelegramMessage = async ({
704682 if ( ! stream ) {
705683 continue ;
706684 }
707- // Don't clear (delete) the stream if: (a) it was finalized, or
708- // (b) the active stream message is itself a boundary-finalized archive.
709- const activePreviewMessageId = stream . messageId ( ) ;
710- const hasBoundaryFinalizedActivePreview =
711- laneState . laneName === "answer" &&
712- typeof activePreviewMessageId === "number" &&
713- archivedAnswerPreviews . some (
714- ( p ) => p . deleteIfUnused === false && p . messageId === activePreviewMessageId ,
715- ) ;
716- const shouldClear =
717- ! finalizedPreviewByLane [ laneState . laneName ] && ! hasBoundaryFinalizedActivePreview ;
685+ const shouldClear = ! finalizedPreviewByLane [ laneState . laneName ] ;
718686 const existing = streamCleanupStates . get ( stream ) ;
719687 if ( ! existing ) {
720688 streamCleanupStates . set ( stream , { shouldClear } ) ;
@@ -728,18 +696,6 @@ export const dispatchTelegramMessage = async ({
728696 await stream . clear ( ) ;
729697 }
730698 }
731- for ( const archivedPreview of archivedAnswerPreviews ) {
732- if ( archivedPreview . deleteIfUnused === false ) {
733- continue ;
734- }
735- try {
736- await bot . api . deleteMessage ( chatId , archivedPreview . messageId ) ;
737- } catch ( err ) {
738- logVerbose (
739- `telegram: archived answer preview cleanup failed (${ archivedPreview . messageId } ): ${ String ( err ) } ` ,
740- ) ;
741- }
742- }
743699 for ( const messageId of archivedReasoningPreviewIds ) {
744700 try {
745701 await bot . api . deleteMessage ( chatId , messageId ) ;
0 commit comments