Skip to content

Commit 9281542

Browse files
committed
修复: 保留 end_turn 后的后台通知
1 parent 5dfe67f commit 9281542

File tree

3 files changed

+217
-17
lines changed

3 files changed

+217
-17
lines changed

agents/s13_background_tasks.py

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ def __init__(self):
9191
self.tasks = {} # task_id -> {status, result, command, started_at}
9292
self._notification_queue = [] # completed task results
9393
self._lock = threading.Lock()
94+
self._condition = threading.Condition(self._lock)
9495

9596
def _record_path(self, task_id: str) -> Path:
9697
return self.dir / f"{task_id}.json"
@@ -156,14 +157,15 @@ def _execute(self, task_id: str, command: str):
156157
self.tasks[task_id]["finished_at"] = time.time()
157158
self.tasks[task_id]["result_preview"] = preview
158159
self._persist_task(task_id)
159-
with self._lock:
160+
with self._condition:
160161
self._notification_queue.append({
161162
"task_id": task_id,
162163
"status": status,
163164
"command": command[:80],
164165
"preview": preview,
165166
"output_file": str(output_path.relative_to(WORKDIR)),
166167
})
168+
self._condition.notify_all()
167169

168170
def check(self, task_id: str = None) -> str:
169171
"""Check status of one task or list all."""
@@ -189,7 +191,22 @@ def check(self, task_id: str = None) -> str:
189191

190192
def drain_notifications(self) -> list:
191193
"""Return and clear all pending completion notifications."""
192-
with self._lock:
194+
with self._condition:
195+
notifs = list(self._notification_queue)
196+
self._notification_queue.clear()
197+
return notifs
198+
199+
def _has_running_tasks_locked(self) -> bool:
200+
return any(task["status"] == "running" for task in self.tasks.values())
201+
202+
def has_running_tasks(self) -> bool:
203+
with self._condition:
204+
return self._has_running_tasks_locked()
205+
206+
def wait_for_notifications(self) -> list:
207+
with self._condition:
208+
while not self._notification_queue and self._has_running_tasks_locked():
209+
self._condition.wait()
193210
notifs = list(self._notification_queue)
194211
self._notification_queue.clear()
195212
return notifs
@@ -286,24 +303,36 @@ def run_edit(path: str, old_text: str, new_text: str) -> str:
286303
]
287304

288305

306+
def inject_background_results(messages: list, notifs: list) -> bool:
307+
if notifs and messages:
308+
notif_text = "\n".join(
309+
f"[bg:{n['task_id']}] {n['status']}: {n.get('preview') or '(no output)'}"
310+
f"{f' (output_file={n['output_file']})' if n.get('output_file') else ''}"
311+
for n in notifs
312+
)
313+
messages.append(
314+
{
315+
"role": "user",
316+
"content": f"<background-results>\n{notif_text}\n</background-results>",
317+
}
318+
)
319+
return True
320+
return False
321+
322+
289323
def agent_loop(messages: list):
290324
while True:
291-
# Drain background notifications and inject as a synthetic user/assistant
292-
# transcript pair before the next model call (teaching demo behavior).
293-
notifs = BG.drain_notifications()
294-
if notifs and messages:
295-
notif_text = "\n".join(
296-
f"[bg:{n['task_id']}] {n['status']}: {n['preview']} "
297-
f"(output_file={n['output_file']})"
298-
for n in notifs
299-
)
300-
messages.append({"role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>"})
325+
inject_background_results(messages, BG.drain_notifications())
301326
response = client.messages.create(
302327
model=MODEL, system=SYSTEM, messages=messages,
303328
tools=TOOLS, max_tokens=8000,
304329
)
305330
messages.append({"role": "assistant", "content": response.content})
306331
if response.stop_reason != "tool_use":
332+
if BG.has_running_tasks() and inject_background_results(
333+
messages, BG.wait_for_notifications()
334+
):
335+
continue
307336
return
308337
results = []
309338
for block in response.content:

agents/s_full.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,21 @@ def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> st
742742
]
743743

744744

