Skip to content

Commit 7d53479

Browse files
AstrBot Localclaude
andcommitted
feat: refactor agent runner, enhance follow-up, skill manager and history saver
- Remove empty completion retry logic from ToolLoopAgentRunner - Enhance astr_main_agent with additional context handling - Extend follow_up pipeline stage with new processing logic - Add new capabilities to skill_manager - Improve history_saver utility Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 657ddcc commit 7d53479

5 files changed

Lines changed: 163 additions & 113 deletions

File tree

astrbot/core/agent/runners/tool_loop_agent_runner.py

Lines changed: 14 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
)
2929
from astrbot.core.provider.entities import (
3030
LLMResponse,
31-
LLM_CONTROL_CODE_EMPTY_COMPLETION_RETRY,
3231
LLM_CONTROL_CODE_UNKNOWN_TOOL_CALL,
3332
ProviderRequest,
3433
ToolCallsResult,
@@ -88,10 +87,6 @@ def _get_persona_custom_error_message(self) -> str | None:
8887
event = getattr(self.run_context.context, "event", None)
8988
return extract_persona_custom_error_message_from_event(event)
9089

91-
@staticmethod
92-
def _is_empty_completion_retry(resp: LLMResponse) -> bool:
93-
return resp.control_code == LLM_CONTROL_CODE_EMPTY_COMPLETION_RETRY
94-
9590
@staticmethod
9691
def _is_unknown_tool_call(resp: LLMResponse) -> bool:
9792
return resp.control_code == LLM_CONTROL_CODE_UNKNOWN_TOOL_CALL
@@ -257,24 +252,6 @@ async def _iter_llm_responses_with_fallback(
257252
yield resp
258253
continue
259254

260-
if self._is_empty_completion_retry(resp):
261-
# Empty/unparseable response from model, retry same provider once
262-
logger.warning(
263-
"Chat Model %s returned empty/unparseable completion, retrying once...",
264-
candidate_id,
265-
)
266-
try:
267-
async for retry_resp in self._iter_llm_responses(include_model=idx == 0):
268-
if retry_resp.is_chunk:
269-
yield retry_resp
270-
continue
271-
yield retry_resp
272-
return
273-
except Exception as retry_exc:
274-
logger.warning("Retry also failed: %s", retry_exc)
275-
last_exception = retry_exc
276-
break
277-
278255
if (
279256
resp.role == "err"
280257
and not has_stream_output
@@ -294,80 +271,6 @@ async def _iter_llm_responses_with_fallback(
294271
return
295272
except Exception as exc: # noqa: BLE001
296273
last_exception = exc
297-
_exc_str = str(exc).lower()
298-
# Auto-compress context when model_max_prompt_tokens_exceeded
299-
if (
300-
"model_max_prompt_tokens_exceeded" in _exc_str
301-
or "prompt token count" in _exc_str
302-
or "tokens_exceeded" in _exc_str
303-
or "context_length_exceeded" in _exc_str
304-
or ("token" in _exc_str and "exceed" in _exc_str)
305-
):
306-
logger.warning(
307-
"Chat Model %s: token limit exceeded, forcing context compression and retrying...",
308-
candidate_id,
309-
)
310-
try:
311-
from astrbot.core.agent.context.truncator import ContextTruncator
312-
_truncator = ContextTruncator()
313-
_before_total = len(self.run_context.messages)
314-
# Aggressively halve until small enough (up to 5 rounds)
315-
_compression_success = False
316-
for _halve_round in range(5):
317-
_before = len(self.run_context.messages)
318-
self.run_context.messages = _truncator.truncate_by_halving(
319-
self.run_context.messages
320-
)
321-
_after = len(self.run_context.messages)
322-
logger.info(
323-
"Forced context truncation round %d: %d -> %d messages.",
324-
_halve_round + 1, _before, _after,
325-
)
326-
if _after <= 4:
327-
break # Can't shrink further
328-
# Retry same candidate
329-
try:
330-
async for resp in self._iter_llm_responses(include_model=idx == 0):
331-
if resp.is_chunk:
332-
has_stream_output = True
333-
yield resp
334-
continue
335-
yield resp
336-
logger.info(
337-
"Context truncation succeeded after %d round(s): %d -> %d messages.",
338-
_halve_round + 1, _before_total, _after,
339-
)
340-
_compression_success = True
341-
return
342-
if has_stream_output:
343-
_compression_success = True
344-
return
345-
_compression_success = True
346-
break # succeeded without stream output
347-
except Exception as retry_exc:
348-
_exc_str2 = str(retry_exc).lower()
349-
if not (
350-
"model_max_prompt_tokens_exceeded" in _exc_str2
351-
or "prompt token count" in _exc_str2
352-
or "tokens_exceeded" in _exc_str2
353-
or "context_length_exceeded" in _exc_str2
354-
or ("token" in _exc_str2 and "exceed" in _exc_str2)
355-
):
356-
last_exception = retry_exc
357-
logger.warning(
358-
"Chat Model %s retry after compression failed: %s",
359-
candidate_id, retry_exc,
360-
)
361-
break
362-
last_exception = retry_exc
363-
logger.warning(
364-
"Chat Model %s still token-exceeded after round %d, halving again...",
365-
candidate_id, _halve_round + 1,
366-
)
367-
continue
368-
except Exception as compress_exc:
369-
logger.error("Failed to compress context: %s", compress_exc)
370-
continue
371274
logger.warning(
372275
"Chat Model %s request error: %s",
373276
candidate_id,
@@ -404,15 +307,21 @@ def follow_up(
404307
*,
405308
message_text: str,
406309
) -> FollowUpTicket | None:
407-
"""Queue a follow-up message for the next tool result."""
310+
"""Queue a follow-up message to be injected into the next tool result.
311+
312+
Returns None if the agent is already done (message arrived too late) or
313+
if the message text is empty.
314+
"""
408315
if self.done():
316+
logger.debug("follow_up: agent already done, message discarded.")
409317
return None
410318
text = (message_text or "").strip()
411319
if not text:
412320
return None
413321
ticket = FollowUpTicket(seq=self._follow_up_seq, text=text)
414322
self._follow_up_seq += 1
415323
self._pending_follow_ups.append(ticket)
324+
logger.debug("follow_up: queued ticket seq=%d, pending=%d", ticket.seq, len(self._pending_follow_ups))
416325
return ticket
417326

418327
def _resolve_unconsumed_follow_ups(self) -> None:
@@ -431,15 +340,16 @@ def _consume_follow_up_notice(self) -> str:
431340
for ticket in follow_ups:
432341
ticket.consumed = True
433342
ticket.resolved.set()
343+
434344
follow_up_lines = "\n".join(
435345
f"{idx}. {ticket.text}" for idx, ticket in enumerate(follow_ups, start=1)
436346
)
347+
count = len(follow_ups)
348+
plural = "messages" if count > 1 else "message"
437349
return (
438-
"\n\n[FOLLOW-UP] The user sent additional message(s) while you were working. "
439-
"Treat these as supplementary instructions for the current task — DO NOT stop "
440-
"or restart the current operation. Instead, seamlessly incorporate them into "
441-
"your ongoing work. Continue the task flow without interrupting it. "
442-
"Do NOT acknowledge receipt explicitly; just act on them naturally.\n"
350+
f"\n\n[FOLLOW-UP x{count}] The user sent {count} {plural} while you were working. "
351+
"Incorporate them as supplementary instructions seamlessly — "
352+
"do NOT stop, restart, or explicitly acknowledge receipt; just act naturally.\n"
443353
f"{follow_up_lines}"
444354
)
445355

@@ -778,7 +688,7 @@ async def step_until_done(
778688
self.run_context.messages.append(
779689
Message(
780690
role="user",
781-
content="工具调用次数已达到上限,请停止使用工具,并根据已经收集到的信息,对你的任务和发现进行总结,然后直接回复用户。",
691+
content="工具调用次数已达到上限,请停止使用工具,并根据已经收集到的信息,对你的任务和发现进行总结,然后直接回复用户。(Tool call limit reached. Stop using tools and summarize your findings directly for the user.)",
782692
)
783693
)
784694
# 再执行最后一步

astrbot/core/astr_main_agent.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
from astrbot.core.platform.astr_message_event import AstrMessageEvent
6060
from astrbot.core.provider import Provider
6161
from astrbot.core.provider.entities import ProviderRequest
62-
from astrbot.core.skills.skill_manager import SkillManager, build_skills_prompt
62+
from astrbot.core.skills.skill_manager import SkillManager, build_skills_prompt, get_skills_fingerprint
6363
from astrbot.core.star.context import Context
6464
from astrbot.core.star.star_handler import star_map
6565
from astrbot.core.tools.cron_tools import (
@@ -353,6 +353,33 @@ async def _ensure_persona_and_skills(
353353
"You cannot use shell or Python to perform skills. "
354354
"If you need to use these capabilities, ask the user to enable Computer Use in the AstrBot WebUI -> Config."
355355
)
356+
357+
# --- Dynamic skill update detection (lightweight) ---
358+
# Compute current fingerprint using only stat() calls (no file I/O).
359+
# If the fingerprint changed since the last turn (stored on the event extra),
360+
# inject a one-line system reminder into extra_user_content_parts so the LLM
361+
# is aware that the skill list may have changed. This avoids rebuilding the
362+
# full system prompt mid-conversation (which many providers ignore anyway).
363+
current_fp = get_skills_fingerprint()
364+
prev_fp = event.get_extra("_skills_fp")
365+
if prev_fp is not None and prev_fp != current_fp:
366+
# Skills changed since the last request in this session — notify the LLM.
367+
skill_names = [s.name for s in skills]
368+
skills_list_str = ", ".join(skill_names) if skill_names else "(none)"
369+
req.extra_user_content_parts.append(
370+
TextPart(
371+
text=(
372+
"<system_reminder>"
373+
"The available skill list has been updated since the last turn. "
374+
f"Current active skills: {skills_list_str}. "
375+
"Please refer to this updated list for any skill-related requests."
376+
"</system_reminder>"
377+
)
378+
)
379+
)
380+
logger.debug("Skills fingerprint changed (%s -> %s), injected update reminder.", prev_fp, current_fp)
381+
event.set_extra("_skills_fp", current_fp)
382+
# --- end dynamic skill update detection ---
356383
tmgr = plugin_context.get_llm_tool_manager()
357384

358385
# inject toolset in the persona

astrbot/core/pipeline/process_stage/follow_up.py

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,28 @@ def register_active_runner(umo: str, runner: AgentRunner) -> None:
4141
def unregister_active_runner(umo: str, runner: AgentRunner) -> None:
4242
if _ACTIVE_AGENT_RUNNERS.get(umo) is runner:
4343
_ACTIVE_AGENT_RUNNERS.pop(umo, None)
44+
# Best-effort cleanup: if no follow-up state is pending any more, drop the
45+
# UMO entry to avoid accumulating stale entries when sessions end abnormally.
46+
state = _FOLLOW_UP_ORDER_STATE.get(umo)
47+
if state is not None:
48+
statuses = state.get("statuses")
49+
if not statuses:
50+
_FOLLOW_UP_ORDER_STATE.pop(umo, None)
51+
else:
52+
# There are still pending/active entries — notify the condition so
53+
# any waiter in _activate_and_wait_follow_up_turn can re-check and
54+
# potentially hit the timeout branch.
55+
condition = state.get("condition")
56+
if isinstance(condition, asyncio.Condition):
57+
async def _notify_condition(cond: asyncio.Condition) -> None:
58+
async with cond:
59+
cond.notify_all()
60+
try:
61+
loop = asyncio.get_event_loop()
62+
if loop.is_running():
63+
loop.create_task(_notify_condition(condition))
64+
except RuntimeError:
65+
pass
4466

4567

4668
def _get_follow_up_order_state(umo: str) -> dict[str, object]:
@@ -108,6 +130,10 @@ async def _mark_follow_up_consumed(umo: str, seq: int) -> None:
108130
_FOLLOW_UP_ORDER_STATE.pop(umo, None)
109131

110132

133+
_FOLLOW_UP_WAIT_TIMEOUT: float = 30.0
134+
"""Max seconds a follow-up turn will wait for its predecessor to finish."""
135+
136+
111137
async def _activate_and_wait_follow_up_turn(umo: str, seq: int) -> None:
112138
state = _FOLLOW_UP_ORDER_STATE.get(umo)
113139
if not state:
@@ -121,12 +147,27 @@ async def _activate_and_wait_follow_up_turn(umo: str, seq: int) -> None:
121147
statuses[seq] = "active"
122148

123149
# Strict ordering: only the head (`next_turn`) can continue.
150+
# Use a timeout to guard against predecessor runner crashes.
151+
deadline = asyncio.get_event_loop().time() + _FOLLOW_UP_WAIT_TIMEOUT
124152
while True:
125153
next_turn = state["next_turn"]
126154
assert isinstance(next_turn, int)
127155
if next_turn == seq:
128156
break
129-
await condition.wait()
157+
remaining = deadline - asyncio.get_event_loop().time()
158+
if remaining <= 0:
159+
# Predecessor never finished; forcibly advance to avoid permanent hang.
160+
logger.warning(
161+
"Follow-up wait timeout for umo=%s seq=%s; advancing turn to prevent hang.",
162+
umo,
163+
seq,
164+
)
165+
state["next_turn"] = seq
166+
break
167+
try:
168+
await asyncio.wait_for(condition.wait(), timeout=remaining)
169+
except asyncio.TimeoutError:
170+
pass # re-check on next iteration
130171

131172

132173
async def _finish_follow_up_turn(umo: str, seq: int) -> None:
@@ -152,8 +193,19 @@ async def _monitor_follow_up_ticket(
152193
ticket: FollowUpTicket,
153194
order_seq: int,
154195
) -> None:
155-
"""Advance consumed slots immediately on resolution to avoid wake-order drift."""
196+
"""Advance consumed slots immediately on resolution to avoid wake-order drift.
197+
198+
Only marks the order slot consumed here when *ticket* was consumed by the
199+
runner (i.e. injected into a tool-result). If the ticket was *not* consumed
200+
(i.e. the runner finished without ever flushing pending follow-ups),
201+
``prepare_follow_up_capture`` handles the mark via its own branch, so we
202+
must not double-call ``_mark_follow_up_consumed`` here.
203+
"""
156204
await ticket.resolved.wait()
205+
# Guard: only act when consumed=True AND prepare_follow_up_capture has not
206+
# already handled this seq (it sets consumed_marked and calls us via the
207+
# captured branch). The state dict check inside _mark_follow_up_consumed
208+
# is idempotent, so a double-call is safe, but we skip it when not needed.
157209
if ticket.consumed:
158210
await _mark_follow_up_consumed(umo, order_seq)
159211

astrbot/core/skills/skill_manager.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,3 +464,44 @@ def install_skill_from_zip(self, zip_path: str, *, overwrite: bool = True) -> st
464464

465465
self.set_skill_active(skill_name, True)
466466
return skill_name
467+
468+
469+
# ---------------------------------------------------------------------------
470+
# Lightweight skill-set fingerprint for dynamic update detection
471+
# ---------------------------------------------------------------------------
472+
473+
def get_skills_fingerprint(skills_root: str | None = None) -> str:
474+
"""Return a lightweight fingerprint of the current skill set.
475+
476+
Uses only os.stat() calls (no file I/O) so it is extremely cheap.
477+
The fingerprint changes whenever:
478+
- skills.json is modified (new skill installed / toggled)
479+
- a new SKILL.md appears or disappears under skills_root
480+
"""
481+
import hashlib
482+
483+
data_path = Path(get_astrbot_data_path())
484+
config_path = str(data_path / SKILLS_CONFIG_FILENAME)
485+
root = Path(skills_root or get_astrbot_skills_path())
486+
487+
parts: list[str] = []
488+
489+
# 1. skills.json mtime (catches installs / enable-disable)
490+
try:
491+
parts.append(str(os.path.getmtime(config_path)))
492+
except OSError:
493+
parts.append("no-config")
494+
495+
# 2. Count of skill dirs that have SKILL.md (catches new folders)
496+
try:
497+
count = sum(
498+
1
499+
for e in root.iterdir()
500+
if e.is_dir() and (e / "SKILL.md").exists()
501+
)
502+
parts.append(str(count))
503+
except OSError:
504+
parts.append("0")
505+
506+
raw = "|".join(parts)
507+
return hashlib.md5(raw.encode()).hexdigest()[:12]

0 commit comments

Comments
 (0)