Skip to content

Commit da4fec6

Browse files
hougangdevobviyus
andauthored
fix(telegram): prevent duplicate messages when preview edit times out (#41662)
Merged via squash. Prepared head SHA: 2780e62 Co-authored-by: hougangdev <[email protected]> Co-authored-by: obviyus <[email protected]> Reviewed-by: @obviyus
1 parent 96e4975 commit da4fec6

File tree

6 files changed

+492
-53
lines changed

6 files changed

+492
-53
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ Docs: https://docs.openclaw.ai
4646
- Tools/web search: treat Brave `llm-context` grounding snippets as plain strings so `web_search` no longer returns empty snippet arrays in LLM Context mode. (#41387) thanks @zheliu2.
4747
- Telegram/exec approvals: reject `/approve` commands aimed at other bots, keep deterministic approval prompts visible when tool-result delivery fails, and stop resolved exact IDs from matching other pending approvals by prefix. (#37233) Thanks @huntharo.
4848
- Control UI/Sessions: restore single-column session table collapse on narrow viewport or container widths by moving the responsive table override next to the base grid rule and enabling inline-size container queries. (#12175) Thanks @benjipeng.
49+
- Telegram/final preview delivery: split active preview lifecycle from cleanup retention so missing archived preview edits avoid duplicate fallback sends without clearing the live preview or blocking later in-place finalization. (#41662) thanks @hougangdev.
4950

5051
## 2026.3.8
5152

src/telegram/bot-message-dispatch.test.ts

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -906,6 +906,131 @@ describe("dispatchTelegramMessage draft streaming", () => {
906906
expect(deliverReplies).not.toHaveBeenCalled();
907907
});
908908

909+
it("keeps the active preview when an archived final edit target is missing", async () => {
910+
let answerMessageId: number | undefined;
911+
let answerDraftParams:
912+
| {
913+
onSupersededPreview?: (preview: { messageId: number; textSnapshot: string }) => void;
914+
}
915+
| undefined;
916+
const answerDraftStream = {
917+
update: vi.fn().mockImplementation((text: string) => {
918+
if (text.includes("Message B")) {
919+
answerMessageId = 1002;
920+
}
921+
}),
922+
flush: vi.fn().mockResolvedValue(undefined),
923+
messageId: vi.fn().mockImplementation(() => answerMessageId),
924+
clear: vi.fn().mockResolvedValue(undefined),
925+
stop: vi.fn().mockResolvedValue(undefined),
926+
forceNewMessage: vi.fn().mockImplementation(() => {
927+
answerMessageId = undefined;
928+
}),
929+
};
930+
const reasoningDraftStream = createDraftStream();
931+
createTelegramDraftStream
932+
.mockImplementationOnce((params) => {
933+
answerDraftParams = params as typeof answerDraftParams;
934+
return answerDraftStream;
935+
})
936+
.mockImplementationOnce(() => reasoningDraftStream);
937+
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
938+
async ({ dispatcherOptions, replyOptions }) => {
939+
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
940+
await replyOptions?.onAssistantMessageStart?.();
941+
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
942+
answerDraftParams?.onSupersededPreview?.({
943+
messageId: 1001,
944+
textSnapshot: "Message A partial",
945+
});
946+
947+
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
948+
return { queuedFinal: true };
949+
},
950+
);
951+
deliverReplies.mockResolvedValue({ delivered: true });
952+
editMessageTelegram.mockRejectedValue(new Error("400: Bad Request: message to edit not found"));
953+
954+
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
955+
956+
expect(editMessageTelegram).toHaveBeenCalledWith(
957+
123,
958+
1001,
959+
"Message A final",
960+
expect.any(Object),
961+
);
962+
expect(answerDraftStream.clear).not.toHaveBeenCalled();
963+
expect(deliverReplies).not.toHaveBeenCalled();
964+
});
965+
966+
it("still finalizes the active preview after an archived final edit is retained", async () => {
967+
let answerMessageId: number | undefined;
968+
let answerDraftParams:
969+
| {
970+
onSupersededPreview?: (preview: { messageId: number; textSnapshot: string }) => void;
971+
}
972+
| undefined;
973+
const answerDraftStream = {
974+
update: vi.fn().mockImplementation((text: string) => {
975+
if (text.includes("Message B")) {
976+
answerMessageId = 1002;
977+
}
978+
}),
979+
flush: vi.fn().mockResolvedValue(undefined),
980+
messageId: vi.fn().mockImplementation(() => answerMessageId),
981+
clear: vi.fn().mockResolvedValue(undefined),
982+
stop: vi.fn().mockResolvedValue(undefined),
983+
forceNewMessage: vi.fn().mockImplementation(() => {
984+
answerMessageId = undefined;
985+
}),
986+
};
987+
const reasoningDraftStream = createDraftStream();
988+
createTelegramDraftStream
989+
.mockImplementationOnce((params) => {
990+
answerDraftParams = params as typeof answerDraftParams;
991+
return answerDraftStream;
992+
})
993+
.mockImplementationOnce(() => reasoningDraftStream);
994+
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
995+
async ({ dispatcherOptions, replyOptions }) => {
996+
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
997+
await replyOptions?.onAssistantMessageStart?.();
998+
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
999+
answerDraftParams?.onSupersededPreview?.({
1000+
messageId: 1001,
1001+
textSnapshot: "Message A partial",
1002+
});
1003+
1004+
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
1005+
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
1006+
return { queuedFinal: true };
1007+
},
1008+
);
1009+
deliverReplies.mockResolvedValue({ delivered: true });
1010+
editMessageTelegram
1011+
.mockRejectedValueOnce(new Error("400: Bad Request: message to edit not found"))
1012+
.mockResolvedValueOnce({ ok: true, chatId: "123", messageId: "1002" });
1013+
1014+
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
1015+
1016+
expect(editMessageTelegram).toHaveBeenNthCalledWith(
1017+
1,
1018+
123,
1019+
1001,
1020+
"Message A final",
1021+
expect.any(Object),
1022+
);
1023+
expect(editMessageTelegram).toHaveBeenNthCalledWith(
1024+
2,
1025+
123,
1026+
1002,
1027+
"Message B final",
1028+
expect.any(Object),
1029+
);
1030+
expect(answerDraftStream.clear).not.toHaveBeenCalled();
1031+
expect(deliverReplies).not.toHaveBeenCalled();
1032+
});
1033+
9091034
it.each(["partial", "block"] as const)(
9101035
"keeps finalized text preview when the next assistant message is media-only (%s mode)",
9111036
async (streamMode) => {
@@ -1903,4 +2028,83 @@ describe("dispatchTelegramMessage draft streaming", () => {
19032028
expect(draftA.clear).toHaveBeenCalledTimes(1);
19042029
expect(draftB.clear).toHaveBeenCalledTimes(1);
19052030
});
2031+
2032+
it("swallows post-connect network timeout on preview edit to prevent duplicate messages", async () => {
2033+
const draftStream = createDraftStream(999);
2034+
createTelegramDraftStream.mockReturnValue(draftStream);
2035+
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
2036+
async ({ dispatcherOptions, replyOptions }) => {
2037+
await replyOptions?.onPartialReply?.({ text: "Streaming..." });
2038+
await dispatcherOptions.deliver({ text: "Final answer" }, { kind: "final" });
2039+
return { queuedFinal: true };
2040+
},
2041+
);
2042+
deliverReplies.mockResolvedValue({ delivered: true });
2043+
// Simulate a post-connect timeout: editMessageTelegram throws a network
2044+
// error even though Telegram's server already processed the edit.
2045+
editMessageTelegram.mockRejectedValue(new Error("timeout: request timed out after 30000ms"));
2046+
2047+
await dispatchWithContext({ context: createContext() });
2048+
2049+
expect(editMessageTelegram).toHaveBeenCalledTimes(1);
2050+
const deliverCalls = deliverReplies.mock.calls;
2051+
const finalTextSentViaDeliverReplies = deliverCalls.some((call: unknown[]) =>
2052+
(call[0] as { replies?: Array<{ text?: string }> })?.replies?.some(
2053+
(r: { text?: string }) => r.text === "Final answer",
2054+
),
2055+
);
2056+
expect(finalTextSentViaDeliverReplies).toBe(false);
2057+
});
2058+
2059+
it("falls back to sendPayload on pre-connect error during final edit", async () => {
2060+
const draftStream = createDraftStream(999);
2061+
createTelegramDraftStream.mockReturnValue(draftStream);
2062+
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
2063+
async ({ dispatcherOptions, replyOptions }) => {
2064+
await replyOptions?.onPartialReply?.({ text: "Streaming..." });
2065+
await dispatcherOptions.deliver({ text: "Final answer" }, { kind: "final" });
2066+
return { queuedFinal: true };
2067+
},
2068+
);
2069+
deliverReplies.mockResolvedValue({ delivered: true });
2070+
const preConnectErr = new Error("connect ECONNREFUSED 149.154.167.220:443");
2071+
(preConnectErr as NodeJS.ErrnoException).code = "ECONNREFUSED";
2072+
editMessageTelegram.mockRejectedValue(preConnectErr);
2073+
2074+
await dispatchWithContext({ context: createContext() });
2075+
2076+
expect(editMessageTelegram).toHaveBeenCalledTimes(1);
2077+
const deliverCalls = deliverReplies.mock.calls;
2078+
const finalTextSentViaDeliverReplies = deliverCalls.some((call: unknown[]) =>
2079+
(call[0] as { replies?: Array<{ text?: string }> })?.replies?.some(
2080+
(r: { text?: string }) => r.text === "Final answer",
2081+
),
2082+
);
2083+
expect(finalTextSentViaDeliverReplies).toBe(true);
2084+
});
2085+
2086+
it("falls back when Telegram reports the current final edit target missing", async () => {
2087+
const draftStream = createDraftStream(999);
2088+
createTelegramDraftStream.mockReturnValue(draftStream);
2089+
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
2090+
async ({ dispatcherOptions, replyOptions }) => {
2091+
await replyOptions?.onPartialReply?.({ text: "Streaming..." });
2092+
await dispatcherOptions.deliver({ text: "Final answer" }, { kind: "final" });
2093+
return { queuedFinal: true };
2094+
},
2095+
);
2096+
deliverReplies.mockResolvedValue({ delivered: true });
2097+
editMessageTelegram.mockRejectedValue(new Error("400: Bad Request: message to edit not found"));
2098+
2099+
await dispatchWithContext({ context: createContext() });
2100+
2101+
expect(editMessageTelegram).toHaveBeenCalledTimes(1);
2102+
const deliverCalls = deliverReplies.mock.calls;
2103+
const finalTextSentViaDeliverReplies = deliverCalls.some((call: unknown[]) =>
2104+
(call[0] as { replies?: Array<{ text?: string }> })?.replies?.some(
2105+
(r: { text?: string }) => r.text === "Final answer",
2106+
),
2107+
);
2108+
expect(finalTextSentViaDeliverReplies).toBe(true);
2109+
});
19062110
});