745+
def inject_background_results(messages: list, notifs: list) -> bool:
746+
if notifs:
747+
txt = "\n".join(
748+
f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
749+
)
750+
messages.append(
751+
{
752+
"role": "user",
753+
"content": f"<background-results>\n{txt}\n</background-results>",
754+
}
755+
)
756+
return True
757+
return False
758+
759+
745760
# === SECTION: agent_loop ===
746761
def agent_loop(messages: list):
747762
rounds_without_todo = 0
@@ -752,11 +767,7 @@ def agent_loop(messages: list):
752767
print("[auto-compact triggered]")
753768
messages[:] = auto_compact(messages)
754769
# s08: drain background notifications
755-
notifs = BG.drain()
756-
if notifs:
757-
txt = "\n".join(f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs)
758-
messages.append({"role": "user", "content": f"<background-results>\n{txt}\n</background-results>"})
759-
messages.append({"role": "assistant", "content": "Noted background results."})
770+
inject_background_results(messages, BG.drain())
760771
# s10: check lead inbox
761772
inbox = BUS.read_inbox("lead")
762773
if inbox:
@@ -769,6 +780,10 @@ def agent_loop(messages: list):
769780
)
770781
messages.append({"role": "assistant", "content": response.content})
771782
if response.stop_reason != "tool_use":
783+
if BG.has_running_tasks() and inject_background_results(
784+
messages, BG.wait_for_notifications()
785+
):
786+
continue
772787
return
773788
# Tool execution
774789
results = []
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
import os
2+
import sys
3+
import types
4+
import unittest
5+
from pathlib import Path
6+
from types import SimpleNamespace
7+
8+
9+
REPO_ROOT = Path(__file__).resolve().parents[1]
10+
if str(REPO_ROOT) not in sys.path:
11+
sys.path.insert(0, str(REPO_ROOT))
12+
13+
os.environ.setdefault("MODEL_ID", "test-model")
14+
15+
fake_anthropic = types.ModuleType("anthropic")
16+
17+
18+
class FakeAnthropic:
19+
def __init__(self, *args, **kwargs):
20+
self.messages = SimpleNamespace(create=None)
21+
22+
23+
setattr(fake_anthropic, "Anthropic", FakeAnthropic)
24+
sys.modules.setdefault("anthropic", fake_anthropic)
25+
26+
fake_dotenv = types.ModuleType("dotenv")
27+
setattr(fake_dotenv, "load_dotenv", lambda *args, **kwargs: None)
28+
sys.modules.setdefault("dotenv", fake_dotenv)
29+
30+
import agents.s13_background_tasks as s13_background_tasks
31+
import agents.s_full as s_full
32+
33+
34+
class FakeMessagesAPI:
35+
def __init__(self, responses):
36+
self._responses = iter(responses)
37+
self.call_count = 0
38+
39+
def create(self, **kwargs):
40+
self.call_count += 1
41+
return next(self._responses)
42+
43+
44+
class FakeS13BackgroundManager:
45+
def __init__(self):
46+
self._running = True
47+
self.wait_called = False
48+
49+
def drain_notifications(self):
50+
return []
51+
52+
def has_running_tasks(self):
53+
return self._running
54+
55+
def wait_for_notifications(self):
56+
self.wait_called = True
57+
self._running = False
58+
return [
59+
{
60+
"task_id": "bg-1",
61+
"status": "completed",
62+
"preview": "done",
63+
"output_file": ".runtime-tasks/bg-1.log",
64+
}
65+
]
66+
67+
68+
class FakeSFullBackgroundManager:
69+
def __init__(self):
70+
self._running = True
71+
self.wait_called = False
72+
73+
def drain(self):
74+
return []
75+
76+
def has_running_tasks(self):
77+
return self._running
78+
79+
def wait_for_notifications(self):
80+
self.wait_called = True
81+
self._running = False
82+
return [{"task_id": "bg-1", "status": "completed", "result": "done"}]
83+
84+
85+
class BackgroundNotificationTests(unittest.TestCase):
86+
def test_s13_agent_loop_waits_for_background_results_after_end_turn(self):
87+
messages = [{"role": "user", "content": "Run tests in the background"}]
88+
fake_bg = FakeS13BackgroundManager()
89+
fake_api = FakeMessagesAPI(
90+
[
91+
SimpleNamespace(
92+
stop_reason="end_turn", content="Started background work."
93+
),
94+
SimpleNamespace(
95+
stop_reason="end_turn", content="Background work completed."
96+
),
97+
]
98+
)
99+
original_bg = s13_background_tasks.BG
100+
original_client = s13_background_tasks.client
101+
try:
102+
s13_background_tasks.BG = fake_bg
103+
s13_background_tasks.client = SimpleNamespace(messages=fake_api)
104+
s13_background_tasks.agent_loop(messages)
105+
finally:
106+
s13_background_tasks.BG = original_bg
107+
s13_background_tasks.client = original_client
108+
109+
self.assertTrue(fake_bg.wait_called)
110+
self.assertEqual(fake_api.call_count, 2)
111+
self.assertTrue(
112+
any(
113+
message["role"] == "user"
114+
and isinstance(message["content"], str)
115+
and "<background-results>" in message["content"]
116+
for message in messages
117+
)
118+
)
119+
120+
def test_s_full_agent_loop_waits_for_background_results_after_end_turn(self):
121+
messages = [{"role": "user", "content": "Run tests in the background"}]
122+
fake_bg = FakeSFullBackgroundManager()
123+
fake_api = FakeMessagesAPI(
124+
[
125+
SimpleNamespace(
126+
stop_reason="end_turn", content="Started background work."
127+
),
128+
SimpleNamespace(
129+
stop_reason="end_turn", content="Background work completed."
130+
),
131+
]
132+
)
133+
original_bg = s_full.BG
134+
original_client = s_full.client
135+
try:
136+
s_full.BG = fake_bg
137+
s_full.client = SimpleNamespace(messages=fake_api)
138+
s_full.agent_loop(messages)
139+
finally:
140+
s_full.BG = original_bg
141+
s_full.client = original_client
142+
143+
self.assertTrue(fake_bg.wait_called)
144+
self.assertEqual(fake_api.call_count, 2)
145+
self.assertTrue(
146+
any(
147+
message["role"] == "user"
148+
and isinstance(message["content"], str)
149+
and "<background-results>" in message["content"]
150+
for message in messages
151+
)
152+
)
153+
154+
155+
if __name__ == "__main__":
156+
unittest.main()

0 commit comments

Comments
 (0)