-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_semantic_router.py
More file actions
350 lines (303 loc) · 12.4 KB
/
_semantic_router.py
File metadata and controls
350 lines (303 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
#!/usr/bin/env python3
"""
_semantic_router.py — FP-11 semantic hook metadata + router.
Usage (inside any hook that wants routing):
import sys, os, json
sys.path.insert(0, os.path.dirname(__file__))
from _semantic_router import should_fire, classify_prompt
hook_input = json.load(sys.stdin)
prompt = hook_input.get("prompt", "")
if not should_fire(__file__, prompt):
print("{}")
sys.exit(0)
Router reads the @bigd-hook-meta YAML block from the hook file itself.
Rule-based keyword classifier — NO LLM. Per CLAUDE.md hard rule.
Log: ~/.claude/hooks/.router_log.jsonl
"""
from __future__ import annotations
import json
import os
import re
import time
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Intent -> keyword map (rule-based, no LLM)
# ---------------------------------------------------------------------------
# Each intent has a list of keyword patterns. Any match = intent is active.
# Patterns are lowercased substring checks (fast). Regex only when needed.
INTENT_KEYWORDS: dict[str, list[str]] = {
"bigd": [
"bigd", "big-d", "bigsystemd", "inbox", "brief", "briefs",
"triage", "pipeline", "daemon", "bundle", "approval queue",
"approve", "defer", "skip", "ack",
],
"pm": [
"kalshi", "polymarket", "manifold", "prediction market",
"pm bot", "hel", "london", "vultr", "pm-london",
"trading", "trade", "bet", "market maker", "mm ", "fill",
"position", "portfolio", "odds", "orderbook",
],
"telegram": [
"telegram", "tg ", "bot message", "telegram bot", "tg bot",
"speak_hook", "memo display", "story memo",
],
"docx": [
"word doc", "docx", ".docx", "word document", "microsoft word",
"write a doc", "export to word",
],
"git": [
"git ", "commit", "push", "pull request", "branch", "merge",
"rebase", "stash", "clone", "repo", "repository", "github",
"gitignore", "git log", "git diff",
],
"code": [
"fix the", "debug", "refactor", "implement", "add feature",
"typescript", "python", "javascript", "function", "class",
"import", "module", "error in", "bug in", "crash", "exception",
"test", "unit test", "lint", "tsc", "compile", "build",
],
"meta": [
"claude.md", "hook", "agent", "skill ", "memory", "nardoworld",
"rules", "settings.json", "strict-execute", "strict-plan",
"/ship", "ship phase", "subagent",
],
"debug": [
"debug", "error:", "traceback", "exception", "stack trace",
"not working", "broken", "fails", "failure", "crash",
"502", "503", "500 error", "timeout", "connection refused",
],
"x_tweet": [
"tweet", "x thread", "post on x", "twitter", "@nardovibecoding",
"280 chars", "post this",
],
"vps": [
"vps", "ssh", "server", "hel ", "london ", "vultr", "systemd",
"launchctl", "service", "deploy", "rsync",
],
"memory": [
"remember", "recall", "memory", "nardoworld", "wiki",
"hub node", "graph", "article", "note", "lesson",
],
"sync": [
"sync", "rsync", "git pull", "git push", "vps sync",
"pm sync", "deploy",
],
"recall": [
"are we using", "do we have", "do we use", "did we",
"have we tried", "have we used", "have we installed", "have we set up",
"remember when", "do you remember", "didn't we", "wasn't that",
"weren't we", "last time we", "before we", "previously",
"already have", "already using", "already installed", "already set up",
"already tried", "we using", "we have", "using this already",
"have this already", "is x wired", "are we shipped", "are we running",
"what's our", "how did we", "where did we",
],
"debug_round": [
"round 1", "round 2", "round 3", "round 4", "round 5", "round n",
"already tried", "already ruled out", "already tested", "already debugged",
"already proved", "already disproved", "previously tried", "previously tested",
"tried that", "tried already", "tried before", "n>1 rounds",
"fresh eyes", "deeper audit", "mystery", "still broken",
"tried again", "tried this already",
],
"concept_search": [
"you told me", "i told you", "we discussed", "yesterday you",
"yesterday we", "we have a doc", "i remember you said",
"you said yesterday", "remember i said", "remember you",
"u told me", "u said", "earlier you", "earlier we",
"i remmebner", "i remmeber", "i rmemeber",
],
}
# ---------------------------------------------------------------------------
# Parser: extract @bigd-hook-meta block from hook file
# ---------------------------------------------------------------------------
_META_CACHE: dict[str, dict] = {}
def _parse_hook_meta(hook_path: str) -> dict:
"""
Parse the @bigd-hook-meta YAML comment block from a hook file.
Returns dict with keys: name, fires_on, relevant_intents,
irrelevant_intents, cost_score, always_fire.
Returns {} if no meta block found (back-compat: always fire).
"""
if hook_path in _META_CACHE:
return _META_CACHE[hook_path]
try:
text = Path(hook_path).read_text(encoding="utf-8", errors="replace")
except OSError:
_META_CACHE[hook_path] = {}
return {}
# Find the meta block: lines starting with # that include @bigd-hook-meta
lines = text.splitlines()
meta_start = -1
for i, line in enumerate(lines[:30]): # only scan first 30 lines
if "@bigd-hook-meta" in line:
meta_start = i
break
if meta_start == -1:
_META_CACHE[hook_path] = {}
return {}
# Collect comment lines after meta_start until non-comment line
meta_lines = []
for line in lines[meta_start + 1 : meta_start + 20]:
stripped = line.strip()
if stripped.startswith("#"):
meta_lines.append(stripped[1:].strip())
else:
break
# Parse simple key: value and key: [a, b, c] YAML subset
meta: dict = {}
for line in meta_lines:
if ":" not in line:
continue
key, _, raw_val = line.partition(":")
key = key.strip()
val = raw_val.split("#")[0].strip() # strip inline comments
if val.startswith("[") and val.endswith("]"):
# List: [a, b, c]
items = [x.strip().strip('"').strip("'") for x in val[1:-1].split(",")]
meta[key] = [x for x in items if x]
elif val.lower() == "true":
meta[key] = True
elif val.lower() == "false":
meta[key] = False
else:
try:
meta[key] = int(val)
except ValueError:
meta[key] = val
_META_CACHE[hook_path] = meta
return meta
# ---------------------------------------------------------------------------
# Classifier: prompt -> set of intent tags
# ---------------------------------------------------------------------------
def classify_prompt(prompt: str) -> set[str]:
"""
Classify a prompt into a set of intent tags using keyword lookup.
Returns set of matching intents. Returns {"general"} if nothing matches.
Rule-based only — no LLM.
"""
if not prompt:
return {"general"}
prompt_lower = prompt.lower()
matched: set[str] = set()
for intent, keywords in INTENT_KEYWORDS.items():
for kw in keywords:
if kw in prompt_lower:
matched.add(intent)
break # one match per intent is enough
return matched if matched else {"general"}
# ---------------------------------------------------------------------------
# Router decision
# ---------------------------------------------------------------------------
def should_fire(hook_path: str, prompt: str) -> bool:
"""
Decide whether this hook should fire for the given prompt.
Rules:
1. No meta block -> always fire (back-compat).
2. always_fire: true -> always fire.
3. relevant_intents present -> fire if ANY classified intent is in the list.
4. irrelevant_intents present -> skip if ALL classified intents are in that list.
5. Neither list -> always fire.
Logs decision to .router_log.jsonl.
"""
meta = _parse_hook_meta(hook_path)
hook_name = os.path.basename(hook_path)
if not meta:
# No metadata -> back-compat, always fire (no log to keep noise down)
return True
if meta.get("always_fire", False):
_log_decision(hook_name, prompt, set(), "fire", "always_fire=true")
return True
intents = classify_prompt(prompt)
relevant = set(meta.get("relevant_intents", []))
irrelevant = set(meta.get("irrelevant_intents", []))
if relevant:
# Fire if ANY classified intent overlaps with relevant_intents
if intents & relevant:
_log_decision(hook_name, prompt, intents, "fire",
f"matched relevant: {intents & relevant}")
return True
# No relevant match -> skip (unless no irrelevant_intents either)
_log_decision(hook_name, prompt, intents, "skip",
f"no relevant match (need {relevant}, got {intents})")
return False
if irrelevant:
# Skip only if ALL intents are in the irrelevant set
if intents and intents.issubset(irrelevant):
_log_decision(hook_name, prompt, intents, "skip",
f"all intents in irrelevant: {intents}")
return False
_log_decision(hook_name, prompt, intents, "fire",
f"not all intents irrelevant (got {intents})")
return True
# No relevant or irrelevant list -> always fire
return True
# ---------------------------------------------------------------------------
# Log
# ---------------------------------------------------------------------------
LOG_PATH = Path(__file__).parent / ".router_log.jsonl"
_LOG_COUNTER_PATH = Path(__file__).parent / ".router_log_counter"
_LOG_TRIM_CHECK_EVERY = 100 # check line count every N writes
_LOG_TRIM_THRESHOLD = 10000 # trim when file exceeds this many lines
_LOG_TRIM_KEEP = 5000 # keep last N lines after trim
_LOG_FD: Optional[int] = None
_log_write_count: int = 0
def _maybe_trim_router_log() -> None:
"""
Trim .router_log.jsonl to last _LOG_TRIM_KEEP lines when it exceeds
_LOG_TRIM_THRESHOLD lines. Called every _LOG_TRIM_CHECK_EVERY writes.
Silent on any error.
"""
global _LOG_FD
try:
if not LOG_PATH.exists():
return
lines = LOG_PATH.read_bytes().split(b"\n")
# Remove trailing empty element from final newline
if lines and lines[-1] == b"":
lines = lines[:-1]
if len(lines) <= _LOG_TRIM_THRESHOLD:
return
kept = lines[-_LOG_TRIM_KEEP:]
# Close fd before truncating
if _LOG_FD is not None:
try:
os.close(_LOG_FD)
except OSError:
pass
_LOG_FD = None
LOG_PATH.write_bytes(b"\n".join(kept) + b"\n")
except Exception:
pass # always silent
def _log_decision(hook_name: str, prompt: str, intents: set,
decision: str, reason: str) -> None:
"""Append one log row to .router_log.jsonl. Silent on failure."""
global _LOG_FD, _log_write_count
try:
row = {
"ts": time.time(),
"hook": hook_name,
"prompt_head": prompt[:80],
"intents": sorted(intents),
"decision": decision,
"reason": reason,
}
line = json.dumps(row) + "\n"
if _LOG_FD is None:
_LOG_FD = os.open(str(LOG_PATH), os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o644)
os.write(_LOG_FD, line.encode("utf-8"))
_log_write_count += 1
if _log_write_count % _LOG_TRIM_CHECK_EVERY == 0:
_maybe_trim_router_log()
except Exception:
pass # always silent
# ---------------------------------------------------------------------------
# Standalone: print classification for a prompt (for testing)
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import sys
prompt_arg = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else ""
result = classify_prompt(prompt_arg)
print(f"prompt: {prompt_arg!r}")
print(f"intents: {sorted(result)}")