Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions slack_sdk/models/messages/chunk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import logging
from typing import Any, Dict, Optional, Sequence, Set, Union

from slack_sdk.errors import SlackObjectFormationError
from slack_sdk.models import show_unknown_key_warning
from slack_sdk.models.basic_objects import JsonObject


class Chunk(JsonObject):
"""
Chunk for streaming messages.

https://docs.slack.dev/messaging/sending-and-scheduling-messages#text-streaming
"""

attributes = {"type"}
logger = logging.getLogger(__name__)

def __init__(
self,
*,
type: Optional[str] = None,
):
self.type = type

@classmethod
def parse(cls, chunk: Union[Dict, "Chunk"]) -> Optional["Chunk"]:
if chunk is None:
return None
elif isinstance(chunk, Chunk):
return chunk
else:
if "type" in chunk:
type = chunk["type"]
if type == MarkdownTextChunk.type:
return MarkdownTextChunk(**chunk)
elif type == TaskUpdateChunk.type:
return TaskUpdateChunk(**chunk)
else:
cls.logger.warning(f"Unknown chunk detected and skipped ({chunk})")
return None
else:
cls.logger.warning(f"Unknown chunk detected and skipped ({chunk})")
return None


class MarkdownTextChunk(Chunk):
type = "markdown_text"

@property
def attributes(self) -> Set[str]: # type: ignore[override]
return super().attributes.union({"text"})

def __init__(
self,
*,
text: str,
**others: Dict,
):
"""Used for streaming text content with markdown formatting support.

https://docs.slack.dev/messaging/sending-and-scheduling-messages#text-streaming
"""
super().__init__(type=self.type)
show_unknown_key_warning(self, others)

self.text = text


class URLSource(JsonObject):
type = "url"

@property
def attributes(self) -> Set[str]:
return super().attributes.union(
{
"url",
"text",
"icon_url",
}
)

def __init__(
self,
*,
url: str,
text: str,
icon_url: Optional[str] = None,
**others: Dict,
):
show_unknown_key_warning(self, others)
self._url = url
self._text = text
self._icon_url = icon_url

def to_dict(self) -> Dict[str, Any]:
self.validate_json()
json: Dict[str, Union[str, Dict]] = {
"type": self.type,
"url": self._url,
"text": self._text,
}
if self._icon_url:
json["icon_url"] = self._icon_url
return json


class TaskUpdateChunk(Chunk):
type = "task_update"

@property
def attributes(self) -> Set[str]: # type: ignore[override]
return super().attributes.union(
{
"id",
"title",
"status",
"details",
"output",
"sources",
}
)

def __init__(
self,
*,
id: str,
title: str,
status: str, # "pending", "in_progress", "complete", "error"
details: Optional[str] = None,
output: Optional[str] = None,
sources: Optional[Sequence[Union[Dict, URLSource]]] = None,
**others: Dict,
):
"""Used for displaying tool execution progress in a timeline-style UI.

https://docs.slack.dev/messaging/sending-and-scheduling-messages#text-streaming
"""
super().__init__(type=self.type)
show_unknown_key_warning(self, others)

self.id = id
self.title = title
self.status = status
self.details = details
self.output = output
if sources is not None:
self.sources = []
for src in sources:
if isinstance(src, Dict):
self.sources.append(src)
elif isinstance(src, URLSource):
self.sources.append(src.to_dict())
else:
raise SlackObjectFormationError(f"Unsupported type for source in task update chunk: {type(src)}")
40 changes: 30 additions & 10 deletions slack_sdk/web/async_chat_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@

import json
import logging
from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union

import slack_sdk.errors as e
from slack_sdk.models.blocks.blocks import Block
from slack_sdk.models.messages.chunk import Chunk, MarkdownTextChunk
from slack_sdk.models.metadata import Metadata
from slack_sdk.web.async_slack_response import AsyncSlackResponse

