-
Notifications
You must be signed in to change notification settings - Fork 284
Expand file tree
/
Copy pathasync_get_thread_context.py
More file actions
44 lines (36 loc) · 1.68 KB
/
async_get_thread_context.py
File metadata and controls
44 lines (36 loc) · 1.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from typing import Optional
from slack_bolt.context.assistant.thread_context import AssistantThreadContext
from slack_bolt.context.assistant.thread_context_store.async_store import AsyncAssistantThreadContextStore
class AsyncGetThreadContext:
thread_context_store: AsyncAssistantThreadContextStore
payload: dict
channel_id: str
thread_ts: str
_thread_context: Optional[AssistantThreadContext]
thread_context_loaded: bool
def __init__(
self,
thread_context_store: AsyncAssistantThreadContextStore,
channel_id: str,
thread_ts: str,
payload: dict,
):
self.thread_context_store = thread_context_store
self.payload = payload
self.channel_id = channel_id
self.thread_ts = thread_ts
self._thread_context: Optional[AssistantThreadContext] = None
self.thread_context_loaded = False
async def __call__(self) -> Optional[AssistantThreadContext]:
if self.thread_context_loaded is True:
return self._thread_context
thread = self.payload.get("assistant_thread")
if isinstance(thread, dict) and thread.get("context", {}).get("channel_id") is not None:
# assistant_thread_started
self._thread_context = AssistantThreadContext(thread["context"])
# for this event, the context will never be changed
self.thread_context_loaded = True
elif self.payload.get("channel") is not None and self.payload.get("thread_ts") is not None:
# message event
self._thread_context = await self.thread_context_store.find(channel_id=self.channel_id, thread_ts=self.thread_ts)
return self._thread_context