-
Notifications
You must be signed in to change notification settings - Fork 856
Expand file tree
/
Copy pathchunk.py
More file actions
155 lines (131 loc) · 4.2 KB
/
chunk.py
File metadata and controls
155 lines (131 loc) · 4.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import logging
from typing import Any, Dict, Optional, Sequence, Set, Union
from slack_sdk.errors import SlackObjectFormationError
from slack_sdk.models import show_unknown_key_warning
from slack_sdk.models.basic_objects import JsonObject
class Chunk(JsonObject):
"""
Chunk for streaming messages.
https://docs.slack.dev/messaging/sending-and-scheduling-messages#text-streaming
"""
attributes = {"type"}
logger = logging.getLogger(__name__)
def __init__(
self,
*,
type: Optional[str] = None,
):
self.type = type
@classmethod
def parse(cls, chunk: Union[Dict, "Chunk"]) -> Optional["Chunk"]:
if chunk is None:
return None
elif isinstance(chunk, Chunk):
return chunk
else:
if "type" in chunk:
type = chunk["type"]
if type == MarkdownTextChunk.type:
return MarkdownTextChunk(**chunk)
elif type == TaskUpdateChunk.type:
return TaskUpdateChunk(**chunk)
else:
cls.logger.warning(f"Unknown chunk detected and skipped ({chunk})")
return None
else:
cls.logger.warning(f"Unknown chunk detected and skipped ({chunk})")
return None
class MarkdownTextChunk(Chunk):
type = "markdown_text"
@property
def attributes(self) -> Set[str]: # type: ignore[override]
return super().attributes.union({"text"})
def __init__(
self,
*,
text: str,
**others: Dict,
):
"""Used for streaming text content with markdown formatting support.
https://docs.slack.dev/messaging/sending-and-scheduling-messages#text-streaming
"""
super().__init__(type=self.type)
show_unknown_key_warning(self, others)
self.text = text
class URLSource(JsonObject):
type = "url"
@property
def attributes(self) -> Set[str]:
return super().attributes.union(
{
"url",
"text",
"icon_url",
}
)
def __init__(
self,
*,
url: str,
text: str,
icon_url: Optional[str] = None,
**others: Dict,
):
show_unknown_key_warning(self, others)
self._url = url
self._text = text
self._icon_url = icon_url
def to_dict(self) -> Dict[str, Any]:
self.validate_json()
json: Dict[str, Union[str, Dict]] = {
"type": self.type,
"url": self._url,
"text": self._text,
}
if self._icon_url:
json["icon_url"] = self._icon_url
return json
class TaskUpdateChunk(Chunk):
type = "task_update"
@property
def attributes(self) -> Set[str]: # type: ignore[override]
return super().attributes.union(
{
"id",
"title",
"status",
"details",
"output",
"sources",
}
)
def __init__(
self,
*,
id: str,
title: str,
status: str, # "pending", "in_progress", "complete", "error"
details: Optional[str] = None,
output: Optional[str] = None,
sources: Optional[Sequence[Union[Dict, URLSource]]] = None,
**others: Dict,
):
"""Used for displaying tool execution progress in a timeline-style UI.
https://docs.slack.dev/messaging/sending-and-scheduling-messages#text-streaming
"""
super().__init__(type=self.type)
show_unknown_key_warning(self, others)
self.id = id
self.title = title
self.status = status
self.details = details
self.output = output
if sources is not None:
self.sources = []
for src in sources:
if isinstance(src, Dict):
self.sources.append(src)
elif isinstance(src, URLSource):
self.sources.append(src.to_dict())
else:
raise SlackObjectFormationError(f"Unsupported type for source in task update chunk: {type(src)}")