Expand All @@ -38,6 +39,7 @@ def __init__(
buffer_size: int,
recipient_team_id: Optional[str] = None,
recipient_user_id: Optional[str] = None,
task_display_mode: Optional[str] = None,
**kwargs,
):
"""Initialize a new ChatStream instance.
Expand All @@ -53,6 +55,8 @@ def __init__(
recipient_team_id: The encoded ID of the team the user receiving the streaming text belongs to. Required when
streaming to channels.
recipient_user_id: The encoded ID of the user to receive the streaming text. Required when streaming to channels.
task_display_mode: Specifies how tasks are displayed in the message. A "timeline" displays individual tasks
interleaved with text and "plan" displays all tasks together.
buffer_size: The length of markdown_text to buffer in-memory before calling a method. Increasing this value
decreases the number of method calls made for the same amount of text, which is useful to avoid rate limits.
**kwargs: Additional arguments passed to the underlying API calls.
Expand All @@ -65,6 +69,7 @@ def __init__(
"thread_ts": thread_ts,
"recipient_team_id": recipient_team_id,
"recipient_user_id": recipient_user_id,
"task_display_mode": task_display_mode,
**kwargs,
}
self._buffer = ""
Expand All @@ -75,7 +80,8 @@ def __init__(
async def append(
self,
*,
markdown_text: str,
markdown_text: Optional[str] = None,
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
**kwargs,
) -> Optional[AsyncSlackResponse]:
"""Append to the stream.
Expand All @@ -84,6 +90,7 @@ async def append(
is stopped this method cannot be called.

Args:
chunks: An array of streaming chunks that can contain either markdown text or task updates.
markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
what will be appended to the message received so far.
**kwargs: Additional arguments passed to the underlying API calls.
Expand Down Expand Up @@ -111,9 +118,10 @@ async def append(
raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}")
if kwargs.get("token"):
self._token = kwargs.pop("token")
self._buffer += markdown_text
if len(self._buffer) >= self._buffer_size:
return await self._flush_buffer(**kwargs)
if markdown_text is not None:
self._buffer += markdown_text
if len(self._buffer) >= self._buffer_size or chunks is not None:
return await self._flush_buffer(chunks=chunks, **kwargs)
details = {
"buffer_length": len(self._buffer),
"buffer_size": self._buffer_size,
Expand All @@ -129,6 +137,7 @@ async def stop(
self,
*,
markdown_text: Optional[str] = None,
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None,
metadata: Optional[Union[Dict, Metadata]] = None,
**kwargs,
Expand All @@ -137,6 +146,7 @@ async def stop(

Args:
blocks: A list of blocks that will be rendered at the bottom of the finalized message.
chunks: An array of streaming chunks that can contain either markdown text or task updates.
markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
what will be appended to the message received so far.
metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you
Expand Down Expand Up @@ -177,26 +187,36 @@ async def stop(
raise e.SlackRequestError("Failed to stop stream: stream not started")
self._stream_ts = str(response["ts"])
self._state = "in_progress"
flushings: List[Union[Dict, Chunk]] = []
if len(self._buffer) != 0:
flushings.append(MarkdownTextChunk(text=self._buffer))
if chunks is not None:
flushings.extend(chunks)
response = await self._client.chat_stopStream(
token=self._token,
channel=self._stream_args["channel"],
ts=self._stream_ts,
blocks=blocks,
markdown_text=self._buffer,
chunks=flushings,
metadata=metadata,
**kwargs,
)
self._state = "completed"
return response

async def _flush_buffer(self, **kwargs) -> AsyncSlackResponse:
"""Flush the internal buffer by making appropriate API calls."""
async def _flush_buffer(self, chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, **kwargs) -> AsyncSlackResponse:
"""Flush the internal buffer with chunks by making appropriate API calls."""
flushings: List[Union[Dict, Chunk]] = []
if len(self._buffer) != 0:
flushings.append(MarkdownTextChunk(text=self._buffer))
if chunks is not None:
flushings.extend(chunks)
if not self._stream_ts:
response = await self._client.chat_startStream(
**self._stream_args,
token=self._token,
**kwargs,
markdown_text=self._buffer,
chunks=flushings,
)
self._stream_ts = response.get("ts")
self._state = "in_progress"
Expand All @@ -206,7 +226,7 @@ async def _flush_buffer(self, **kwargs) -> AsyncSlackResponse:
channel=self._stream_args["channel"],
ts=self._stream_ts,
**kwargs,
markdown_text=self._buffer,
chunks=flushings,
)
self._buffer = ""
return response
19 changes: 17 additions & 2 deletions slack_sdk/web/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
from typing import Any, Dict, List, Optional, Sequence, Union

import slack_sdk.errors as e
from slack_sdk.models.messages.chunk import Chunk
from slack_sdk.models.views import View
from slack_sdk.web.async_chat_stream import AsyncChatStream

from ..models.attachments import Attachment
from ..models.blocks import Block, RichTextBlock
from ..models.metadata import Metadata, EntityMetadata, EventAndEntityMetadata
from ..models.metadata import EntityMetadata, EventAndEntityMetadata, Metadata
from .async_base_client import AsyncBaseClient, AsyncSlackResponse
from .internal_utils import (
_parse_web_class_objects,
Expand Down Expand Up @@ -2630,7 +2631,8 @@ async def chat_appendStream(
*,
channel: str,
ts: str,
markdown_text: str,
markdown_text: Optional[str] = None,
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
**kwargs,
) -> AsyncSlackResponse:
"""Appends text to an existing streaming conversation.
Expand All @@ -2641,8 +2643,10 @@ async def chat_appendStream(
"channel": channel,
"ts": ts,
"markdown_text": markdown_text,
"chunks": chunks,
}
)
_parse_web_class_objects(kwargs)
kwargs = _remove_none_values(kwargs)
return await self.api_call("chat.appendStream", json=kwargs)

Expand Down Expand Up @@ -2884,6 +2888,8 @@ async def chat_startStream(
markdown_text: Optional[str] = None,
recipient_team_id: Optional[str] = None,
recipient_user_id: Optional[str] = None,
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
task_display_mode: Optional[str] = None, # timeline, plan
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(take it or leave it) we could potentially use enums here, but if it does not fall in line with existing patterns we shouldn't do it

Edit: actually maybe it would be worth having the web API return a proper error message with the possible options if an invalid value is passed

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I think it's good to expect the API return an error message with the the available options when an invalid option value is passed.

The display modes are aiming to support what's common in LLM UIs, so I imagine there will be more in the future that Slack will want to support. It would be nice for developers to have access to the new modes without upgrading the SDK version (I'm assuming an enum would require an SDK upgrade to support the new options?)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WilliamBergamin @mwbrooks Oh dang, the tradeoffs are fascinating! 😳

I think I agree letting the API error for invalid values might be best for ongoing maintenance and I hope for now this comment is quick enough to jump to for fast reference. I'll update some notes to avoid using enums in adjacent changes too 📚

Prior art for me was found just above:

parse: Optional[str] = None, # none, full

**kwargs,
) -> AsyncSlackResponse:
"""Starts a new streaming conversation.
Expand All @@ -2896,8 +2902,11 @@ async def chat_startStream(
"markdown_text": markdown_text,
"recipient_team_id": recipient_team_id,
"recipient_user_id": recipient_user_id,
"chunks": chunks,
"task_display_mode": task_display_mode,
}
)
_parse_web_class_objects(kwargs)
kwargs = _remove_none_values(kwargs)
return await self.api_call("chat.startStream", json=kwargs)

Expand All @@ -2909,6 +2918,7 @@ async def chat_stopStream(
markdown_text: Optional[str] = None,
blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None,
metadata: Optional[Union[Dict, Metadata]] = None,
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
**kwargs,
) -> AsyncSlackResponse:
"""Stops a streaming conversation.
Expand All @@ -2921,6 +2931,7 @@ async def chat_stopStream(
"markdown_text": markdown_text,
"blocks": blocks,
"metadata": metadata,
"chunks": chunks,
}
)
_parse_web_class_objects(kwargs)
Expand All @@ -2935,6 +2946,7 @@ async def chat_stream(
thread_ts: str,
recipient_team_id: Optional[str] = None,
recipient_user_id: Optional[str] = None,
task_display_mode: Optional[str] = None,
**kwargs,
) -> AsyncChatStream:
"""Stream markdown text into a conversation.
Expand All @@ -2961,6 +2973,8 @@ async def chat_stream(
recipient_team_id: The encoded ID of the team the user receiving the streaming text belongs to. Required when
streaming to channels.
recipient_user_id: The encoded ID of the user to receive the streaming text. Required when streaming to channels.
task_display_mode: Specifies how tasks are displayed in the message. A "timeline" displays individual tasks
interleaved with text and "plan" displays all tasks together.
**kwargs: Additional arguments passed to the underlying API calls.

Returns:
Expand All @@ -2986,6 +3000,7 @@ async def chat_stream(
thread_ts=thread_ts,
recipient_team_id=recipient_team_id,
recipient_user_id=recipient_user_id,
task_display_mode=task_display_mode,
buffer_size=buffer_size,
**kwargs,
)
Expand Down
Loading