Skip to content

Commit 8b1fe0d

Browse files
authored
fix(telegram): split streaming preview per assistant block (#22613)
Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: 26f35f4 Co-authored-by: obviyus <[email protected]> Co-authored-by: obviyus <[email protected]> Reviewed-by: @obviyus
1 parent 36a0df4 commit 8b1fe0d

14 files changed

+277
-19
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ Docs: https://docs.openclaw.ai
8585
- Memory: return empty snippets when `memory_get`/QMD read files that have not been created yet, and harden memory indexing/session helpers against ENOENT races so missing Markdown no longer crashes tools. (#20680) Thanks @pahdo.
8686
- Telegram/Streaming: always clean up draft previews even when dispatch throws before fallback handling, preventing orphaned preview messages during failed runs. (#19041) thanks @mudrii.
8787
- Telegram/Streaming: split reasoning and answer draft preview lanes to prevent cross-lane overwrites, and ignore literal `<think>` tags inside inline/fenced code snippets so sample markup is not misrouted as reasoning. (#20774) Thanks @obviyus.
88+
- Telegram/Streaming: restore 30-char first-preview debounce and scope `NO_REPLY` prefix suppression to partial sentinel fragments so normal `No...` text is not filtered. (#22613) thanks @obviyus.
8889
- Telegram/Status reactions: refresh stall timers on repeated phase updates and honor ack-reaction scope when lifecycle reactions are enabled, preventing false stall emojis and unwanted group reactions. Thanks @wolly-tundracube and @thewilloftheshadow.
8990
- Telegram/Status reactions: keep lifecycle reactions active when available-reactions lookup fails by falling back to unrestricted variant selection instead of suppressing reaction updates. (#22380) thanks @obviyus.
9091
- Discord/Streaming: apply `replyToMode: first` only to the first Discord chunk so block-streamed replies do not spam mention pings. (#20726) Thanks @thewilloftheshadow for the report.

src/auto-reply/reply/agent-runner-execution.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import {
2828
import { stripHeartbeatToken } from "../heartbeat.js";
2929
import type { TemplateContext } from "../templating.js";
3030
import type { VerboseLevel } from "../thinking.js";
31-
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
31+
import { isSilentReplyPrefixText, isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
3232
import type { GetReplyOptions, ReplyPayload } from "../types.js";
3333
import {
3434
buildEmbeddedRunBaseParams,
@@ -157,6 +157,9 @@ export async function runAgentTurnWithFallback(params: {
157157
return { text: sanitized, skip: false };
158158
};
159159
const handlePartialForTyping = async (payload: ReplyPayload): Promise<string | undefined> => {
160+
if (isSilentReplyPrefixText(payload.text, SILENT_REPLY_TOKEN)) {
161+
return undefined;
162+
}
160163
const { text, skip } = normalizeStreamingText(payload);
161164
if (skip || !text) {
162165
return undefined;

src/auto-reply/reply/agent-runner.runreplyagent.test.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,50 @@ describe("runReplyAgent typing (heartbeat)", () => {
383383
expect(typing.startTypingLoop).not.toHaveBeenCalled();
384384
});
385385

386+
it("suppresses partial streaming for NO_REPLY prefixes", async () => {
387+
const onPartialReply = vi.fn();
388+
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
389+
await params.onPartialReply?.({ text: "NO_" });
390+
await params.onPartialReply?.({ text: "NO_RE" });
391+
await params.onPartialReply?.({ text: "NO_REPLY" });
392+
return { payloads: [{ text: "NO_REPLY" }], meta: {} };
393+
});
394+
395+
const { run, typing } = createMinimalRun({
396+
opts: { isHeartbeat: false, onPartialReply },
397+
typingMode: "message",
398+
});
399+
await run();
400+
401+
expect(onPartialReply).not.toHaveBeenCalled();
402+
expect(typing.startTypingOnText).not.toHaveBeenCalled();
403+
expect(typing.startTypingLoop).not.toHaveBeenCalled();
404+
});
405+
406+
it("does not suppress partial streaming for normal 'No' prefixes", async () => {
407+
const onPartialReply = vi.fn();
408+
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
409+
await params.onPartialReply?.({ text: "No" });
410+
await params.onPartialReply?.({ text: "No, that is valid" });
411+
return { payloads: [{ text: "No, that is valid" }], meta: {} };
412+
});
413+
414+
const { run, typing } = createMinimalRun({
415+
opts: { isHeartbeat: false, onPartialReply },
416+
typingMode: "message",
417+
});
418+
await run();
419+
420+
expect(onPartialReply).toHaveBeenCalledTimes(2);
421+
expect(onPartialReply).toHaveBeenNthCalledWith(1, { text: "No", mediaUrls: undefined });
422+
expect(onPartialReply).toHaveBeenNthCalledWith(2, {
423+
text: "No, that is valid",
424+
mediaUrls: undefined,
425+
});
426+
expect(typing.startTypingOnText).toHaveBeenCalled();
427+
expect(typing.startTypingLoop).not.toHaveBeenCalled();
428+
});
429+
386430
it("does not start typing on assistant message start without prior text in message mode", async () => {
387431
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
388432
await params.onAssistantMessageStart?.();

src/auto-reply/tokens.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,23 @@ export function isSilentReplyText(
1818
const suffix = new RegExp(`\\b${escaped}\\b\\W*$`);
1919
return suffix.test(text);
2020
}
21+
22+
export function isSilentReplyPrefixText(
23+
text: string | undefined,
24+
token: string = SILENT_REPLY_TOKEN,
25+
): boolean {
26+
if (!text) {
27+
return false;
28+
}
29+
const normalized = text.trimStart().toUpperCase();
30+
if (!normalized) {
31+
return false;
32+
}
33+
if (!normalized.includes("_")) {
34+
return false;
35+
}
36+
if (/[^A-Z_]/.test(normalized)) {
37+
return false;
38+
}
39+
return token.toUpperCase().startsWith(normalized);
40+
}

src/channels/draft-stream-loop.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ export type DraftStreamLoop = {
33
flush: () => Promise<void>;
44
stop: () => void;
55
resetPending: () => void;
6+
resetThrottleWindow: () => void;
67
waitForInFlight: () => Promise<void>;
78
};
89

@@ -87,6 +88,13 @@ export function createDraftStreamLoop(params: {
8788
resetPending: () => {
8889
pendingText = "";
8990
},
91+
resetThrottleWindow: () => {
92+
lastSentAt = 0;
93+
if (timer) {
94+
clearTimeout(timer);
95+
timer = undefined;
96+
}
97+
},
9098
waitForInFlight: async () => {
9199
if (inFlightPromise) {
92100
await inFlightPromise;

src/config/config.legacy-config-detection.rejects-routing-allowfrom.e2e.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,11 +378,11 @@ describe("legacy config detection", () => {
378378
expect(res.config.channels?.telegram?.groupPolicy).toBe("allowlist");
379379
}
380380
});
381-
it("defaults telegram.streaming to true when telegram section exists", async () => {
381+
it("defaults telegram.streaming to false when telegram section exists", async () => {
382382
const res = validateConfigObject({ channels: { telegram: {} } });
383383
expect(res.ok).toBe(true);
384384
if (res.ok) {
385-
expect(res.config.channels?.telegram?.streaming).toBe(true);
385+
expect(res.config.channels?.telegram?.streaming).toBe(false);
386386
expect(res.config.channels?.telegram?.streamMode).toBeUndefined();
387387
}
388388
});

src/config/schema.help.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ export const FIELD_HELP: Record<string, string> = {
392392
"channels.telegram.dmPolicy":
393393
'Direct message access control ("pairing" recommended). "open" requires channels.telegram.allowFrom=["*"].',
394394
"channels.telegram.streaming":
395-
"Enable Telegram live stream preview via message edits (default: true; legacy streamMode auto-maps here).",
395+
"Enable Telegram live stream preview via message edits (default: false; legacy streamMode auto-maps here).",
396396
"channels.discord.streamMode":
397397
"Live stream preview mode for Discord replies (off | partial | block). Separate from block streaming; uses sendMessage + editMessage.",
398398
"channels.discord.draftChunk.minChars":

src/config/zod-schema.providers-core.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ function normalizeTelegramStreamingConfig(value: {
117117
delete value.streamMode;
118118
return;
119119
}
120-
value.streaming = true;
120+
value.streaming = false;
121121
}
122122

123123
export const TelegramAccountSchemaBase = z

src/telegram/bot-message-dispatch.test.ts

Lines changed: 76 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,25 @@ describe("dispatchTelegramMessage draft streaming", () => {
6363
};
6464
}
6565

66+
function createSequencedDraftStream(startMessageId = 1001) {
67+
let activeMessageId: number | undefined;
68+
let nextMessageId = startMessageId;
69+
return {
70+
update: vi.fn().mockImplementation(() => {
71+
if (activeMessageId == null) {
72+
activeMessageId = nextMessageId++;
73+
}
74+
}),
75+
flush: vi.fn().mockResolvedValue(undefined),
76+
messageId: vi.fn().mockImplementation(() => activeMessageId),
77+
clear: vi.fn().mockResolvedValue(undefined),
78+
stop: vi.fn().mockResolvedValue(undefined),
79+
forceNewMessage: vi.fn().mockImplementation(() => {
80+
activeMessageId = undefined;
81+
}),
82+
};
83+
}
84+
6685
function setupDraftStreams(params?: { answerMessageId?: number; reasoningMessageId?: number }) {
6786
const answerDraftStream = createDraftStream(params?.answerMessageId);
6887
const reasoningDraftStream = createDraftStream(params?.reasoningMessageId);
@@ -172,7 +191,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
172191
expect.objectContaining({
173192
chatId: 123,
174193
thread: { id: 777, scope: "dm" },
175-
minInitialChars: 1,
194+
minInitialChars: 30,
176195
}),
177196
);
178197
expect(draftStream.update).toHaveBeenCalledWith("Hello");
@@ -193,7 +212,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
193212
expect(draftStream.clear).toHaveBeenCalledTimes(1);
194213
});
195214

196-
it("uses immediate preview updates for legacy block stream mode", async () => {
215+
it("uses 30-char preview debounce for legacy block stream mode", async () => {
197216
const draftStream = createDraftStream();
198217
createTelegramDraftStream.mockReturnValue(draftStream);
199218
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
@@ -209,7 +228,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
209228

210229
expect(createTelegramDraftStream).toHaveBeenCalledWith(
211230
expect.objectContaining({
212-
minInitialChars: 1,
231+
minInitialChars: 30,
213232
}),
214233
);
215234
});
@@ -445,7 +464,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
445464
);
446465
});
447466

448-
it("does not force new message for legacy block stream mode", async () => {
467+
it("forces new message for next assistant block in legacy block stream mode", async () => {
449468
const draftStream = createDraftStream(999);
450469
createTelegramDraftStream.mockReturnValue(draftStream);
451470
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
@@ -464,10 +483,10 @@ describe("dispatchTelegramMessage draft streaming", () => {
464483

465484
await dispatchWithContext({ context: createContext(), streamMode: "block" });
466485

467-
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
486+
expect(draftStream.forceNewMessage).toHaveBeenCalledTimes(1);
468487
});
469488

470-
it("does not force new message in partial mode when assistant message restarts", async () => {
489+
it("forces new message in partial mode when assistant message restarts", async () => {
471490
const draftStream = createDraftStream(999);
472491
createTelegramDraftStream.mockReturnValue(draftStream);
473492
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
@@ -483,7 +502,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
483502

484503
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
485504

486-
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
505+
expect(draftStream.forceNewMessage).toHaveBeenCalledTimes(1);
487506
});
488507

489508
it("does not force new message on first assistant message start", async () => {
@@ -508,6 +527,56 @@ describe("dispatchTelegramMessage draft streaming", () => {
508527
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
509528
});
510529

530+
it("finalizes multi-message assistant stream to matching preview messages in order", async () => {
531+
const answerDraftStream = createSequencedDraftStream(1001);
532+
const reasoningDraftStream = createDraftStream();
533+
createTelegramDraftStream
534+
.mockImplementationOnce(() => answerDraftStream)
535+
.mockImplementationOnce(() => reasoningDraftStream);
536+
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
537+
async ({ dispatcherOptions, replyOptions }) => {
538+
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
539+
await replyOptions?.onAssistantMessageStart?.();
540+
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
541+
await replyOptions?.onAssistantMessageStart?.();
542+
await replyOptions?.onPartialReply?.({ text: "Message C partial" });
543+
544+
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
545+
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
546+
await dispatcherOptions.deliver({ text: "Message C final" }, { kind: "final" });
547+
return { queuedFinal: true };
548+
},
549+
);
550+
deliverReplies.mockResolvedValue({ delivered: true });
551+
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
552+
553+
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
554+
555+
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(2);
556+
expect(editMessageTelegram).toHaveBeenNthCalledWith(
557+
1,
558+
123,
559+
1001,
560+
"Message A final",
561+
expect.any(Object),
562+
);
563+
expect(editMessageTelegram).toHaveBeenNthCalledWith(
564+
2,
565+
123,
566+
1002,
567+
"Message B final",
568+
expect.any(Object),
569+
);
570+
expect(editMessageTelegram).toHaveBeenNthCalledWith(
571+
3,
572+
123,
573+
1003,
574+
"Message C final",
575+
expect.any(Object),
576+
);
577+
expect(deliverReplies).not.toHaveBeenCalled();
578+
});
579+
511580
it.each(["block", "partial"] as const)(
512581
"splits reasoning lane only when a later reasoning block starts (%s mode)",
513582
async (streamMode) => {

0 commit comments

Comments
 (0)