-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtg_api_guard.py
More file actions
160 lines (134 loc) · 5.68 KB
/
tg_api_guard.py
File metadata and controls
160 lines (134 loc) · 5.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#!/usr/bin/env python3
# @bigd-hook-meta
# name: tg_api_guard
# fires_on: PostToolUse
# relevant_intents: [code, telegram]
# irrelevant_intents: [bigd, pm, docx, x_tweet, git, vps, sync, memory]
# cost_score: 2
# always_fire: false
# Copyright (c) 2026 Nardo (<github-user>). AGPL-3.0 — see LICENSE
"""PostToolUse hook: catch Telegram API misuse patterns.
Catches:
1. Missing RetryAfter / rate-limit handling in send/edit/answer calls
2. Unbounded growth: dicts/lists stored in bot_data/context.chat_data
without any eviction/maxlen
3. Non-atomic file saves (write directly to target, not write-then-rename)
4. TOCTOU: os.path.exists() or Path.exists() check followed by open()/rename()
without a lock (race condition)
5. asyncio.gather without return_exceptions=True (one failure kills all)
Commits this prevents: 429a5dc (8), ff1bd59 (9), 6f65919 (11)
"""
import re
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from hook_base import run_hook
# Telegram send/edit calls — check if RetryAfter is handled nearby
_TG_SEND = re.compile(
r"(send_message|send_photo|send_document|send_audio|edit_message_text|"
r"answer_callback_query|reply_text|reply_photo)\s*\("
)
_RETRY_HANDLING = re.compile(r"RetryAfter|Flood|retry_after|telegram\.error")
# bot_data / context.chat_data assignment
_BOT_DATA_SET = re.compile(
r"(bot_data|chat_data|user_data)\s*\[.*\]\s*="
)
_HAS_MAXLEN = re.compile(r"maxlen|max_size|evict|LRU|deque|OrderedDict")
# Non-atomic file save: open(path, 'w') where path is the final file
_DIRECT_WRITE = re.compile(r"""open\s*\(\s*(\w+)\s*,\s*['"]w['"]\s*\)""")
_ATOMIC_WRITE = re.compile(r"\.tmp|tempfile|_tmp|write_then_rename|atomic")
# TOCTOU: exists() check then open/rename without lock
_EXISTS_CHECK = re.compile(r"\.(exists|is_file)\s*\(\s*\)")
_LOCK_NEARBY = re.compile(r"lock|Lock|acquire|FileLock|flock")
# asyncio.gather without return_exceptions
_GATHER_NO_EXC = re.compile(r"asyncio\.gather\s*\([^)]*\)")
_HAS_RETURN_EXC = re.compile(r"return_exceptions\s*=\s*True")
def _scan(content):
lines = content.splitlines()
warnings = []
# Check for Telegram send calls with no RetryAfter handling in the file
has_tg_send = any(_TG_SEND.search(l) for l in lines)
has_retry = any(_RETRY_HANDLING.search(l) for l in lines)
if has_tg_send and not has_retry:
warnings.append(
" No RetryAfter/TelegramError handling found, but Telegram send calls present. "
"Wrap sends in try/except telegram.error.RetryAfter to survive flood limits."
)
for i, line in enumerate(lines, 1):
stripped = line.strip()
if stripped.startswith("#"):
continue
# bot_data / chat_data assignment without eviction
if _BOT_DATA_SET.search(line):
window = "\n".join(lines[max(0, i-5):min(len(lines), i+5)])
if not _HAS_MAXLEN.search(window):
warnings.append(
f" line ~{i}: `{stripped[:80]}` — "
"bot_data/chat_data grows unbounded. "
"Use collections.deque(maxlen=N) or periodic cleanup."
)
# Direct file write (non-atomic)
if _DIRECT_WRITE.search(line):
window = "\n".join(lines[max(0, i-3):min(len(lines), i+3)])
if not _ATOMIC_WRITE.search(window):
warnings.append(
f" line ~{i}: `{stripped[:80]}` — "
"direct open(path, 'w') is non-atomic (crash = corrupt file). "
"Write to a .tmp file then os.replace() for atomicity."
)
# TOCTOU: exists() check
if _EXISTS_CHECK.search(line):
window = "\n".join(lines[max(0, i-2):min(len(lines), i+6)])
if re.search(r"\bopen\s*\(|\brename\s*\(|\bos\.replace\s*\(", window):
if not _LOCK_NEARBY.search(window):
warnings.append(
f" line ~{i}: `{stripped[:80]}` — "
"TOCTOU: exists() check followed by open()/rename() without a lock. "
"Another process can create/delete the file between check and use."
)
# asyncio.gather without return_exceptions
if _GATHER_NO_EXC.search(line) and not _HAS_RETURN_EXC.search(line):
warnings.append(
f" line ~{i}: `{stripped[:80]}` — "
"asyncio.gather() without return_exceptions=True. "
"One exception cancels all tasks; use return_exceptions=True and check results."
)
return warnings
def check(tool_name, tool_input, _input_data):
if tool_name not in ("Edit", "Write"):
return False
fp = tool_input.get("file_path", "")
if not fp.endswith(".py"):
return False
if "tg_api_guard" in fp:
return False
return True
def action(tool_name, tool_input, _input_data):
if tool_name == "Write":
content = tool_input.get("content", "")
else:
content = tool_input.get("new_string", "")
if not content:
return None
warnings = _scan(content)
if not warnings:
return None
fp = tool_input.get("file_path", "")
return (
f"TG API GUARD: Telegram API anti-patterns in `{Path(fp).name}`.\n"
+ "\n".join(warnings[:5])
)
if __name__ == "__main__":
import io
import json
_raw = sys.stdin.read()
try:
_prompt = json.loads(_raw).get("prompt", "") if _raw else ""
except Exception:
_prompt = ""
from _semantic_router import should_fire
if not should_fire(__file__, _prompt):
print("{}")
sys.exit(0)
sys.stdin = io.StringIO(_raw)
run_hook(check, action, "tg_api_guard")