Skip to content

Commit b836d0a

Browse files
wip
1 parent 46e6f58 commit b836d0a

7 files changed

Lines changed: 499 additions & 135 deletions

File tree

examples/slackdemo.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
from fastloop import FastLoop, LoopContext
55
from fastloop.integrations.slack import (
66
SlackAppMentionEvent,
7+
SlackConfig,
78
SlackFileSharedEvent,
89
SlackIntegration,
910
SlackMessageEvent,
11+
SlackSetupInput,
1012
)
11-
from fastloop.models import LoopEvent
1213

1314
app = FastLoop(name="slackdemo")
1415

@@ -17,18 +18,26 @@ class AppContext(LoopContext):
1718
client: Any
1819

1920

20-
async def resolve_bot_token(context: LoopContext, event: LoopEvent) -> str:
21+
async def resolve_slack_config(setup_input: SlackSetupInput) -> SlackConfig:
2122
"""
22-
Resolve the bot token for this workspace.
23+
Resolve the Slack config for this workspace.
2324
24-
In a real app, you would look this up from a database based on the team_id
25-
that was stored when the workspace installed your Slack app via OAuth.
25+
In a real app, you would look up the bot token and signing secret from a database
26+
based on the team_id that was stored when the workspace installed your Slack app via OAuth.
2627
2728
Example:
28-
team_id = getattr(event, 'team', None)
29-
return await db.get_bot_token(team_id)
29+
workspace = await db.get_workspace(setup_input.team_id)
30+
return SlackConfig(
31+
bot_token=workspace.bot_token,
32+
signing_secret=workspace.signing_secret,
33+
team_id=setup_input.team_id,
34+
)
3035
"""
31-
return os.getenv("SLACK_BOT_TOKEN") or ""
36+
return SlackConfig(
37+
bot_token=os.getenv("SLACK_BOT_TOKEN") or "",
38+
signing_secret=os.getenv("SLACK_SIGNING_SECRET") or "",
39+
team_id=setup_input.team_id,
40+
)
3241

3342

3443
async def analyze_file(context: AppContext):
@@ -45,12 +54,7 @@ async def analyze_file(context: AppContext):
4554

