Skip to content

Commit ffeb783

Browse files
zimegmwbrooksmcodik
authored
feat(web-api): add a chat stream method to the web client (#2379)
Co-authored-by: Michael Brooks <mbrooks@slack-corp.com> Co-authored-by: Maurice Codik <mcodik@slack-corp.com>
1 parent 520a2d6 commit ffeb783

File tree

3 files changed

+367
-2
lines changed

3 files changed

+367
-2
lines changed

packages/web-api/src/WebClient.spec.ts

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,6 +1214,159 @@ describe('WebClient', () => {
12141214
});
12151215
});
12161216

1217+
describe('chatStream', () => {
1218+
it('streams a short message', async () => {
1219+
const scope = nock('https://slack.com')
1220+
.post('/api/chat.startStream', {
1221+
channel: 'C0123456789',
1222+
thread_ts: '123.000',
1223+
recipient_team_id: 'T0123456789',
1224+
recipient_user_id: 'U0123456789',
1225+
})
1226+
.reply(200, {
1227+
ok: true,
1228+
ts: '123.123',
1229+
})
1230+
.post('/api/chat.stopStream', { channel: 'C0123456789', ts: '123.123', markdown_text: 'nice!' })
1231+
.reply(200, {
1232+
ok: true,
1233+
});
1234+
const streamer = client.chatStream({
1235+
channel: 'C0123456789',
1236+
thread_ts: '123.000',
1237+
recipient_team_id: 'T0123456789',
1238+
recipient_user_id: 'U0123456789',
1239+
});
1240+
await streamer.append({
1241+
markdown_text: 'nice!',
1242+
});
1243+
await streamer.stop();
1244+
scope.done();
1245+
});
1246+
1247+
it('streams a long message', async () => {
1248+
const scope = nock('https://slack.com')
1249+
.post('/api/chat.startStream', {
1250+
channel: 'C0123456789',
1251+
markdown_text: '**this messag',
1252+
recipient_team_id: 'T0123456789',
1253+
recipient_user_id: 'U0123456789',
1254+
thread_ts: '123.000',
1255+
})
1256+
.reply(200, {
1257+
ok: true,
1258+
ts: '123.123',
1259+
})
1260+
.post('/api/chat.appendStream', {
1261+
channel: 'C0123456789',
1262+
markdown_text: 'e is bold!',
1263+
token: 'xoxb-updated-1',
1264+
ts: '123.123',
1265+
})
1266+
.reply(200, {
1267+
ok: true,
1268+
})
1269+
.post('/api/chat.stopStream', {
1270+
channel: 'C0123456789',
1271+
markdown_text: '**',
1272+
token: 'xoxb-updated-2',
1273+
ts: '123.123',
1274+
})
1275+
.reply(200, {
1276+
ok: true,
1277+
});
1278+
const streamer = client.chatStream({
1279+
buffer_size: 5,
1280+
channel: 'C0123456789',
1281+
recipient_team_id: 'T0123456789',
1282+
recipient_user_id: 'U0123456789',
1283+
thread_ts: '123.000',
1284+
});
1285+
await streamer.append({
1286+
markdown_text: '**this messag',
1287+
});
1288+
await streamer.append({
1289+
markdown_text: 'e is',
1290+
token: 'xoxb-updated-1',
1291+
});
1292+
await streamer.append({
1293+
markdown_text: ' bold!',
1294+
});
1295+
await streamer.append({
1296+
markdown_text: '*',
1297+
});
1298+
await streamer.stop({
1299+
markdown_text: '*',
1300+
token: 'xoxb-updated-2',
1301+
});
1302+
scope.done();
1303+
});
1304+
1305+
it('errors when appending to an unstarted stream', async () => {
1306+
const scope = nock('https://slack.com')
1307+
.post('/api/chat.startStream', {
1308+
channel: 'U0123456789',
1309+
thread_ts: '123.000',
1310+
})
1311+
.reply(200, {
1312+
ok: true,
1313+
ts: undefined,
1314+
});
1315+
const streamer = client.chatStream({
1316+
channel: 'U0123456789',
1317+
thread_ts: '123.000',
1318+
});
1319+
try {
1320+
await streamer.stop();
1321+
assert.fail();
1322+
} catch (error) {
1323+
assert.equal((error as Error).message, 'failed to stop stream: stream not started');
1324+
}
1325+
scope.done();
1326+
});
1327+
1328+
it('errors when appending to a completed stream', async () => {
1329+
const scope = nock('https://slack.com')
1330+
.post('/api/chat.startStream', {
1331+
channel: 'C0123456789',
1332+
thread_ts: '123.000',
1333+
recipient_team_id: 'T0123456789',
1334+
recipient_user_id: 'U0123456789',
1335+
})
1336+
.reply(200, {
1337+
ok: true,
1338+
ts: '123.123',
1339+
})
1340+
.post('/api/chat.stopStream', { channel: 'C0123456789', ts: '123.123', markdown_text: 'nice!' })
1341+
.reply(200, {
1342+
ok: true,
1343+
});
1344+
const streamer = client.chatStream({
1345+
channel: 'C0123456789',
1346+
thread_ts: '123.000',
1347+
recipient_team_id: 'T0123456789',
1348+
recipient_user_id: 'U0123456789',
1349+
});
1350+
await streamer.append({
1351+
markdown_text: 'nice!',
1352+
});
1353+
await streamer.stop();
1354+
try {
1355+
await streamer.append({ markdown_text: 'more...' });
1356+
assert.fail();
1357+
} catch (error) {
1358+
assert.equal((error as Error).message, 'failed to append stream: stream state is completed');
1359+
}
1360+
try {
1361+
await streamer.stop();
1362+
assert.fail();
1363+
} catch (error) {
1364+
assert.equal((error as Error).message, 'failed to stop stream: stream state is completed');
1365+
}
1366+
scope.done();
1367+
});
1368+
});
1369+
12171370
describe('filesUploadV2', () => {
12181371
it('uploads a single file', async () => {
12191372
const scope = nock('https://slack.com')

packages/web-api/src/WebClient.ts

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import isElectron from 'is-electron';
1717
import isStream from 'is-stream';
1818
import pQueue from 'p-queue';
1919
import pRetry, { AbortError } from 'p-retry';
20-
20+
import { ChatStreamer, type ChatStreamerOptions } from './chat-stream';
2121
import {
2222
httpErrorFromResponse,
2323
platformErrorFromResult,
@@ -35,8 +35,8 @@ import { getUserAgent } from './instrument';
3535
import { getLogger, type Logger, LogLevel } from './logger';
3636
import { Methods } from './methods';
3737
import { type RetryOptions, tenRetriesInAboutThirtyMinutes } from './retry-policies';
38+
import type { ChatStartStreamArguments } from './types/request';
3839
import type { CursorPaginationEnabled } from './types/request/common';
39-
4040
import type {
4141
FilesCompleteUploadExternalArguments,
4242
FilesGetUploadURLExternalArguments,
@@ -510,6 +510,38 @@ export class WebClient extends Methods {
510510
})();
511511
}
512512

513+
/**
514+
* Stream markdown text into a conversation.
515+
*
516+
* @description The "chatStream" method starts a new chat stream in a coversation that can be appended to. After appending an entire message, the stream can be stopped with concluding arguments such as "blocks" for gathering feedback.
517+
*
518+
* @example
519+
* const streamer = client.chatStream({
520+
* channel: "C0123456789",
521+
* thread_ts: "1700000001.123456",
522+
* recipient_team_id: "T0123456789",
523+
* recipient_user_id: "U0123456789",
524+
* });
525+
* await streamer.append({
526+
* markdown_text: "**hello wo",
527+
* });
528+
* await streamer.append({
529+
* markdown_text: "rld!**",
530+
* });
531+
* await streamer.stop();
532+
*
533+
* @see {@link https://docs.slack.dev/reference/methods/chat.startStream}
534+
* @see {@link https://docs.slack.dev/reference/methods/chat.appendStream}
535+
* @see {@link https://docs.slack.dev/reference/methods/chat.stopStream}
536+
*/
537+
public chatStream(params: Omit<ChatStartStreamArguments & ChatStreamerOptions, 'markdown_text'>): ChatStreamer {
538+
const { buffer_size, ...args } = params;
539+
const options: ChatStreamerOptions = {
540+
buffer_size,
541+
};
542+
return new ChatStreamer(this, this.logger, args, options);
543+
}
544+
513545
/**
514546
* This wrapper method provides an easy way to upload files using the following endpoints:
515547
*
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
import type { Logger } from '@slack/logger';
2+
import type { ChatAppendStreamArguments, ChatStartStreamArguments, ChatStopStreamArguments } from './types/request';
3+
import type { ChatAppendStreamResponse, ChatStartStreamResponse, ChatStopStreamResponse } from './types/response';
4+
import type WebClient from './WebClient';
5+
6+
export interface ChatStreamerOptions {
7+
/**
8+
* @description The length of markdown_text to buffer in-memory before calling a method. Increasing this value decreases the number of method calls made for the same amount of text, which is useful to avoid rate limits.
9+
* @default 256
10+
*/
11+
buffer_size?: number;
12+
}
13+
14+
export class ChatStreamer {
15+
private buffer = '';
16+
17+
private client: WebClient;
18+
19+
private logger: Logger;
20+
21+
private options: Required<ChatStreamerOptions>;
22+
23+
private state: 'starting' | 'in_progress' | 'completed';
24+
25+
private streamArgs: ChatStartStreamArguments;
26+
27+
private streamTs: string | undefined;
28+
29+
private token: string | undefined;
30+
31+
/**
32+
* Instantiate a new chat streamer.
33+
*
34+
* @description The "constructor" method creates a unique {@link ChatStreamer} instance that keeps track of one chat stream.
35+
* @example
36+
* const client = new WebClient(process.env.SLACK_BOT_TOKEN);
37+
* const logger = new ConsoleLogger();
38+
* const args = {
39+
* channel: "C0123456789",
40+
* thread_ts: "1700000001.123456",
41+
* recipient_team_id: "T0123456789",
42+
* recipient_user_id: "U0123456789",
43+
* };
44+
* const streamer = new ChatStreamer(client, logger, args, { buffer_size: 512 });
45+
* await streamer.append({
46+
* markdown_text: "**hello world!**",
47+
* });
48+
* await streamer.stop();
49+
* @see {@link https://docs.slack.dev/reference/methods/chat.startStream}
50+
* @see {@link https://docs.slack.dev/reference/methods/chat.appendStream}
51+
* @see {@link https://docs.slack.dev/reference/methods/chat.stopStream}
52+
*/
53+
constructor(client: WebClient, logger: Logger, args: ChatStartStreamArguments, options: ChatStreamerOptions) {
54+
this.client = client;
55+
this.logger = logger;
56+
this.options = {
57+
buffer_size: options.buffer_size ?? 256,
58+
};
59+
this.state = 'starting';
60+
this.streamArgs = args;
61+
}
62+
63+
/**
64+
* Append to a stream.
65+
*
66+
* @description The "append" method appends to the chat stream being used. This method can be called multiple times. After the stream is stopped this method cannot be called.
67+
* @example
68+
* const streamer = client.chatStream({
69+
* channel: "C0123456789",
70+
* thread_ts: "1700000001.123456",
71+
* recipient_team_id: "T0123456789",
72+
* recipient_user_id: "U0123456789",
73+
* });
74+
* await streamer.append({
75+
* markdown_text: "**hello wo",
76+
* });
77+
* await streamer.append({
78+
* markdown_text: "rld!**",
79+
* });
80+
* await streamer.stop();
81+
* @see {@link https://docs.slack.dev/reference/methods/chat.appendStream}
82+
*/
83+
async append(
84+
args: Omit<ChatAppendStreamArguments, 'channel' | 'ts'>,
85+
): Promise<ChatStartStreamResponse | ChatAppendStreamResponse | null> {
86+
if (this.state === 'completed') {
87+
throw new Error(`failed to append stream: stream state is ${this.state}`);
88+
}
89+
if (args.token) {
90+
this.token = args.token;
91+
}
92+
this.buffer += args.markdown_text;
93+
if (this.buffer.length >= this.options.buffer_size) {
94+
return await this.flushBuffer(args);
95+
}
96+
const details = {
97+
bufferLength: this.buffer.length,
98+
bufferSize: this.options.buffer_size,
99+
channel: this.streamArgs.channel,
100+
recipientTeamId: this.streamArgs.recipient_team_id,
101+
recipientUserId: this.streamArgs.recipient_user_id,
102+
threadTs: this.streamArgs.thread_ts,
103+
};
104+
this.logger.debug(`ChatStreamer appended to buffer: ${JSON.stringify(details)}`);
105+
return null;
106+
}
107+
108+
/**
109+
* Stop a stream.
110+
*
111+
* @description The "stop" method stops the chat stream being used. This method can be called once to end the stream. Additional "blocks" and "metadata" can be provided.
112+
*
113+
* @example
114+
* const streamer = client.chatStream({
115+
* channel: "C0123456789",
116+
* thread_ts: "1700000001.123456",
117+
* recipient_team_id: "T0123456789",
118+
* recipient_user_id: "U0123456789",
119+
* });
120+
* await streamer.append({
121+
* markdown_text: "**hello world!**",
122+
* });
123+
* await streamer.stop();
124+
* @see {@link https://docs.slack.dev/reference/methods/chat.stopStream}
125+
*/
126+
async stop(args?: Omit<ChatStopStreamArguments, 'channel' | 'ts'>): Promise<ChatStopStreamResponse> {
127+
if (this.state === 'completed') {
128+
throw new Error(`failed to stop stream: stream state is ${this.state}`);
129+
}
130+
if (args?.token) {
131+
this.token = args.token;
132+
}
133+
if (!this.streamTs) {
134+
const response = await this.client.chat.startStream({
135+
...this.streamArgs,
136+
token: this.token,
137+
});
138+
if (!response.ts) {
139+
throw new Error('failed to stop stream: stream not started');
140+
}
141+
this.streamTs = response.ts;
142+
this.state = 'in_progress';
143+
}
144+
const response = await this.client.chat.stopStream({
145+
token: this.token,
146+
channel: this.streamArgs.channel,
147+
ts: this.streamTs,
148+
...args,
149+
markdown_text: this.buffer + (args?.markdown_text ?? ''),
150+
});
151+
this.state = 'completed';
152+
return response;
153+
}
154+
155+
private async flushBuffer(
156+
args: Omit<ChatStartStreamArguments | ChatAppendStreamArguments, 'channel' | 'ts'>,
157+
): Promise<ChatStartStreamResponse | ChatAppendStreamResponse> {
158+
if (!this.streamTs) {
159+
const response = await this.client.chat.startStream({
160+
...this.streamArgs,
161+
token: this.token,
162+
...args,
163+
markdown_text: this.buffer,
164+
});
165+
this.buffer = '';
166+
this.streamTs = response.ts;
167+
this.state = 'in_progress';
168+
return response;
169+
}
170+
const response = await this.client.chat.appendStream({
171+
token: this.token,
172+
channel: this.streamArgs.channel,
173+
ts: this.streamTs,
174+
...args,
175+
markdown_text: this.buffer,
176+
});
177+
this.buffer = '';
178+
return response;
179+
}
180+
}

0 commit comments

Comments
 (0)