Skip to content

Commit 5de794b

Browse files
committed
feat: support and flush chunks in the chat stream helper
1 parent 1fb7355 commit 5de794b

6 files changed

Lines changed: 119 additions & 25 deletions

File tree

slack_sdk/web/async_chat_stream.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import slack_sdk.errors as e
1616
from slack_sdk.models.blocks.blocks import Block
17+
from slack_sdk.models.messages.chunk import Chunk
1718
from slack_sdk.models.metadata import Metadata
1819
from slack_sdk.web.async_slack_response import AsyncSlackResponse
1920

@@ -75,7 +76,8 @@ def __init__(
7576
async def append(
7677
self,
7778
*,
78-
markdown_text: str,
79+
markdown_text: Optional[str] = None,
80+
chunks: Optional[Sequence[Chunk]] = None,
7981
**kwargs,
8082
) -> Optional[AsyncSlackResponse]:
8183
"""Append to the stream.
@@ -84,6 +86,7 @@ async def append(
8486
is stopped this method cannot be called.
8587
8688
Args:
89+
chunks: An array of streaming chunks that can contain either markdown text or task updates.
8790
markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
8891
what will be appended to the message received so far.
8992
**kwargs: Additional arguments passed to the underlying API calls.
@@ -111,9 +114,10 @@ async def append(
111114
raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}")
112115
if kwargs.get("token"):
113116
self._token = kwargs.pop("token")
114-
self._buffer += markdown_text
115-
if len(self._buffer) >= self._buffer_size:
116-
return await self._flush_buffer(**kwargs)
117+
if markdown_text is not None:
118+
self._buffer += markdown_text
119+
if len(self._buffer) >= self._buffer_size or chunks is not None:
120+
return await self._flush_buffer(chunks=chunks, **kwargs)
117121
details = {
118122
"buffer_length": len(self._buffer),
119123
"buffer_size": self._buffer_size,
@@ -129,6 +133,7 @@ async def stop(
129133
self,
130134
*,
131135
markdown_text: Optional[str] = None,
136+
chunks: Optional[Sequence[Chunk]] = None,
132137
blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None,
133138
metadata: Optional[Union[Dict, Metadata]] = None,
134139
**kwargs,
@@ -137,6 +142,7 @@ async def stop(
137142
138143
Args:
139144
blocks: A list of blocks that will be rendered at the bottom of the finalized message.
145+
chunks: An array of streaming chunks that can contain either markdown text or task updates.
140146
markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
141147
what will be appended to the message received so far.
142148
metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you
@@ -177,26 +183,36 @@ async def stop(
177183
raise e.SlackRequestError("Failed to stop stream: stream not started")
178184
self._stream_ts = str(response["ts"])
179185
self._state = "in_progress"
186+
flushings = []
187+
if len(self._buffer) != 0:
188+
flushings.append({"type": "markdown_text", "text": self._buffer})
189+
if chunks is not None:
190+
flushings.extend(chunks)
180191
response = await self._client.chat_stopStream(
181192
token=self._token,
182193
channel=self._stream_args["channel"],
183194
ts=self._stream_ts,
184195
blocks=blocks,
185-
markdown_text=self._buffer,
196+
chunks=flushings,
186197
metadata=metadata,
187198
**kwargs,
188199
)
189200
self._state = "completed"
190201
return response
191202

192-
async def _flush_buffer(self, **kwargs) -> AsyncSlackResponse:
193-
"""Flush the internal buffer by making appropriate API calls."""
203+
async def _flush_buffer(self, chunks: Optional[Sequence[Chunk]] = None, **kwargs) -> AsyncSlackResponse:
204+
"""Flush the internal buffer with chunks by making appropriate API calls."""
205+
flushings = []
206+
if len(self._buffer) != 0:
207+
flushings.append({"type": "markdown_text", "text": self._buffer})
208+
if chunks is not None:
209+
flushings.extend(chunks)
194210
if not self._stream_ts:
195211
response = await self._client.chat_startStream(
196212
**self._stream_args,
197213
token=self._token,
198214
**kwargs,
199-
markdown_text=self._buffer,
215+
chunks=flushings,
200216
)
201217
self._stream_ts = response.get("ts")
202218
self._state = "in_progress"
@@ -206,7 +222,7 @@ async def _flush_buffer(self, **kwargs) -> AsyncSlackResponse:
206222
channel=self._stream_args["channel"],
207223
ts=self._stream_ts,
208224
**kwargs,
209-
markdown_text=self._buffer,
225+
chunks=flushings,
210226
)
211227
self._buffer = ""
212228
return response

slack_sdk/web/async_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2631,7 +2631,7 @@ async def chat_appendStream(
26312631
*,
26322632
channel: str,
26332633
ts: str,
2634-
markdown_text: str,
2634+
markdown_text: Optional[str] = None,
26352635
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
26362636
**kwargs,
26372637
) -> AsyncSlackResponse:

slack_sdk/web/chat_stream.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import slack_sdk.errors as e
66
from slack_sdk.models.blocks.blocks import Block
7+
from slack_sdk.models.messages.chunk import Chunk
78
from slack_sdk.models.metadata import Metadata
89
from slack_sdk.web.slack_response import SlackResponse
910

@@ -65,7 +66,8 @@ def __init__(
6566
def append(
6667
self,
6768
*,
68-
markdown_text: str,
69+
markdown_text: Optional[str] = None,
70+
chunks: Optional[Sequence[Chunk]] = None,
6971
**kwargs,
7072
) -> Optional[SlackResponse]:
7173
"""Append to the stream.
@@ -74,6 +76,7 @@ def append(
7476
is stopped this method cannot be called.
7577
7678
Args:
79+
chunks: An array of streaming chunks that can contain either markdown text or task updates.
7780
markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
7881
what will be appended to the message received so far.
7982
**kwargs: Additional arguments passed to the underlying API calls.
@@ -101,9 +104,10 @@ def append(
101104
raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}")
102105
if kwargs.get("token"):
103106
self._token = kwargs.pop("token")
104-
self._buffer += markdown_text
105-
if len(self._buffer) >= self._buffer_size:
106-
return self._flush_buffer(**kwargs)
107+
if markdown_text is not None:
108+
self._buffer += markdown_text
109+
if len(self._buffer) >= self._buffer_size or chunks is not None:
110+
return self._flush_buffer(chunks=chunks, **kwargs)
107111
details = {
108112
"buffer_length": len(self._buffer),
109113
"buffer_size": self._buffer_size,
@@ -119,6 +123,7 @@ def stop(
119123
self,
120124
*,
121125
markdown_text: Optional[str] = None,
126+
chunks: Optional[Sequence[Chunk]] = None,
122127
blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None,
123128
metadata: Optional[Union[Dict, Metadata]] = None,
124129
**kwargs,
@@ -127,6 +132,7 @@ def stop(
127132
128133
Args:
129134
blocks: A list of blocks that will be rendered at the bottom of the finalized message.
135+
chunks: An array of streaming chunks that can contain either markdown text or task updates.
130136
markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
131137
what will be appended to the message received so far.
132138
metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you
@@ -167,26 +173,36 @@ def stop(
167173
raise e.SlackRequestError("Failed to stop stream: stream not started")
168174
self._stream_ts = str(response["ts"])
169175
self._state = "in_progress"
176+
flushings = []
177+
if len(self._buffer) != 0:
178+
flushings.append({"type": "markdown_text", "text": self._buffer})
179+
if chunks is not None:
180+
flushings.extend(chunks)
170181
response = self._client.chat_stopStream(
171182
token=self._token,
172183
channel=self._stream_args["channel"],
173184
ts=self._stream_ts,
174185
blocks=blocks,
175-
markdown_text=self._buffer,
186+
chunks=flushings,
176187
metadata=metadata,
177188
**kwargs,
178189
)
179190
self._state = "completed"
180191
return response
181192

182-
def _flush_buffer(self, **kwargs) -> SlackResponse:
183-
"""Flush the internal buffer by making appropriate API calls."""
193+
def _flush_buffer(self, chunks: Optional[Sequence[Chunk]] = None, **kwargs) -> SlackResponse:
194+
"""Flush the internal buffer with chunks by making appropriate API calls."""
195+
flushings = []
196+
if len(self._buffer) != 0:
197+
flushings.append({"type": "markdown_text", "text": self._buffer})
198+
if chunks is not None:
199+
flushings.extend(chunks)
184200
if not self._stream_ts:
185201
response = self._client.chat_startStream(
186202
**self._stream_args,
187203
token=self._token,
188204
**kwargs,
189-
markdown_text=self._buffer,
205+
chunks=flushings,
190206
)
191207
self._stream_ts = response.get("ts")
192208
self._state = "in_progress"
@@ -196,7 +212,7 @@ def _flush_buffer(self, **kwargs) -> SlackResponse:
196212
channel=self._stream_args["channel"],
197213
ts=self._stream_ts,
198214
**kwargs,
199-
markdown_text=self._buffer,
215+
chunks=flushings,
200216
)
201217
self._buffer = ""
202218
return response

slack_sdk/web/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2621,7 +2621,7 @@ def chat_appendStream(
26212621
*,
26222622
channel: str,
26232623
ts: str,
2624-
markdown_text: str,
2624+
markdown_text: Optional[str] = None,
26252625
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
26262626
**kwargs,
26272627
) -> SlackResponse:

slack_sdk/web/legacy_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2632,7 +2632,7 @@ def chat_appendStream(
26322632
*,
26332633
channel: str,
26342634
ts: str,
2635-
markdown_text: str,
2635+
markdown_text: Optional[str] = None,
26362636
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
26372637
**kwargs,
26382638
) -> Union[Future, SlackResponse]:

tests/slack_sdk/web/test_chat_stream.py

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from slack_sdk.models.blocks.basic_components import FeedbackButtonObject
88
from slack_sdk.models.blocks.block_elements import FeedbackButtonsElement, IconButtonElement
99
from slack_sdk.models.blocks.blocks import ContextActionsBlock
10+
from slack_sdk.models.messages.chunk import MarkdownTextChunk, TaskUpdateChunk
1011
from tests.mock_web_api_server import cleanup_mock_web_api_server, setup_mock_web_api_server
1112
from tests.slack_sdk.web.mock_web_api_handler import MockHandler
1213

@@ -105,7 +106,7 @@ def test_streams_a_short_message(self):
105106
stop_request = self.thread.server.chat_stream_requests.get("/chat.stopStream", {})
106107
self.assertEqual(stop_request.get("channel"), "C0123456789")
107108
self.assertEqual(stop_request.get("ts"), "123.123")
108-
self.assertEqual(stop_request.get("markdown_text"), "nice!")
109+
self.assertEqual(stop_request.get("chunks"), [{"type": "markdown_text", "text": "nice!"}])
109110

110111
def test_streams_a_long_message(self):
111112
streamer = self.client.chat_stream(
@@ -146,13 +147,13 @@ def test_streams_a_long_message(self):
146147
start_request = self.thread.server.chat_stream_requests.get("/chat.startStream", {})
147148
self.assertEqual(start_request.get("channel"), "C0123456789")
148149
self.assertEqual(start_request.get("thread_ts"), "123.000")
149-
self.assertEqual(start_request.get("markdown_text"), "**this messag")
150+
self.assertEqual(start_request.get("chunks"), [{"type": "markdown_text", "text": "**this messag"}])
150151
self.assertEqual(start_request.get("recipient_team_id"), "T0123456789")
151152
self.assertEqual(start_request.get("recipient_user_id"), "U0123456789")
152153

153154
append_request = self.thread.server.chat_stream_requests.get("/chat.appendStream", {})
154155
self.assertEqual(append_request.get("channel"), "C0123456789")
155-
self.assertEqual(append_request.get("markdown_text"), "e is bold!")
156+
self.assertEqual(append_request.get("chunks"), [{"type": "markdown_text", "text": "e is bold!"}])
156157
self.assertEqual(append_request.get("token"), "xoxb-chat_stream_test_token1")
157158
self.assertEqual(append_request.get("ts"), "123.123")
158159

@@ -162,10 +163,71 @@ def test_streams_a_long_message(self):
162163
'[{"elements": [{"negative_button": {"text": {"emoji": true, "text": "bad", "type": "plain_text"}, "value": "-1"}, "positive_button": {"text": {"emoji": true, "text": "good", "type": "plain_text"}, "value": "+1"}, "type": "feedback_buttons"}, {"icon": "trash", "text": {"emoji": true, "text": "delete", "type": "plain_text"}, "type": "icon_button"}], "type": "context_actions"}]',
163164
)
164165
self.assertEqual(stop_request.get("channel"), "C0123456789")
165-
self.assertEqual(stop_request.get("markdown_text"), "**")
166+
self.assertEqual(stop_request.get("chunks"), [{"type": "markdown_text", "text": "**"}])
166167
self.assertEqual(stop_request.get("token"), "xoxb-chat_stream_test_token2")
167168
self.assertEqual(stop_request.get("ts"), "123.123")
168169

170+
def test_streams_a_chunk_message(self):
171+
streamer = self.client.chat_stream(
172+
channel="C0123456789",
173+
recipient_team_id="T0123456789",
174+
recipient_user_id="U0123456789",
175+
thread_ts="123.000",
176+
)
177+
streamer.append(markdown_text="**this is ")
178+
streamer.append(markdown_text="buffered**")
179+
streamer.append(
180+
chunks=[
181+
TaskUpdateChunk(
182+
id="001",
183+
title="Counting...",
184+
status="pending",
185+
),
186+
],
187+
)
188+
streamer.append(
189+
chunks=[
190+
MarkdownTextChunk(text="**this is unbuffered**"),
191+
],
192+
)
193+
streamer.append(markdown_text="\n")
194+
streamer.stop(
195+
chunks=[
196+
MarkdownTextChunk(text=":space_invader:"),
197+
],
198+
)
199+
200+
self.assertEqual(self.received_requests.get("/chat.startStream", 0), 1)
201+
self.assertEqual(self.received_requests.get("/chat.appendStream", 0), 1)
202+
self.assertEqual(self.received_requests.get("/chat.stopStream", 0), 1)
203+
204+
if hasattr(self.thread.server, "chat_stream_requests"):
205+
start_request = self.thread.server.chat_stream_requests.get("/chat.startStream", {})
206+
self.assertEqual(start_request.get("channel"), "C0123456789")
207+
self.assertEqual(start_request.get("thread_ts"), "123.000")
208+
self.assertEqual(
209+
json.dumps(start_request.get("chunks")),
210+
'[{"type": "markdown_text", "text": "**this is buffered**"}, {"id": "001", "status": "pending", "title": "Counting...", "type": "task_update"}]',
211+
)
212+
self.assertEqual(start_request.get("recipient_team_id"), "T0123456789")
213+
self.assertEqual(start_request.get("recipient_user_id"), "U0123456789")
214+
215+
append_request = self.thread.server.chat_stream_requests.get("/chat.appendStream", {})
216+
self.assertEqual(append_request.get("channel"), "C0123456789")
217+
self.assertEqual(append_request.get("ts"), "123.123")
218+
self.assertEqual(
219+
json.dumps(append_request.get("chunks")),
220+
'[{"text": "**this is unbuffered**", "type": "markdown_text"}]',
221+
)
222+
223+
stop_request = self.thread.server.chat_stream_requests.get("/chat.stopStream", {})
224+
self.assertEqual(stop_request.get("channel"), "C0123456789")
225+
self.assertEqual(stop_request.get("ts"), "123.123")
226+
self.assertEqual(
227+
json.dumps(stop_request.get("chunks")),
228+
'[{"type": "markdown_text", "text": "\\n"}, {"text": ":space_invader:", "type": "markdown_text"}]',
229+
)
230+
169231
def test_streams_errors_when_appending_to_an_unstarted_stream(self):
170232
streamer = self.client.chat_stream(
171233
channel="C0123456789",

0 commit comments

Comments
 (0)