4655
@app.loop(
4756
"filebot",
48-
integrations=[
49-
SlackIntegration(
50-
signing_secret=os.getenv("SLACK_SIGNING_SECRET"),
51-
setup=resolve_bot_token,
52-
)
53-
],
57+
integrations=[SlackIntegration(setup=resolve_slack_config)],
5458
)
5559
async def test_slack_bot(context: AppContext):
5660
mention: SlackAppMentionEvent | None = await context.wait_for(

fastloop/context.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,13 @@ async def sleep_until(self, timestamp: float) -> None:
115115

116116
async def wait_for(
117117
self,
118-
event: type[E],
118+
*event_types: type[E],
119119
timeout: float | int = 10.0,
120120
raise_on_timeout: bool = True,
121121
) -> E | None:
122+
if not event_types:
123+
raise ValueError("At least one event type must be provided")
124+
122125
wait_for_start = time.monotonic()
123126
start = asyncio.get_event_loop().time()
124127
pubsub = await self.state_manager.subscribe_to_events(self.loop_id)
@@ -139,14 +142,15 @@ async def wait_for(
139142
if self.should_stop:
140143
raise LoopStoppedError()
141144

142-
event_result = await self.state_manager.pop_event(
143-
self.loop_id,
144-
event, # type: ignore
145-
sender=LoopEventSender.CLIENT,
146-
)
147-
if event_result is not None:
148-
self.event_this_cycle = True
149-
return cast(E, event_result) # noqa
145+
for event_type in event_types:
146+
event_result = await self.state_manager.pop_event(
147+
self.loop_id,
148+
event_type, # type: ignore
149+
sender=LoopEventSender.CLIENT,
150+
)
151+
if event_result is not None:
152+
self.event_this_cycle = True
153+
return cast(E, event_result) # noqa
150154

151155
remaining_timeout = timeout - (asyncio.get_event_loop().time() - start)
152156
if remaining_timeout <= 0:
@@ -166,7 +170,8 @@ async def wait_for(
166170
await pubsub.close() # type: ignore
167171

168172
if raise_on_timeout:
169-
raise EventTimeoutError(f"Timeout waiting for event {event.type}")
173+
type_names = ", ".join(et.__name__ for et in event_types)
174+
raise EventTimeoutError(f"Timeout waiting for event(s): {type_names}")
170175
else:
171176
return None
172177

fastloop/integrations/slack.py

Lines changed: 127 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import re
22
from collections.abc import Awaitable, Callable
3+
from dataclasses import dataclass, field
34
from http import HTTPStatus
45
from typing import TYPE_CHECKING, Any, cast
56

@@ -16,7 +17,26 @@
1617
from ..context import LoopContext
1718
from ..fastloop import FastLoop
1819

19-
SlackSetupCallback = Callable[["LoopContext", "LoopEvent"], Awaitable[str]]
20+
21+
@dataclass
22+
class SlackConfig:
23+
bot_token: str
24+
signing_secret: str
25+
team_id: str | None = None
26+
27+
28+
@dataclass
29+
class SlackSetupInput:
30+
loop_name: str
31+
team_id: str
32+
channel: str
33+
root_ts: str
34+
event_type: str
35+
payload: dict[str, Any] = field(default_factory=dict)
36+
event: dict[str, Any] = field(default_factory=dict)
37+
38+
39+
SlackSetupCallback = Callable[["SlackSetupInput"], Awaitable["SlackConfig"]]
2040

2141
IGNORED_MESSAGE_SUBTYPES = frozenset(
2242
[
@@ -224,21 +244,44 @@ class SlackFileUploadEvent(LoopEvent):
224244

225245

226246
class SlackIntegration(Integration):
227-
def __init__(self, *, signing_secret: str, setup: SlackSetupCallback):
247+
def __init__(self, *, setup: SlackSetupCallback):
228248
super().__init__()
229249
self._setup_callback = setup
230-
self._signing_secret = signing_secret
231-
self.verifier = SignatureVerifier(signing_secret)
250+
self._config_cache: dict[str, SlackConfig] = {}
232251

233252
def type(self) -> IntegrationType:
234253
return IntegrationType.SLACK
235254

255+
async def _resolve_config(self, setup_input: SlackSetupInput) -> SlackConfig:
256+
team_id = setup_input.team_id
257+
if team_id in self._config_cache:
258+
return self._config_cache[team_id]
259+
config = await self._setup_callback(setup_input)
260+
self._config_cache[team_id] = config
261+
return config
262+
236263
async def setup_for_context(
237264
self, context: "LoopContext", event: "LoopEvent"
238265
) -> AsyncWebClient:
239-
bot_token = await self._setup_callback(context, event)
240-
client = AsyncWebClient(token=bot_token)
266+
team_id = getattr(event, "team", "")
267+
channel = getattr(event, "channel", "")
268+
root_ts = getattr(event, "root_ts", None) or getattr(event, "ts", "")
269+
event_type = getattr(event, "type", "")
270+
event_dict = getattr(event, "raw_event", None) or {}
271+
272+
setup_input = SlackSetupInput(
273+
loop_name=self.loop_name,
274+
team_id=team_id,
275+
channel=channel,
276+
root_ts=root_ts,
277+
event_type=event_type,
278+
payload={},
279+
event=event_dict,
280+
)
281+
config = await self._resolve_config(setup_input)
282+
client = AsyncWebClient(token=config.bot_token)
241283
context.set_integration_client(self.type(), client)
284+
context.set_integration_client(f"{self.type()}_config", config)
242285
return client
243286

244287
def get_client_for_context(self, context: "LoopContext") -> AsyncWebClient:
@@ -250,6 +293,12 @@ def get_client_for_context(self, context: "LoopContext") -> AsyncWebClient:
250293
)
251294
return cast("AsyncWebClient", client)
252295

296+
def get_config_for_context(self, context: "LoopContext") -> SlackConfig:
297+
config = context.get_integration_client(f"{self.type()}_config")
298+
if config is None:
299+
raise ValueError("Slack config not initialized for this context.")
300+
return cast("SlackConfig", config)
301+
253302
def register(self, fastloop: "FastLoop", loop_name: str) -> None:
254303
fastloop.register_events(SLACK_EVENT_TYPES)
255304
self._fastloop: FastLoop = fastloop
@@ -265,18 +314,69 @@ def events(self) -> list[Any]:
265314
return list(SLACK_EVENT_TYPES)
266315

267316
async def _handle_slack_event(self, request: Request):
317+
from ..logging import setup_logger
318+
319+
logger = setup_logger(__name__)
320+
268321
body = await request.body()
269-
if not self.verifier.is_valid_request(body, dict(request.headers)):
270-
raise HTTPException(
271-
status_code=HTTPStatus.FORBIDDEN, detail="Invalid signature"
272-
)
273322

274-
payload = await request.json()
323+
logger.debug(
324+
"Slack webhook received",
325+
extra={"body_length": len(body), "path": str(request.url)},
326+
)
327+
328+
try:
329+
payload = await request.json()
330+
except Exception as e:
331+
logger.error("Failed to parse Slack payload", extra={"error": str(e)})
332+
raise
333+
334+
logger.debug(
335+
"Parsed Slack payload",
336+
extra={
337+
"type": payload.get("type"),
338+
"event_type": payload.get("event", {}).get("type"),
339+
},
340+
)
341+
275342
if payload.get("type") == "url_verification":
276343
return {"challenge": payload["challenge"]}
277344

345+
team_id = payload.get("team_id", "")
278346
event: dict[str, Any] = payload.get("event", {})
279-
event_type = event.get("type")
347+
event_type = event.get("type", "")
348+
channel = event.get("channel", "")
349+
root_ts = event.get("thread_ts") or event.get("ts", "")
350+
351+
logger.info(
352+
"Received Slack event",
353+
extra={
354+
"event_type": event_type,
355+
"channel": channel,
356+
"root_ts": root_ts,
357+
"thread_ts": event.get("thread_ts"),
358+
"ts": event.get("ts"),
359+
"subtype": event.get("subtype"),
360+
"has_bot_id": bool(event.get("bot_id")),
361+
},
362+
)
363+
364+
setup_input = SlackSetupInput(
365+
loop_name=self.loop_name,
366+
team_id=team_id,
367+
channel=channel,
368+
root_ts=root_ts,
369+
event_type=event_type,
370+
payload=payload,
371+
event=event,
372+
)
373+
config = await self._resolve_config(setup_input)
374+
verifier = SignatureVerifier(config.signing_secret)
375+
376+
if not verifier.is_valid_request(body, dict(request.headers)):
377+
raise HTTPException(
378+
status_code=HTTPStatus.FORBIDDEN, detail="Invalid signature"
379+
)
280380

281381
if event_type not in SUPPORTED_SLACK_EVENTS:
282382
return {"ok": True}
@@ -288,18 +388,29 @@ async def _handle_slack_event(self, request: Request):
288388

289389
handler = self._fastloop.loop_event_handlers.get(self.loop_name)
290390
if not handler:
391+
logger.warning("No handler found", extra={"loop_name": self.loop_name})
291392
return {"ok": True}
292393

293-
channel = event.get("channel", "")
294-
root_ts = event.get("thread_ts") or event.get("ts", "")
295-
loop_id = await self._fastloop.state_manager.get_loop_mapping(
296-
f"slack_thread:{channel}:{root_ts}"
394+
mapping_key = f"slack_thread:{channel}:{root_ts}"
395+
loop_id = await self._fastloop.state_manager.get_loop_mapping(mapping_key)
396+
397+
logger.info(
398+
"Loop mapping lookup",
399+
extra={"mapping_key": mapping_key, "loop_id": loop_id},
297400
)
298401

299402
loop_event = self._map_event(event, event_type, payload, loop_id)
300403
if loop_event is None:
404+
logger.warning(
405+
"Event could not be mapped", extra={"event_type": event_type}
406+
)
301407
return {"ok": True}
302408

409+
logger.info(
410+
"Dispatching event to handler",
411+
extra={"event_type": loop_event.type, "loop_id": loop_id},
412+
)
413+
303414
loop: LoopState = await handler(loop_event.to_dict())
304415
if loop.loop_id:
305416
await self._fastloop.state_manager.set_loop_mapping(

fastloop/state/state_redis.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -604,12 +604,13 @@ async def push_event(self, loop_id: str, event: "LoopEvent"):
604604
pipe.lpush(queue_key, event_str)
605605
pipe.lpush(history_key, event_str)
606606
pipe.ltrim(history_key, 0, MAX_EVENT_HISTORY - 1)
607-
608-
if event.sender == LoopEventSender.SERVER:
609-
pipe.publish(channel_key, "new_event") # type: ignore
607+
pipe.publish(channel_key, "new_event") # type: ignore
610608

611609
await pipe.execute()
612610

611+
if event.sender == LoopEventSender.CLIENT:
612+
self.wake_queue.put_nowait(loop_id)
613+
613614
async def get_context_value(self, loop_id: str, key: str) -> Any:
614615
value_str = await self.rdb.get(
615616
RedisKeys.LOOP_CONTEXT.format(

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "fastloop"
3-
version = "0.1.107"
3+
version = "0.1.113"
44
description = "A Python package for deploying stateful loops"
55
readme = "README.md"
66
requires-python = ">=3.12"

0 commit comments

Comments
 (0)