Skip to content

Commit 95a07ea

Browse files
garinyanclaude
andcommitted
Fix: add cross-turn separator for block streaming (issue #35308)
When blockStreaming is enabled, the agent runner's deltaBuffer now inserts a cross-turn separator (\n\n) between successive API turns. This prevents text from separate tool-call cycles from being concatenated together in the streaming UI. Changes: - Add pendingCrossTurnSeparator flag to EmbeddedPiSubscribeState - Set flag in resetAssistantMessageState when block streaming is active and deltaBuffer has content - Prepend separator in handleMessageUpdate when flag is set - Add comprehensive test coverage for the fix Fixes #35308 Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
1 parent 68e68bf commit 95a07ea

File tree

4 files changed

+304
-0
lines changed

4 files changed

+304
-0
lines changed

src/agents/pi-embedded-subscribe.handlers.messages.test.ts

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import type { AgentEvent } from "@mariozechner/pi-agent-core";
12
import { describe, expect, it } from "vitest";
23
import { resolveSilentReplyFallbackText } from "./pi-embedded-subscribe.handlers.messages.js";
4+
import { handleMessageStart } from "./pi-embedded-subscribe.handlers.messages.js";
35

46
describe("resolveSilentReplyFallbackText", () => {
57
it("replaces NO_REPLY with latest messaging tool text when available", () => {
@@ -29,3 +31,289 @@ describe("resolveSilentReplyFallbackText", () => {
2931
).toBe("NO_REPLY");
3032
});
3133
});
34+
35+
describe("Cross-turn separator for block streaming (issue #35308)", () => {
36+
it("sets pendingCrossTurnSeparator when block streaming is active and deltaBuffer has content", () => {
37+
// Create a minimal context with the required state
38+
const state = {
39+
assistantTexts: [],
40+
toolMetas: [],
41+
toolMetaById: new Map(),
42+
toolSummaryById: new Set(),
43+
blockReplyBreak: "text_end" as const,
44+
reasoningMode: "off" as const,
45+
includeReasoning: false,
46+
shouldEmitPartialReplies: true,
47+
streamReasoning: false,
48+
deltaBuffer: "existing text content",
49+
blockBuffer: "",
50+
blockState: { thinking: false, final: false, inlineCode: {} },
51+
partialBlockState: { thinking: false, final: false, inlineCode: {} },
52+
lastStreamedAssistant: undefined,
53+
lastStreamedAssistantCleaned: undefined,
54+
emittedAssistantUpdate: false,
55+
lastStreamedReasoning: undefined,
56+
lastBlockReplyText: undefined,
57+
reasoningStreamOpen: false,
58+
assistantMessageIndex: 0,
59+
lastAssistantTextMessageIndex: -1,
60+
lastAssistantTextNormalized: undefined,
61+
lastAssistantTextTrimmed: undefined,
62+
assistantTextBaseline: 0,
63+
suppressBlockChunks: false,
64+
lastReasoningSent: undefined,
65+
compactionInFlight: false,
66+
pendingCompactionRetry: 0,
67+
compactionRetryPromise: null,
68+
unsubscribed: false,
69+
messagingToolSentTexts: [],
70+
messagingToolSentTextsNormalized: [],
71+
messagingToolSentTargets: [],
72+
messagingToolSentMediaUrls: [],
73+
pendingMessagingTexts: new Map(),
74+
pendingMessagingTargets: new Map(),
75+
successfulCronAdds: 0,
76+
pendingMessagingMediaUrls: new Map(),
77+
pendingCrossTurnSeparator: false,
78+
};
79+
80+
const ctx = {
81+
params: {
82+
runId: "test-run",
83+
sessionId: { id: "test-session" },
84+
onAssistantMessageStart: undefined,
85+
},
86+
state,
87+
log: { debug: () => {}, warn: () => {} },
88+
blockChunking: { minChars: 100, maxChars: 1000, breakPreference: "paragraph" as const },
89+
blockChunker: {
90+
hasBuffered: () => false,
91+
append: () => {},
92+
reset: () => {},
93+
drain: () => {},
94+
},
95+
hookRunner: undefined,
96+
noteLastAssistant: () => {},
97+
shouldEmitToolResult: () => false,
98+
shouldEmitToolOutput: () => false,
99+
emitToolSummary: () => {},
100+
emitToolOutput: () => {},
101+
stripBlockTags: (text: string) => text,
102+
emitBlockChunk: () => {},
103+
flushBlockReplyBuffer: () => {},
104+
emitReasoningStream: () => {},
105+
consumeReplyDirectives: () => ({ text: "", mediaUrls: undefined }),
106+
consumePartialReplyDirectives: () => ({ text: "", mediaUrls: undefined }),
107+
resetAssistantMessageState: (nextBaseline: number) => {
108+
// Simulate the logic from resetAssistantMessageState
109+
const shouldInsertCrossTurnSeparator =
110+
ctx.blockChunking && ctx.state.deltaBuffer.length > 0;
111+
ctx.state.deltaBuffer = "";
112+
ctx.state.blockBuffer = "";
113+
ctx.blockChunker?.reset();
114+
ctx.state.assistantMessageIndex += 1;
115+
ctx.state.lastAssistantTextMessageIndex = -1;
116+
ctx.state.assistantTextBaseline = nextBaseline;
117+
ctx.state.pendingCrossTurnSeparator = shouldInsertCrossTurnSeparator;
118+
},
119+
resetForCompactionRetry: () => {},
120+
};
121+
122+
const evt: AgentEvent = {
123+
type: "message_start",
124+
message: { role: "assistant" },
125+
} as AgentEvent & { message: { role: string } };
126+
127+
// Call handleMessageStart which triggers resetAssistantMessageState
128+
handleMessageStart(ctx, evt);
129+
130+
// Verify that pendingCrossTurnSeparator is set to true
131+
expect(state.pendingCrossTurnSeparator).toBe(true);
132+
// Verify that deltaBuffer is cleared
133+
expect(state.deltaBuffer).toBe("");
134+
});
135+
136+
it("does not set pendingCrossTurnSeparator when block streaming is not active", () => {
137+
const state = {
138+
assistantTexts: [],
139+
toolMetas: [],
140+
toolMetaById: new Map(),
141+
toolSummaryById: new Set(),
142+
blockReplyBreak: "text_end" as const,
143+
reasoningMode: "off" as const,
144+
includeReasoning: false,
145+
shouldEmitPartialReplies: true,
146+
streamReasoning: false,
147+
deltaBuffer: "existing text content",
148+
blockBuffer: "",
149+
blockState: { thinking: false, final: false, inlineCode: {} },
150+
partialBlockState: { thinking: false, final: false, inlineCode: {} },
151+
lastStreamedAssistant: undefined,
152+
lastStreamedAssistantCleaned: undefined,
153+
emittedAssistantUpdate: false,
154+
lastStreamedReasoning: undefined,
155+
lastBlockReplyText: undefined,
156+
reasoningStreamOpen: false,
157+
assistantMessageIndex: 0,
158+
lastAssistantTextMessageIndex: -1,
159+
lastAssistantTextNormalized: undefined,
160+
lastAssistantTextTrimmed: undefined,
161+
assistantTextBaseline: 0,
162+
suppressBlockChunks: false,
163+
lastReasoningSent: undefined,
164+
compactionInFlight: false,
165+
pendingCompactionRetry: 0,
166+
compactionRetryPromise: null,
167+
unsubscribed: false,
168+
messagingToolSentTexts: [],
169+
messagingToolSentTextsNormalized: [],
170+
messagingToolSentTargets: [],
171+
messagingToolSentMediaUrls: [],
172+
pendingMessagingTexts: new Map(),
173+
pendingMessagingTargets: new Map(),
174+
successfulCronAdds: 0,
175+
pendingMessagingMediaUrls: new Map(),
176+
pendingCrossTurnSeparator: false,
177+
};
178+
179+
const ctx = {
180+
params: {
181+
runId: "test-run",
182+
sessionId: { id: "test-session" },
183+
onAssistantMessageStart: undefined,
184+
},
185+
state,
186+
log: { debug: () => {}, warn: () => {} },
187+
blockChunking: undefined, // No block streaming
188+
blockChunker: null,
189+
hookRunner: undefined,
190+
noteLastAssistant: () => {},
191+
shouldEmitToolResult: () => false,
192+
shouldEmitToolOutput: () => false,
193+
emitToolSummary: () => {},
194+
emitToolOutput: () => {},
195+
stripBlockTags: (text: string) => text,
196+
emitBlockChunk: () => {},
197+
flushBlockReplyBuffer: () => {},
198+
emitReasoningStream: () => {},
199+
consumeReplyDirectives: () => ({ text: "", mediaUrls: undefined }),
200+
consumePartialReplyDirectives: () => ({ text: "", mediaUrls: undefined }),
201+
resetAssistantMessageState: (nextBaseline: number) => {
202+
const shouldInsertCrossTurnSeparator =
203+
ctx.blockChunking && ctx.state.deltaBuffer.length > 0;
204+
ctx.state.deltaBuffer = "";
205+
ctx.state.blockBuffer = "";
206+
ctx.state.assistantMessageIndex += 1;
207+
ctx.state.lastAssistantTextMessageIndex = -1;
208+
ctx.state.assistantTextBaseline = nextBaseline;
209+
ctx.state.pendingCrossTurnSeparator = shouldInsertCrossTurnSeparator;
210+
},
211+
resetForCompactionRetry: () => {},
212+
};
213+
214+
const evt: AgentEvent = {
215+
type: "message_start",
216+
message: { role: "assistant" },
217+
} as AgentEvent & { message: { role: string } };
218+
219+
handleMessageStart(ctx, evt);
220+
221+
// Verify that pendingCrossTurnSeparator is NOT set when block streaming is not active
222+
expect(state.pendingCrossTurnSeparator).toBeUndefined();
223+
});
224+
225+
it("does not set pendingCrossTurnSeparator when deltaBuffer is empty", () => {
226+
const state = {
227+
assistantTexts: [],
228+
toolMetas: [],
229+
toolMetaById: new Map(),
230+
toolSummaryById: new Set(),
231+
blockReplyBreak: "text_end" as const,
232+
reasoningMode: "off" as const,
233+
includeReasoning: false,
234+
shouldEmitPartialReplies: true,
235+
streamReasoning: false,
236+
deltaBuffer: "", // Empty deltaBuffer
237+
blockBuffer: "",
238+
blockState: { thinking: false, final: false, inlineCode: {} },
239+
partialBlockState: { thinking: false, final: false, inlineCode: {} },
240+
lastStreamedAssistant: undefined,
241+
lastStreamedAssistantCleaned: undefined,
242+
emittedAssistantUpdate: false,
243+
lastStreamedReasoning: undefined,
244+
lastBlockReplyText: undefined,
245+
reasoningStreamOpen: false,
246+
assistantMessageIndex: 0,
247+
lastAssistantTextMessageIndex: -1,
248+
lastAssistantTextNormalized: undefined,
249+
lastAssistantTextTrimmed: undefined,
250+
assistantTextBaseline: 0,
251+
suppressBlockChunks: false,
252+
lastReasoningSent: undefined,
253+
compactionInFlight: false,
254+
pendingCompactionRetry: 0,
255+
compactionRetryPromise: null,
256+
unsubscribed: false,
257+
messagingToolSentTexts: [],
258+
messagingToolSentTextsNormalized: [],
259+
messagingToolSentTargets: [],
260+
messagingToolSentMediaUrls: [],
261+
pendingMessagingTexts: new Map(),
262+
pendingMessagingTargets: new Map(),
263+
successfulCronAdds: 0,
264+
pendingMessagingMediaUrls: new Map(),
265+
pendingCrossTurnSeparator: false,
266+
};
267+
268+
const ctx = {
269+
params: {
270+
runId: "test-run",
271+
sessionId: { id: "test-session" },
272+
onAssistantMessageStart: undefined,
273+
},
274+
state,
275+
log: { debug: () => {}, warn: () => {} },
276+
blockChunking: { minChars: 100, maxChars: 1000, breakPreference: "paragraph" as const },
277+
blockChunker: {
278+
hasBuffered: () => false,
279+
append: () => {},
280+
reset: () => {},
281+
drain: () => {},
282+
},
283+
hookRunner: undefined,
284+
noteLastAssistant: () => {},
285+
shouldEmitToolResult: () => false,
286+
shouldEmitToolOutput: () => false,
287+
emitToolSummary: () => {},
288+
emitToolOutput: () => {},
289+
stripBlockTags: (text: string) => text,
290+
emitBlockChunk: () => {},
291+
flushBlockReplyBuffer: () => {},
292+
emitReasoningStream: () => {},
293+
consumeReplyDirectives: () => ({ text: "", mediaUrls: undefined }),
294+
consumePartialReplyDirectives: () => ({ text: "", mediaUrls: undefined }),
295+
resetAssistantMessageState: (nextBaseline: number) => {
296+
const shouldInsertCrossTurnSeparator =
297+
ctx.blockChunking && ctx.state.deltaBuffer.length > 0;
298+
ctx.state.deltaBuffer = "";
299+
ctx.state.blockBuffer = "";
300+
ctx.blockChunker?.reset();
301+
ctx.state.assistantMessageIndex += 1;
302+
ctx.state.lastAssistantTextMessageIndex = -1;
303+
ctx.state.assistantTextBaseline = nextBaseline;
304+
ctx.state.pendingCrossTurnSeparator = shouldInsertCrossTurnSeparator;
305+
},
306+
resetForCompactionRetry: () => {},
307+
};
308+
309+
const evt: AgentEvent = {
310+
type: "message_start",
311+
message: { role: "assistant" },
312+
} as AgentEvent & { message: { role: string } };
313+
314+
handleMessageStart(ctx, evt);
315+
316+
// Verify that pendingCrossTurnSeparator is NOT set when deltaBuffer is empty
317+
expect(state.pendingCrossTurnSeparator).toBe(false);
318+
});
319+
});

src/agents/pi-embedded-subscribe.handlers.messages.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,14 @@ export function handleMessageUpdate(
8686

8787
ctx.noteLastAssistant(msg);
8888

89+
// If a cross-turn separator is pending, prepend it to deltaBuffer before processing
90+
// the first chunk of the new turn. This ensures proper separation between
91+
// tool-call cycles when block streaming is enabled (issue #35308).
92+
if (ctx.state.pendingCrossTurnSeparator) {
93+
ctx.state.deltaBuffer = "\n\n" + ctx.state.deltaBuffer;
94+
ctx.state.pendingCrossTurnSeparator = false;
95+
}
96+
8997
const assistantEvent = evt.assistantMessageEvent;
9098
const assistantRecord =
9199
assistantEvent && typeof assistantEvent === "object"

src/agents/pi-embedded-subscribe.handlers.types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ export type EmbeddedPiSubscribeState = {
7676
pendingMessagingTargets: Map<string, MessagingToolSend>;
7777
successfulCronAdds: number;
7878
pendingMessagingMediaUrls: Map<string, string[]>;
79+
pendingCrossTurnSeparator: boolean;
7980
lastAssistant?: AgentMessage;
8081
};
8182

src/agents/pi-embedded-subscribe.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
7878
pendingMessagingTargets: new Map(),
7979
successfulCronAdds: 0,
8080
pendingMessagingMediaUrls: new Map(),
81+
pendingCrossTurnSeparator: false,
8182
};
8283
const usageTotals = {
8384
input: 0,
@@ -102,6 +103,11 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
102103
const partialReplyDirectiveAccumulator = createStreamingDirectiveAccumulator();
103104

104105
const resetAssistantMessageState = (nextAssistantTextBaseline: number) => {
106+
// When block streaming is active and we have existing delta content,
107+
// mark that a cross-turn separator should be prepended to the next chunk.
108+
// This prevents text from separate tool-call cycles from being concatenated
109+
// together in the streaming UI (issue #35308).
110+
const shouldInsertCrossTurnSeparator = blockChunking && state.deltaBuffer.length > 0;
105111
state.deltaBuffer = "";
106112
state.blockBuffer = "";
107113
blockChunker?.reset();
@@ -126,6 +132,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
126132
state.lastAssistantTextNormalized = undefined;
127133
state.lastAssistantTextTrimmed = undefined;
128134
state.assistantTextBaseline = nextAssistantTextBaseline;
135+
state.pendingCrossTurnSeparator = shouldInsertCrossTurnSeparator;
129136
};
130137

131138
const rememberAssistantText = (text: string) => {

0 commit comments

Comments
 (0)