Skip to content

Commit 7f2708a

Browse files
authored
fix(routing): unify session delivery invariants for duplicate suppression (openclaw#33786)
* Routing: unify session delivery invariants * Routing: address PR review feedback * Routing: tighten topic and session-scope suppression * fix(chat): inherit routes for per-account channel-peer sessions
1 parent 1be39d4 commit 7f2708a

File tree

12 files changed

+436
-28
lines changed

12 files changed

+436
-28
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai
2020
- Build/lazy runtime boundaries: replace ineffective dynamic import sites with dedicated lazy runtime boundaries across Slack slash handling, Telegram audit, CLI send deps, memory fallback, and outbound delivery paths while preserving behavior. (#33690) thanks @gumadeiras.
2121
- Config/heartbeat legacy-path handling: auto-migrate top-level `heartbeat` into `agents.defaults.heartbeat` (with merge semantics that preserve explicit defaults), and keep startup failures on non-migratable legacy entries in the detailed invalid-config path instead of generic migration-failed errors. (#32706) thanks @xiwan.
2222
- Plugins/SDK subpath parity: add channel-specific plugin SDK subpaths for Discord, Slack, Signal, iMessage, WhatsApp, and LINE; migrate bundled plugin entrypoints to scoped subpaths/core with CI guardrails; and keep `openclaw/plugin-sdk` root import compatibility for existing external plugins. (#33737) thanks @gumadeiras.
23+
- Routing/session duplicate suppression synthesis: align shared session delivery-context inheritance, channel-paired route-field merges, and reply-surface target matching so dmScope=main turns avoid cross-surface duplicate replies while thread-aware forwarding keeps intended routing semantics. (from #33629, #26889, #17337, #33250) Thanks @Yuandiaodiaodiao, @kevinwildenradt, @Glucksberg, and @bmendonca3.
2324
- Security/auth labels: remove token and API-key snippets from user-facing auth status labels so `/status` and `/models` do not expose credential fragments. (#33262) thanks @cu1ch3n.
2425
- Auth/credential semantics: align profile eligibility + probe diagnostics with SecretRef/expiry rules and harden browser download atomic writes. (#33733) thanks @joshavant.
2526
- Security/audit denyCommands guidance: suggest likely exact node command IDs for unknown `gateway.nodes.denyCommands` entries so ineffective denylist entries are easier to correct. (#29713) thanks @liquidhorizon88-bot.

src/agents/pi-embedded-messaging.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export type MessagingToolSend = {
55
provider: string;
66
accountId?: string;
77
to?: string;
8+
threadId?: string;
89
};
910

1011
const CORE_MESSAGING_TOOLS = new Set(["sessions_send", "message"]);

src/auto-reply/reply/reply-payloads.test.ts

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import { describe, expect, it } from "vitest";
2-
import { filterMessagingToolMediaDuplicates } from "./reply-payloads.js";
2+
import {
3+
filterMessagingToolMediaDuplicates,
4+
shouldSuppressMessagingToolReplies,
5+
} from "./reply-payloads.js";
36

47
describe("filterMessagingToolMediaDuplicates", () => {
58
it("strips mediaUrl when it matches sentMediaUrls", () => {
@@ -75,3 +78,79 @@ describe("filterMessagingToolMediaDuplicates", () => {
7578
expect(result).toEqual([{ text: "hello", mediaUrl: undefined, mediaUrls: undefined }]);
7679
});
7780
});
81+
82+
describe("shouldSuppressMessagingToolReplies", () => {
83+
it("suppresses when target provider is missing but target matches current provider route", () => {
84+
expect(
85+
shouldSuppressMessagingToolReplies({
86+
messageProvider: "telegram",
87+
originatingTo: "123",
88+
messagingToolSentTargets: [{ tool: "message", provider: "", to: "123" }],
89+
}),
90+
).toBe(true);
91+
});
92+
93+
it('suppresses when target provider uses "message" placeholder and target matches', () => {
94+
expect(
95+
shouldSuppressMessagingToolReplies({
96+
messageProvider: "telegram",
97+
originatingTo: "123",
98+
messagingToolSentTargets: [{ tool: "message", provider: "message", to: "123" }],
99+
}),
100+
).toBe(true);
101+
});
102+
103+
it("does not suppress when providerless target does not match origin route", () => {
104+
expect(
105+
shouldSuppressMessagingToolReplies({
106+
messageProvider: "telegram",
107+
originatingTo: "123",
108+
messagingToolSentTargets: [{ tool: "message", provider: "", to: "456" }],
109+
}),
110+
).toBe(false);
111+
});
112+
113+
it("suppresses telegram topic-origin replies when explicit threadId matches", () => {
114+
expect(
115+
shouldSuppressMessagingToolReplies({
116+
messageProvider: "telegram",
117+
originatingTo: "telegram:group:-100123:topic:77",
118+
messagingToolSentTargets: [
119+
{ tool: "message", provider: "telegram", to: "-100123", threadId: "77" },
120+
],
121+
}),
122+
).toBe(true);
123+
});
124+
125+
it("does not suppress telegram topic-origin replies when explicit threadId differs", () => {
126+
expect(
127+
shouldSuppressMessagingToolReplies({
128+
messageProvider: "telegram",
129+
originatingTo: "telegram:group:-100123:topic:77",
130+
messagingToolSentTargets: [
131+
{ tool: "message", provider: "telegram", to: "-100123", threadId: "88" },
132+
],
133+
}),
134+
).toBe(false);
135+
});
136+
137+
it("does not suppress telegram topic-origin replies when target omits topic metadata", () => {
138+
expect(
139+
shouldSuppressMessagingToolReplies({
140+
messageProvider: "telegram",
141+
originatingTo: "telegram:group:-100123:topic:77",
142+
messagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "-100123" }],
143+
}),
144+
).toBe(false);
145+
});
146+
147+
it("suppresses telegram replies when chatId matches but target forms differ", () => {
148+
expect(
149+
shouldSuppressMessagingToolReplies({
150+
messageProvider: "telegram",
151+
originatingTo: "telegram:group:-100123",
152+
messagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "-100123" }],
153+
}),
154+
).toBe(true);
155+
});
156+
});

src/auto-reply/reply/reply-payloads.ts

Lines changed: 69 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { normalizeChannelId } from "../../channels/plugins/index.js";
44
import type { ReplyToMode } from "../../config/types.js";
55
import { normalizeTargetForProvider } from "../../infra/outbound/target-normalization.js";
66
import { normalizeOptionalAccountId } from "../../routing/account-id.js";
7+
import { parseTelegramTarget } from "../../telegram/targets.js";
78
import type { OriginatingChannelType } from "../templating.js";
89
import type { ReplyPayload } from "../types.js";
910
import { extractReplyToTag } from "./reply-tags.js";
@@ -162,6 +163,62 @@ function normalizeProviderForComparison(value?: string): string | undefined {
162163
return PROVIDER_ALIAS_MAP[lowered] ?? lowered;
163164
}
164165

166+
function normalizeThreadIdForComparison(value?: string): string | undefined {
167+
const trimmed = value?.trim();
168+
if (!trimmed) {
169+
return undefined;
170+
}
171+
if (/^-?\d+$/.test(trimmed)) {
172+
return String(Number.parseInt(trimmed, 10));
173+
}
174+
return trimmed.toLowerCase();
175+
}
176+
177+
function resolveTargetProviderForComparison(params: {
178+
currentProvider: string;
179+
targetProvider?: string;
180+
}): string {
181+
const targetProvider = normalizeProviderForComparison(params.targetProvider);
182+
if (!targetProvider || targetProvider === "message") {
183+
return params.currentProvider;
184+
}
185+
return targetProvider;
186+
}
187+
188+
function targetsMatchForSuppression(params: {
189+
provider: string;
190+
originTarget: string;
191+
targetKey: string;
192+
targetThreadId?: string;
193+
}): boolean {
194+
if (params.provider !== "telegram") {
195+
return params.targetKey === params.originTarget;
196+
}
197+
198+
const origin = parseTelegramTarget(params.originTarget);
199+
const target = parseTelegramTarget(params.targetKey);
200+
const explicitTargetThreadId = normalizeThreadIdForComparison(params.targetThreadId);
201+
const targetThreadId =
202+
explicitTargetThreadId ??
203+
(target.messageThreadId != null ? String(target.messageThreadId) : undefined);
204+
const originThreadId =
205+
origin.messageThreadId != null ? String(origin.messageThreadId) : undefined;
206+
if (origin.chatId.trim().toLowerCase() !== target.chatId.trim().toLowerCase()) {
207+
return false;
208+
}
209+
if (originThreadId && targetThreadId != null) {
210+
return originThreadId === targetThreadId;
211+
}
212+
if (originThreadId && targetThreadId == null) {
213+
return false;
214+
}
215+
if (!originThreadId && targetThreadId != null) {
216+
return false;
217+
}
218+
// chatId already matched and neither side carries thread context.
219+
return true;
220+
}
221+
165222
export function shouldSuppressMessagingToolReplies(params: {
166223
messageProvider?: string;
167224
messagingToolSentTargets?: MessagingToolSend[];
@@ -182,23 +239,26 @@ export function shouldSuppressMessagingToolReplies(params: {
182239
return false;
183240
}
184241
return sentTargets.some((target) => {
185-
const targetProvider = normalizeProviderForComparison(target?.provider);
186-
if (!targetProvider) {
187-
return false;
188-
}
189-
const isGenericMessageProvider = targetProvider === "message";
190-
if (!isGenericMessageProvider && targetProvider !== provider) {
242+
const targetProvider = resolveTargetProviderForComparison({
243+
currentProvider: provider,
244+
targetProvider: target?.provider,
245+
});
246+
if (targetProvider !== provider) {
191247
return false;
192248
}
193-
const targetNormalizationProvider = isGenericMessageProvider ? provider : targetProvider;
194-
const targetKey = normalizeTargetForProvider(targetNormalizationProvider, target.to);
249+
const targetKey = normalizeTargetForProvider(targetProvider, target.to);
195250
if (!targetKey) {
196251
return false;
197252
}
198253
const targetAccount = normalizeOptionalAccountId(target.accountId);
199254
if (originAccount && targetAccount && originAccount !== targetAccount) {
200255
return false;
201256
}
202-
return targetKey === originTarget;
257+
return targetsMatchForSuppression({
258+
provider,
259+
originTarget,
260+
targetKey,
261+
targetThreadId: target.threadId,
262+
});
203263
});
204264
}

src/auto-reply/reply/session-delivery.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ function resolveSessionKeyChannelHint(sessionKey?: string): string | undefined {
3030
return normalizeMessageChannel(head);
3131
}
3232

33+
function isMainSessionKey(sessionKey?: string): boolean {
34+
const parsed = parseAgentSessionKey(sessionKey);
35+
if (!parsed) {
36+
return (sessionKey ?? "").trim().toLowerCase() === "main";
37+
}
38+
return parsed.rest.trim().toLowerCase() === "main";
39+
}
40+
3341
function isExternalRoutingChannel(channel?: string): channel is string {
3442
return Boolean(
3543
channel && channel !== INTERNAL_MESSAGE_CHANNEL && isDeliverableMessageChannel(channel),
@@ -42,6 +50,9 @@ export function resolveLastChannelRaw(params: {
4250
sessionKey?: string;
4351
}): string | undefined {
4452
const originatingChannel = normalizeMessageChannel(params.originatingChannelRaw);
53+
if (originatingChannel === INTERNAL_MESSAGE_CHANNEL && isMainSessionKey(params.sessionKey)) {
54+
return params.originatingChannelRaw;
55+
}
4556
const persistedChannel = normalizeMessageChannel(params.persistedLastChannel);
4657
const sessionKeyChannelHint = resolveSessionKeyChannelHint(params.sessionKey);
4758
let resolved = params.originatingChannelRaw || params.persistedLastChannel;
@@ -66,6 +77,9 @@ export function resolveLastToRaw(params: {
6677
sessionKey?: string;
6778
}): string | undefined {
6879
const originatingChannel = normalizeMessageChannel(params.originatingChannelRaw);
80+
if (originatingChannel === INTERNAL_MESSAGE_CHANNEL && isMainSessionKey(params.sessionKey)) {
81+
return params.originatingToRaw || params.toRaw;
82+
}
6983
const persistedChannel = normalizeMessageChannel(params.persistedLastChannel);
7084
const sessionKeyChannelHint = resolveSessionKeyChannelHint(params.sessionKey);
7185

src/auto-reply/reply/session.test.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1609,4 +1609,69 @@ describe("initSessionState internal channel routing preservation", () => {
16091609

16101610
expect(result.sessionEntry.lastChannel).toBe("webchat");
16111611
});
1612+
1613+
it("does not reuse stale external lastTo for webchat/main turns without destination", async () => {
1614+
const storePath = await createStorePath("webchat-main-no-stale-lastto-");
1615+
const sessionKey = "agent:main:main";
1616+
await writeSessionStoreFast(storePath, {
1617+
[sessionKey]: {
1618+
sessionId: "sess-webchat-main-1",
1619+
updatedAt: Date.now(),
1620+
lastChannel: "whatsapp",
1621+
lastTo: "+15555550123",
1622+
deliveryContext: {
1623+
channel: "whatsapp",
1624+
to: "+15555550123",
1625+
},
1626+
},
1627+
});
1628+
const cfg = { session: { store: storePath } } as OpenClawConfig;
1629+
1630+
const result = await initSessionState({
1631+
ctx: {
1632+
Body: "webchat follow-up",
1633+
SessionKey: sessionKey,
1634+
OriginatingChannel: "webchat",
1635+
},
1636+
cfg,
1637+
commandAuthorized: true,
1638+
});
1639+
1640+
expect(result.sessionEntry.lastChannel).toBe("webchat");
1641+
expect(result.sessionEntry.lastTo).toBeUndefined();
1642+
});
1643+
1644+
it("prefers webchat route over persisted external route for main session turns", async () => {
1645+
const storePath = await createStorePath("prefer-webchat-main-route-");
1646+
const sessionKey = "agent:main:main";
1647+
await writeSessionStoreFast(storePath, {
1648+
[sessionKey]: {
1649+
sessionId: "sess-webchat-main-2",
1650+
updatedAt: Date.now(),
1651+
lastChannel: "whatsapp",
1652+
lastTo: "+15555550123",
1653+
deliveryContext: {
1654+
channel: "whatsapp",
1655+
to: "+15555550123",
1656+
},
1657+
},
1658+
});
1659+
const cfg = { session: { store: storePath } } as OpenClawConfig;
1660+
1661+
const result = await initSessionState({
1662+
ctx: {
1663+
Body: "reply only here",
1664+
SessionKey: sessionKey,
1665+
OriginatingChannel: "webchat",
1666+
OriginatingTo: "session:webchat-main",
1667+
},
1668+
cfg,
1669+
commandAuthorized: true,
1670+
});
1671+
1672+
expect(result.sessionEntry.lastChannel).toBe("webchat");
1673+
expect(result.sessionEntry.lastTo).toBe("session:webchat-main");
1674+
expect(result.sessionEntry.deliveryContext?.channel).toBe("webchat");
1675+
expect(result.sessionEntry.deliveryContext?.to).toBe("session:webchat-main");
1676+
});
16121677
});

src/channels/plugins/types.core.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ export type ChannelMessageActionContext = {
332332
export type ChannelToolSend = {
333333
to: string;
334334
accountId?: string | null;
335+
threadId?: string | null;
335336
};
336337

337338
export type ChannelMessageActionAdapter = {

0 commit comments

Comments
 (0)