Skip to content

Commit 7b28b33

Browse files
committed
Fix Slack native streaming thread replies missing recipient ids
1 parent 750276f commit 7b28b33

File tree

4 files changed

+130
-13
lines changed

4 files changed

+130
-13
lines changed

src/slack/monitor/message-handler/dispatch.streaming.test.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
import { describe, expect, it } from "vitest";
2-
import { isSlackStreamingEnabled, resolveSlackStreamingThreadHint } from "./dispatch.js";
2+
import {
3+
isSlackStreamingEnabled,
4+
resolveSlackStreamingThreadHint,
5+
shouldUseSlackNativeStreaming,
6+
} from "./dispatch.js";
37

48
describe("slack native streaming defaults", () => {
59
it("is enabled when config is undefined", () => {
@@ -43,3 +47,27 @@ describe("slack native streaming thread hint", () => {
4347
).toBe("2000.1");
4448
});
4549
});
50+
51+
describe("slack native streaming recipient gating", () => {
52+
it("is disabled when team id is missing", () => {
53+
expect(
54+
shouldUseSlackNativeStreaming({
55+
streamingEnabled: true,
56+
threadTs: "1234.56",
57+
recipientTeamId: undefined,
58+
recipientUserId: "U123",
59+
}),
60+
).toBe(false);
61+
});
62+
63+
it("is disabled when user id is missing", () => {
64+
expect(
65+
shouldUseSlackNativeStreaming({
66+
streamingEnabled: true,
67+
threadTs: "1234.56",
68+
recipientTeamId: "T123",
69+
recipientUserId: undefined,
70+
}),
71+
).toBe(false);
72+
});
73+
});

src/slack/monitor/message-handler/dispatch.ts

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,25 @@ export function resolveSlackStreamingThreadHint(params: {
4343
});
4444
}
4545

