-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathchannels.py
More file actions
266 lines (229 loc) · 8.64 KB
/
channels.py
File metadata and controls
266 lines (229 loc) · 8.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
"""
Slack channel operations: list channels, join channel, run join-check (with allow/block list).
"""
from __future__ import annotations
import logging
import os
import threading
from typing import Any, Optional
from core.operations.slack_ops.client import SlackAPIClient
from core.operations.slack_ops.tokens import get_slack_client
logger = logging.getLogger(__name__)
DEFAULT_JOIN_INTERVAL_MINUTES = 15
class _ChannelJoinCoordinator:
"""Background channel-join scheduler state.
``_check_lock`` is try-acquire only (overlap prevention, not long-held
mutual exclusion). ``_stop_event`` signals the daemon thread to exit.
Standalone — no ordering constraints with other locks.
"""
def __init__(self) -> None:
self._check_lock = threading.Lock()
self._stop_event = threading.Event()
def try_run_join_check(self, bot_token: str | None) -> None:
if self._check_lock.acquire(blocking=False):
try:
run_channel_join_check(bot_token)
finally:
self._check_lock.release()
def wait(self, timeout: float) -> bool:
return self._stop_event.wait(timeout)
def is_stopped(self) -> bool:
return self._stop_event.is_set()
def signal_stop(self) -> None:
self._stop_event.set()
def clear_stop(self) -> None:
self._stop_event.clear()
_coordinator = _ChannelJoinCoordinator()
def _parse_list_env(value: Optional[str]) -> set:
"""Parse comma-separated env value into a set of stripped, lowercased strings."""
if not value or not str(value).strip():
return set()
return {s.strip().lower() for s in str(value).split(",") if s.strip()}
def _get_channel_join_config() -> dict:
"""Load channel-join config from env (CHANNEL_JOIN_*, CHANNEL_ALLOWLIST, CHANNEL_BLOCKLIST)."""
raw_interval = os.environ.get("CHANNEL_JOIN_INTERVAL_MINUTES", "") or str(
DEFAULT_JOIN_INTERVAL_MINUTES
)
try:
interval_min = int(raw_interval)
except (ValueError, TypeError):
logger.warning(
"CHANNEL_JOIN_INTERVAL_MINUTES env value %r is invalid; using default %s",
raw_interval,
DEFAULT_JOIN_INTERVAL_MINUTES,
)
interval_min = DEFAULT_JOIN_INTERVAL_MINUTES
if interval_min < 1:
interval_min = DEFAULT_JOIN_INTERVAL_MINUTES
public_only_str = (
(os.environ.get("CHANNEL_JOIN_PUBLIC_ONLY", "true") or "true").strip().lower()
)
public_only = public_only_str in ("true", "1", "yes")
allowlist = _parse_list_env(os.environ.get("CHANNEL_ALLOWLIST"))
blocklist = _parse_list_env(os.environ.get("CHANNEL_BLOCKLIST"))
return {
"interval_minutes": interval_min,
"public_only": public_only,
"allowlist": allowlist,
"blocklist": blocklist,
}
def _channel_matches_policy(
channel_id: str,
channel_name: str,
allowlist: set,
blocklist: set,
) -> bool:
"""Return True if channel is allowed by allowlist/blocklist policy."""
name_lower = (channel_name or "").lower()
id_lower = (channel_id or "").lower()
if blocklist and (name_lower in blocklist or id_lower in blocklist):
return False
if allowlist:
return name_lower in allowlist or id_lower in allowlist
return True
def channel_list(
client: Optional[SlackAPIClient] = None,
types: str = "public_channel",
exclude_archived: bool = True,
) -> list[dict]:
"""
List channels. Returns list of {"id", "name", "is_member", ...} for each channel.
If client is None, uses get_slack_client().
"""
if client is None:
client = get_slack_client()
all_channels = []
cursor = None
while True:
data = client.conversations_list(
types=types,
exclude_archived=exclude_archived,
limit=200,
cursor=cursor,
)
if not data.get("ok"):
logger.warning(
"conversations.list failed: %s", data.get("error", "unknown")
)
break
channels = data.get("channels", [])
all_channels.extend(channels)
cursor = (data.get("response_metadata") or {}).get("next_cursor")
if not cursor:
break
return all_channels
def channel_join(channel_id: str, client: Optional[SlackAPIClient] = None) -> dict:
"""Join a channel by ID. Returns Slack API response."""
if client is None:
client = get_slack_client()
return client.conversations_join(channel_id)
def run_channel_join_check(
bot_token: Optional[str] = None,
*,
client: Optional[SlackAPIClient] = None,
) -> dict:
"""
Find public channels the bot is not in, apply allow/block list policy, and join them.
Returns dict with keys: joined, failed, skipped_policy, error (optional).
"""
if client is None:
token = (bot_token or "").strip() or None
try:
client = get_slack_client(bot_token=token) if token else get_slack_client()
except ValueError:
logger.warning("Channel join check skipped: missing SLACK_BOT_TOKEN")
return {
"joined": [],
"failed": [],
"skipped_policy": [],
"error": "Missing SLACK_BOT_TOKEN",
}
config = _get_channel_join_config()
allowlist = config["allowlist"]
blocklist = config["blocklist"]
public_only = config["public_only"]
types = "public_channel" if public_only else "public_channel,private_channel"
result: dict[str, Any] = {"joined": [], "failed": [], "skipped_policy": []}
try:
channels_to_consider = []
cursor = None
while True:
data = client.conversations_list(
types=types,
exclude_archived=True,
limit=200,
cursor=cursor,
)
if not data.get("ok"):
result["error"] = (
f"conversations.list failed: {data.get('error', 'unknown')}"
)
return result
for ch in data.get("channels", []):
if not ch.get("is_member", False):
channels_to_consider.append(
{"id": ch["id"], "name": ch.get("name", "") or ch["id"]}
)
cursor = (data.get("response_metadata") or {}).get("next_cursor")
if not cursor:
break
to_join = []
for ch in channels_to_consider:
if _channel_matches_policy(ch["id"], ch["name"], allowlist, blocklist):
to_join.append(ch)
else:
result["skipped_policy"].append(ch["id"])
for ch in to_join:
try:
jdata = client.conversations_join(ch["id"])
if jdata.get("ok"):
result["joined"].append(ch["id"])
logger.info("Joined channel: %s (%s)", ch["name"], ch["id"])
else:
result["failed"].append(
{
"channel_id": ch["id"],
"channel_name": ch["name"],
"error": jdata.get("error", "unknown"),
}
)
except Exception as e:
result["failed"].append(
{
"channel_id": ch["id"],
"channel_name": ch["name"],
"error": str(e),
}
)
return result
except Exception as e:
logger.exception("Channel join check failed: %s", e)
result["error"] = str(e)
return result
def start_channel_join_background(
bot_token: Optional[str] = None,
) -> threading.Thread:
"""Start a daemon thread that runs the channel-join check every N minutes."""
def _run_loop():
config = _get_channel_join_config()
interval_seconds = config["interval_minutes"] * 60
first_run_delay = min(60, interval_seconds)
_coordinator.wait(first_run_delay)
if _coordinator.is_stopped():
return
while not _coordinator.is_stopped():
_coordinator.try_run_join_check(bot_token)
waited = 0
while waited < interval_seconds and not _coordinator.is_stopped():
_coordinator.wait(min(60, interval_seconds - waited))
waited += 60
t = threading.Thread(
target=_run_loop,
daemon=True,
name="SlackChannelJoiner",
)
t.start()
return t
def stop_channel_join_background() -> None:
"""Signal the background channel-join thread to stop."""
_coordinator.signal_stop()