Skip to content

Commit 826e46c

Browse files
committed
feat: buffer text according to unsent character counts
1 parent 6a7e7ba commit 826e46c

2 files changed

Lines changed: 92 additions & 87 deletions

File tree

packages/web-api/src/WebClient.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import isElectron from 'is-electron';
1717
import isStream from 'is-stream';
1818
import pQueue from 'p-queue';
1919
import pRetry, { AbortError } from 'p-retry';
20-
import ChatStreamer from './chat-stream';
20+
import { ChatStreamer, type ChatStreamerOptions } from './chat-stream';
2121
import {
2222
httpErrorFromResponse,
2323
platformErrorFromResult,
@@ -516,7 +516,7 @@ export class WebClient extends Methods {
516516
* @description The "chatStream" method starts a new chat stream in a coversation that can be appended to. After appending an entire message, the stream can be stopped with concluding arguments such as "blocks" for gathering feedback.
517517
*
518518
* @example
519-
* const streamer = await client.chatStream({
519+
* const streamer = client.chatStream({
520520
* channel: "C0123456789",
521521
* thread_ts: "1700000001.123456",
522522
* recipient_team_id: "T0123456789",
@@ -534,10 +534,12 @@ export class WebClient extends Methods {
534534
* @see {@link https://docs.slack.dev/reference/methods/chat.appendStream}
535535
* @see {@link https://docs.slack.dev/reference/methods/chat.stopStream}
536536
*/
537-
public async chatStream(params: ChatStartStreamArguments): Promise<ChatStreamer> {
538-
const streamer = new ChatStreamer(this);
539-
await streamer.start(params);
540-
return streamer;
537+
public chatStream(params: Omit<ChatStartStreamArguments & ChatStreamerOptions, 'markdown_text'>): ChatStreamer {
538+
const { buffer_size, ...args } = params;
539+
const options: ChatStreamerOptions = {
540+
buffer_size,
541+
};
542+
return new ChatStreamer(this, this.logger, args, options);
541543
}
542544

543545
/**
Lines changed: 84 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,27 @@
1-
import type { ExcludeFromUnion } from './types/helpers';
1+
import type { Logger } from '@slack/logger';
22
import type { ChatAppendStreamArguments, ChatStartStreamArguments, ChatStopStreamArguments } from './types/request';
33
import type { ChatAppendStreamResponse, ChatStartStreamResponse, ChatStopStreamResponse } from './types/response';
44
import type WebClient from './WebClient';
55

6-
export default class ChatStreamer {
7-
private channel: string | undefined;
6+
export interface ChatStreamerOptions {
7+
/**
8+
* @description The length of markdown_text to buffer in-memory before calling a method. Increasing this value decreases the number of method calls made for the same amount of text, which is useful to avoid rate limits.
9+
* @default 250
10+
*/
11+
buffer_size?: number;
12+
}
13+
14+
export class ChatStreamer {
15+
private args: ChatStartStreamArguments;
16+
17+
private buffer = '';
818

919
private client: WebClient;
1020

21+
private logger: Logger;
22+
23+
private options: Required<ChatStreamerOptions>;
24+
1125
private state: 'starting' | 'in_progress' | 'completed';
1226

1327
private streamTs: string | undefined;
@@ -20,13 +34,14 @@ export default class ChatStreamer {
2034
* @description The "constructor" method creates a unique {@link ChatStreamer} instance that keeps track of one chat stream. The stream must be started.
2135
* @example
2236
* const client = new WebClient(process.env.SLACK_BOT_TOKEN);
23-
* const streamer = new ChatStreamer(client);
24-
* const _response = await streamer.start({
37+
* const logger = new ConsoleLogger();
38+
* const args = {
2539
* channel: "C0123456789",
2640
* thread_ts: "1700000001.123456",
2741
* recipient_team_id: "T0123456789",
2842
* recipient_user_id: "U0123456789",
29-
* });
43+
* };
44+
* const streamer = new ChatStreamer(client, logger, args, { buffer_size: 500 });
3045
* await streamer.append({
3146
* markdown_text: "**hello world!**",
3247
* });
@@ -35,67 +50,22 @@ export default class ChatStreamer {
3550
* @see {@link https://docs.slack.dev/reference/methods/chat.appendStream}
3651
* @see {@link https://docs.slack.dev/reference/methods/chat.stopStream}
3752
*/
38-
constructor(client: WebClient) {
53+
constructor(client: WebClient, logger: Logger, args: ChatStartStreamArguments, options: ChatStreamerOptions) {
54+
this.args = args;
3955
this.client = client;
56+
this.logger = logger;
57+
this.options = {
58+
buffer_size: options.buffer_size ?? 250,
59+
};
4060
this.state = 'starting';
4161
}
4262

43-
/**
44-
* Start a new stream.
45-
*
46-
* @description The "start" method starts a new chat stream unique to a {@link ChatStreamer} being used. This method can be called once. If the chat stream was created with the {@link WebClient#chatStream} method this method should not be called because the stream is started already.
47-
* @example
48-
* const streamer = await client.chatStream({
49-
* channel: "C0123456789",
50-
* thread_ts: "1700000001.123456",
51-
* recipient_team_id: "T0123456789",
52-
* recipient_user_id: "U0123456789",
53-
* });
54-
* await streamer.append({
55-
* markdown_text: "**hello world!**",
56-
* });
57-
* await streamer.stop();
58-
* @example
59-
* const client = new WebClient(process.env.SLACK_BOT_TOKEN);
60-
* const streamer = new ChatStreamer(client);
61-
* const _response = await streamer.start({
62-
* channel: "C0123456789",
63-
* thread_ts: "1700000001.123456",
64-
* recipient_team_id: "T0123456789",
65-
* recipient_user_id: "U0123456789",
66-
* });
67-
* await streamer.append({
68-
* markdown_text: "**hello world!**",
69-
* });
70-
* await streamer.stop();
71-
* @see {@link https://docs.slack.dev/reference/methods/chat.startStream}
72-
*/
73-
async start(params: ChatStartStreamArguments): Promise<ChatStartStreamResponse> {
74-
if (this.state !== 'starting') {
75-
throw new Error(`failed to start stream: stream state is ${this.state}`);
76-
}
77-
if (params.token) {
78-
this.token = params.token;
79-
}
80-
const response = await this.client.chat.startStream({
81-
token: this.token,
82-
...params,
83-
});
84-
if (!response.ts) {
85-
throw new Error(`failed to start stream: ${response.error}`);
86-
}
87-
this.channel = params.channel;
88-
this.state = 'in_progress';
89-
this.streamTs = response.ts;
90-
return response;
91-
}
92-
9363
/**
9464
* Append to a stream.
9565
*
9666
* @description The "append" method appends to the chat stream being used. This method can be called multiple times. After the stream is stopped this method cannot be called.
9767
* @example
98-
* const streamer = await client.stream({
68+
* const streamer = client.startStream({
9969
* channel: "C0123456789",
10070
* thread_ts: "1700000001.123456",
10171
* recipient_team_id: "T0123456789",
@@ -111,27 +81,28 @@ export default class ChatStreamer {
11181
* @see {@link https://docs.slack.dev/reference/methods/chat.appendStream}
11282
*/
11383
async append(
114-
params: ExcludeFromUnion<ChatAppendStreamArguments, 'channel' | 'ts'>,
115-
): Promise<ChatAppendStreamResponse> {
116-
if (this.state !== 'in_progress') {
84+
params: Omit<ChatAppendStreamArguments, 'channel' | 'ts'>,
85+
): Promise<ChatStartStreamResponse | ChatAppendStreamResponse | null> {
86+
if (this.state === 'completed') {
11787
throw new Error(`failed to append stream: stream state is ${this.state}`);
11888
}
11989
if (params.token) {
12090
this.token = params.token;
12191
}
122-
if (!this.channel) {
123-
throw new Error('failed to append stream: channel not found');
92+
this.buffer += params.markdown_text;
93+
if (this.buffer.length >= this.options.buffer_size) {
94+
return await this.flushBuffer(params);
12495
}
125-
if (!this.streamTs) {
126-
throw new Error('failed to append stream: ts not found');
127-
}
128-
const response = await this.client.chat.appendStream({
129-
token: this.token,
130-
channel: this.channel,
131-
ts: this.streamTs,
132-
...params,
133-
});
134-
return response;
96+
const details = {
97+
bufferLength: this.buffer.length,
98+
bufferSize: this.options.buffer_size,
99+
channel: this.args.channel,
100+
recipientTeamId: this.args.recipient_team_id,
101+
recipientUserId: this.args.recipient_user_id,
102+
threadTs: this.args.thread_ts,
103+
};
104+
this.logger.debug(`ChatStreamer appended to buffer: ${JSON.stringify(details)}`);
105+
return null;
135106
}
136107

137108
/**
@@ -140,7 +111,7 @@ export default class ChatStreamer {
140111
* @description The "stop" method stops the chat stream being used. This method can be called once to end the stream. Additional "blocks" and "metadata" can be provided.
141112
*
142113
* @example
143-
* const streamer = await client.stream({
114+
* const streamer = client.startStream({
144115
* channel: "C0123456789",
145116
* thread_ts: "1700000001.123456",
146117
* recipient_team_id: "T0123456789",
@@ -152,26 +123,58 @@ export default class ChatStreamer {
152123
* await streamer.stop();
153124
* @see {@link https://docs.slack.dev/reference/methods/chat.stopStream}
154125
*/
155-
async stop(params?: ExcludeFromUnion<ChatStopStreamArguments, 'channel' | 'ts'>): Promise<ChatStopStreamResponse> {
156-
if (this.state !== 'in_progress') {
126+
async stop(params?: Omit<ChatStopStreamArguments, 'channel' | 'ts'>): Promise<ChatStopStreamResponse> {
127+
if (this.state === 'completed') {
157128
throw new Error(`failed to stop stream: stream state is ${this.state}`);
158129
}
159130
if (params?.token) {
160131
this.token = params.token;
161132
}
162-
if (!this.channel) {
163-
throw new Error('failed to stop stream: channel not found');
164-
}
165133
if (!this.streamTs) {
166-
throw new Error('failed to stop stream: ts not found');
134+
const response = await this.client.chat.startStream({
135+
...this.args,
136+
token: this.token,
137+
});
138+
if (!response.ts) {
139+
throw new Error('failed to stop stream: stream not started');
140+
}
141+
this.streamTs = response.ts;
142+
this.state = 'in_progress';
167143
}
168144
const response = await this.client.chat.stopStream({
169145
token: this.token,
170-
channel: this.channel,
146+
channel: this.args.channel,
171147
ts: this.streamTs,
172148
...params,
149+
markdown_text: this.buffer + (params?.markdown_text ?? ''),
173150
});
174151
this.state = 'completed';
175152
return response;
176153
}
154+
155+
private async flushBuffer(
156+
params: Omit<ChatStartStreamArguments | ChatAppendStreamArguments, 'channel' | 'ts'>,
157+
): Promise<ChatStartStreamResponse | ChatAppendStreamResponse> {
158+
if (!this.streamTs) {
159+
const response = await this.client.chat.startStream({
160+
...this.args,
161+
token: this.token,
162+
...params,
163+
markdown_text: this.buffer,
164+
});
165+
this.buffer = '';
166+
this.streamTs = response.ts;
167+
this.state = 'in_progress';
168+
return response;
169+
}
170+
const response = await this.client.chat.appendStream({
171+
token: this.token,
172+
channel: this.args.channel,
173+
ts: this.streamTs,
174+
...params,
175+
markdown_text: this.buffer,
176+
});
177+
this.buffer = '';
178+
return response;
179+
}
177180
}

0 commit comments

Comments
 (0)