Skip to content

Commit e8a162d

Browse files
mathiasnaglerclaudevincentkoc
authored
fix(mattermost): prevent duplicate messages when block streaming + threading are active (openclaw#41362)
* fix(mattermost): prevent duplicate messages when block streaming + threading are active Remove replyToId from createBlockReplyPayloadKey so identical content is deduplicated regardless of threading target. Add explicit threading dock to the Mattermost plugin with resolveReplyToMode reading from config (default "all"), and add replyToMode to the Mattermost config schema. Fixes openclaw#41219 Co-Authored-By: Claude Opus 4.6 <[email protected]> * fix(mattermost): address PR review — per-account replyToMode and test clarity Read replyToMode from the merged per-account config via resolveMattermostAccount so account-level overrides are honored in multi-account setups. Add replyToMode to MattermostAccountConfig type. Rename misleading test to clarify it exercises shouldDropFinalPayloads short-circuit, not payload key dedup. Co-Authored-By: Claude Opus 4.6 <[email protected]> * Replies: keep block-pipeline reply targets distinct * Tests: cover block reply target-aware dedupe * Update CHANGELOG.md --------- Co-authored-by: Claude Opus 4.6 <[email protected]> Co-authored-by: Vincent Koc <[email protected]>
1 parent 241e8cc commit e8a162d

File tree

10 files changed

+182
-6
lines changed

10 files changed

+182
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai
2020
- TUI/chat log: reuse the active assistant message component for the same streaming run so `openclaw tui` no longer renders duplicate assistant replies. (#35364) Thanks @lisitan.
2121
- macOS/Reminders: add the missing `NSRemindersUsageDescription` to the bundled app so `apple-reminders` can trigger the system permission prompt from OpenClaw.app. (#8559) Thanks @dinakars777.
2222
- iMessage/self-chat echo dedupe: drop reflected duplicate copies only when a matching `is_from_me` event was just seen for the same chat, text, and `created_at`, preventing self-chat loops without broad text-only suppression. Related to #32166. (#38440) Thanks @vincentkoc.
23+
- Mattermost/block streaming: fix duplicate message delivery (one threaded, one top-level) when block streaming is active by excluding `replyToId` from the block reply dedup key and adding an explicit `threading` dock to the Mattermost plugin. (#41362) Thanks @mathiasnagler and @vincentkoc.
2324
- BlueBubbles/self-chat echo dedupe: drop reflected duplicate webhook copies only when a matching `fromMe` event was just seen for the same chat, body, and timestamp, preventing self-chat loops without broad webhook suppression. Related to #32166. (#38442) Thanks @vincentkoc.
2425

2526
## 2026.3.11

extensions/mattermost/src/channel.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,16 @@ export const mattermostPlugin: ChannelPlugin<ResolvedMattermostAccount> = {
270270
streaming: {
271271
blockStreamingCoalesceDefaults: { minChars: 1500, idleMs: 1000 },
272272
},
273+
threading: {
274+
resolveReplyToMode: ({ cfg, accountId }) => {
275+
const account = resolveMattermostAccount({ cfg, accountId: accountId ?? "default" });
276+
const mode = account.config.replyToMode;
277+
if (mode === "off" || mode === "first") {
278+
return mode;
279+
}
280+
return "all";
281+
},
282+
},
273283
reload: { configPrefixes: ["channels.mattermost"] },
274284
configSchema: buildChannelConfigSchema(MattermostConfigSchema),
275285
config: {

extensions/mattermost/src/config-schema.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ const MattermostAccountSchemaBase = z
4343
chunkMode: z.enum(["length", "newline"]).optional(),
4444
blockStreaming: z.boolean().optional(),
4545
blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(),
46+
replyToMode: z.enum(["off", "first", "all"]).optional(),
4647
responsePrefix: z.string().optional(),
4748
actions: z
4849
.object({

extensions/mattermost/src/mattermost/monitor.test.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,29 @@ describe("mattermost mention gating", () => {
109109
});
110110
});
111111

112+
describe("resolveMattermostReplyRootId with block streaming payloads", () => {
113+
it("uses threadRootId for block-streamed payloads with replyToId", () => {
114+
// When block streaming sends a payload with replyToId from the threading
115+
// mode, the deliver callback should still use the existing threadRootId.
116+
expect(
117+
resolveMattermostReplyRootId({
118+
threadRootId: "thread-root-1",
119+
replyToId: "streamed-reply-id",
120+
}),
121+
).toBe("thread-root-1");
122+
});
123+
124+
it("falls back to payload replyToId when no threadRootId in block streaming", () => {
125+
// Top-level channel message: no threadRootId, payload carries the
126+
// inbound post id as replyToId from the "all" threading mode.
127+
expect(
128+
resolveMattermostReplyRootId({
129+
replyToId: "inbound-post-for-threading",
130+
}),
131+
).toBe("inbound-post-for-threading");
132+
});
133+
});
134+
112135
describe("resolveMattermostReplyRootId", () => {
113136
it("uses replyToId for top-level replies", () => {
114137
expect(

extensions/mattermost/src/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ export type MattermostAccountConfig = {
5252
blockStreaming?: boolean;
5353
/** Merge streamed block replies before sending. */
5454
blockStreamingCoalesce?: BlockStreamingCoalesceConfig;
55+
/** Control reply threading (off|first|all). Default: "all". */
56+
replyToMode?: "off" | "first" | "all";
5557
/** Outbound response prefix override for this channel/account. */
5658
responsePrefix?: string;
5759
/** Action toggles for this account. */

src/auto-reply/reply/agent-runner-payloads.test.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,50 @@ describe("buildReplyPayloads media filter integration", () => {
169169
expect(replyPayloads).toHaveLength(0);
170170
});
171171

172+
it("drops all final payloads when block pipeline streamed successfully", async () => {
173+
const pipeline: Parameters<typeof buildReplyPayloads>[0]["blockReplyPipeline"] = {
174+
didStream: () => true,
175+
isAborted: () => false,
176+
hasSentPayload: () => false,
177+
enqueue: () => {},
178+
flush: async () => {},
179+
stop: () => {},
180+
hasBuffered: () => false,
181+
};
182+
// shouldDropFinalPayloads short-circuits to [] when the pipeline streamed
183+
// without aborting, so hasSentPayload is never reached.
184+
const { replyPayloads } = await buildReplyPayloads({
185+
...baseParams,
186+
blockStreamingEnabled: true,
187+
blockReplyPipeline: pipeline,
188+
replyToMode: "all",
189+
payloads: [{ text: "response", replyToId: "post-123" }],
190+
});
191+
192+
expect(replyPayloads).toHaveLength(0);
193+
});
194+
195+
it("deduplicates final payloads against directly sent block keys regardless of replyToId", async () => {
196+
// When block streaming is not active but directlySentBlockKeys has entries
197+
// (e.g. from pre-tool flush), the key should match even if replyToId differs.
198+
const { createBlockReplyContentKey } = await import("./block-reply-pipeline.js");
199+
const directlySentBlockKeys = new Set<string>();
200+
directlySentBlockKeys.add(
201+
createBlockReplyContentKey({ text: "response", replyToId: "post-1" }),
202+
);
203+
204+
const { replyPayloads } = await buildReplyPayloads({
205+
...baseParams,
206+
blockStreamingEnabled: false,
207+
blockReplyPipeline: null,
208+
directlySentBlockKeys,
209+
replyToMode: "off",
210+
payloads: [{ text: "response" }],
211+
});
212+
213+
expect(replyPayloads).toHaveLength(0);
214+
});
215+
172216
it("does not suppress same-target replies when accountId differs", async () => {
173217
const { replyPayloads } = await buildReplyPayloads({
174218
...baseParams,

src/auto-reply/reply/agent-runner-payloads.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import type { OriginatingChannelType } from "../templating.js";
55
import { SILENT_REPLY_TOKEN } from "../tokens.js";
66
import type { ReplyPayload } from "../types.js";
77
import { formatBunFetchSocketError, isBunFetchSocketError } from "./agent-runner-utils.js";
8-
import { createBlockReplyPayloadKey, type BlockReplyPipeline } from "./block-reply-pipeline.js";
8+
import { createBlockReplyContentKey, type BlockReplyPipeline } from "./block-reply-pipeline.js";
99
import {
1010
resolveOriginAccountId,
1111
resolveOriginMessageProvider,
@@ -213,7 +213,7 @@ export async function buildReplyPayloads(params: {
213213
)
214214
: params.directlySentBlockKeys?.size
215215
? mediaFilteredPayloads.filter(
216-
(payload) => !params.directlySentBlockKeys!.has(createBlockReplyPayloadKey(payload)),
216+
(payload) => !params.directlySentBlockKeys!.has(createBlockReplyContentKey(payload)),
217217
)
218218
: mediaFilteredPayloads;
219219
const replyPayloads = suppressMessagingToolReplies ? [] : filteredPayloads;
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import { describe, expect, it } from "vitest";
2+
import {
3+
createBlockReplyContentKey,
4+
createBlockReplyPayloadKey,
5+
createBlockReplyPipeline,
6+
} from "./block-reply-pipeline.js";
7+
8+
describe("createBlockReplyPayloadKey", () => {
9+
it("produces different keys for payloads differing only by replyToId", () => {
10+
const a = createBlockReplyPayloadKey({ text: "hello world", replyToId: "post-1" });
11+
const b = createBlockReplyPayloadKey({ text: "hello world", replyToId: "post-2" });
12+
const c = createBlockReplyPayloadKey({ text: "hello world" });
13+
expect(a).not.toBe(b);
14+
expect(a).not.toBe(c);
15+
});
16+
17+
it("produces different keys for payloads with different text", () => {
18+
const a = createBlockReplyPayloadKey({ text: "hello" });
19+
const b = createBlockReplyPayloadKey({ text: "world" });
20+
expect(a).not.toBe(b);
21+
});
22+
23+
it("produces different keys for payloads with different media", () => {
24+
const a = createBlockReplyPayloadKey({ text: "hello", mediaUrl: "file:///a.png" });
25+
const b = createBlockReplyPayloadKey({ text: "hello", mediaUrl: "file:///b.png" });
26+
expect(a).not.toBe(b);
27+
});
28+
29+
it("trims whitespace from text for key comparison", () => {
30+
const a = createBlockReplyPayloadKey({ text: " hello " });
31+
const b = createBlockReplyPayloadKey({ text: "hello" });
32+
expect(a).toBe(b);
33+
});
34+
});
35+
36+
describe("createBlockReplyContentKey", () => {
37+
it("produces the same key for payloads differing only by replyToId", () => {
38+
const a = createBlockReplyContentKey({ text: "hello world", replyToId: "post-1" });
39+
const b = createBlockReplyContentKey({ text: "hello world", replyToId: "post-2" });
40+
const c = createBlockReplyContentKey({ text: "hello world" });
41+
expect(a).toBe(b);
42+
expect(a).toBe(c);
43+
});
44+
});
45+
46+
describe("createBlockReplyPipeline dedup with threading", () => {
47+
it("keeps separate deliveries for same text with different replyToId", async () => {
48+
const sent: Array<{ text?: string; replyToId?: string }> = [];
49+
const pipeline = createBlockReplyPipeline({
50+
onBlockReply: async (payload) => {
51+
sent.push({ text: payload.text, replyToId: payload.replyToId });
52+
},
53+
timeoutMs: 5000,
54+
});
55+
56+
pipeline.enqueue({ text: "response text", replyToId: "thread-root-1" });
57+
pipeline.enqueue({ text: "response text", replyToId: undefined });
58+
await pipeline.flush();
59+
60+
expect(sent).toEqual([
61+
{ text: "response text", replyToId: "thread-root-1" },
62+
{ text: "response text", replyToId: undefined },
63+
]);
64+
});
65+
66+
it("hasSentPayload matches regardless of replyToId", async () => {
67+
const pipeline = createBlockReplyPipeline({
68+
onBlockReply: async () => {},
69+
timeoutMs: 5000,
70+
});
71+
72+
pipeline.enqueue({ text: "response text", replyToId: "thread-root-1" });
73+
await pipeline.flush();
74+
75+
// Final payload with no replyToId should be recognized as already sent
76+
expect(pipeline.hasSentPayload({ text: "response text" })).toBe(true);
77+
expect(pipeline.hasSentPayload({ text: "response text", replyToId: "other-id" })).toBe(true);
78+
});
79+
});

src/auto-reply/reply/block-reply-pipeline.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,19 @@ export function createBlockReplyPayloadKey(payload: ReplyPayload): string {
4848
});
4949
}
5050

51+
export function createBlockReplyContentKey(payload: ReplyPayload): string {
52+
const text = payload.text?.trim() ?? "";
53+
const mediaList = payload.mediaUrls?.length
54+
? payload.mediaUrls
55+
: payload.mediaUrl
56+
? [payload.mediaUrl]
57+
: [];
58+
// Content-only key used for final-payload suppression after block streaming.
59+
// This intentionally ignores replyToId so a streamed threaded payload and the
60+
// later final payload still collapse when they carry the same content.
61+
return JSON.stringify({ text, mediaList });
62+
}
63+
5164
const withTimeout = async <T>(
5265
promise: Promise<T>,
5366
timeoutMs: number,
@@ -80,6 +93,7 @@ export function createBlockReplyPipeline(params: {
8093
}): BlockReplyPipeline {
8194
const { onBlockReply, timeoutMs, coalescing, buffer } = params;
8295
const sentKeys = new Set<string>();
96+
const sentContentKeys = new Set<string>();
8397
const pendingKeys = new Set<string>();
8498
const seenKeys = new Set<string>();
8599
const bufferedKeys = new Set<string>();
@@ -95,6 +109,7 @@ export function createBlockReplyPipeline(params: {
95109
return;
96110
}
97111
const payloadKey = createBlockReplyPayloadKey(payload);
112+
const contentKey = createBlockReplyContentKey(payload);
98113
if (!bypassSeenCheck) {
99114
if (seenKeys.has(payloadKey)) {
100115
return;
@@ -130,6 +145,7 @@ export function createBlockReplyPipeline(params: {
130145
return;
131146
}
132147
sentKeys.add(payloadKey);
148+
sentContentKeys.add(contentKey);
133149
didStream = true;
134150
})
135151
.catch((err) => {
@@ -238,8 +254,8 @@ export function createBlockReplyPipeline(params: {
238254
didStream: () => didStream,
239255
isAborted: () => aborted,
240256
hasSentPayload: (payload) => {
241-
const payloadKey = createBlockReplyPayloadKey(payload);
242-
return sentKeys.has(payloadKey);
257+
const payloadKey = createBlockReplyContentKey(payload);
258+
return sentContentKeys.has(payloadKey);
243259
},
244260
};
245261
}

src/auto-reply/reply/reply-delivery.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { logVerbose } from "../../globals.js";
22
import { SILENT_REPLY_TOKEN } from "../tokens.js";
33
import type { BlockReplyContext, ReplyPayload } from "../types.js";
44
import type { BlockReplyPipeline } from "./block-reply-pipeline.js";
5-
import { createBlockReplyPayloadKey } from "./block-reply-pipeline.js";
5+
import { createBlockReplyContentKey } from "./block-reply-pipeline.js";
66
import { parseReplyDirectives } from "./reply-directives.js";
77
import { applyReplyTagsToPayload, isRenderablePayload } from "./reply-payloads.js";
88
import type { TypingSignaler } from "./typing-mode.js";
@@ -128,7 +128,7 @@ export function createBlockReplyDeliveryHandler(params: {
128128
} else if (params.blockStreamingEnabled) {
129129
// Send directly when flushing before tool execution (no pipeline but streaming enabled).
130130
// Track sent key to avoid duplicate in final payloads.
131-
params.directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload));
131+
params.directlySentBlockKeys.add(createBlockReplyContentKey(blockPayload));
132132
await params.onBlockReply(blockPayload);
133133
}
134134
// When streaming is disabled entirely, blocks are accumulated in final text instead.

0 commit comments

Comments
 (0)