Skip to content

Commit 1c72a06

Browse files
bolster slack integration
1 parent 8d6f3c4 commit 1c72a06

16 files changed

Lines changed: 1527 additions & 406 deletions

examples/slackdemo.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
SlackIntegration,
99
SlackMessageEvent,
1010
)
11+
from fastloop.models import LoopEvent
1112

1213
app = FastLoop(name="slackdemo")
1314

@@ -16,27 +17,38 @@ class AppContext(LoopContext):
1617
client: Any
1718

1819

20+
async def resolve_bot_token(context: LoopContext, event: LoopEvent) -> str:
21+
"""
22+
Resolve the bot token for this workspace.
23+
24+
In a real app, you would look this up from a database based on the team_id
25+
that was stored when the workspace installed your Slack app via OAuth.
26+
27+
Example:
28+
team_id = getattr(event, 'team', None)
29+
return await db.get_bot_token(team_id)
30+
"""
31+
return os.getenv("SLACK_BOT_TOKEN") or ""
32+
33+
1934
async def analyze_file(context: AppContext):
2035
file_shared: SlackFileSharedEvent | None = await context.wait_for(
2136
SlackFileSharedEvent, timeout=1
2237
)
2338
if not file_shared:
2439
return
2540

26-
file_bytes = await file_shared.download_file()
41+
file_bytes = await file_shared.download_file(context)
2742
with open("something.png", "wb") as f:
2843
f.write(file_bytes)
2944

3045

3146
@app.loop(
3247
"filebot",
33-
# start_event=SlackAppMentionEvent,
3448
integrations=[
3549
SlackIntegration(
36-
app_id=os.getenv("SLACK_APP_ID") or "",
37-
bot_token=os.getenv("SLACK_BOT_TOKEN") or "",
38-
signing_secret=os.getenv("SLACK_SIGNING_SECRET") or "",
39-
client_id=os.getenv("SLACK_CLIENT_ID") or "",
50+
signing_secret=os.getenv("SLACK_SIGNING_SECRET"),
51+
setup=resolve_bot_token,
4052
)
4153
],
4254
)

fastloop/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@
66
CLAIM_LOCK_BLOCKING_TIMEOUT_S = 5
77
DEFAULT_SYSTEM_CONFIG_DIR = "/etc/fastloop.d/"
88
LEASE_TTL_S = 30
9-
LEASE_HEARTBEAT_INTERVAL_S = 10
9+
LEASE_HEARTBEAT_INTERVAL_S = 5
1010
MAX_EVENT_HISTORY = 1000
1111
MEANINGFUL_WORK_THRESHOLD_S = 0.01

fastloop/context.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def __init__(
6060
self.integration_events: dict[str, list[Any]] = {
6161
i.type(): i.events() for i in integrations
6262
}
63+
self._integration_clients: dict[str, Any] = {}
6364

6465
def _reset_cycle_tracking(self) -> None:
6566
self.event_this_cycle = False
@@ -199,7 +200,7 @@ async def _emit_to_integrations(self, event: LoopEvent) -> None:
199200
},
200201
)
201202

202-
await self.integrations[integration_type].emit(event)
203+
await self.integrations[integration_type].emit(event, self)
203204

204205
async def set(self, key: str, value: Any, local: bool = False) -> None:
205206
if not local:
@@ -231,6 +232,28 @@ async def delete(self, key: str, local: bool = False) -> None:
231232
async def get_event_history(self) -> list[dict[str, Any]]:
232233
return await self.state_manager.get_event_history(self.loop_id)
233234

235+
def set_integration_client(self, integration_type: str, client: Any) -> None:
236+
self._integration_clients[integration_type] = client
237+
238+
def get_integration_client(self, integration_type: str) -> Any | None:
239+
return self._integration_clients.get(integration_type)
240+
241+
async def setup_integrations(self, event: LoopEvent | None = None) -> None:
242+
if not self.integrations:
243+
return
244+
245+
event_to_use = event or self.initial_event
246+
if not event_to_use:
247+
logger.warning(
248+
"Cannot setup integrations: no event available",
249+
extra={"loop_id": self.loop_id},
250+
)
251+
return
252+
253+
for integration in self.integrations.values():
254+
if hasattr(integration, "setup_for_context"):
255+
await integration.setup_for_context(self, event_to_use)
256+
234257
@property
235258
def should_stop(self) -> bool:
236259
"""Check if the loop should stop."""

fastloop/executor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@
1212

1313
def _get_pool(executor_type: ExecutorType) -> ThreadPoolExecutor | ProcessPoolExecutor:
1414
global _thread_pool, _process_pool
15+
1516
if executor_type == ExecutorType.THREAD:
1617
if _thread_pool is None:
1718
_thread_pool = ThreadPoolExecutor(max_workers=4)
1819
return _thread_pool
20+
1921
if _process_pool is None:
2022
_process_pool = ProcessPoolExecutor(max_workers=4)
23+
2124
return _process_pool
2225

2326

fastloop/fastloop.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ async def _event_handler(request: dict[str, Any], func: Any = func):
351351
integrations=self._loop_metadata[name].get("integrations", []),
352352
)
353353

354+
await context.setup_integrations(event)
354355
await self.state_manager.push_event(loop.loop_id, event)
355356

356357
if loop_instance:
@@ -697,6 +698,8 @@ async def restart_loop(self, loop_id: str) -> bool:
697698
integrations=metadata.get("integrations", []),
698699
)
699700

701+
await context.setup_integrations()
702+
700703
loop_instance: Loop | None = metadata.get("loop_instance")
701704
if loop_instance:
702705
loop_instance.ctx = context
@@ -718,7 +721,7 @@ async def restart_loop(self, loop_id: str) -> bool:
718721
return True
719722
else:
720723
logger.warning(
721-
"Failed to restart loop",
724+
"Failed to restart loop - task already exists in loop_manager",
722725
extra={
723726
"loop_id": loop.loop_id,
724727
},

fastloop/integrations/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
from ..types import IntegrationType
55

66
if TYPE_CHECKING:
7-
from ..app import FastLoop
7+
from ..context import LoopContext
8+
from ..fastloop import FastLoop
89

910

1011
class Integration(ABC):
@@ -17,7 +18,7 @@ def register(self, fastloop: "FastLoop", loop_name: str) -> None:
1718
pass
1819

1920
@abstractmethod
20-
async def emit(self, event: Any) -> None:
21+
async def emit(self, event: Any, context: "LoopContext | None" = None) -> None:
2122
pass
2223

2324
@abstractmethod

0 commit comments

Comments
 (0)