-
Notifications
You must be signed in to change notification settings - Fork 855
Expand file tree
/
Copy pathasync_chat_stream.py
More file actions
232 lines (211 loc) · 9.41 KB
/
async_chat_stream.py
File metadata and controls
232 lines (211 loc) · 9.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
#
# *** DO NOT EDIT THIS FILE ***
#
# 1) Modify slack_sdk/web/client.py
# 2) Run `python scripts/codegen.py`
# 3) Run `black slack_sdk/`
#
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
import json
import logging
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union
import slack_sdk.errors as e
from slack_sdk.models.blocks.blocks import Block
from slack_sdk.models.messages.chunk import Chunk, MarkdownTextChunk
from slack_sdk.models.metadata import Metadata
from slack_sdk.web.async_slack_response import AsyncSlackResponse
if TYPE_CHECKING:
from slack_sdk.web.async_client import AsyncWebClient
class AsyncChatStream:
"""A helper class for streaming markdown text into a conversation using the chat streaming APIs.
This class provides a convenient interface for the chat.startStream, chat.appendStream, and chat.stopStream API
methods, with automatic buffering and state management.
"""
def __init__(
self,
client: "AsyncWebClient",
*,
channel: str,
logger: logging.Logger,
thread_ts: str,
buffer_size: int,
recipient_team_id: Optional[str] = None,
recipient_user_id: Optional[str] = None,
task_display_mode: Optional[str] = None,
**kwargs,
):
"""Initialize a new ChatStream instance.
The __init__ method creates a unique ChatStream instance that keeps track of one chat stream.
Args:
client: The WebClient instance to use for API calls.
channel: An encoded ID that represents a channel, private group, or DM.
logger: A logging channel for outputs.
thread_ts: Provide another message's ts value to reply to. Streamed messages should always be replies to a user
request.
recipient_team_id: The encoded ID of the team the user receiving the streaming text belongs to. Required when
streaming to channels.
recipient_user_id: The encoded ID of the user to receive the streaming text. Required when streaming to channels.
task_display_mode: Specifies how tasks are displayed in the message. A "timeline" displays individual tasks
interleaved with text and "plan" displays all tasks together.
buffer_size: The length of markdown_text to buffer in-memory before calling a method. Increasing this value
decreases the number of method calls made for the same amount of text, which is useful to avoid rate limits.
**kwargs: Additional arguments passed to the underlying API calls.
"""
self._client = client
self._logger = logger
self._token: Optional[str] = kwargs.pop("token", None)
self._stream_args = {
"channel": channel,
"thread_ts": thread_ts,
"recipient_team_id": recipient_team_id,
"recipient_user_id": recipient_user_id,
"task_display_mode": task_display_mode,
**kwargs,
}
self._buffer = ""
self._state = "starting"
self._stream_ts: Optional[str] = None
self._buffer_size = buffer_size
async def append(
self,
*,
markdown_text: Optional[str] = None,
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
**kwargs,
) -> Optional[AsyncSlackResponse]:
"""Append to the stream.
The "append" method appends to the chat stream being used. This method can be called multiple times. After the stream
is stopped this method cannot be called.
Args:
chunks: An array of streaming chunks that can contain either markdown text or task updates.
markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
what will be appended to the message received so far.
**kwargs: Additional arguments passed to the underlying API calls.
Returns:
AsyncSlackResponse if the buffer was flushed, None if buffering.
Raises:
SlackRequestError: If the stream is already completed.
Example:
```python
streamer = client.chat_stream(
channel="C0123456789",
thread_ts="1700000001.123456",
recipient_team_id="T0123456789",
recipient_user_id="U0123456789",
)
streamer.append(markdown_text="**hello wo")
streamer.append(markdown_text="rld!**")
streamer.stop()
```
"""
if self._state == "completed":
raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}")
if kwargs.get("token"):
self._token = kwargs.pop("token")
if markdown_text is not None:
self._buffer += markdown_text
if len(self._buffer) >= self._buffer_size or chunks is not None:
return await self._flush_buffer(chunks=chunks, **kwargs)
details = {
"buffer_length": len(self._buffer),
"buffer_size": self._buffer_size,
"channel": self._stream_args.get("channel"),
"recipient_team_id": self._stream_args.get("recipient_team_id"),
"recipient_user_id": self._stream_args.get("recipient_user_id"),
"thread_ts": self._stream_args.get("thread_ts"),
}
self._logger.debug(f"ChatStream appended to buffer: {json.dumps(details)}")
return None
async def stop(
self,
*,
markdown_text: Optional[str] = None,
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None,
metadata: Optional[Union[Dict, Metadata]] = None,
**kwargs,
) -> AsyncSlackResponse:
"""Stop the stream and finalize the message.
Args:
blocks: A list of blocks that will be rendered at the bottom of the finalized message.
chunks: An array of streaming chunks that can contain either markdown text or task updates.
markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
what will be appended to the message received so far.
metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you
post to Slack is accessible to any app or user who is a member of that workspace.
**kwargs: Additional arguments passed to the underlying API calls.
Returns:
AsyncSlackResponse from the chat.stopStream API call.
Raises:
SlackRequestError: If the stream is already completed.
Example:
```python
streamer = client.chat_stream(
channel="C0123456789",
thread_ts="1700000001.123456",
recipient_team_id="T0123456789",
recipient_user_id="U0123456789",
)
streamer.append(markdown_text="**hello wo")
streamer.append(markdown_text="rld!**")
streamer.stop()
```
"""
if self._state == "completed":
raise e.SlackRequestError(f"Cannot stop stream: stream state is {self._state}")
if kwargs.get("token"):
self._token = kwargs.pop("token")
if markdown_text:
self._buffer += markdown_text
if not self._stream_ts:
response = await self._client.chat_startStream(
**self._stream_args,
token=self._token,
)
if not response.get("ts"):
raise e.SlackRequestError("Failed to stop stream: stream not started")
self._stream_ts = str(response["ts"])
self._state = "in_progress"
flushings: List[Union[Dict, Chunk]] = []
if len(self._buffer) != 0:
flushings.append(MarkdownTextChunk(text=self._buffer))
if chunks is not None:
flushings.extend(chunks)
response = await self._client.chat_stopStream(
token=self._token,
channel=self._stream_args["channel"],
ts=self._stream_ts,
blocks=blocks,
chunks=flushings,
metadata=metadata,
**kwargs,
)
self._state = "completed"
return response
async def _flush_buffer(self, chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, **kwargs) -> AsyncSlackResponse:
"""Flush the internal buffer with chunks by making appropriate API calls."""
flushings: List[Union[Dict, Chunk]] = []
if len(self._buffer) != 0:
flushings.append(MarkdownTextChunk(text=self._buffer))
if chunks is not None:
flushings.extend(chunks)
if not self._stream_ts:
response = await self._client.chat_startStream(
**self._stream_args,
token=self._token,
**kwargs,
chunks=flushings,
)
self._stream_ts = response.get("ts")
self._state = "in_progress"
else:
response = await self._client.chat_appendStream(
token=self._token,
channel=self._stream_args["channel"],
ts=self._stream_ts,
**kwargs,
chunks=flushings,
)
self._buffer = ""
return response