Skip to content

Commit 259d68d

Browse files
קרמבוטקרמבוט
authored andcommitted
fix(routing): prevent cross-channel reply routing in same-provider multi-channel setups
Fixes #45514
1 parent 52a0aa0 commit 259d68d

File tree

2 files changed

+242
-7
lines changed

2 files changed

+242
-7
lines changed
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
import { afterAll, beforeAll, describe, expect, it } from "vitest";
2+
import type { OpenClawConfig } from "../../../config/config.js";
3+
import { defaultRuntime } from "../../../runtime.js";
4+
import { enqueueFollowupRun, scheduleFollowupDrain } from "../queue.js";
5+
import type { FollowupRun, QueueSettings } from "../queue.js";
6+
7+
function createDeferred<T>() {
8+
let resolve!: (value: T) => void;
9+
let reject!: (reason?: unknown) => void;
10+
const promise = new Promise<T>((res, rej) => {
11+
resolve = res;
12+
reject = rej;
13+
});
14+
return { promise, resolve, reject };
15+
}
16+
17+
function createRun(params: {
18+
prompt: string;
19+
originatingChannel?: FollowupRun["originatingChannel"];
20+
originatingTo?: string;
21+
originatingAccountId?: string;
22+
originatingThreadId?: string | number;
23+
}): FollowupRun {
24+
return {
25+
prompt: params.prompt,
26+
enqueuedAt: Date.now(),
27+
originatingChannel: params.originatingChannel,
28+
originatingTo: params.originatingTo,
29+
originatingAccountId: params.originatingAccountId,
30+
originatingThreadId: params.originatingThreadId,
31+
run: {
32+
agentId: "agent",
33+
agentDir: "/tmp",
34+
sessionId: "sess",
35+
sessionFile: "/tmp/session.json",
36+
workspaceDir: "/tmp",
37+
config: {} as OpenClawConfig,
38+
provider: "openai",
39+
model: "gpt-test",
40+
timeoutMs: 10_000,
41+
blockReplyBreak: "text_end",
42+
},
43+
};
44+
}
45+
46+
let previousRuntimeError: typeof defaultRuntime.error;
47+
48+
beforeAll(() => {
49+
previousRuntimeError = defaultRuntime.error;
50+
defaultRuntime.error = (() => {}) as typeof defaultRuntime.error;
51+
});
52+
53+
afterAll(() => {
54+
defaultRuntime.error = previousRuntimeError;
55+
});
56+
57+
describe("multi-channel Slack reply routing (regression #45514)", () => {
58+
it("routes items from two different Slack channels individually with correct originating metadata", async () => {
59+
// Regression: when two Slack channels share a queue, cross-channel detection
60+
// must route each item back to its own channel — not the other channel.
61+
const key = `test-slack-multichannel-${Date.now()}`;
62+
const calls: FollowupRun[] = [];
63+
const done = createDeferred<void>();
64+
const expectedCalls = 2;
65+
const runFollowup = async (run: FollowupRun) => {
66+
calls.push(run);
67+
if (calls.length >= expectedCalls) {
68+
done.resolve();
69+
}
70+
};
71+
const settings: QueueSettings = {
72+
mode: "collect",
73+
debounceMs: 0,
74+
cap: 50,
75+
dropPolicy: "summarize",
76+
};
77+
78+
enqueueFollowupRun(
79+
key,
80+
createRun({
81+
prompt: "msg-channel-A",
82+
originatingChannel: "slack",
83+
originatingTo: "channel:C_CHANNEL_A",
84+
}),
85+
settings,
86+
);
87+
enqueueFollowupRun(
88+
key,
89+
createRun({
90+
prompt: "msg-channel-B",
91+
originatingChannel: "slack",
92+
originatingTo: "channel:C_CHANNEL_B",
93+
}),
94+
settings,
95+
);
96+
97+
scheduleFollowupDrain(key, runFollowup);
98+
await done.promise;
99+
100+
// Both items must be delivered individually (cross-channel, not collected)
101+
expect(calls).toHaveLength(2);
102+
// First item must carry channel A's routing — not channel B's
103+
expect(calls[0]?.originatingChannel).toBe("slack");
104+
expect(calls[0]?.originatingTo).toBe("channel:C_CHANNEL_A");
105+
// Second item must carry channel B's routing — not channel A's
106+
expect(calls[1]?.originatingChannel).toBe("slack");
107+
expect(calls[1]?.originatingTo).toBe("channel:C_CHANNEL_B");
108+
});
109+
110+
it("collect batch routing comes from a single consistent source item, not mixed across items", async () => {
111+
// Regression: resolveOriginRoutingMetadata must pick all routing fields from
112+
// the same item. Picking each field independently allows channel from one item
113+
// to combine with accountId/threadId from another, routing the reply wrongly.
114+
const key = `test-collect-single-source-${Date.now()}`;
115+
const calls: FollowupRun[] = [];
116+
const done = createDeferred<void>();
117+
const runFollowup = async (run: FollowupRun) => {
118+
calls.push(run);
119+
done.resolve();
120+
};
121+
const settings: QueueSettings = {
122+
mode: "collect",
123+
debounceMs: 0,
124+
cap: 50,
125+
dropPolicy: "summarize",
126+
};
127+
128+
enqueueFollowupRun(
129+
key,
130+
createRun({
131+
prompt: "msg-1",
132+
originatingChannel: "slack",
133+
originatingTo: "channel:C_CHANNEL_A",
134+
originatingAccountId: "WS_ALPHA",
135+
originatingThreadId: "1706000000.000001",
136+
}),
137+
settings,
138+
);
139+
enqueueFollowupRun(
140+
key,
141+
createRun({
142+
prompt: "msg-2",
143+
originatingChannel: "slack",
144+
originatingTo: "channel:C_CHANNEL_A",
145+
originatingAccountId: "WS_ALPHA",
146+
originatingThreadId: "1706000000.000001",
147+
}),
148+
settings,
149+
);
150+
151+
scheduleFollowupDrain(key, runFollowup);
152+
await done.promise;
153+
154+
// Same channel + same thread: must be collected into one batch call
155+
expect(calls).toHaveLength(1);
156+
expect(calls[0]?.prompt).toContain("[Queued messages while agent was busy]");
157+
// All routing fields must be consistent — from the same source item
158+
expect(calls[0]?.originatingChannel).toBe("slack");
159+
expect(calls[0]?.originatingTo).toBe("channel:C_CHANNEL_A");
160+
expect(calls[0]?.originatingAccountId).toBe("WS_ALPHA");
161+
expect(calls[0]?.originatingThreadId).toBe("1706000000.000001");
162+
});
163+
164+
it("does not mix routing fields when first item has channel and later item has accountId", async () => {
165+
// Regression: with independent .find() calls, originatingChannel could come
166+
// from item[0] and originatingAccountId from item[1], producing a mixed
167+
// routing context. Items from different channels are detected as cross-channel
168+
// and processed individually — each with its own complete routing.
169+
const key = `test-no-field-mixing-${Date.now()}`;
170+
const calls: FollowupRun[] = [];
171+
const done = createDeferred<void>();
172+
const expectedCalls = 2;
173+
const runFollowup = async (run: FollowupRun) => {
174+
calls.push(run);
175+
if (calls.length >= expectedCalls) {
176+
done.resolve();
177+
}
178+
};
179+
const settings: QueueSettings = {
180+
mode: "collect",
181+
debounceMs: 0,
182+
cap: 50,
183+
dropPolicy: "summarize",
184+
};
185+
186+
// Item from channel A with accountId WS_A
187+
enqueueFollowupRun(
188+
key,
189+
createRun({
190+
prompt: "from-A",
191+
originatingChannel: "slack",
192+
originatingTo: "channel:C_A",
193+
originatingAccountId: "WS_A",
194+
}),
195+
settings,
196+
);
197+
// Item from channel B with accountId WS_B
198+
enqueueFollowupRun(
199+
key,
200+
createRun({
201+
prompt: "from-B",
202+
originatingChannel: "slack",
203+
originatingTo: "channel:C_B",
204+
originatingAccountId: "WS_B",
205+
}),
206+
settings,
207+
);
208+
209+
scheduleFollowupDrain(key, runFollowup);
210+
await done.promise;
211+
212+
// Items from different channels/accounts must be processed individually
213+
expect(calls).toHaveLength(2);
214+
// Each call must carry its own complete, unmixed routing
215+
expect(calls[0]?.originatingChannel).toBe("slack");
216+
expect(calls[0]?.originatingTo).toBe("channel:C_A");
217+
expect(calls[0]?.originatingAccountId).toBe("WS_A");
218+
expect(calls[1]?.originatingChannel).toBe("slack");
219+
expect(calls[1]?.originatingTo).toBe("channel:C_B");
220+
expect(calls[1]?.originatingAccountId).toBe("WS_B");
221+
});
222+
});

