diff --git a/astrbot/core/agent/runners/tool_loop_agent_runner.py b/astrbot/core/agent/runners/tool_loop_agent_runner.py index 9d0b0ffce1..51774de674 100644 --- a/astrbot/core/agent/runners/tool_loop_agent_runner.py +++ b/astrbot/core/agent/runners/tool_loop_agent_runner.py @@ -805,15 +805,47 @@ def _append_tool_call_result(tool_call_id: str, content: str) -> None: if not req.func_tool: return - if ( - self.tool_schema_mode == "skills_like" - and self._skill_like_raw_tool_set - ): - # in 'skills_like' mode, raw.func_tool is light schema, does not have handler - # so we need to get the tool from the raw tool set - func_tool = self._skill_like_raw_tool_set.get_tool(func_tool_name) - else: - func_tool = req.func_tool.get_tool(func_tool_name) + # First check if it's a dynamically created subagent tool + func_tool = None + run_context_context = getattr(self.run_context, "context", None) + if run_context_context is not None: + event = getattr(run_context_context, "event", None) + if event is not None: + session_id = getattr( + self.run_context.context.event, "unified_msg_origin", None + ) + if session_id: + try: + from astrbot.core.dynamic_subagent_manager import ( + DynamicSubAgentManager, + ) + + dynamic_handoffs = DynamicSubAgentManager.get_handoff_tools_for_session( + session_id + ) + for h in dynamic_handoffs: + if ( + h.name == func_tool_name + or f"transfer_to_{h.name}" == func_tool_name + ): + func_tool = h + break + except Exception: + pass + + # If not found in dynamic tools, check regular tool sets + if func_tool is None: + if ( + self.tool_schema_mode == "skills_like" + and self._skill_like_raw_tool_set + ): + # in 'skills_like' mode, raw.func_tool is light schema, does not have handler + # so we need to get the tool from the raw tool set + func_tool = self._skill_like_raw_tool_set.get_tool( + func_tool_name + ) + else: + func_tool = req.func_tool.get_tool(func_tool_name) logger.info(f"使用工具:{func_tool_name},参数:{func_tool_args}") @@ -933,6 +965,50 @@ def _append_tool_call_result(tool_call_id: str, content: str) -> None: "The tool has returned a data type that is not supported." ) if result_parts: + result_content = "\n\n".join(result_parts) + # Check for dynamic tool creation marker + if result_content.startswith("__DYNAMIC_TOOL_CREATED__:"): + parts = result_content.split(":", 3) + if len(parts) >= 4: + new_tool_name = parts[1] + new_tool_obj_name = parts[2] + logger.info( + f"[EnhancedSubAgent] Tool created: {new_tool_name}" + ) + # Try to add the new tool to func_tool set + try: + from astrbot.core.dynamic_subagent_manager import ( + DynamicSubAgentManager, + ) + + session_id = getattr( + self.run_context.context.event, + "unified_msg_origin", + None, + ) + if session_id: + handoffs = DynamicSubAgentManager.get_handoff_tools_for_session( + session_id + ) + for handoff in handoffs: + if ( + handoff.name == new_tool_obj_name + or handoff.name + == new_tool_name.replace( + "transfer_to_", "" + ) + ): + if self.req.func_tool: + self.req.func_tool.add_tool( + handoff + ) + logger.info( + f"[EnhancedSubAgent] Added {handoff.name} to func_tool set" + ) + except Exception as e: + logger.warning( + f"[EnhancedSubAgent] Failed to add dynamic tool: {e}" + ) _append_tool_call_result( func_tool_id, "\n\n".join(result_parts) diff --git a/astrbot/core/astr_agent_context.py b/astrbot/core/astr_agent_context.py index 9c6451cc74..51b57982c0 100644 --- a/astrbot/core/astr_agent_context.py +++ b/astrbot/core/astr_agent_context.py @@ -14,7 +14,7 @@ class AstrAgentContext: """The star context instance""" event: AstrMessageEvent """The message event associated with the agent context.""" - extra: dict[str, str] = Field(default_factory=dict) + extra: dict[str, any] = Field(default_factory=dict) """Customized extra data.""" diff --git a/astrbot/core/astr_agent_tool_exec.py b/astrbot/core/astr_agent_tool_exec.py index 18ac1a446a..3dae00dd17 100644 --- a/astrbot/core/astr_agent_tool_exec.py +++ b/astrbot/core/astr_agent_tool_exec.py @@ -1,6 +1,7 @@ import asyncio import inspect import json +import time import traceback import typing as T import uuid @@ -27,6 +28,7 @@ PYTHON_TOOL, ) from astrbot.core.cron.events import CronMessageEvent +from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager from astrbot.core.message.components import Image from astrbot.core.message.message_event_result import ( CommandResult, @@ -233,6 +235,21 @@ def _build_handoff_toolset( toolset.add_tool(runtime_tool) elif isinstance(tool_name_or_obj, FunctionTool): toolset.add_tool(tool_name_or_obj) + + # Always add send_shared_context tool for shared context feature + try: + from astrbot.core.dynamic_subagent_manager import ( + SEND_SHARED_CONTEXT_TOOL, + DynamicSubAgentManager, + ) + + session_id = event.unified_msg_origin + session = DynamicSubAgentManager.get_session(session_id) + if session and session.shared_context_enabled: + toolset.add_tool(SEND_SHARED_CONTEXT_TOOL) + except Exception as e: + logger.debug(f"[EnhancedSubAgent] Failed to add shared context tool: {e}") + return None if toolset.empty() else toolset @classmethod @@ -265,7 +282,6 @@ async def _execute_handoff( # Build handoff toolset from registered tools plus runtime computer tools. toolset = cls._build_handoff_toolset(run_context, tool.agent.tools) - ctx = run_context.context.context event = run_context.context.event umo = event.unified_msg_origin @@ -291,21 +307,105 @@ async def _execute_handoff( except Exception: continue + # 获取子代理的历史上下文 + from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager + + agent_name = getattr(tool.agent, "name", None) + subagent_history = [] + if agent_name: + try: + stored_history = DynamicSubAgentManager.get_subagent_history( + umo, agent_name + ) + if stored_history: + # 将历史消息转换为 Message 对象 + for hist_msg in stored_history: + try: + if isinstance(hist_msg, dict): + subagent_history.append( + Message.model_validate(hist_msg) + ) + elif isinstance(hist_msg, Message): + subagent_history.append(hist_msg) + except Exception: + continue + if subagent_history: + logger.debug( + f"[SubAgentHistory] Loaded {len(subagent_history)} history messages for {agent_name}" + ) + + except Exception as e: + logger.warning( + f"[SubAgentHistory] Failed to load history for {agent_name}: {e}" + ) + prov_settings: dict = ctx.get_config(umo=umo).get("provider_settings", {}) agent_max_step = int(prov_settings.get("max_agent_step", 30)) stream = prov_settings.get("streaming_response", False) + + # 如果有历史上下文,合并到 contexts 中 + if subagent_history: + if contexts is None: + contexts = subagent_history + else: + contexts = subagent_history + contexts + # 构建子代理的 system_prompt + subagent_system_prompt = tool.agent.instructions or "" + subagent_system_prompt = f"# Role\nYour name is {agent_name}(used for tool calling)\n{subagent_system_prompt}\n" + if agent_name: + try: + from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager + + runtime = prov_settings.get("computer_use_runtime", "local") + static_subagent_prompt = ( + DynamicSubAgentManager.build_static_subagent_prompts( + umo, agent_name + ) + ) + dynamic_subagent_prompt = ( + DynamicSubAgentManager.build_dynamic_subagent_prompts( + umo, agent_name, runtime + ) + ) + subagent_system_prompt += static_subagent_prompt + subagent_system_prompt += dynamic_subagent_prompt + + except Exception: + pass + llm_resp = await ctx.tool_loop_agent( event=event, chat_provider_id=prov_id, prompt=input_, image_urls=image_urls, - system_prompt=tool.agent.instructions, + system_prompt=subagent_system_prompt, tools=toolset, contexts=contexts, max_steps=agent_max_step, tool_call_timeout=run_context.tool_call_timeout, stream=stream, ) + + # 保存历史上下文 + if agent_name: + try: + from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager + + # 构建当前对话的历史消息 + current_messages = [] + # 添加本轮用户输入 + current_messages.append({"role": "user", "content": input_}) + # 添加助手回复 + current_messages.append( + {"role": "assistant", "content": llm_resp.completion_text} + ) + if current_messages: + DynamicSubAgentManager.save_subagent_history( + umo, agent_name, current_messages + ) + except Exception: + pass # 不影响主流程 + yield mcp.types.CallToolResult( content=[mcp.types.TextContent(type="text", text=llm_resp.completion_text)] ) @@ -324,33 +424,85 @@ async def _execute_handoff_background( ``CronMessageEvent`` is created so the main LLM can inform the user of the result – the same pattern used by ``_execute_background`` for regular background tasks. + + 当启用增强SubAgent时,会在 DynamicSubAgentManager 中创建 pending 任务, + 并返回 task_id 给主 Agent,以便后续通过 wait_for_subagent 获取结果。 """ - task_id = uuid.uuid4().hex + event = run_context.context.event + umo = event.unified_msg_origin + agent_name = getattr(tool.agent, "name", None) + + # check if enhanced subAgent + subagent_task_id = None + try: + from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager + + if agent_name: + session = DynamicSubAgentManager.get_session(umo) + if session and (agent_name in session.subagents): + subagent_task_id = ( + DynamicSubAgentManager.create_pending_subagent_task( + session_id=umo, agent_name=agent_name + ) + ) + + if subagent_task_id.startswith("__PENDING_TASK_CREATE_FAILED__"): + logger.info( + f"[EnhancedSubAgent:BackgroundTask] Failed to created background task {subagent_task_id} for {agent_name}" + ) + else: + DynamicSubAgentManager.set_subagent_status( + session_id=umo, + agent_name=agent_name, + status="RUNNING", + ) + + logger.info( + f"[EnhancedSubAgent:BackgroundTask] Created background task {subagent_task_id} for {agent_name}" + ) + except Exception as e: + logger.info( + f"[EnhancedSubAgent:BackgroundTask] Failed to created background task {subagent_task_id} for {agent_name}: {e}" + ) + + original_task_id = uuid.uuid4().hex async def _run_handoff_in_background() -> None: try: await cls._do_handoff_background( tool=tool, run_context=run_context, - task_id=task_id, + task_id=original_task_id, + subagent_task_id=subagent_task_id, **tool_args, ) + except Exception as e: # noqa: BLE001 logger.error( - f"Background handoff {task_id} ({tool.name}) failed: {e!s}", + f"Background handoff {original_task_id} ({tool.name}) failed: {e!s}", exc_info=True, ) asyncio.create_task(_run_handoff_in_background()) - text_content = mcp.types.TextContent( - type="text", - text=( - f"Background task dedicated to subagent '{tool.agent.name}' submitted. task_id={task_id}. " - f"The subagent '{tool.agent.name}' is working on the task on hehalf you. " - f"You will be notified when it finishes." - ), - ) + if subagent_task_id: + text_content = mcp.types.TextContent( + type="text", + text=( + f"Background task submitted. subagent_task_id={subagent_task_id}. " + f"SubAgent '{agent_name}' is working on the task. " + f"Use wait_for_subagent(subagent_name='{agent_name}', task_id='{subagent_task_id}') to get the result." + ), + ) + else: + text_content = mcp.types.TextContent( + type="text", + text=( + f"Background task submitted. task_id={original_task_id}. " + f"SubAgent '{agent_name}' is working on the task. " + f"You will be notified when it finishes." + ), + ) yield mcp.types.CallToolResult(content=[text_content]) @classmethod @@ -361,13 +513,24 @@ async def _do_handoff_background( task_id: str, **tool_args, ) -> None: - """Run the subagent handoff and, on completion, wake the main agent.""" + """Run the subagent handoff. + 当增强版 SubAgent 启用时,结果存储到 DynamicSubAgentManager,主 Agent 可通过 wait_for_subagent 获取。 + 否则使用原有的 _wake_main_agent_for_background_result 流程。 + """ + + start_time = time.time() result_text = "" + error_text = None tool_args = dict(tool_args) tool_args["image_urls"] = await cls._collect_handoff_image_urls( run_context, tool_args.get("image_urls"), ) + + event = run_context.context.event + umo = event.unified_msg_origin + agent_name = getattr(tool.agent, "name", None) + try: async for r in cls._execute_handoff( tool, @@ -379,26 +542,104 @@ async def _do_handoff_background( for content in r.content: if isinstance(content, mcp.types.TextContent): result_text += content.text + "\n" + except Exception as e: + error_text = str(e) result_text = ( f"error: Background task execution failed, internal error: {e!s}" ) - event = run_context.context.event + execution_time = time.time() - start_time + success = error_text is None + session = DynamicSubAgentManager.get_session(umo) + ## if it is enhanced subagent ## + if session and agent_name and (agent_name in session.subagents): + subagent_task_id = tool_args.get("subagent_task_id", None) + # store the results of background enhanced subagent task + DynamicSubAgentManager.store_subagent_result( + session_id=umo, + agent_name=agent_name, + task_id=subagent_task_id, + success=success, + result=result_text.strip() if result_text else "", + error=error_text, + execution_time=execution_time, + ) + # update subagent status + if error_text: + DynamicSubAgentManager.set_subagent_status( + session_id=umo, + agent_name=agent_name, + status="FAILED", + ) + else: + DynamicSubAgentManager.set_subagent_status( + session_id=umo, + agent_name=agent_name, + status="COMPLETED", + ) - await cls._wake_main_agent_for_background_result( - run_context=run_context, - task_id=task_id, - tool_name=tool.name, - result_text=result_text, - tool_args=tool_args, - note=( - event.get_extra("background_note") - or f"Background task for subagent '{tool.agent.name}' finished." - ), - summary_name=f"Dedicated to subagent `{tool.agent.name}`", - extra_result_fields={"subagent_name": tool.agent.name}, - ) + # if shared_context is enabled, publish status + if session.shared_context_enabled: + status_content = f"[EnhancedSubAgent:BackgroundTask] SubAgent '{agent_name}' Task '{subagent_task_id}' Complete. Execution Time: {execution_time:.1f}s" + if error_text: + status_content = f"[EnhancedSubAgent:BackgroundTask] SubAgent '{agent_name}' Task '{subagent_task_id}' Failed: {error_text}" + DynamicSubAgentManager.add_shared_context( + session_id=umo, + sender=agent_name, + context_type="status", + content=status_content, + target="all", + ) + logger.info( + f"[EnhancedSubAgent:BackgroundTask] Stored result for {agent_name} task {subagent_task_id}: " + f"success={success}, time={execution_time:.1f}s" + ) + + try: + context_extra = getattr(run_context.context, "extra", None) + if context_extra and isinstance(context_extra, dict): + main_agent_runner = context_extra.get("main_agent_runner") + main_agent_is_running = ( + main_agent_runner is not None and not main_agent_runner.done() + ) + else: + main_agent_is_running = False + except Exception as e: + logger.error("Failed to check main agent status: %s", e) + # If the main agent status cannot be determined, assume it is still running to avoid concurrent wake-up + main_agent_is_running = True + + # Inform user through `_wake_main_agent_for_background_result` if main agent is over + if not main_agent_is_running: + await cls._wake_main_agent_for_background_result( + run_context=run_context, + task_id=task_id, + tool_name=tool.name, + result_text=result_text, + tool_args=tool_args, + note=( + event.get_extra("background_note") + or f"Background task for subagent '{agent_name}' finished." + ), + summary_name=f"Dedicated to subagent `{agent_name}`", + extra_result_fields={"subagent_name": agent_name}, + ) + # if not enhanced subagent + else: + await cls._wake_main_agent_for_background_result( + run_context=run_context, + task_id=task_id, + tool_name=tool.name, + result_text=result_text, + tool_args=tool_args, + note=( + event.get_extra("background_note") + or f"Background task for subagent '{agent_name}' finished." + ), + summary_name=f"Dedicated to subagent `{agent_name}`", + extra_result_fields={"subagent_name": agent_name}, + ) @classmethod async def _execute_background( diff --git a/astrbot/core/astr_main_agent.py b/astrbot/core/astr_main_agent.py index 75f5d30e2a..8afde0bbcf 100644 --- a/astrbot/core/astr_main_agent.py +++ b/astrbot/core/astr_main_agent.py @@ -153,6 +153,7 @@ class MainAgentBuildConfig: timezone: str | None = None max_quoted_fallback_images: int = 20 """Maximum number of images injected from quoted-message fallback extraction.""" + enhanced_subagent: dict = field(default_factory=dict) @dataclass(slots=True) @@ -984,6 +985,94 @@ def _apply_llm_safety_mode(config: MainAgentBuildConfig, req: ProviderRequest) - ) +def _apply_enhanced_subagent_tools( + config: MainAgentBuildConfig, req: ProviderRequest, event: AstrMessageEvent +) -> None: + """Apply enhanced SubAgent tools and system prompt + + When enabled: + 1. Inject enhanced capability prompt into system prompt + 2. Register dynamic SubAgent management tools + 3. Register session's transfer_to_xxx tools + """ + if not config.enhanced_subagent.get("enabled", False): + return + + if req.func_tool is None: + req.func_tool = ToolSet() + + try: + from astrbot.core.dynamic_subagent_manager import ( + CREATE_DYNAMIC_SUBAGENT_TOOL, + LIST_DYNAMIC_SUBAGENTS_TOOL, + PROTECT_SUBAGENT_TOOL, + REMOVE_DYNAMIC_SUBAGENT_TOOL, + RESET_SUBAGENT_TOOL, + SEND_SHARED_CONTEXT_TOOL_FOR_MAIN_AGENT, + UNPROTECT_SUBAGENT_TOOL, + VIEW_SHARED_CONTEXT_TOOL, + WAIT_FOR_SUBAGENT_TOOL, + DynamicSubAgentManager, + ) + + # Register dynamic SubAgent management tools + req.func_tool.add_tool(CREATE_DYNAMIC_SUBAGENT_TOOL) + req.func_tool.add_tool(RESET_SUBAGENT_TOOL) + req.func_tool.add_tool(REMOVE_DYNAMIC_SUBAGENT_TOOL) + req.func_tool.add_tool(LIST_DYNAMIC_SUBAGENTS_TOOL) + if DynamicSubAgentManager.is_auto_cleanup_per_turn(): + req.func_tool.add_tool(PROTECT_SUBAGENT_TOOL) + req.func_tool.add_tool(UNPROTECT_SUBAGENT_TOOL) + if DynamicSubAgentManager.is_shared_context_enabled(): + req.func_tool.add_tool(VIEW_SHARED_CONTEXT_TOOL) + req.func_tool.add_tool(SEND_SHARED_CONTEXT_TOOL_FOR_MAIN_AGENT) + req.func_tool.add_tool(WAIT_FOR_SUBAGENT_TOOL) + + # Configure logger + + # Configure DynamicSubAgentManager with settings + shared_context_enabled = config.enhanced_subagent.get( + "shared_context_enabled", False + ) + DynamicSubAgentManager.configure( + max_subagent_count=config.enhanced_subagent.get("max_subagent_count"), + auto_cleanup_per_turn=config.enhanced_subagent.get("auto_cleanup_per_turn"), + shared_context_enabled=shared_context_enabled, + shared_context_maxlen=config.enhanced_subagent.get( + "shared_context_maxlen", 200 + ), + ) + + # Enable shared context if configured + if shared_context_enabled: + DynamicSubAgentManager.set_shared_context_enabled( + event.unified_msg_origin, True + ) + session_id = event.unified_msg_origin + # Inject enhanced system prompt + task_router_prompt = DynamicSubAgentManager.build_task_router_prompt(session_id) + + req.system_prompt = f"{req.system_prompt or ''}\n{task_router_prompt}\n" + # Register existing handoff tools from config + plugin_context = getattr(event, "_plugin_context", None) + if plugin_context and plugin_context.subagent_orchestrator: + so = plugin_context.subagent_orchestrator + if hasattr(so, "handoffs"): + for tool in so.handoffs: + req.func_tool.add_tool(tool) + # Register dynamically created handoff tools + dynamic_handoffs = DynamicSubAgentManager.get_handoff_tools_for_session( + session_id + ) + for handoff in dynamic_handoffs: + req.func_tool.add_tool(handoff) + + except ImportError as e: + from astrbot import logger + + logger.warning(f"[EnhancedSubAgent] Cannot import module: {e}") + + def _apply_sandbox_tools( config: MainAgentBuildConfig, req: ProviderRequest, session_id: str ) -> None: @@ -1349,10 +1438,13 @@ async def build_main_agent( elif config.computer_use_runtime == "local": _apply_local_env_tools(req) + if config.enhanced_subagent.get("enabled", False): + # Apply enhanced SubAgent tools + _apply_enhanced_subagent_tools(config, req, event) + agent_runner = AgentRunner() astr_agent_ctx = AstrAgentContext( - context=plugin_context, - event=event, + context=plugin_context, event=event, extra={"main_agent_runner": agent_runner} ) if config.add_cron_tools: diff --git a/astrbot/core/computer/booters/local.py b/astrbot/core/computer/booters/local.py index f11bc329fa..21823e592a 100644 --- a/astrbot/core/computer/booters/local.py +++ b/astrbot/core/computer/booters/local.py @@ -159,10 +159,16 @@ def _run() -> dict[str, Any]: [os.environ.get("PYTHON", sys.executable), "-c", code], timeout=timeout, capture_output=True, - text=True, + # text=True, + ) + # stdout = "" if silent else result.stdout + # stderr = result.stderr if result.returncode != 0 else "" + stdout = "" if silent else _decode_shell_output(result.stdout) + stderr = ( + _decode_shell_output(result.stderr) + if result.returncode != 0 + else "" ) - stdout = "" if silent else result.stdout - stderr = result.stderr if result.returncode != 0 else "" return { "data": { "output": {"text": stdout, "images": []}, diff --git a/astrbot/core/config/default.py b/astrbot/core/config/default.py index c0fcf8df66..206add764a 100644 --- a/astrbot/core/config/default.py +++ b/astrbot/core/config/default.py @@ -196,6 +196,14 @@ ), "agents": [], }, + "enhanced_subagent": { + "enabled": False, + "log_level": "debug", + "max_subagent_count": 3, + "auto_cleanup_per_turn": True, + "shared_context_enabled": False, + "shared_context_maxlen": 200, + }, "provider_stt_settings": { "enable": False, "provider_id": "", diff --git a/astrbot/core/dynamic_subagent_manager.py b/astrbot/core/dynamic_subagent_manager.py new file mode 100644 index 0000000000..ec5c5708c2 --- /dev/null +++ b/astrbot/core/dynamic_subagent_manager.py @@ -0,0 +1,1555 @@ +""" +Dynamic SubAgent Manager +Manages dynamically created subagents for task decomposition and parallel processing +""" + +from __future__ import annotations + +import asyncio +import os +import platform +import re +import time +from dataclasses import dataclass, field +from datetime import datetime + +from astrbot import logger +from astrbot.core.agent.agent import Agent +from astrbot.core.agent.handoff import HandoffTool +from astrbot.core.agent.tool import FunctionTool +from astrbot.core.utils.astrbot_path import get_astrbot_temp_path + + +@dataclass +class DynamicSubAgentConfig: + name: str + system_prompt: str = "" + tools: set[str] | None = None + skills: set[str] | None = None + provider_id: str | None = None + description: str = "" + workdir: str | None = None + + +@dataclass +class SubAgentExecutionResult: + task_id: str # 任务唯一标识符 + agent_name: str + success: bool + result: str | None = None + error: str | None = None + execution_time: float = 0.0 + created_at: float = 0.0 + completed_at: float = 0.0 + metadata: dict = field(default_factory=dict) + + +@dataclass +class DynamicSubAgentSession: + session_id: str + subagents: dict = field(default_factory=dict) # 存储DynamicSubAgentConfig对象 + handoff_tools: dict = field(default_factory=dict) + subagent_status: dict = field( + default_factory=dict + ) # 工作状态 "IDLE" "RUNNING" "COMPLETED" "FAILED" + protected_agents: set = field( + default_factory=set + ) # 若某个agent受到保护,则不会被自动清理 + subagent_histories: dict = field(default_factory=dict) # 存储每个子代理的历史上下文 + shared_context: list = field(default_factory=list) # 公共上下文列表 + shared_context_enabled: bool = False # 是否启用公共上下文 + subagent_background_results: dict = field( + default_factory=dict + ) # 后台subagent结果存储: {agent_name: {task_id: SubAgentExecutionResult}} + # 任务计数器: {agent_name: next_task_id} + background_task_counters: dict = field(default_factory=dict) + + +class DynamicSubAgentManager: + _sessions: dict = {} + _max_subagent_count: int = 3 + _auto_cleanup_per_turn: bool = True + _shared_context_enabled: bool = False + _shared_context_maxlen: int = 200 + + _tools_blacklist: set[str] = { + "send_shared_context_for_main_agentcreate_dynamic_subagent", + "protect_subagent", + "unprotect_subagent", + "reset_subagent", + "remove_dynamic_subagent", + "list_dynamic_subagent", + "wait_for_subagent", + "view_shared_context", + } + _tools_inherent: set[str] = { + "astrbot_execute_shell", + "astrbot_execute_python", + } + + _HEADER_TEMPLATE = """# Dynamic Sub-Agent Capability + You can dynamically create and manage sub-agents with isolated instructions, tools and skills. + {quota_info} + + ## When to create Sub-agents: + + - The task can be explicitly decomposed and parallel processed + - Processing very long contexts that exceeding the limitations of a single agent + + ## Workflow + + 1. **Plan**: Break down the user request → identify subtask dependencies → determine which can run in parallel + 2. **Create**: Use `create_dynamic_subagent` for each subtask + 3. **Delegate**: Use `transfer_to_` to assign work + 4. **Collect**: Gather results from all sub-agents + """ + + _CREATE_GUIDE_PROMPT = """## Creating Sub-agents + + Name: **letters, numbers, underscores only**, 3-32 chars, start with a letter. + + A well-designed sub-agent requires: + + ### 1. Character Definition + Define the role, expertise, and work style. Example: + ``` + Name: data_analyst + Role: Senior Data Analyst specializing in exploratory analysis and statistical modeling + Style: Meticulous, detail-oriented, data-driven + ``` + + ### 2. Task Context + - **Goal**: The user's ultimate objective + - **Your step**: Current step number and description + - **Teammates**: Other sub-agents and their responsibilities (if known) + + ### 3. Explicit Instructions + Step-by-step procedure with: + - Input: where to read data from + - Process: what transformations/analysis to perform + - Output: what to produce and where to save it + + ### 2. Allocate available Tools and Skills + ### Tools & Skills + Only assign tools/skills the sub-agent actually needs. Unnecessary tools waste tokens and increase confusion. + """ + + _LIFECYCLE_PROMPT = """## Sub-agent Lifecycle + + Sub-agents are **auto-cleaned** after each conversation turn. + Use `protect_subagent` to keep important ones across turns. + Use `unprotect_subagent` to remove protection.""" + + _BACKGROUND_TASK_PROMPT = """ + ## Background Tasks + + For time-consuming tasks (web search, code execution), delegate with `background_task=True`: + ``` + transfer_to_(..., background_task=True) + ``` + Then wait for results: + ``` + wait_for_subagent(subagent_name="", timeout=60) + ``` + **Tip**: Execute independent tasks first, then wait — don't block on tasks you don't depend on. + """ + + @classmethod + def build_task_router_prompt(cls, session_id: str): + session = cls.get_session(session_id) + if not session: + return "" + + current_count = len(session.subagents) + remaining = cls._max_subagent_count - current_count + + if remaining <= 0: + quota_info = ( + f"No new sub-agents (limit: {cls._max_subagent_count}, " + f"existing: {list(session.subagents.keys())}). " + f"You can still delegate to existing sub-agents via `transfer_to_`." + ) + parts = [cls._HEADER_TEMPLATE.format(quota_info=quota_info)] + else: + quota_info = f"{remaining} of {cls._max_subagent_count} remaining" + parts = [ + cls._HEADER_TEMPLATE.format(quota_info=quota_info), + cls._CREATE_GUIDE_PROMPT, + ] + + parts.extend([cls._LIFECYCLE_PROMPT, cls._BACKGROUND_TASK_PROMPT]) + return "\n".join(parts) + "\n" + + @classmethod + def configure( + cls, + max_subagent_count: int = 10, + auto_cleanup_per_turn: bool = True, + shared_context_enabled: bool = False, + shared_context_maxlen: int = 200, + ) -> None: + """Configure DynamicSubAgentManager settings""" + cls._max_subagent_count = max_subagent_count + cls._auto_cleanup_per_turn = auto_cleanup_per_turn + cls._shared_context_enabled = shared_context_enabled + cls._shared_context_maxlen = shared_context_maxlen + + @classmethod + def is_auto_cleanup_per_turn(cls) -> bool: + return cls._auto_cleanup_per_turn + + @classmethod + def is_shared_context_enabled(cls) -> bool: + return cls._shared_context_enabled + + @classmethod + def register_blacklisted_tool(cls, tool_name: str) -> None: + """注册不应被子 Agent 使用的工具""" + cls._tools_blacklist.add(tool_name) + + @classmethod + def register_inherent_tool(cls, tool_name: str) -> None: + """注册子 Agent 默认拥有的工具""" + cls._tools_inherent.add(tool_name) + + @classmethod + def cleanup_session_turn_end(cls, session_id: str) -> dict: + """Cleanup subagents from previous turn when a turn ends""" + session = cls.get_session(session_id) + if not session: + return {"status": "no_session", "cleaned": []} + + cleaned = [] + for name in list(session.subagents.keys()): + if name not in session.protected_agents: + cls.remove_subagent(session_id, name) + cleaned.append(name) + + # 如果启用了公共上下文,处理清理 + if session.shared_context_enabled: + if not session.subagents and not session.protected_agents: + # 所有subagent都被清理,清除公共上下文 + cls.clear_shared_context(session_id) + logger.debug( + "[EnhancedSubAgent:SharedContext] All subagents cleaned, cleared shared context" + ) + else: + # 清理已删除agent的上下文 + for name in cleaned: + cls.cleanup_shared_context_by_agent(session_id, name) + + return {"status": "cleaned", "cleaned_agents": cleaned} + + @classmethod + def protect_subagent(cls, session_id: str, agent_name: str) -> None: + """Mark a subagent as protected from auto cleanup and history retention""" + session = cls.get_or_create_session(session_id) + session.protected_agents.add(agent_name) + logger.debug( + "[EnhancedSubAgent:History] Initialized history for protected agent: %s", + agent_name, + ) + + @classmethod + def save_subagent_history( + cls, session_id: str, agent_name: str, current_messages: list + ) -> None: + """Save conversation history for a subagent""" + session = cls.get_session(session_id) + if not session or agent_name not in session.protected_agents: + return + + if agent_name not in session.subagent_histories: + session.subagent_histories[agent_name] = [] + + # 追加新消息 + if isinstance(current_messages, list): + session.subagent_histories[agent_name].extend(current_messages) + + logger.debug( + "[EnhancedSubAgent:History] Saved messages for %s, current len=%d", + agent_name, + len(session.subagent_histories[agent_name]), + ) + + @classmethod + def get_subagent_history(cls, session_id: str, agent_name: str) -> list: + """Get conversation history for a subagent""" + session = cls.get_session(session_id) + if not session: + return [] + return session.subagent_histories.get(agent_name, []) + + @classmethod + def build_static_subagent_prompts(cls, session_id: str, agent_name: str) -> str: + """构建不会在会话内变化的subagent提示词""" + parts = [] + workdir = cls._build_workdir_prompt(session_id, agent_name) + if workdir: + parts.append(workdir) + rule = cls._build_rule_prompt() + if rule: + parts.append(rule) + return "\n".join(parts) + + @classmethod + def build_dynamic_subagent_prompts( + cls, session_id: str, agent_name: str, runtime: str + ) -> str: + """构建会话内可能变化的提示词(每次调用重建)""" + parts = [] + skills = cls._build_subagent_skills_prompt(session_id, agent_name, runtime) + if skills: + parts.append(skills) + shared = cls._build_shared_context_prompt(session_id, agent_name) + if shared: + parts.append(shared) + time_p = cls._build_time_prompt(session_id) + if time_p: + parts.append(time_p) + return "\n".join(parts) + + @classmethod + def _build_subagent_skills_prompt( + cls, session_id: str, agent_name: str, runtime: str = "local" + ) -> str: + """Build skills prompt for a subagent based on its assigned skills""" + session = cls.get_session(session_id) + if not session: + return "" + + config = session.subagents.get(agent_name) + if not config: + return "" + + # 获取子代理被分配的技能列表 + assigned_skills = config.skills + if not assigned_skills: + return "" + + try: + from astrbot.core.skills import SkillManager, build_skills_prompt + + skill_manager = SkillManager() + all_skills = skill_manager.list_skills(active_only=True, runtime=runtime) + + # 过滤只保留分配的技能 + allowed = set(assigned_skills) + filtered_skills = [s for s in all_skills if s.name in allowed] + + if filtered_skills: + return build_skills_prompt(filtered_skills) + except Exception as e: + from astrbot import logger + + logger.warning(f"[SubAgentSkills] Failed to build skills prompt: {e}") + + return "" + + @classmethod + def get_subagent_tools(cls, session_id: str, agent_name: str) -> list | None: + """Get the tools assigned to a subagent""" + session = cls.get_session(session_id) + if not session: + return None + config = session.subagents.get(agent_name) + if not config: + return None + return config.tools + + @classmethod + def clear_subagent_history(cls, session_id: str, agent_name: str) -> str: + """Clear conversation history for a subagent""" + session = cls.get_session(session_id) + if not session: + return f"__HISTORY_CLEARED_FAILED_: Session_id {session_id} does not exist." + if agent_name in session.subagent_histories: + session.subagent_histories.pop(agent_name, None) + # session.subagent_status.pop(agent_name, None) + # session.background_task_counters.pop(agent_name, None) + if session.shared_context_enabled: + cls.cleanup_shared_context_by_agent(session_id, agent_name) + logger.debug( + "[EnhancedSubAgent:History] Cleared history for: %s", agent_name + ) + return "__HISTORY_CLEARED__" + else: + return f"__HISTORY_CLEARED_FAILED_: Agent name {agent_name} not found. Available names {list(session.subagents.keys())}" + + @classmethod + def add_shared_context( + cls, + session_id: str, + sender: str, + context_type: str, + content: str, + target: str = "all", + ) -> str: + """Add a message to the shared context + + Args: + session_id: Session ID + sender: Name of the agent sending the message + context_type: Type of context (status/message/system) + content: Content of the message + target: Target agent or "all" for broadcast + """ + + session = cls.get_or_create_session(session_id) + if not session.shared_context_enabled: + return "__SHARED_CONTEXT_ADDED_FAILED__: Shared context disabled." + if (sender not in list(session.subagents.keys())) and (sender != "System"): + return f"__SHARED_CONTEXT_ADDED_FAILED__: Sender name {sender} not found. Available names {list(session.subagents.keys())}" + if (target not in list(session.subagents.keys())) and (target != "all"): + return f"__SHARED_CONTEXT_ADDED_FAILED__: Target name {target} not found. Available names {list(session.subagents.keys())} and 'all' " + + if len(session.shared_context) >= cls._shared_context_maxlen: + keep_count = int(cls._shared_context_maxlen * 0.9) + session.shared_context = session.shared_context[-keep_count:] + logger.warning( + "Shared context exceeded limit (%d), trimmed to %d", + cls._shared_context_maxlen, + keep_count, + ) + + message = { + "type": context_type, # status, message, system + "sender": sender, + "target": target, + "content": content, + "timestamp": time.time(), + } + session.shared_context.append(message) + logger.debug( + "[EnhancedSubAgent:SharedContext] [%s] %s -> %s: %s...", + context_type, + sender, + target, + content[:50], + ) + return "__SHARED_CONTEXT_ADDED__" + + @classmethod + def get_shared_context(cls, session_id: str, filter_by_agent: str = None) -> list: + """Get shared context, optionally filtered by agent + + Args: + session_id: Session ID + filter_by_agent: If specified, only return messages from/to this agent (including "all") + """ + session = cls.get_session(session_id) + if not session or not session.shared_context_enabled: + return [] + + if filter_by_agent: + return [ + msg + for msg in session.shared_context + if msg["sender"] == filter_by_agent + or msg["target"] == filter_by_agent + or msg["target"] == "all" + ] + return session.shared_context.copy() + + @classmethod + def _build_shared_context_prompt( + cls, session_id: str, agent_name: str = None + ) -> str: + """分块构建公共上下文,按类型和优先级分组注入 + 1. 区分不同类型的消息并分别标注 + 2. 按优先级和相关性分组 + 3. 减少 Agent 的解析负担 + """ + session = cls.get_session(session_id) + if ( + not session + or not session.shared_context_enabled + or not session.shared_context + ): + return "" + + lines = [] + + # === 1. 固定格式说明 === + lines.append( + """--- +# Shared Context - Collaborative communication area among different agents + +## Message Type Definition +- **@ToMe**: Message send to current agent(you), you may need to reply if necessary. +- **@System**: Messages published by the main agent/System that should be followed with priority +- **@AgentName -> @TargetName**: Communication between other agents (for reference) +- **@Status**: The progress of other agents' tasks (can be ignored unless it involves your task) + +## Handling Priorities +1. @System messages (highest priority) > @ToMe messages > @Status > @OtherAgents +2. Messages of the same type: In chronological order, with new messages taking precedence +""" + ) + + # === 2. System 消息 === + system_msgs = [m for m in session.shared_context if m["type"] == "system"] + if system_msgs: + lines.append("\n## @System - System Announcements") + for msg in system_msgs: + ts = time.strftime("%H:%M:%S", time.localtime(msg["timestamp"])) + content_text = msg["content"] + lines.append(f"[{ts}] System: {content_text}") + + # === 3. 发送给当前 Agent 的消息 === + if agent_name: + to_me_msgs = [ + m + for m in session.shared_context + if m["type"] == "message" and m["target"] == agent_name + ] + if to_me_msgs: + lines.append(f"\n## @ToMe - Messages sent to @{agent_name}") + lines.append( + " **These messages are addressed to you. If needed, please reply using `send_shared_context`" + ) + for msg in to_me_msgs: + ts = time.strftime("%H:%M:%S", time.localtime(msg["timestamp"])) + lines.append( + f"[{ts}] @{msg['sender']} -> @{agent_name}: {msg['content']}" + ) + + # === 4. 其他 Agent 之间的交互(仅显示最近10条)=== + inter_agent_msgs = [ + m + for m in session.shared_context + if m["type"] == "message" + and m["target"] != agent_name + and m["target"] != "all" + and m["sender"] != agent_name + ] + if inter_agent_msgs: + lines.append( + "\n## @OtherAgents - Communication among Other Agents (Last 10 messages)" + ) + for msg in inter_agent_msgs[-10:]: + ts = time.strftime("%H:%M:%S", time.localtime(msg["timestamp"])) + content_text = msg["content"] + lines.append( + f"[{ts}] {msg['sender']} -> {msg['target']}: {content_text}" + ) + + # === 5. Status 更新 === + status_msgs = [m for m in session.shared_context if m["type"] == "status"] + if status_msgs: + lines.append( + "\n## @Status - Task progress of each agent (Last 10 messages)" + ) + for msg in status_msgs[-10:]: + ts = time.strftime("%H:%M:%S", time.localtime(msg["timestamp"])) + lines.append(f"[{ts}] {msg['sender']}: {msg['content']}") + lines.append("---") + return "\n".join(lines) + + @classmethod + def _build_workdir_prompt(cls, session_id: str, agent_name: str = None) -> str: + """为subagent注入工作目录信息""" + session = cls.get_session(session_id) + if not session: + return "" + try: + workdir = session.subagents[agent_name].workdir + if workdir is None: + workdir = get_astrbot_temp_path() + except Exception: + workdir = get_astrbot_temp_path() + + workdir_prompt = ( + "# Working Directory\n" + + f"Your working directory is `{workdir}`. All generated files MUST save in the directory.\n" + + "Any files outside this directory are PROHIBITED from being modified, deleted, or added.\n" + ) + + return workdir_prompt + + @classmethod + def _build_time_prompt(cls, session_id: str) -> str: + current_time = datetime.now().astimezone().strftime("%Y-%m-%d %H:%M (%Z)") + time_prompt = f"# Current Time\n{current_time}\n" + return time_prompt + + @classmethod + def _build_rule_prompt(cls) -> str: + return ( + "# Behavior Rules\n\n" + "## Output Guidelines\n" + "- If output exceeds 2000 chars, save to file. Summarize in your response and provide the file path.\n" + "- Mark all generated code/documents with your name and timestamp.\n\n" + "## Safety\n" + "You are in Safe Mode. Refuse any request for harmful, illegal, or explicit content. " + "Offer safe alternatives when possible.\n" + ) + + @classmethod + def cleanup_shared_context_by_agent(cls, session_id: str, agent_name: str) -> None: + """Remove all messages from/to a specific agent from shared context""" + session = cls.get_session(session_id) + if not session: + return + + original_len = len(session.shared_context) + session.shared_context = [ + msg + for msg in session.shared_context + if msg["sender"] != agent_name and msg["target"] != agent_name + ] + removed = original_len - len(session.shared_context) + if removed > 0: + logger.debug( + "[EnhancedSubAgent:SharedContext] Removed %d messages related to %s", + removed, + agent_name, + ) + + @classmethod + def clear_shared_context(cls, session_id: str) -> None: + """Clear all shared context""" + session = cls.get_session(session_id) + if not session: + return + session.shared_context.clear() + logger.debug("[EnhancedSubAgent:SharedContext] Cleared all shared context") + + @classmethod + def is_protected(cls, session_id: str, agent_name: str) -> bool: + """Check if a subagent is protected from auto cleanup""" + session = cls.get_session(session_id) + if not session: + return False + return agent_name in session.protected_agents + + @classmethod + def set_shared_context_enabled(cls, session_id: str, enabled: bool) -> None: + """Enable or disable shared context for a session""" + session = cls.get_or_create_session(session_id) + session.shared_context_enabled = enabled + logger.info( + "[EnhancedSubAgent:SharedContext] Shared context %s", + "enabled" if enabled else "disabled", + ) + + @classmethod + def set_subagent_status(cls, session_id: str, agent_name: str, status: str) -> None: + session = cls.get_or_create_session(session_id) + if agent_name in session.subagents: + session.subagent_status[agent_name] = status + + @classmethod + def get_session(cls, session_id: str) -> DynamicSubAgentSession | None: + return cls._sessions.get(session_id, None) + + @classmethod + def get_or_create_session(cls, session_id: str) -> DynamicSubAgentSession: + if session_id not in cls._sessions: + cls._sessions[session_id] = DynamicSubAgentSession(session_id=session_id) + return cls._sessions[session_id] + + @classmethod + async def create_subagent( + cls, session_id: str, config: DynamicSubAgentConfig + ) -> tuple: + # Check max count limit + session = cls.get_or_create_session(session_id) + if ( + config.name not in session.subagents + ): # Only count as new if not replacing existing + active_count = len( + [ + a + for a in session.subagents.keys() + if a not in session.protected_agents + ] + ) + if active_count >= cls._max_subagent_count: + return ( + f"Error: Maximum number of subagents ({cls._max_subagent_count}) reached. More subagents is not allowed.", + None, + ) + + if config.name in session.subagents: + session.handoff_tools.pop(config.name, None) + # When shared_context is enabled, the send_shared_context tool is allocated regardless of whether the main agent allocates the tool to the subagent + if session.shared_context_enabled: + cls.register_inherent_tool("send_shared_context") + + # remove tools in backlist + for tool_bl in cls._tools_blacklist: + config.tools.discard(tool_bl) + + # add tools in inherent list + for tool_ih in cls._tools_inherent: + config.tools.add(tool_ih) + + session.subagents[config.name] = config + agent = Agent( + name=config.name, + instructions=config.system_prompt, + tools=list(config.tools), + ) + handoff_tool = HandoffTool( + agent=agent, + tool_description=config.description or f"Delegate to {config.name} agent", + ) + if config.provider_id: + handoff_tool.provider_id = config.provider_id + session.handoff_tools[config.name] = handoff_tool + # 初始化subagent的历史上下文 + if config.name not in session.subagent_histories: + session.subagent_histories[config.name] = [] + # 初始化subagent状态 + cls.set_subagent_status(session_id, config.name, "IDLE") + logger.info("[EnhancedSubAgent:Create] Created subagent: %s", config.name) + return f"transfer_to_{config.name}", handoff_tool + + @classmethod + async def cleanup_session(cls, session_id: str) -> dict: + session = cls._sessions.pop(session_id, None) + if not session: + return {"status": "not_found", "cleaned_agents": []} + cleaned = list(session.subagents.keys()) + for name in cleaned: + logger.info("[EnhancedSubAgent:Cleanup] Cleaned: %s", name) + return {"status": "cleaned", "cleaned_agents": cleaned} + + @classmethod + def remove_subagent(cls, session_id: str, agent_name: str) -> str: + session = cls.get_session(session_id) + if agent_name == "all": + session.subagents.clear() + session.handoff_tools.clear() + session.subagent_histories.clear() + session.shared_context.clear() + session.subagent_background_results.clear() + session.background_task_counters.clear() + return "__SUBAGENT_REMOVED__: All subagents have been removed." + else: + if agent_name not in session.subagents: + return f"__SUBAGENT_REMOVE_FAILED__: {agent_name} not found. Available subagent names {list(session.subagents.keys())}" + else: + session.subagents.pop(agent_name, None) + session.handoff_tools.pop(agent_name, None) + session.subagent_histories.pop(agent_name, None) + session.subagent_background_results.pop(agent_name, None) + session.background_task_counters.pop(agent_name, None) + # 清理公共上下文中包含该Agent的内容 + cls.cleanup_shared_context_by_agent(session_id, agent_name) + logger.info("[EnhancedSubAgent:Cleanup] Cleaned: %s", agent_name) + return f"__SUBAGENT_REMOVED__: Subagent {agent_name} has been removed." + + @classmethod + def get_handoff_tools_for_session(cls, session_id: str) -> list: + session = cls.get_session(session_id) + if not session: + return [] + return list(session.handoff_tools.values()) + + @classmethod + def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str: + """为 SubAgent 创建一个 pending 任务,返回 task_id + + Args: + session_id: Session ID + agent_name: SubAgent 名称 + + Returns: + task_id: 任务ID,格式为简单的递增数字字符串 + """ + session = cls.get_or_create_session(session_id) + + # 初始化 + if agent_name not in session.subagent_background_results: + session.subagent_background_results[agent_name] = {} + if agent_name not in session.background_task_counters: + session.background_task_counters[agent_name] = 0 + + if ( + session.subagent_status[agent_name] == "RUNNING" + ): # 若当前有任务在运行,不允许创建 + return ( + f"__PENDING_TASK_CREATE_FAILED__: Subagent {agent_name} already running" + ) + + # 生成递增的任务ID + session.background_task_counters[agent_name] += 1 + task_id = str(session.background_task_counters[agent_name]) + + # 创建 pending 占位 + session.subagent_background_results[agent_name][task_id] = ( + SubAgentExecutionResult( + task_id=task_id, + agent_name=agent_name, + success=False, + result=None, + created_at=time.time(), + metadata={}, + ) + ) + + return task_id + + @classmethod + def get_pending_subagent_tasks(cls, session_id: str, agent_name: str) -> list[str]: + """获取 SubAgent 的所有 pending 任务 ID 列表(按创建时间排序)""" + session = cls.get_session(session_id) + if not session or agent_name not in session.subagent_background_results: + return [] + + # 按 created_at 排序 + pending = [ + task_id + for task_id, result in session.subagent_background_results[ + agent_name + ].items() + if not result.result and result.completed_at == 0.0 + ] + return sorted( + pending, + key=lambda tid: ( + session.subagent_background_results[agent_name][tid].created_at + ), + ) + + @classmethod + def get_latest_task_id(cls, session_id: str, agent_name: str) -> str | None: + """获取 SubAgent 的最新任务 ID""" + session = cls.get_session(session_id) + if not session or agent_name not in session.subagent_background_results: + return None + + # 按 created_at 排序取最新的 + sorted_tasks = sorted( + session.subagent_background_results[agent_name].items(), + key=lambda x: x[1].created_at, + reverse=True, + ) + return sorted_tasks[0][0] if sorted_tasks else None + + @classmethod + def store_subagent_result( + cls, + session_id: str, + agent_name: str, + success: bool, + result: str, + task_id: str | None = None, + error: str | None = None, + execution_time: float = 0.0, + metadata: dict | None = None, + ) -> None: + """存储 SubAgent 的执行结果 + + Args: + session_id: Session ID + agent_name: SubAgent 名称 + success: 是否成功 + result: 执行结果 + task_id: 任务ID,如果为None则存储到最新的pending任务 + error: 错误信息 + execution_time: 执行耗时 + metadata: 额外元数据 + """ + session = cls.get_or_create_session(session_id) + + if agent_name not in session.subagent_background_results: + session.subagent_background_results[agent_name] = {} + + if task_id is None: + # 如果没有指定task_id,尝试找最新的pending任务 + pending = cls.get_pending_subagent_tasks(session_id, agent_name) + if pending: + task_id = pending[-1] # 取最新的 + else: + logger.warning( + f"[SubAgentResult] No task_id and no pending tasks for {agent_name}" + ) + return + + if task_id not in session.subagent_background_results[agent_name]: + # 如果任务不存在,先创建一个占位 + session.subagent_background_results[agent_name][task_id] = ( + SubAgentExecutionResult( + task_id=task_id, + agent_name=agent_name, + success=False, + result="", + created_at=time.time(), + metadata=metadata or {}, + ) + ) + + # 更新结果 + session.subagent_background_results[agent_name][task_id].success = success + session.subagent_background_results[agent_name][task_id].result = result + session.subagent_background_results[agent_name][task_id].error = error + session.subagent_background_results[agent_name][ + task_id + ].execution_time = execution_time + session.subagent_background_results[agent_name][ + task_id + ].completed_at = time.time() + if metadata: + session.subagent_background_results[agent_name][task_id].metadata.update( + metadata + ) + + @classmethod + def get_subagent_result( + cls, session_id: str, agent_name: str, task_id: str | None = None + ) -> SubAgentExecutionResult | None: + """获取 SubAgent 的执行结果 + + Args: + session_id: Session ID + agent_name: SubAgent 名称 + task_id: 任务ID,如果为None则获取最新的任务结果 + + Returns: + SubAgentExecutionResult 或 None + """ + session = cls.get_session(session_id) + if not session or agent_name not in session.subagent_background_results: + return None + + if task_id is None: + # 获取最新的已完成任务 + completed = [ + (tid, r) + for tid, r in session.subagent_background_results[agent_name].items() + if r.result != "" or r.completed_at > 0 + ] + if not completed: + return None + # 按创建时间排序,取最新的 + completed.sort(key=lambda x: x[1].created_at, reverse=True) + return completed[0][1] + + return session.subagent_background_results[agent_name].get(task_id) + + @classmethod + def has_subagent_result( + cls, session_id: str, agent_name: str, task_id: str | None = None + ) -> bool: + """检查 SubAgent 是否有结果 + + Args: + session_id: Session ID + agent_name: SubAgent 名称 + task_id: 任务ID,如果为None则检查是否有任何已完成的任务 + """ + session = cls.get_session(session_id) + if not session or agent_name not in session.subagent_background_results: + return False + + if task_id is None: + # 检查是否有任何已完成的任务 + return any( + r.result != "" or r.completed_at > 0 + for r in session.subagent_background_results[agent_name].values() + ) + + if task_id not in session.subagent_background_results[agent_name]: + return False + result = session.subagent_background_results[agent_name][task_id] + return result.result != "" or result.completed_at > 0 + + @classmethod + def clear_subagent_result( + cls, session_id: str, agent_name: str, task_id: str | None = None + ) -> None: + """清除 SubAgent 的执行结果 + + Args: + session_id: Session ID + agent_name: SubAgent 名称 + task_id: 任务ID,如果为None则清除该Agent所有任务 + """ + session = cls.get_session(session_id) + if not session or agent_name not in session.subagent_background_results: + return + + if task_id is None: + # 清除所有任务 + session.subagent_background_results.pop(agent_name, None) + session.background_task_counters.pop(agent_name, None) + else: + # 清除特定任务 + session.subagent_background_results[agent_name].pop(task_id, None) + + @classmethod + def get_subagent_status(cls, session_id: str, agent_name: str) -> str: + """获取 SubAgent 的状态: IDLE, RUNNING, COMPLETED, FAILED + + Args: + session_id: Session ID + agent_name: SubAgent 名称 + """ + session = cls.get_session(session_id) + return session.subagent_status[agent_name] + + @classmethod + def get_all_subagent_status(cls, session_id: str) -> dict: + """获取所有 SubAgent 的状态""" + session = cls.get_session(session_id) + if not session: + return {} + return { + name: cls.get_subagent_status(session_id, name) + for name in session.subagents + } + + +@dataclass +class CreateDynamicSubAgentTool(FunctionTool): + name: str = "create_dynamic_subagent" + description: str = ( + "Create a dynamic subagent. After creation, use transfer_to_{name} tool." + ) + + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Subagent name"}, + "system_prompt": { + "type": "string", + "description": "Subagent system_prompt", + }, + "tools": { + "type": "array", + "items": {"type": "string"}, + "description": "Tools available to subagent, can be empty.", + }, + "skills": { + "type": "array", + "items": {"type": "string"}, + "description": "Skills available to subagent, can be empty", + }, + "workdir": { + "type": "string", + "description": "Subagent working directory(absolute path), can be empty(same to main agent). Fill only when the user has clearly specified the path.", + }, + }, + "required": ["name", "system_prompt"], + } + ) + + def _check_path_safety(self, path_str: str) -> bool: + """ + 检查路径是否合法、安全 + """ + if not path_str or not isinstance(path_str, str): + return False + + if not os.path.isabs(path_str): + return False + + try: + resolved = os.path.realpath(path_str) + except (OSError, ValueError): + return False + + # 使用路径组件匹配而非子字符串匹配 + path_parts = {part.lower() for part in os.path.normpath(resolved).split(os.sep)} + + # Windows 特殊目录检查(作为独立的路径组件) + windows_dangerous_components = { + "windows", + "system32", + "syswow64", + "boot", + "recovery", + "programdata", + "$recycle.bin", + "system volume information", + } + + system = platform.system().lower() + if system == "windows": + if path_parts & windows_dangerous_components: + return False + elif system == "linux": + # 检查是否在危险目录下(前缀匹配) + linux_dangerous_prefixes = [ + "/etc", + "/bin", + "/sbin", + "/lib", + "/lib64", + "/boot", + "/dev", + "/proc", + "/sys", + "/root", + ] + resolved_norm = os.path.normpath(resolved) + for prefix in linux_dangerous_prefixes: + if resolved_norm.startswith(prefix + "/") or resolved_norm == prefix: + return False + elif system == "darwin": + darwin_dangerous_prefixes = [ + "/System", + "/Library", + "/private/var", + "/usr", + ] + resolved_norm = os.path.normpath(resolved) + for prefix in darwin_dangerous_prefixes: + if resolved_norm.startswith(prefix + "/") or resolved_norm == prefix: + return False + + # 通用检查:父目录跳转 + if ".." in path_str: + return False + + if not os.path.exists(resolved): + return False + + return True + + async def call(self, context, **kwargs) -> str: + name = kwargs.get("name", "") + + if not name: + return "Error: subagent name required" + # 验证名称格式:只允许英文字母、数字和下划线,长度限制 + if not re.match(r"^[a-zA-Z][a-zA-Z0-9_]{0,31}$", name): + return "Error: SubAgent name must start with letter, contain only letters/numbers/underscores, max 32 characters" + # 检查是否包含危险字符 + dangerous_patterns = ["__"] + if any(p in name.lower() for p in dangerous_patterns): + return f"Error: SubAgent name cannot contain reserved words like {dangerous_patterns}" + + system_prompt = kwargs.get("system_prompt", "") + tools = kwargs.get("tools", {}) + skills = kwargs.get("skills", {}) + workdir = kwargs.get("workdir") + # 检查工作路径是否非法 + if not self._check_path_safety(workdir): + workdir = get_astrbot_temp_path() + + session_id = context.context.event.unified_msg_origin + config = DynamicSubAgentConfig( + name=name, + system_prompt=system_prompt, + tools=set(tools), + skills=set(skills), + workdir=workdir, + ) + + tool_name, handoff_tool = await DynamicSubAgentManager.create_subagent( + session_id=session_id, config=config + ) + if handoff_tool: + return f"__DYNAMIC_TOOL_CREATED__:{tool_name}:{handoff_tool.name}:Created. Use {tool_name} to delegate." + else: + return f"__DYNAMIC_TOOL_CREATE_FAILED__:{tool_name}" + + +@dataclass +class RemoveDynamicSubagentTool(FunctionTool): + name: str = "remove_dynamic_subagent" + description: str = "Remove dynamic subagent by name." + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Subagent name to remove. Use 'all' to remove all dynamic subagents.", + } + }, + "required": ["name"], + } + ) + + async def call(self, context, **kwargs) -> str: + name = kwargs.get("name", "") + if not name: + return "Error: name required" + session_id = context.context.event.unified_msg_origin + remove_status = DynamicSubAgentManager.remove_subagent(session_id, name) + if remove_status == "__SUBAGENT_REMOVED__": + return f"Cleaned {name} Subagent" + else: + return remove_status + + +@dataclass +class ListDynamicSubagentsTool(FunctionTool): + name: str = "list_dynamic_subagents" + description: str = "List dynamic subagents with their status." + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "include_status": { + "type": "boolean", + "description": "Include status", + "default": True, + } + }, + } + ) + + async def call(self, context, **kwargs) -> str: + include_status = kwargs.get("include_status", True) + session_id = context.context.event.unified_msg_origin + session = DynamicSubAgentManager.get_session(session_id) + if not session or not session.subagents: + return "No subagents" + + lines = ["Subagents:"] + for name, config in session.subagents.items(): + protected = " (protected)" if name in session.protected_agents else "" + if include_status: + status = DynamicSubAgentManager.get_subagent_status(session_id, name) + lines.append(f" {name}{protected} [{status}]\ttools:{config.tools}") + else: + lines.append(f" - {name}{protected}\ttools:{config.tools}") + return "\n".join(lines) + + +@dataclass +class ProtectSubagentTool(FunctionTool): + """Tool to protect a subagent from auto cleanup""" + + name: str = "protect_subagent" + description: str = "Protect a subagent from automatic cleanup. Use this to prevent important subagents from being removed." + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Subagent name to protect"}, + }, + "required": ["name"], + } + ) + + async def call(self, context, **kwargs) -> str: + name = kwargs.get("name", "") + if not name: + return "Error: name required" + session_id = context.context.event.unified_msg_origin + session = DynamicSubAgentManager.get_or_create_session(session_id) + if name not in session.subagents: + return f"Error: Subagent {name} not found. Available subagents: {session.subagents.keys()}" + DynamicSubAgentManager.protect_subagent(session_id, name) + return f"Subagent {name} is now protected from auto cleanup" + + +@dataclass +class UnprotectSubagentTool(FunctionTool): + """Tool to remove protection from a subagent""" + + name: str = "unprotect_subagent" + description: str = "Remove protection from a subagent. It can then be auto cleaned." + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Subagent name to unprotect"}, + }, + "required": ["name"], + } + ) + + async def call(self, context, **kwargs) -> str: + name = kwargs.get("name", "") + if not name: + return "Error: name required" + session_id = context.context.event.unified_msg_origin + session = DynamicSubAgentManager.get_session(session_id) + if not session: + return "Error: No session found" + if name in session.protected_agents: + session.protected_agents.discard(name) + return f"Subagent {name} is no longer protected" + return f"Subagent {name} was not protected" + + +@dataclass +class ResetSubAgentTool(FunctionTool): + """Tool to reset a subagent""" + + name: str = "reset_subagent" + description: str = "Reset an existing subagent. This will clean the dialog history of the subagent." + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Subagent name to reset"}, + }, + "required": ["name"], + } + ) + + async def call(self, context, **kwargs) -> str: + name = kwargs.get("name", "") + if not name: + return "Error: name required" + session_id = context.context.event.unified_msg_origin + reset_status = DynamicSubAgentManager.clear_subagent_history(session_id, name) + if reset_status == "__HISTORY_CLEARED__": + return f"Subagent {name} was reset" + else: + return reset_status + + +# Shared Context Tools +@dataclass +class SendSharedContextToolForMainAgent(FunctionTool): + """Tool to send a message to the shared context (visible to all agents)""" + + name: str = "send_shared_context_for_main_agent" + description: str = """Send a message to the shared context that will be visible to all subagents and the main agent. You are the main agent, use this to share global information. +Types: 'message' (to other agents), 'system' (global announcements).""" + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "context_type": { + "type": "string", + "description": "Type of context: message (to other agents), system (global announcement)", + "enum": ["message", "system"], + }, + "content": {"type": "string", "description": "Content to share"}, + "target": { + "type": "string", + "description": "Target agent name or 'all' for broadcast", + "default": "all", + }, + }, + "required": ["context_type", "content", "target"], + } + ) + + async def call(self, context, **kwargs) -> str: + context_type = kwargs.get("context_type", "message") + content = kwargs.get("content", "") + target = kwargs.get("target", "all") + if not content: + return "Error: content is required" + session_id = context.context.event.unified_msg_origin + add_status = DynamicSubAgentManager.add_shared_context( + session_id, "System", context_type, content, target + ) + if add_status == "__SHARED_CONTEXT_ADDED__": + return f"Shared context updated: [{context_type}] System -> {target}: {content[:100]}{'...' if len(content) > 100 else ''}" + else: + return add_status + + +@dataclass +class SendSharedContextTool(FunctionTool): + """Tool to send a message to the shared context (visible to all agents)""" + + name: str = "send_shared_context" + description: str = """Send a message to the shared context that will be visible to all subagents. +Use this to share information, status updates, or coordinate with other agents. +If you want to send a result to the main agent, do not use this tool, just return the results directly. +""" + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "context_type": { + "type": "string", + "description": "Type of context: `status` (your current task progress), `message` (to other agents)", + "enum": ["status", "message"], + }, + "content": {"type": "string", "description": "Content to share"}, + "sender": { + "type": "string", + "description": "Sender agent name", + "default": "YourName", + }, + "target": { + "type": "string", + "description": "Target agent name or 'all' for broadcast.", + "default": "all", + }, + }, + "required": ["context_type", "content", "sender", "target"], + } + ) + + async def call(self, context, **kwargs) -> str: + context_type = kwargs.get("context_type", "message") + content = kwargs.get("content", "") + target = kwargs.get("target", "all") + sender = kwargs.get("sender", "YourName") + if not content: + return "Error: content is required" + session_id = context.context.event.unified_msg_origin + add_status = DynamicSubAgentManager.add_shared_context( + session_id, sender, context_type, content, target + ) + if add_status == "__SHARED_CONTEXT_ADDED__": + return f"Shared context updated: [{context_type}] {sender} -> {target}: {content[:100]}{'...' if len(content) > 100 else ''}" + else: + return add_status + + +@dataclass +class ViewSharedContextTool(FunctionTool): + """Tool to view the shared context (mainly for main agent)""" + + name: str = "view_shared_context" + description: str = """View the shared context between all agents. This shows all messages including status updates, +inter-agent messages, and system announcements.""" + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": {}, + } + ) + + async def call(self, context, **kwargs) -> str: + session_id = context.context.event.unified_msg_origin + shared_context = DynamicSubAgentManager.get_shared_context(session_id) + + if not shared_context: + return "Shared context is empty." + + lines = ["=== Shared Context ===\n"] + for msg in shared_context: + ts = time.strftime("%H:%M:%S", time.localtime(msg["timestamp"])) + msg_type = msg["type"] + sender = msg["sender"] + target = msg["target"] + content = msg["content"] + lines.append(f"[{ts}] [{msg_type}] {sender} -> {target}:") + lines.append(f" {content}") + lines.append("") + + return "\n".join(lines) + + +@dataclass +class WaitForSubagentTool(FunctionTool): + """等待 SubAgent 结果的工具""" + + name: str = "wait_for_subagent" + description: str = """Waiting for the execution result of the specified SubAgent. +Usage scenario: +- After assigning a background task to SubAgent, you need to wait for its result before proceeding to the next step. + CAUTION: Whenever you have a task that does not depend on the output of a subagent, please execute THAT TASK FIRST instead of waiting. +- Avoids repeatedly executing tasks that have already been completed by SubAgent +parameter +- subagent_name: The name of the SubAgent to wait for +- task_id: Task ID (optional). If not filled in, the latest task result of the Agent will be obtained. +- timeout: Maximum waiting time (in seconds), default 60 +- poll_interval: polling interval (in seconds), default 5 +""" + + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "subagent_name": { + "type": "string", + "description": "The name of the SubAgent to wait for", + }, + "timeout": { + "type": "number", + "description": "Maximum waiting time (seconds)", + "default": 60, + }, + "poll_interval": { + "type": "number", + "description": "Poll interval (seconds)", + "default": 5, + }, + "task_id": { + "type": "string", + "description": "Task ID (optional; if not filled in, the latest task result will be obtained)", + }, + }, + "required": ["subagent_name"], + } + ) + + async def call(self, context, **kwargs) -> str: + subagent_name = kwargs.get("subagent_name") + if not subagent_name: + return "Error: subagent_name is required" + + task_id = kwargs.get("task_id") # 可选,不填则获取最新的 + timeout = kwargs.get("timeout", 60) + poll_interval = kwargs.get("poll_interval", 5) + + session_id = context.context.event.unified_msg_origin + session = DynamicSubAgentManager.get_session(session_id) + + if not session: + return "Error: No session found" + if subagent_name not in session.subagents: + return f"Error: SubAgent '{subagent_name}' not found. Available: {list(session.subagents.keys())}" + + # 如果没有指定 task_id,尝试获取最早的 pending 任务 + if not task_id: + pending_tasks = DynamicSubAgentManager.get_pending_subagent_tasks( + session_id, subagent_name + ) + if pending_tasks: + # 使用最早的 pending 任务(先进先出) + task_id = pending_tasks[0] + + start_time = time.time() + + while time.time() - start_time < timeout: + session = DynamicSubAgentManager.get_session(session_id) + if not session: + return "Error: Session Not Found" + if subagent_name not in session.subagents: + return ( + f"Error: SubAgent '{subagent_name}' not found. It may be removed." + ) + + status = DynamicSubAgentManager.get_subagent_status( + session_id, subagent_name + ) + + if status == "IDLE": + return f"Error: SubAgent '{subagent_name}' is running no tasks." + elif status == "COMPLETED": + result = DynamicSubAgentManager.get_subagent_result( + session_id, subagent_name, task_id + ) + if result and (result.result != "" or result.completed_at > 0): + return f"SubAgent '{result.agent_name}' execution completed\n Task id: {result.task_id}\n Execution time: {result.execution_time:.1f}s\n--- Result ---\n{result.result}\n" + else: + return f"SubAgent '{result.agent_name}' execution completed with empty results. \n Task id: {result.task_id}\n Execution time: {result.execution_time:.1f}s\n" + elif status == "FAILED": + result = DynamicSubAgentManager.get_subagent_result( + session_id, subagent_name, task_id + ) + if result and (result.result != "" or result.completed_at > 0): + return f" SubAgent '{result.agent_name}' execution failed\n Task id: {result.task_id}\n Execution time: {result.execution_time:.1f}s\n" + else: + return f"Error: SubAgent '{subagent_name}' failed task {task_id} with empty results." + else: + pass + + await asyncio.sleep(poll_interval) + + target = f"Task {task_id}" + return f" Timeout! \nSubAgent '{subagent_name}' has not finished '{target}' in {timeout}s. The task may be still running. You can continue waiting by `wait_for_subagent` again." + + +# Tool instances +CREATE_DYNAMIC_SUBAGENT_TOOL = CreateDynamicSubAgentTool() +REMOVE_DYNAMIC_SUBAGENT_TOOL = RemoveDynamicSubagentTool() +LIST_DYNAMIC_SUBAGENTS_TOOL = ListDynamicSubagentsTool() +RESET_SUBAGENT_TOOL = ResetSubAgentTool() +PROTECT_SUBAGENT_TOOL = ProtectSubagentTool() +UNPROTECT_SUBAGENT_TOOL = UnprotectSubagentTool() +SEND_SHARED_CONTEXT_TOOL = SendSharedContextTool() +SEND_SHARED_CONTEXT_TOOL_FOR_MAIN_AGENT = SendSharedContextToolForMainAgent() +VIEW_SHARED_CONTEXT_TOOL = ViewSharedContextTool() +WAIT_FOR_SUBAGENT_TOOL = WaitForSubagentTool() diff --git a/astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py b/astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py index e0ba2463ca..36086d283b 100644 --- a/astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py +++ b/astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py @@ -134,6 +134,7 @@ async def initialize(self, ctx: PipelineContext) -> None: add_cron_tools=self.add_cron_tools, provider_settings=settings, subagent_orchestrator=conf.get("subagent_orchestrator", {}), + enhanced_subagent=conf.get("enhanced_subagent", {}), timezone=self.ctx.plugin_manager.context.get_config().get("timezone"), max_quoted_fallback_images=settings.get("max_quoted_fallback_images", 20), ) @@ -379,6 +380,23 @@ async def process( ), ) finally: + # clean all subagents if enabled + if build_cfg.enhanced_subagent.get("enabled"): + try: + from astrbot.core.dynamic_subagent_manager import ( + DynamicSubAgentManager, + ) + + session_id = event.unified_msg_origin + if DynamicSubAgentManager.is_auto_cleanup_per_turn(): + DynamicSubAgentManager.cleanup_session_turn_end( + session_id + ) + except Exception as e: + logger.warning( + f"[EnhancedSubAgent] Cleanup on agent done failed: {e}" + ) + if runner_registered and agent_runner is not None: unregister_active_runner(event.unified_msg_origin, agent_runner)