src/telegram/bot-message-dispatch.ts

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import {
3838
createLaneTextDeliverer,
3939
type DraftLaneState,
4040
type LaneName,
41+
type LanePreviewLifecycle,
4142
} from "./lane-delivery.js";
4243
import {
4344
createTelegramReasoningStepState,
@@ -239,7 +240,14 @@ export const dispatchTelegramMessage = async ({
239240
answer: createDraftLane("answer", canStreamAnswerDraft),
240241
reasoning: createDraftLane("reasoning", canStreamReasoningDraft),
241242
};
242-
const finalizedPreviewByLane: Record<LaneName, boolean> = {
243+
// Active preview lifecycle answers "can this current preview still be
244+
// finalized?" Cleanup retention is separate so archived-preview decisions do
245+
// not poison the active lane.
246+
const activePreviewLifecycleByLane: Record<LaneName, LanePreviewLifecycle> = {
247+
answer: "transient",
248+
reasoning: "transient",
249+
};
250+
const retainPreviewOnCleanupByLane: Record<LaneName, boolean> = {
243251
answer: false,
244252
reasoning: false,
245253
};
@@ -288,7 +296,10 @@ export const dispatchTelegramMessage = async ({
288296
// so it remains visible across tool boundaries.
289297
const materializedId = await answerLane.stream?.materialize?.();
290298
const previewMessageId = materializedId ?? answerLane.stream?.messageId();
291-
if (typeof previewMessageId === "number" && !finalizedPreviewByLane.answer) {
299+
if (
300+
typeof previewMessageId === "number" &&
301+
activePreviewLifecycleByLane.answer === "transient"
302+
) {
292303
archivedAnswerPreviews.push({
293304
messageId: previewMessageId,
294305
textSnapshot: answerLane.lastPartialText,
@@ -301,7 +312,8 @@ export const dispatchTelegramMessage = async ({
301312
resetDraftLaneState(answerLane);
302313
if (didForceNewMessage) {
303314
// New assistant message boundary: this lane now tracks a fresh preview lifecycle.
304-
finalizedPreviewByLane.answer = false;
315+
activePreviewLifecycleByLane.answer = "transient";
316+
retainPreviewOnCleanupByLane.answer = false;
305317
}
306318
return didForceNewMessage;
307319
};
@@ -331,7 +343,7 @@ export const dispatchTelegramMessage = async ({
331343
const ingestDraftLaneSegments = async (text: string | undefined) => {
332344
const split = splitTextIntoLaneSegments(text);
333345
const hasAnswerSegment = split.segments.some((segment) => segment.lane === "answer");
334-
if (hasAnswerSegment && finalizedPreviewByLane.answer) {
346+
if (hasAnswerSegment && activePreviewLifecycleByLane.answer !== "transient") {
335347
// Some providers can emit the first partial of a new assistant message before
336348
// onAssistantMessageStart() arrives. Rotate preemptively so we do not edit
337349
// the previously finalized preview message with the next message's text.
@@ -469,7 +481,8 @@ export const dispatchTelegramMessage = async ({
469481
const deliverLaneText = createLaneTextDeliverer({
470482
lanes,
471483
archivedAnswerPreviews,
472-
finalizedPreviewByLane,
484+
activePreviewLifecycleByLane,
485+
retainPreviewOnCleanupByLane,
473486
draftMaxChars,
474487
applyTextToPayload,
475488
sendPayload,
@@ -596,7 +609,8 @@ export const dispatchTelegramMessage = async ({
596609
}
597610
if (info.kind === "final") {
598611
if (reasoningLane.hasStreamedMessage) {
599-
finalizedPreviewByLane.reasoning = true;
612+
activePreviewLifecycleByLane.reasoning = "complete";
613+
retainPreviewOnCleanupByLane.reasoning = true;
600614
}
601615
reasoningStepState.resetForNextStep();
602616
}
@@ -674,15 +688,17 @@ export const dispatchTelegramMessage = async ({
674688
reasoningStepState.resetForNextStep();
675689
if (skipNextAnswerMessageStartRotation) {
676690
skipNextAnswerMessageStartRotation = false;
677-
finalizedPreviewByLane.answer = false;
691+
activePreviewLifecycleByLane.answer = "transient";
692+
retainPreviewOnCleanupByLane.answer = false;
678693
return;
679694
}
680695
await rotateAnswerLaneForNewAssistantMessage();
681696
// Message-start is an explicit assistant-message boundary.
682697
// Even when no forceNewMessage happened (e.g. prior answer had no
683698
// streamed partials), the next partial belongs to a fresh lifecycle
684699
// and must not trigger late pre-rotation mid-message.
685-
finalizedPreviewByLane.answer = false;
700+
activePreviewLifecycleByLane.answer = "transient";
701+
retainPreviewOnCleanupByLane.answer = false;
686702
})
687703
: undefined,
688704
onReasoningEnd: reasoningLane.stream
@@ -731,7 +747,7 @@ export const dispatchTelegramMessage = async ({
731747
(p) => p.deleteIfUnused === false && p.messageId === activePreviewMessageId,
732748
);
733749
const shouldClear =
734-
!finalizedPreviewByLane[laneState.laneName] && !hasBoundaryFinalizedActivePreview;
750+
!retainPreviewOnCleanupByLane[laneState.laneName] && !hasBoundaryFinalizedActivePreview;
735751
const existing = streamCleanupStates.get(stream);
736752
if (!existing) {
737753
streamCleanupStates.set(stream, { shouldClear });

0 commit comments

Comments
 (0)