46-
function shouldUseStreaming(params: {
46+
export function shouldUseSlackNativeStreaming(params: {
4747
streamingEnabled: boolean;
4848
threadTs: string | undefined;
49+
recipientTeamId: string | undefined;
50+
recipientUserId: string | undefined;
4951
}): boolean {
5052
if (!params.streamingEnabled) {
5153
return false;
5254
}
5355
if (!params.threadTs) {
54-
logVerbose("slack-stream: streaming disabled — no reply thread target available");
56+
logVerbose("slack-stream: streaming disabled - no reply thread target available");
57+
return false;
58+
}
59+
if (!params.recipientTeamId) {
60+
logVerbose("slack-stream: streaming disabled - missing recipient team id");
61+
return false;
62+
}
63+
if (!params.recipientUserId) {
64+
logVerbose("slack-stream: streaming disabled - missing recipient user id");
5565
return false;
5666
}
5767
return true;
@@ -152,9 +162,13 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
152162
incomingThreadTs,
153163
messageTs,
154164
});
155-
const useStreaming = shouldUseStreaming({
165+
const recipientTeamId = ctx.teamId.trim() || undefined;
166+
const recipientUserId = message.user?.trim() || undefined;
167+
const useStreaming = shouldUseSlackNativeStreaming({
156168
streamingEnabled,
157169
threadTs: streamThreadHint,
170+
recipientTeamId,
171+
recipientUserId,
158172
});
159173
let streamSession: SlackStreamSession | null = null;
160174
let streamFailed = false;
@@ -194,10 +208,21 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
194208
return;
195209
}
196210

211+
if (!recipientTeamId || !recipientUserId) {
212+
logVerbose(
213+
"slack-stream: missing recipient ids for stream start, falling back to normal delivery",
214+
);
215+
streamFailed = true;
216+
await deliverNormally(payload, streamThreadTs);
217+
return;
218+
}
219+
197220
streamSession = await startSlackStream({
198221
client: ctx.app.client,
199-
channel: message.channel,
222+
channelId: message.channel,
200223
threadTs: streamThreadTs,
224+
recipientTeamId,
225+
recipientUserId,
201226
text,
202227
});
203228
replyPlan.markSent();

src/slack/streaming.test.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import type { WebClient } from "@slack/web-api";
2+
import { describe, expect, it, vi } from "vitest";
3+
import { startSlackStream, stopSlackStream } from "./streaming.js";
4+
5+
describe("startSlackStream", () => {
6+
it("passes recipient ids to chatStream", async () => {
7+
const streamer = {
8+
append: vi.fn(),
9+
stop: vi.fn(),
10+
};
11+
const chatStream = vi.fn().mockReturnValue(streamer);
12+
const client = {
13+
chatStream,
14+
} as unknown as WebClient;
15+
16+
await startSlackStream({
17+
client,
18+
channelId: "C123",
19+
threadTs: "1730.1",
20+
recipientTeamId: "T123",
21+
recipientUserId: "U123",
22+
});
23+
24+
expect(chatStream).toHaveBeenCalledWith(
25+
expect.objectContaining({
26+
channel: "C123",
27+
thread_ts: "1730.1",
28+
recipient_team_id: "T123",
29+
recipient_user_id: "U123",
30+
}),
31+
);
32+
});
33+
});
34+
35+
describe("stopSlackStream", () => {
36+
it("does not throw when Slack rejects stop", async () => {
37+
const streamer = {
38+
append: vi.fn(),
39+
stop: vi.fn().mockRejectedValue(new Error("stop failed")),
40+
};
41+
const chatStream = vi.fn().mockReturnValue(streamer);
42+
const client = {
43+
chatStream,
44+
} as unknown as WebClient;
45+
46+
const session = await startSlackStream({
47+
client,
48+
channelId: "C123",
49+
threadTs: "1730.2",
50+
recipientTeamId: "T123",
51+
recipientUserId: "U123",
52+
});
53+
54+
await expect(stopSlackStream({ session })).resolves.toBeUndefined();
55+
expect(streamer.stop).toHaveBeenCalledTimes(1);
56+
});
57+
});

src/slack/streaming.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ export type SlackStreamSession = {
3232

3333
export type StartSlackStreamParams = {
3434
client: WebClient;
35-
channel: string;
35+
channelId: string;
3636
threadTs: string;
37+
recipientTeamId: string;
38+
recipientUserId: string;
3739
/** Optional initial markdown text to include in the stream start. */
3840
text?: string;
3941
};
@@ -64,18 +66,20 @@ export type StopSlackStreamParams = {
6466
export async function startSlackStream(
6567
params: StartSlackStreamParams,
6668
): Promise<SlackStreamSession> {
67-
const { client, channel, threadTs, text } = params;
69+
const { client, channelId, threadTs, recipientTeamId, recipientUserId, text } = params;
6870

69-
logVerbose(`slack-stream: starting stream in ${channel} thread=${threadTs}`);
71+
logVerbose(`slack-stream: starting stream in ${channelId} thread=${threadTs}`);
7072

7173
const streamer = client.chatStream({
72-
channel,
74+
channel: channelId,
7375
thread_ts: threadTs,
76+
recipient_team_id: recipientTeamId,
77+
recipient_user_id: recipientUserId,
7478
});
7579

7680
const session: SlackStreamSession = {
7781
streamer,
78-
channel,
82+
channel: channelId,
7983
threadTs,
8084
stopped: false,
8185
};
@@ -131,7 +135,10 @@ export async function stopSlackStream(params: StopSlackStreamParams): Promise<vo
131135
}`,
132136
);
133137

134-
await session.streamer.stop(text ? { markdown_text: text } : undefined);
135-
136-
logVerbose("slack-stream: stream stopped");
138+
try {
139+
await session.streamer.stop(text ? { markdown_text: text } : undefined);
140+
logVerbose("slack-stream: stream stopped");
141+
} catch (err) {
142+
logVerbose(`slack-stream: failed to stop stream (${String(err)})`);
143+
}
137144
}

0 commit comments

Comments
 (0)