Skip to content

Commit 3bf6ed1

Browse files
rexl2018Takhoffman
andauthored
Feishu: harden streaming merge semantics and final reply dedupe (openclaw#33245)
* Feishu: close duplicate final gap and cover routing precedence * Feishu: resolve reviewer duplicate-final and routing feedback * Feishu: tighten streaming send-mode option typing * Feishu: fix reverse-overlap streaming merge ordering * Feishu: align streaming final dedupe test expectation * Feishu: allow distinct streaming finals while deduping repeats --------- Co-authored-by: Tak Hoffman <[email protected]>
1 parent 8b8167d commit 3bf6ed1

File tree

5 files changed

+95
-82
lines changed

5 files changed

+95
-82
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai
1717

1818
### Fixes
1919

20+
- Feishu/streaming card delivery synthesis: unify snapshot and delta streaming merge semantics, apply overlap-aware final merge, suppress duplicate final text delivery (including text+media final packets), prefer topic-thread `message.reply` routing when a reply target exists, and tune card print cadence to avoid duplicate incremental rendering. (from #33245, #32896, #33840) Thanks @rexl2018, @kcinzgg, and @aerelune.
2021
- Security/dependency audit: patch transitive Hono vulnerabilities by pinning `hono` to `4.12.5` and `@hono/node-server` to `1.19.10` in production resolution paths. Thanks @shakkernerd.
2122
- Security/dependency audit: bump `tar` to `7.5.10` (from `7.5.9`) to address the high-severity hardlink path traversal advisory (`GHSA-qffp-2rhf-9h96`). Thanks @shakkernerd.
2223
- Cron/announce delivery robustness: bypass pending-descendant announce guards for cron completion sends, ensure named-agent announce routes have outbound session entries, and fall back to direct delivery only when an announce send was actually attempted and failed. (from #35185, #32443, #34987) Thanks @Sid-Qin, @scoootscooob, and @bmendonca3.

extensions/feishu/src/reply-dispatcher.test.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,6 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
300300
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
301301
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
302302
});
303-
304303
it("suppresses duplicate final text while still sending media", async () => {
305304
resolveFeishuAccountMock.mockReturnValue({
306305
accountId: "main",
@@ -341,6 +340,40 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
341340
);
342341
});
343342

343+
it("keeps distinct non-streaming final payloads", async () => {
344+
resolveFeishuAccountMock.mockReturnValue({
345+
accountId: "main",
346+
appId: "app_id",
347+
appSecret: "app_secret",
348+
domain: "feishu",
349+
config: {
350+
renderMode: "auto",
351+
streaming: false,
352+
},
353+
});
354+
355+
createFeishuReplyDispatcher({
356+
cfg: {} as never,
357+
agentId: "agent",
358+
runtime: { log: vi.fn(), error: vi.fn() } as never,
359+
chatId: "oc_chat",
360+
});
361+
362+
const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0];
363+
await options.deliver({ text: "notice header" }, { kind: "final" });
364+
await options.deliver({ text: "actual answer body" }, { kind: "final" });
365+
366+
expect(sendMessageFeishuMock).toHaveBeenCalledTimes(2);
367+
expect(sendMessageFeishuMock).toHaveBeenNthCalledWith(
368+
1,
369+
expect.objectContaining({ text: "notice header" }),
370+
);
371+
expect(sendMessageFeishuMock).toHaveBeenNthCalledWith(
372+
2,
373+
expect.objectContaining({ text: "actual answer body" }),
374+
);
375+
});
376+
344377
it("treats block updates as delta chunks", async () => {
345378
resolveFeishuAccountMock.mockReturnValue({
346379
accountId: "main",

extensions/feishu/src/reply-dispatcher.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
143143
let streaming: FeishuStreamingSession | null = null;
144144
let streamText = "";
145145
let lastPartial = "";
146-
let lastFinalText: string | null = null;
146+
const deliveredFinalTexts = new Set<string>();
147147
let partialUpdateQueue: Promise<void> = Promise.resolve();
148148
let streamingStartPromise: Promise<void> | null = null;
149149
type StreamTextUpdateMode = "snapshot" | "delta";
@@ -230,7 +230,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
230230
responsePrefixContextProvider: prefixContext.responsePrefixContextProvider,
231231
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, agentId),
232232
onReplyStart: () => {
233-
lastFinalText = null;
233+
deliveredFinalTexts.clear();
234234
if (streamingEnabled && renderMode === "card") {
235235
startStreaming();
236236
}
@@ -246,10 +246,8 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
246246
: [];
247247
const hasText = Boolean(text.trim());
248248
const hasMedia = mediaList.length > 0;
249-
// Suppress only exact duplicate final text payloads to avoid
250-
// dropping legitimate multi-part final replies.
251249
const skipTextForDuplicateFinal =
252-
info?.kind === "final" && hasText && lastFinalText === text;
250+
info?.kind === "final" && hasText && deliveredFinalTexts.has(text);
253251
const shouldDeliverText = hasText && !skipTextForDuplicateFinal;
254252

255253
if (!shouldDeliverText && !hasMedia) {
@@ -287,7 +285,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
287285
if (info?.kind === "final") {
288286
streamText = mergeStreamingText(streamText, text);
289287
await closeStreaming();
290-
lastFinalText = text;
288+
deliveredFinalTexts.add(text);
291289
}
292290
// Send media even when streaming handled the text
293291
if (hasMedia) {
@@ -324,7 +322,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
324322
first = false;
325323
}
326324
if (info?.kind === "final") {
327-
lastFinalText = text;
325+
deliveredFinalTexts.add(text);
328326
}
329327
} else {
330328
const converted = core.channel.text.convertMarkdownTables(text, tableMode);
@@ -345,7 +343,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
345343
first = false;
346344
}
347345
if (info?.kind === "final") {
348-
lastFinalText = text;
346+
deliveredFinalTexts.add(text);
349347
}
350348
}
351349
}
Lines changed: 24 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,5 @@
1-
import { beforeEach, describe, expect, it, vi } from "vitest";
2-
3-
const fetchWithSsrFGuardMock = vi.hoisted(() => vi.fn());
4-
5-
vi.mock("openclaw/plugin-sdk/feishu", () => ({
6-
fetchWithSsrFGuard: fetchWithSsrFGuardMock,
7-
}));
8-
9-
import { FeishuStreamingSession, mergeStreamingText } from "./streaming-card.js";
1+
import { describe, expect, it } from "vitest";
2+
import { mergeStreamingText, resolveStreamingCardSendMode } from "./streaming-card.js";
103

114
describe("mergeStreamingText", () => {
125
it("prefers the latest full text when it already includes prior text", () => {
@@ -28,59 +21,34 @@ describe("mergeStreamingText", () => {
2821
expect(mergeStreamingText("revision_id: 552", "2,一点变化都没有")).toBe(
2922
"revision_id: 552,一点变化都没有",
3023
);
24+
expect(mergeStreamingText("abc", "cabc")).toBe("cabc");
3125
});
3226
});
3327

34-
describe("FeishuStreamingSession routing", () => {
35-
beforeEach(() => {
36-
vi.clearAllMocks();
37-
fetchWithSsrFGuardMock.mockReset();
28+
describe("resolveStreamingCardSendMode", () => {
29+
it("prefers message.reply when reply target and root id both exist", () => {
30+
expect(
31+
resolveStreamingCardSendMode({
32+
replyToMessageId: "om_parent",
33+
rootId: "om_topic_root",
34+
}),
35+
).toBe("reply");
3836
});
3937

40-
it("prefers message.reply when reply target and root id both exist", async () => {
41-
fetchWithSsrFGuardMock
42-
.mockResolvedValueOnce({
43-
response: { json: async () => ({ code: 0, msg: "ok", tenant_access_token: "token" }) },
44-
release: async () => {},
45-
})
46-
.mockResolvedValueOnce({
47-
response: { json: async () => ({ code: 0, msg: "ok", data: { card_id: "card_1" } }) },
48-
release: async () => {},
49-
});
50-
51-
const replyMock = vi.fn(async () => ({ code: 0, data: { message_id: "msg_reply" } }));
52-
const createMock = vi.fn(async () => ({ code: 0, data: { message_id: "msg_create" } }));
53-
54-
const session = new FeishuStreamingSession(
55-
{
56-
im: {
57-
message: {
58-
reply: replyMock,
59-
create: createMock,
60-
},
61-
},
62-
} as never,
63-
{
64-
appId: "app",
65-
appSecret: "secret",
66-
domain: "feishu",
67-
},
68-
);
69-
70-
await session.start("oc_chat", "chat_id", {
71-
replyToMessageId: "om_parent",
72-
replyInThread: true,
73-
rootId: "om_topic_root",
74-
});
38+
it("falls back to root create when reply target is absent", () => {
39+
expect(
40+
resolveStreamingCardSendMode({
41+
rootId: "om_topic_root",
42+
}),
43+
).toBe("root_create");
44+
});
7545

76-
expect(replyMock).toHaveBeenCalledTimes(1);
77-
expect(replyMock).toHaveBeenCalledWith({
78-
path: { message_id: "om_parent" },
79-
data: expect.objectContaining({
80-
msg_type: "interactive",
81-
reply_in_thread: true,
46+
it("uses create mode when no reply routing fields are provided", () => {
47+
expect(resolveStreamingCardSendMode()).toBe("create");
48+
expect(
49+
resolveStreamingCardSendMode({
50+
replyInThread: true,
8251
}),
83-
});
84-
expect(createMock).not.toHaveBeenCalled();
52+
).toBe("create");
8553
});
8654
});

extensions/feishu/src/streaming-card.ts

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ export type StreamingCardHeader = {
1616
template?: string;
1717
};
1818

19+
type StreamingStartOptions = {
20+
replyToMessageId?: string;
21+
replyInThread?: boolean;
22+
rootId?: string;
23+
header?: StreamingCardHeader;
24+
};
25+
1926
// Token cache (keyed by domain + appId)
2027
const tokenCache = new Map<string, { token: string; expiresAt: number }>();
2128

@@ -103,6 +110,12 @@ export function mergeStreamingText(
103110
if (previous.startsWith(next)) {
104111
return previous;
105112
}
113+
if (next.includes(previous)) {
114+
return next;
115+
}
116+
if (previous.includes(next)) {
117+
return previous;
118+
}
106119

107120
// Merge partial overlaps, e.g. "这" + "这是" => "这是".
108121
const maxOverlap = Math.min(previous.length, next.length);
@@ -111,15 +124,18 @@ export function mergeStreamingText(
111124
return `${previous}${next.slice(overlap)}`;
112125
}
113126
}
127+
// Fallback for fragmented partial chunks: append as-is to avoid losing tokens.
128+
return `${previous}${next}`;
129+
}
114130

115-
if (next.includes(previous)) {
116-
return next;
131+
export function resolveStreamingCardSendMode(options?: StreamingStartOptions) {
132+
if (options?.replyToMessageId) {
133+
return "reply";
117134
}
118-
if (previous.includes(next)) {
119-
return previous;
135+
if (options?.rootId) {
136+
return "root_create";
120137
}
121-
// Fallback for fragmented partial chunks: append as-is to avoid losing tokens.
122-
return `${previous}${next}`;
138+
return "create";
123139
}
124140

125141
/** Streaming card session manager */
@@ -143,12 +159,7 @@ export class FeishuStreamingSession {
143159
async start(
144160
receiveId: string,
145161
receiveIdType: "open_id" | "user_id" | "union_id" | "email" | "chat_id" = "chat_id",
146-
options?: {
147-
replyToMessageId?: string;
148-
replyInThread?: boolean;
149-
rootId?: string;
150-
header?: StreamingCardHeader;
151-
},
162+
options?: StreamingStartOptions,
152163
): Promise<void> {
153164
if (this.state) {
154165
return;
@@ -204,22 +215,24 @@ export class FeishuStreamingSession {
204215
// message.create with root_id may silently ignore root_id for card
205216
// references (card_id format).
206217
let sendRes;
207-
if (options?.replyToMessageId) {
218+
const sendOptions = options ?? {};
219+
const sendMode = resolveStreamingCardSendMode(sendOptions);
220+
if (sendMode === "reply") {
208221
sendRes = await this.client.im.message.reply({
209-
path: { message_id: options.replyToMessageId },
222+
path: { message_id: sendOptions.replyToMessageId! },
210223
data: {
211224
msg_type: "interactive",
212225
content: cardContent,
213-
...(options.replyInThread ? { reply_in_thread: true } : {}),
226+
...(sendOptions.replyInThread ? { reply_in_thread: true } : {}),
214227
},
215228
});
216-
} else if (options?.rootId) {
229+
} else if (sendMode === "root_create") {
217230
// root_id is undeclared in the SDK types but accepted at runtime
218231
sendRes = await this.client.im.message.create({
219232
params: { receive_id_type: receiveIdType },
220233
data: Object.assign(
221234
{ receive_id: receiveId, msg_type: "interactive", content: cardContent },
222-
{ root_id: options.rootId },
235+
{ root_id: sendOptions.rootId },
223236
),
224237
});
225238
} else {

0 commit comments

Comments
 (0)