forked from shareAI-lab/learn-claude-code
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcode.py
More file actions
616 lines (522 loc) · 24.6 KB
/
code.py
File metadata and controls
616 lines (522 loc) · 24.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
#!/usr/bin/env python3
"""
s09_memory.py - Memory System
Persistent, cross-session knowledge for the coding agent.
Storage:
.memory/
MEMORY.md ← index (one line per memory, ≤200 lines)
feedback_tabs.md ← individual memory files (Markdown + YAML frontmatter)
user_profile.md
project_facts.md
Flow in agent_loop:
1. Load MEMORY.md index into SYSTEM prompt (cheap, always present)
2. Select relevant memories by filename/description → inject content
3. Run compression pipeline from s08
4. After each turn ends → extract new memories from original messages
5. Periodically consolidate (Dream)
Builds on s08 (context compact). Usage:
python s09_memory/code.py
Needs: pip install anthropic python-dotenv + ANTHROPIC_API_KEY in .env
"""
import os, subprocess, json, time, re
from pathlib import Path
try:
import readline
readline.parse_and_bind('set bind-tty-special-chars off')
except ImportError:
pass
from anthropic import Anthropic
from dotenv import load_dotenv
load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"): os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
WORKDIR = Path.cwd()
MEMORY_DIR = WORKDIR / ".memory"; MEMORY_DIR.mkdir(exist_ok=True)
MEMORY_INDEX = MEMORY_DIR / "MEMORY.md"
SKILLS_DIR = WORKDIR / "skills"
TRANSCRIPT_DIR = WORKDIR / ".transcripts"
TOOL_RESULTS_DIR = WORKDIR / ".task_outputs" / "tool-results"
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
# ═══════════════════════════════════════════════════════════
# NEW in s09: Memory System
# ═══════════════════════════════════════════════════════════
MEMORY_TYPES = ["user", "feedback", "project", "reference"]
def _parse_frontmatter(text: str) -> tuple[dict, str]:
if not text.startswith("---"):
return {}, text
parts = text.split("---", 2)
if len(parts) < 3:
return {}, text
meta = {}
for line in parts[1].strip().splitlines():
if ":" in line:
k, v = line.split(":", 1)
meta[k.strip()] = v.strip().strip('"').strip("'")
return meta, parts[2].strip()
def write_memory_file(name: str, mem_type: str, description: str, body: str):
"""Write a single memory file with YAML frontmatter."""
slug = name.lower().replace(" ", "-").replace("/", "-")
filename = f"{slug}.md"
filepath = MEMORY_DIR / filename
filepath.write_text(
f"---\nname: {name}\ndescription: {description}\ntype: {mem_type}\n---\n\n{body}\n"
)
_rebuild_index()
return filepath
def _rebuild_index():
"""Rebuild MEMORY.md index from all memory files."""
lines = []
for f in sorted(MEMORY_DIR.glob("*.md")):
if f.name == "MEMORY.md":
continue
raw = f.read_text()
meta, body = _parse_frontmatter(raw)
name = meta.get("name", f.stem)
desc = meta.get("description", body.split("\n")[0][:80])
lines.append(f"- [{name}]({f.name}) — {desc}")
MEMORY_INDEX.write_text("\n".join(lines) + "\n" if lines else "")
def read_memory_index() -> str:
"""Read MEMORY.md index (injected into SYSTEM every turn)."""
if not MEMORY_INDEX.exists():
return ""
text = MEMORY_INDEX.read_text().strip()
return text if text else ""
def read_memory_file(filename: str) -> str | None:
"""Read a single memory file's full content."""
path = MEMORY_DIR / filename
if not path.exists():
return None
return path.read_text()
def list_memory_files() -> list[dict]:
"""List all memory files with metadata."""
result = []
for f in sorted(MEMORY_DIR.glob("*.md")):
if f.name == "MEMORY.md":
continue
raw = f.read_text()
meta, body = _parse_frontmatter(raw)
result.append({
"filename": f.name,
"name": meta.get("name", f.stem),
"description": meta.get("description", ""),
"type": meta.get("type", "user"),
"body": body,
})
return result
def select_relevant_memories(messages: list, max_items: int = 5) -> list[str]:
"""Select relevant memory filenames by matching recent conversation against
memory names/descriptions. Uses a simple LLM call (or falls back to keyword
matching on name+description)."""
files = list_memory_files()
if not files:
return []
# Collect recent user text for context
recent_texts = []
for msg in reversed(messages):
if msg.get("role") == "user":
content = msg.get("content", "")
if isinstance(content, list):
content = " ".join(
str(getattr(b, "text", "")) for b in content
if getattr(b, "type", None) == "text"
)
if isinstance(content, str):
recent_texts.append(content)
if len(recent_texts) >= 3:
break
recent = " ".join(reversed(recent_texts))[:2000]
if not recent.strip():
return []
# Build catalog of name + description for LLM to choose from
catalog_lines = []
for i, f in enumerate(files):
catalog_lines.append(f"{i}: {f['name']} — {f['description']}")
catalog = "\n".join(catalog_lines)
prompt = (
"Given the recent conversation and the memory catalog below, "
"select the indices of memories that are clearly relevant. "
"Return ONLY a JSON array of integers, e.g. [0, 3]. "
"If none are relevant, return [].\n\n"
f"Recent conversation:\n{recent}\n\n"
f"Memory catalog:\n{catalog}"
)
try:
response = client.messages.create(
model=MODEL,
messages=[{"role": "user", "content": prompt}],
max_tokens=200,
)
text = response.content[0].text.strip()
# Extract JSON array from response
match = re.search(r'\[.*?\]', text, re.DOTALL)
if match:
indices = json.loads(match.group())
selected = []
for idx in indices:
if isinstance(idx, int) and 0 <= idx < len(files):
selected.append(files[idx]["filename"])
if len(selected) >= max_items:
break
return selected
except Exception:
pass
# Fallback: keyword matching on name + description
keywords = [w.lower() for w in recent.split() if len(w) > 3]
selected = []
for f in files:
text = (f["name"] + " " + f["description"]).lower()
if any(kw in text for kw in keywords):
selected.append(f["filename"])
if len(selected) >= max_items:
break
return selected
def load_memories(messages: list) -> str:
"""Load relevant memory content for injection into context."""
selected_files = select_relevant_memories(messages)
if not selected_files:
return ""
parts = ["<relevant_memories>"]
for filename in selected_files:
content = read_memory_file(filename)
if content:
parts.append(content)
parts.append("</relevant_memories>")
return "\n\n".join(parts)
def extract_memories(messages: list):
"""Extract new memories from recent dialogue. Runs after each turn."""
# Collect recent conversation text
dialogue_parts = []
for msg in messages[-10:]:
role = msg.get("role", "?")
content = msg.get("content", "")
if isinstance(content, list):
content = " ".join(
str(getattr(b, "text", "")) for b in content
if getattr(b, "type", None) == "text"
)
if isinstance(content, str) and content.strip():
dialogue_parts.append(f"{role}: {content}")
dialogue = "\n".join(dialogue_parts)
if not dialogue.strip():
return
# Check existing memories to avoid duplicates
existing = list_memory_files()
existing_desc = "\n".join(f"- {m['name']}: {m['description']}" for m in existing) if existing else "(none)"
prompt = (
"Extract user preferences, constraints, or project facts from this dialogue.\n"
"Return a JSON array. Each item: {name, type, description, body}.\n"
"- name: short kebab-case identifier (e.g. 'user-preference-tabs')\n"
"- type: one of 'user' (user preference), 'feedback' (guidance), "
"'project' (project fact), 'reference' (external pointer)\n"
"- description: one-line summary for index lookup\n"
"- body: full detail in markdown\n"
"If nothing new or already covered by existing memories, return [].\n\n"
f"Existing memories:\n{existing_desc}\n\n"
f"Dialogue:\n{dialogue[:4000]}"
)
try:
response = client.messages.create(
model=MODEL, messages=[{"role": "user", "content": prompt}], max_tokens=800
)
text = response.content[0].text.strip()
# Extract JSON array from response
match = re.search(r'\[.*\]', text, re.DOTALL)
if not match:
return
items = json.loads(match.group())
if not items:
return
count = 0
for mem in items:
name = mem.get("name", f"memory_{int(time.time())}")
mem_type = mem.get("type", "user")
desc = mem.get("description", "")
body = mem.get("body", "")
if desc and body:
write_memory_file(name, mem_type, desc, body)
count += 1
if count:
print(f"\n\033[33m[Memory: extracted {count} new memories]\033[0m")
except Exception:
pass
CONSOLIDATE_THRESHOLD = 10
def consolidate_memories():
"""Merge duplicate/stale memories. Triggered when file count ≥ threshold."""
files = list_memory_files()
if len(files) < CONSOLIDATE_THRESHOLD:
return
catalog = "\n\n".join(
f"## {f['filename']}\nname: {f['name']}\ndescription: {f['description']}\n{f['body']}"
for f in files
)
prompt = (
"Consolidate the following memory files. Rules:\n"
"1. Merge duplicates into one\n"
"2. Remove outdated/contradicted memories\n"
"3. Keep the total under 30 memories\n"
"4. Preserve important user preferences above all\n"
"Return a JSON array. Each item: {name, type, description, body}.\n\n"
f"{catalog[:16000]}"
)
try:
response = client.messages.create(
model=MODEL, messages=[{"role": "user", "content": prompt}], max_tokens=3000
)
text = response.content[0].text.strip()
match = re.search(r'\[.*\]', text, re.DOTALL)
if not match:
return
items = json.loads(match.group())
# Remove old memory files (keep MEMORY.md)
for f in MEMORY_DIR.glob("*.md"):
if f.name != "MEMORY.md":
f.unlink()
for mem in items:
name = mem.get("name", f"memory_{int(time.time())}")
mem_type = mem.get("type", "user")
desc = mem.get("description", "")
body = mem.get("body", "")
if desc and body:
write_memory_file(name, mem_type, desc, body)
print(f"\n\033[33m[Memory: consolidated {len(files)} → {len(items)} memories]\033[0m")
except Exception:
pass
# Build SYSTEM with memory index
def build_system() -> str:
index = read_memory_index()
memories_section = f"\n\nMemories available:\n{index}" if index else ""
return (
f"You are a coding agent at {WORKDIR}."
f"{memories_section}\n"
"Relevant memories are injected below. Respect user preferences from memory.\n"
"When the user says 'remember' or expresses a clear preference, extract it as a memory."
)
SYSTEM = build_system()
SUB_SYSTEM = (
f"You are a coding agent at {WORKDIR}. "
"Complete the task you were given, then return a concise summary. "
"Do not delegate further."
)
# ═══════════════════════════════════════════════════════════
# FROM s02-s08 (skeleton): Basic tools
# ═══════════════════════════════════════════════════════════
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR): raise ValueError(f"Path escapes workspace: {p}")
return path
def run_bash(command: str) -> str:
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR, capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired: return "Error: Timeout (120s)"
def run_read(path: str, limit: int | None = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines): lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"]
return "\n".join(lines)
except Exception as e: return f"Error: {e}"
def run_write(path: str, content: str) -> str:
try:
file_path = safe_path(path); file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content); return f"Wrote {len(content)} bytes to {path}"
except Exception as e: return f"Error: {e}"
def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
file_path = safe_path(path)
text = file_path.read_text()
if old_text not in text: return f"Error: text not found in {path}"
file_path.write_text(text.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e: return f"Error: {e}"
def run_glob(pattern: str) -> str:
import glob as g
try:
results = []
for match in g.glob(pattern, root_dir=WORKDIR):
if (WORKDIR / match).resolve().is_relative_to(WORKDIR):
results.append(match)
return "\n".join(results) if results else "(no matches)"
except Exception as e: return f"Error: {e}"
def extract_text(content) -> str:
if not isinstance(content, list): return str(content)
return "\n".join(getattr(b, "text", "") for b in content if getattr(b, "type", None) == "text")
# Subagent (simplified from s06-s07)
SUB_TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to a file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
]
SUB_HANDLERS = {"bash": run_bash, "read_file": run_read, "write_file": run_write}
def spawn_subagent(task: str) -> str:
print(f"\n\033[35m[Subagent spawned]\033[0m")
messages = [{"role": "user", "content": task}]
for _ in range(30):
response = client.messages.create(model=MODEL, system=SUB_SYSTEM,
messages=messages, tools=SUB_TOOLS, max_tokens=8000)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use": break
results = []
for block in response.content:
if block.type == "tool_use":
handler = SUB_HANDLERS.get(block.name)
output = handler(**block.input) if handler else f"Unknown: {block.name}"
print(f" \033[90m[sub] {block.name}: {str(output)[:100]}\033[0m")
results.append({"type": "tool_result", "tool_use_id": block.id, "content": output})
messages.append({"role": "user", "content": results})
result = extract_text(messages[-1]["content"])
if not result:
for msg in reversed(messages):
if msg["role"] == "assistant":
result = extract_text(msg["content"])
if result: break
if not result: result = "Subagent stopped after 30 turns without final answer."
print(f"\033[35m[Subagent done]\033[0m")
return result
# ═══════════════════════════════════════════════════════════
# FROM s08 (skeleton): Compaction pipeline
# ═══════════════════════════════════════════════════════════
CONTEXT_LIMIT = 50000; KEEP_RECENT = 3; PERSIST_THRESHOLD = 30000
def estimate_size(msgs): return len(str(msgs))
def snip_compact(msgs, mx=50):
if len(msgs) <= mx: return msgs
return msgs[:3] + [{"role": "user", "content": f"[snipped {len(msgs)-mx} msgs]"}] + msgs[-(mx-3):]
def collect_tool_results(msgs):
blocks = []
for mi, msg in enumerate(msgs):
if msg.get("role") != "user" or not isinstance(msg.get("content"), list): continue
for bi, block in enumerate(msg["content"]):
if isinstance(block, dict) and block.get("type") == "tool_result": blocks.append((mi, bi, block))
return blocks
def micro_compact(msgs):
tr = collect_tool_results(msgs)
if len(tr) <= KEEP_RECENT: return msgs
for _, _, b in tr[:-KEEP_RECENT]:
if len(b.get("content", "")) > 120: b["content"] = "[Earlier tool result compacted.]"
return msgs
def persist_large(tid, out):
if len(out) <= PERSIST_THRESHOLD: return out
TOOL_RESULTS_DIR.mkdir(parents=True, exist_ok=True)
p = TOOL_RESULTS_DIR / f"{tid}.txt"
if not p.exists(): p.write_text(out)
return f"<persisted-output>\nFull: {p}\nPreview:\n{out[:2000]}\n</persisted-output>"
def tool_result_budget(msgs, mx=200_000):
last = msgs[-1] if msgs else None
if not last or last.get("role") != "user" or not isinstance(last.get("content"), list): return msgs
blocks = [(i, b) for i, b in enumerate(last["content"]) if isinstance(b, dict) and b.get("type") == "tool_result"]
total = sum(len(str(b.get("content", ""))) for _, b in blocks)
if total <= mx: return msgs
for _, block in sorted(blocks, key=lambda p: len(str(p[1].get("content", ""))), reverse=True):
if total <= mx: break
c = str(block.get("content", ""))
if len(c) <= PERSIST_THRESHOLD: continue
block["content"] = persist_large(block.get("tool_use_id", "?"), c)
total = sum(len(str(b.get("content", ""))) for _, b in blocks)
return msgs
def write_transcript(msgs):
TRANSCRIPT_DIR.mkdir(parents=True, exist_ok=True)
p = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
with p.open("w") as f:
for m in msgs: f.write(json.dumps(m, default=str) + "\n")
return p
def summarize_history(msgs):
conv = json.dumps(msgs, default=str)[:80000]
r = client.messages.create(model=MODEL, messages=[{"role": "user", "content":
"Summarize this coding-agent conversation so work can continue.\n"
"Preserve: 1. current goal, 2. key findings, 3. files changed, 4. remaining work, 5. user constraints.\n\n" + conv}],
max_tokens=2000)
return r.content[0].text.strip()
def compact_history(msgs):
write_transcript(msgs)
summary = summarize_history(msgs)
return [{"role": "user", "content": f"[Compacted]\n\n{summary}"}]
def reactive_compact(msgs):
write_transcript(msgs)
summary = summarize_history(msgs)
return [{"role": "user", "content": f"[Reactive compact]\n\n{summary}"}, *msgs[-5:]]
# ═══════════════════════════════════════════════════════════
# Tool Definitions (skeleton — fewer tools to focus on memory)
# ═══════════════════════════════════════════════════════════
TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to a file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in a file once.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "glob", "description": "Find files matching a glob pattern.",
"input_schema": {"type": "object", "properties": {"pattern": {"type": "string"}}, "required": ["pattern"]}},
{"name": "task", "description": "Launch a subagent to handle a subtask.",
"input_schema": {"type": "object", "properties": {"description": {"type": "string"}}, "required": ["description"]}},
]
TOOL_HANDLERS = {
"bash": run_bash, "read_file": run_read, "write_file": run_write,
"edit_file": run_edit, "glob": run_glob, "task": spawn_subagent,
}
# ═══════════════════════════════════════════════════════════
# agent_loop — s09: inject memories + extract after each turn
# ═══════════════════════════════════════════════════════════
MAX_REACTIVE_RETRIES = 1
def agent_loop(messages: list):
reactive_retries = 0
while True:
# s09: rebuild system with current memory index + relevant memories
system = build_system()
memories_content = load_memories(messages)
if memories_content:
system += "\n\n" + memories_content
# s09: save pre-compression snapshot for accurate memory extraction
pre_compress = [m if isinstance(m, dict) else {"role": m.get("role",""),
"content": str(m.get("content",""))} for m in messages]
# s08: compression pipeline (budget → snip → micro)
messages[:] = tool_result_budget(messages)
messages[:] = snip_compact(messages)
messages[:] = micro_compact(messages)
if estimate_size(messages) > CONTEXT_LIMIT:
print("[auto compact]")
messages[:] = compact_history(messages)
try:
response = client.messages.create(
model=MODEL, system=system, messages=messages, tools=TOOLS, max_tokens=8000
)
reactive_retries = 0
except Exception as e:
if ("prompt_too_long" in str(e).lower() or "too many tokens" in str(e).lower()) and reactive_retries < MAX_REACTIVE_RETRIES:
print("[reactive compact]")
messages[:] = reactive_compact(messages)
reactive_retries += 1
continue
raise
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
# s09: extract from pre-compression snapshot for full fidelity
extract_memories(pre_compress)
consolidate_memories()
return
results = []
for block in response.content:
if block.type != "tool_use": continue
print(f"\033[36m> {block.name}\033[0m")
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler else f"Unknown: {block.name}"
print(str(output)[:200])
results.append({"type": "tool_result", "tool_use_id": block.id, "content": output})
messages.append({"role": "user", "content": results})
if __name__ == "__main__":
print("s09: Memory — persistent cross-session knowledge")
print("输入问题,回车发送。输入 q 退出。\n")
history = []
while True:
try: query = input("\033[36ms09 >> \033[0m")
except (EOFError, KeyboardInterrupt): break
if query.strip().lower() in ("q", "exit", ""): break
history.append({"role": "user", "content": query})
agent_loop(history)
for block in history[-1]["content"]:
if getattr(block, "type", None) == "text": print(block.text)
print()