-
Notifications
You must be signed in to change notification settings - Fork 10.9k
Expand file tree
/
Copy pathcode.py
More file actions
251 lines (198 loc) · 9.76 KB
/
Copy pathcode.py
File metadata and controls
251 lines (198 loc) · 9.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
#!/usr/bin/env python3
"""
s03_permission.py - Permission System
Three gates inserted before tool execution:
Gate 1: Hard deny list (rm -rf /, sudo, ...)
Gate 2: Rule matching (write outside workspace? destructive cmd?)
Gate 3: User approval (pause and wait for confirmation)
+-------+ +--------+ +--------+ +--------+ +------+
| Tool | -> | Gate 1 | -> | Gate 2 | -> | Gate 3 | -> | Exec |
| call | | deny? | | match? | | allow? | | |
+-------+ +--------+ +--------+ +--------+ +------+
| | | |
v v v v
(normal) (blocked) (ask user) (user says no?)
Only one line added to the agent loop:
if not check_permission(block):
continue
Builds on s02 (multi-tool). Usage:
python s03_permission/code.py
Needs: pip install anthropic python-dotenv + ANTHROPIC_API_KEY in .env
"""
import os, subprocess
from pathlib import Path
try:
import readline
readline.parse_and_bind('set bind-tty-special-chars off')
readline.parse_and_bind('set input-meta on')
readline.parse_and_bind('set output-meta on')
readline.parse_and_bind('set convert-meta off')
except ImportError:
pass
from anthropic import Anthropic
from dotenv import load_dotenv
load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
SYSTEM = f"You are a coding agent at {WORKDIR}. All destructive operations require user approval."
# ═══════════════════════════════════════════════════════════
# FROM s02 (unchanged): Tool Implementations
# ═══════════════════════════════════════════════════════════
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
def run_bash(command: str) -> str:
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"
def run_read(path: str, limit: int | None = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"]
return "\n".join(lines)
except Exception as e:
return f"Error: {e}"
def run_write(path: str, content: str) -> str:
try:
file_path = safe_path(path)
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content)
return f"Wrote {len(content)} bytes to {path}"
except Exception as e:
return f"Error: {e}"
def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
file_path = safe_path(path)
text = file_path.read_text()
if old_text not in text:
return f"Error: text not found in {path}"
file_path.write_text(text.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"
def run_glob(pattern: str) -> str:
import glob as g
try:
results = []
for match in g.glob(pattern, root_dir=WORKDIR):
if (WORKDIR / match).resolve().is_relative_to(WORKDIR):
results.append(match)
return "\n".join(results) if results else "(no matches)"
except Exception as e:
return f"Error: {e}"
# ═══════════════════════════════════════════════════════════
# FROM s02 (unchanged): Tool Definitions & Dispatch
# ═══════════════════════════════════════════════════════════
TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to a file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in a file once.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "glob", "description": "Find files matching a glob pattern.",
"input_schema": {"type": "object", "properties": {"pattern": {"type": "string"}}, "required": ["pattern"]}},
]
TOOL_HANDLERS = {
"bash": run_bash, "read_file": run_read, "write_file": run_write,
"edit_file": run_edit, "glob": run_glob,
}
# ═══════════════════════════════════════════════════════════
# NEW in s03: Three-Gate Permission Pipeline
# ═══════════════════════════════════════════════════════════
# Gate 1: Hard deny list — always forbidden
DENY_LIST = ["rm -rf /", "sudo", "shutdown", "reboot", "mkfs", "dd if=", "> /dev/sda"]
def check_deny_list(command: str) -> str | None:
for pattern in DENY_LIST:
if pattern in command:
return f"Blocked: '{pattern}' is on the deny list"
return None
# Gate 2: Rule matching — context-dependent checks
PERMISSION_RULES = [
{"tools": ["write_file", "edit_file"],
"check": lambda args: not (WORKDIR / args.get("path", "")).resolve().is_relative_to(WORKDIR),
"message": "Writing outside workspace"},
{"tools": ["bash"],
"check": lambda args: any(kw in args.get("command", "") for kw in ["rm ", "> /etc/", "chmod 777"]),
"message": "Potentially destructive command"},
]
def check_rules(tool_name: str, args: dict) -> str | None:
for rule in PERMISSION_RULES:
if tool_name in rule["tools"] and rule["check"](args):
return rule["message"]
return None
# Gate 3: User approval — wait for confirmation after rule match
def ask_user(tool_name: str, args: dict, reason: str) -> str:
print(f"\n\033[33m⚠ {reason}\033[0m")
print(f" Tool: {tool_name}({args})")
choice = input(" Allow? [y/N] ").strip().lower()
return "allow" if choice in ("y", "yes") else "deny"
# Pipeline: all three gates chained
def check_permission(block) -> bool:
if block.name == "bash":
reason = check_deny_list(block.input.get("command", ""))
if reason:
print(f"\n\033[31m⛔ {reason}\033[0m")
return False
reason = check_rules(block.name, block.input)
if reason:
decision = ask_user(block.name, block.input, reason)
if decision == "deny":
return False
return True
# ═══════════════════════════════════════════════════════════
# agent_loop — same as s02, with check_permission() inserted
# ═══════════════════════════════════════════════════════════
def agent_loop(messages: list):
while True:
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type != "tool_use":
continue
print(f"\033[36m> {block.name}\033[0m")
# s03 change: run through permission pipeline before executing
if not check_permission(block):
results.append({"type": "tool_result", "tool_use_id": block.id,
"content": "Permission denied."})
continue
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler else f"Unknown: {block.name}"
print(str(output)[:200])
results.append({"type": "tool_result", "tool_use_id": block.id, "content": output})
messages.append({"role": "user", "content": results})
if __name__ == "__main__":
print("s03: Permission")
print("输入问题,回车发送。输入 q 退出。\n")
history = []
while True:
try:
query = input("\033[36ms03 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history)
for block in history[-1]["content"]:
if getattr(block, "type", None) == "text":
print(block.text)
print()