src/auto-reply/reply/queue/drain.ts

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,27 @@ type OriginRoutingMetadata = Pick<
4141
>;
4242

4343
function resolveOriginRoutingMetadata(items: FollowupRun[]): OriginRoutingMetadata {
44+
// Resolve all routing fields from a single source item. Picking each field
45+
// independently with separate .find() calls can combine originatingChannel
46+
// from one item with originatingTo from another when items carry partial
47+
// metadata — silently routing a collect-batch reply to the wrong channel.
48+
// Using one consistent source prevents cross-item field mixing. Fixes #45514.
49+
const source = items.find(
50+
(item) =>
51+
item.originatingChannel ||
52+
item.originatingTo ||
53+
item.originatingAccountId ||
54+
// Support both number (Telegram topic) and string (Slack thread_ts) thread IDs.
55+
(item.originatingThreadId != null && item.originatingThreadId !== ""),
56+
);
57+
if (!source) {
58+
return {};
59+
}
4460
return {
45-
originatingChannel: items.find((item) => item.originatingChannel)?.originatingChannel,
46-
originatingTo: items.find((item) => item.originatingTo)?.originatingTo,
47-
originatingAccountId: items.find((item) => item.originatingAccountId)?.originatingAccountId,
48-
// Support both number (Telegram topic) and string (Slack thread_ts) thread IDs.
49-
originatingThreadId: items.find(
50-
(item) => item.originatingThreadId != null && item.originatingThreadId !== "",
51-
)?.originatingThreadId,
61+
originatingChannel: source.originatingChannel,
62+
originatingTo: source.originatingTo,
63+
originatingAccountId: source.originatingAccountId,
64+
originatingThreadId: source.originatingThreadId,
5265
};
5366
}
5467

0 commit comments

Comments
 (0)