Skip to content

Commit 4f3368f

Browse files
committed
feat: 添加有向无环图(DAG)编排器,自动处理数据依赖
1 parent 99b5d63 commit 4f3368f

3 files changed

Lines changed: 47 additions & 30 deletions

File tree

astrbot/core/subagent_dag.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ async def _execute_single_node(
244244
else:
245245
node.status = DAGNodeStatus.FAILED
246246
node.error = result.error or "Unknown error"
247+
node.result = result.result or ""
247248
return
248249

249250
session = SubAgentManager.get_session(session_id)
@@ -406,9 +407,9 @@ def _format_dag_result(
406407
f" ✓ {nid} ({node.agent_name}) — {node.execution_time:.1f}s"
407408
)
408409
if node.result:
409-
preview = node.result[:300]
410-
if len(node.result) > 300:
411-
preview += "..."
410+
preview = node.result[:1000]
411+
if len(node.result) > 1000:
412+
preview += "...[truncated]"
412413
lines.append(f" {preview}")
413414
elif node.status == DAGNodeStatus.FAILED:
414415
lines.append(
@@ -417,6 +418,11 @@ def _format_dag_result(
417418
)
418419
if node.error:
419420
lines.append(f" Error: {node.error}")
421+
if node.result:
422+
preview = node.result[:1000]
423+
if len(node.result) > 1000:
424+
preview += "...[truncated]"
425+
lines.append(f" Output: {preview}")
420426
elif node.status == DAGNodeStatus.SKIPPED:
421427
lines.append(
422428
f" ⊘ {nid} ({node.agent_name}) — skipped (dependency failed)"

astrbot/core/subagent_manager.py

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ def cleanup_session_turn_end(cls, session_id: str) -> dict:
259259
if not session.subagents and not session.protected_agents:
260260
# 所有subagent都被清理,清除公共上下文
261261
cls.clear_shared_context(session_id)
262-
logger.debug(
262+
logger.info(
263263
"[SubAgent:SharedContext] All subagents cleaned, cleared shared context"
264264
)
265265
else:
@@ -279,7 +279,6 @@ def cleanup_session_turn_end(cls, session_id: str) -> dict:
279279

280280
# 每轮结束时顺便清理全局过期会话
281281
cls.cleanup_expired_sessions()
282-
283282
return {"status": "cleaned", "cleaned_agents": cleaned}
284283

285284
@classmethod
@@ -734,27 +733,21 @@ def _build_time_prompt(cls) -> str:
734733

735734
_TASK_STATUS_PROMPT = (
736735
"# Task Status Reporting\n"
737-
"At the end of your task, you MUST self-audit before reporting status.\n"
738-
"\n"
739-
"SUCCESS protocol — verify ALL:\n"
740-
"- No tool returned an error or empty result unexpectedly\n"
741-
"- The output directly answers the task prompt\n"
742-
"- You are confident the result is not a guess or placeholder\n"
743-
"- If files were created: they exist on disk and their content is "
744-
"correct and complete enough for another agent to continue the work\n"
745-
"<task_status><result>SUCCESS</result></task_status>\n"
746-
"\n"
747-
"FAILURE protocol — if ANY tool failed or you cannot complete:\n"
748-
"<task_status><result>FAILURE</result>"
749-
"<reason>specific reason</reason></task_status>\n"
750-
"\n"
751-
"Common mistakes to avoid:\n"
752-
"- Do NOT report SUCCESS when a command returned an error\n"
753-
"- Do NOT report SUCCESS when you only completed part of the task\n"
754-
"- Do NOT report SUCCESS when generated files are missing or incomplete\n"
755-
"- Do NOT guess status — if uncertain, use FAILURE\n"
756-
"\n"
757-
"Put the XML block first, then your result or error details.\n"
736+
"At the end of your task, self-audit before giving your final answer.\n"
737+
"## SUCCESS — use only when ALL of these are true:\n"
738+
"- Every tool call succeeded; no unexpected error or empty result\n"
739+
"- Your output directly answers the task you were assigned\n"
740+
"- You are confident the result is accurate, not a guess or placeholder\n"
741+
"- If you created files: ensure they exist on disk, and their content is correct and complete\n"
742+
"If all pass, put this EXACT line FIRST, then your result:\n"
743+
"[TASK RESULT: SUCCESS]\n"
744+
"## FAILURE — use if ANY tool failed, or you cannot complete the task:\n"
745+
"[TASK RESULT: FAILURE]\n"
746+
"[FAILURE REASON: <one-line explanation>]\n"
747+
"## Reporting Marker Rules\n"
748+
"- The marker MUST be exactly `[TASK RESULT: SUCCESS]` or `[TASK RESULT: FAILURE]` — do not change it\n"
749+
"- The marker MUST be on its own line, at the very top of your response\n"
750+
"- When uncertain between success and failure, choose failure\n"
758751
)
759752

760753
@classmethod
@@ -771,7 +764,10 @@ def _build_rule_prompt(cls) -> str:
771764
"- Mark all generated code/documents with your name and timestamp (if given).\n"
772765
)
773766
)
774-
return base + cls._TASK_STATUS_PROMPT
767+
if cls._dag_enabled:
768+
return base + cls._TASK_STATUS_PROMPT
769+
else:
770+
return base
775771

776772
@classmethod
777773
def cleanup_shared_context_by_agent(cls, session_id: str, agent_name: str) -> None:

astrbot/core/subagent_tools.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -740,15 +740,28 @@ async def _launch_dag_node(node, _sid, injected_context, task_id):
740740
result_text += c.text + chr(10)
741741

742742
# Detect task status from subagent output.
743-
# Priority: 1) XML task_status 2) error: prefix 3) empty
743+
# Priority: 1) [TASK RESULT: ...] marker 2) error: prefix 3) empty
744744
success = True
745+
error_reason = None
745746
stripped = result_text.strip()
746747
status_match = re.search(
747-
r"<task_status>\s*<result>\s*(SUCCESS|FAILURE)\s*</result>",
748+
r"\[TASK\s*RESULT\s*:\s*(SUCCESS|FAILURE)\]",
748749
stripped,
750+
re.IGNORECASE,
749751
)
750752
if status_match:
751-
success = status_match.group(1) == "SUCCESS"
753+
success = status_match.group(1).upper() == "SUCCESS"
754+
if not success:
755+
# Extract failure reason for concise error reporting
756+
reason_match = re.search(
757+
r"\[FAILURE\s*REASON\s*:\s*(.+?)\]",
758+
stripped,
759+
re.IGNORECASE,
760+
)
761+
if reason_match:
762+
error_reason = reason_match.group(1).strip()
763+
else:
764+
error_reason = "No reason provided"
752765
elif not stripped or stripped.lower().startswith("error:"):
753766
success = False
754767

@@ -759,6 +772,7 @@ async def _launch_dag_node(node, _sid, injected_context, task_id):
759772
result_text,
760773
task_id=task_id,
761774
execution_time=0.0,
775+
error=error_reason,
762776
)
763777
except Exception as e:
764778
logger.error(f"[SubAgent:DAG] Launch error for {node.agent_name}: {e}")
@@ -776,6 +790,7 @@ async def _launch_dag_node(node, _sid, injected_context, task_id):
776790
result = await SubAgentDAGEngine.execute_dag(
777791
ctx=dag_ctx,
778792
session_id=session_id,
793+
max_inject_length=cfg.get("dag_max_inject_length", 4000),
779794
launch_fn=_launch_dag_node,
780795
)
781796
dag_ctx.status = "COMPLETED" if result["failed"] == 0 else "FAILED"

0 commit comments

Comments
 (0)