From f67dc5b82570d246d7b1105eeee86dbd49f26ab2 Mon Sep 17 00:00:00 2001 From: Dylan Savage Date: Thu, 12 Mar 2026 12:52:02 -0700 Subject: [PATCH 1/9] Feat: new control interface for crew, added crew orchestrator node --- nodes/src/nodes/agent_crewai/IGlobal.py | 18 +- nodes/src/nodes/agent_crewai/IInstance.py | 18 +- nodes/src/nodes/agent_crewai/crew.md | 107 ++++++ nodes/src/nodes/agent_crewai/crewai.py | 70 ++-- .../nodes/agent_crewai/orchestrator_driver.py | 358 ++++++++++++++++++ nodes/src/nodes/agent_crewai/services.json | 21 +- .../agent_crewai/services.orchestrator.json | 73 ++++ .../lib/rocketlib/__init__.py | 2 + .../rocketlib-python/lib/rocketlib/types.py | 29 ++ 9 files changed, 669 insertions(+), 27 deletions(-) create mode 100644 nodes/src/nodes/agent_crewai/crew.md create mode 100644 nodes/src/nodes/agent_crewai/orchestrator_driver.py create mode 100644 nodes/src/nodes/agent_crewai/services.orchestrator.json diff --git a/nodes/src/nodes/agent_crewai/IGlobal.py b/nodes/src/nodes/agent_crewai/IGlobal.py index 19b897822..bd1e3bf8a 100644 --- a/nodes/src/nodes/agent_crewai/IGlobal.py +++ b/nodes/src/nodes/agent_crewai/IGlobal.py @@ -26,12 +26,14 @@ import os from typing import Any -from rocketlib import IGlobalBase, OPEN_MODE +from rocketlib import IGlobalBase, IJson, OPEN_MODE class IGlobal(IGlobalBase): process: Any = None agent: Any = None + role: str = 'Assistant' + task_description: str = '' def beginGlobal(self) -> None: if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG: @@ -46,10 +48,20 @@ def beginGlobal(self) -> None: self.process = Process.sequential - from .crewai import CrewDriver + conn_config = IJson.toDict(self.glb.connConfig) if self.glb.connConfig else {} + conn_config = conn_config if isinstance(conn_config, dict) else {} - self.agent = CrewDriver(self, process=self.process) + if self.glb.logicalType == 'agent_crewai_orchestrator': + from .orchestrator_driver import OrchestratorDriver + self.agent = OrchestratorDriver(self) + else: + self.role = str(conn_config.get('role') or 'Assistant').strip() or 'Assistant' + self.task_description = str(conn_config.get('task_description') or '').strip() + from .crewai import CrewDriver + self.agent = CrewDriver(self, process=self.process, role=self.role, task_description=self.task_description) def endGlobal(self) -> None: self.agent = None self.process = None + self.role = 'Assistant' + self.task_description = '' diff --git a/nodes/src/nodes/agent_crewai/IInstance.py b/nodes/src/nodes/agent_crewai/IInstance.py index ab92e5eea..90e4f5430 100644 --- a/nodes/src/nodes/agent_crewai/IInstance.py +++ b/nodes/src/nodes/agent_crewai/IInstance.py @@ -42,8 +42,24 @@ def writeQuestions(self, question: Question): self.IGlobal.agent.run_agent(self, question, emit_answers_lane=True) def invoke(self, param: Any) -> Any: # noqa: ANN401 - # Only intercept tool.* control-plane operations; otherwise fall back. op = param.get('op') if isinstance(param, dict) else getattr(param, 'op', None) + + # crewai.describe fan-out: only sub-agents (CrewDriver) respond — guarded by hasattr. + # OrchestratorDriver has no describe() so it silently falls through. + if isinstance(op, str) and op == 'crewai.describe' and hasattr(self.IGlobal.agent, 'describe'): + descriptor = self.IGlobal.agent.describe(self) + existing = getattr(param, 'agents', None) + if isinstance(existing, list): + existing.append(descriptor) + try: + param.agents = existing + except Exception: + pass + return param + return [descriptor] + + # tool.* control-plane operations for agent-as-tool. if isinstance(op, str) and op.startswith('tool.'): return self.IGlobal.agent.handle_invoke(self, param) + return super().invoke(param) diff --git a/nodes/src/nodes/agent_crewai/crew.md b/nodes/src/nodes/agent_crewai/crew.md new file mode 100644 index 000000000..ca78450e6 --- /dev/null +++ b/nodes/src/nodes/agent_crewai/crew.md @@ -0,0 +1,107 @@ +# CrewAI Nodes — Topology Guide + +## Node Types + +### `agent_crewai` (Sub-Agent / Standalone) + +Channels: `llm` (required), `tool` (optional) + +**Standalone mode** (no orchestrator above): receives questions on the pipeline lane and runs a single-agent CrewAI Crew. + +**Sub-agent mode** (connected to an orchestrator via `crewai` channel): responds to `crewai.describe` with its role, task description, and LLM/tool invoke handle. The orchestrator assembles it into a hierarchical Crew. + +Config: +- `role` — short role name shown to the manager (e.g. `Financial Analyst`). Default: `Assistant`. +- `task_description` — what this agent should do. If blank, the full incoming question is used as the task. +- `instructions` — optional additional instructions (applied to every question). + +--- + +### `agent_crewai_orchestrator` (Orchestrator) + +Channels: `llm` (required), `crewai` (required, min 1), `tool` (optional) + +Always the top-level driver. On each question: +1. Fans out `crewai.describe` to all nodes on the `crewai` channel +2. Builds a hierarchical CrewAI Crew with each connected sub-agent +3. Kicks off all sub-tasks with `async_execution=True` +4. The manager (this node's own LLM) synthesizes sub-agent outputs +5. Emits the final answer on the pipeline + +Config: +- `instructions` — optional delegation guidance appended to every question. + +**Does NOT respond to `crewai.describe`** — orchestrators cannot be wired as sub-agents of another orchestrator. + +--- + +## Topology Map + +### Flat Crew (standard use) + +``` +[Orchestrator A] ──crewai──► [Sub-agent B] (own llm + tools) + ──crewai──► [Sub-agent C] (own llm + tools) + ──llm────► [LLM_A] +``` + +- A receives a question → fans out `crewai.describe` → B and C respond +- A builds hierarchical Crew: B and C as async sub-agents, A's LLM as manager +- B and C tasks run in parallel; each uses its own `llm`/`tool` channels +- Manager synthesizes outputs → answer emitted on A's pipeline lane + +--- + +### Depth via tool channel + +``` +[Orchestrator A] ──crewai──► [Sub-agent B] ──tool──► [Orchestrator C] + ──crewai──► [Sub-agent D] + ──crewai──► [Sub-agent E] +``` + +- B's task can invoke C as a tool during execution +- Calling C triggers C's full `_run()`, which fans out its own `crewai.describe` to D and E +- C assembles D and E into its own hierarchical Crew and returns a synthesized result to B as a tool call string +- C does NOT emit to the pipeline answers lane (`emit_answers_lane=False` for tool invocations) +- D and E's LLM/tool calls are routed through their own engine channels + +--- + +### What a sub-agent cannot do + +``` +[Sub-agent B] ──crewai──► ??? ← IMPOSSIBLE +``` + +`agent_crewai` has no `crewai` invoke channel. Sub-agents cannot directly have CrewAI sub-agents. To add depth, put an orchestrator on their `tool` channel instead (see above). + +--- + +## A→B→C Reference Cases + +| Wiring | What happens | +|--------|-------------| +| A(orch) → B(sub) via crewai, B(sub) → C(sub) via crewai | **IMPOSSIBLE** — B has no `crewai` channel | +| A(orch) → B(sub) via crewai, B(sub) → C(sub) via tool | C runs as a standalone single-agent Crew when B invokes it as a tool | +| A(orch) → B(sub) via crewai, B(sub) → C(orch) via tool | C assembles its own hierarchical Crew of its sub-agents when B calls it as a tool | +| A(orch) → C(orch) via crewai | C silently skipped (orchestrator has no `describe()`) — A sees 0 descriptors → RuntimeError | +| A(orch) → B(sub) via crewai, B(sub) → A via tool | Circular — A re-enters `run_agent`, overwrites `_current_pSelf`. Undefined behavior. | + +--- + +## Known Constraints + +1. **One level of CrewAI sub-agents per Crew.** `crewai.describe` fan-out is one level deep — only nodes directly on the orchestrator's `crewai` channel are discovered. Multi-level orchestration requires the `tool` channel. + +2. **Sub-agent tasks run in parallel, tool calls are serial.** `async_execution=True` applies within one Crew's flat sub-agent list. Tool-channel depth is blocking — each nested Crew must complete before the calling sub-agent continues. + +3. **No orchestrator-as-sub-agent.** Wiring an orchestrator to another orchestrator's `crewai` channel produces no result (no `describe()` method). The outer orchestrator raises `RuntimeError: no sub-agents connected` if no valid sub-agents respond. Use the `tool` channel for nested orchestrators. + +4. **Empty `task_description`.** If a sub-agent has no configured task, the full incoming question becomes its task. All such sub-agents receive identical work — differentiate them with explicit `task_description` values. + +5. **Defensive tool filter.** If a sub-agent node is accidentally wired to both the `crewai` and `tool` channels of the same orchestrator, its `run_agent` tool is automatically excluded from the manager's tool repertoire. The orchestrator uses `crewai` channel only for Crew assembly and never calls sub-agents directly as tools. + +6. **Circular tool wiring causes undefined behavior.** Do not wire a node as a tool to an orchestrator that is already above it in the Crew chain. + +7. **Nested orchestrator output size.** When an orchestrator returns as a tool result, its full synthesized answer enters the calling sub-agent's LLM context as a tool response. Deep nesting with verbose outputs can bloat context rapidly. diff --git a/nodes/src/nodes/agent_crewai/crewai.py b/nodes/src/nodes/agent_crewai/crewai.py index 3ac81e46c..e2ccb40c0 100644 --- a/nodes/src/nodes/agent_crewai/crewai.py +++ b/nodes/src/nodes/agent_crewai/crewai.py @@ -5,7 +5,7 @@ # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights -# to use, copy, merge, publish, distribute, sublicense, and/or sell +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # @@ -17,7 +17,7 @@ # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OF OTHER DEALINGS IN THE +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # ============================================================================= @@ -36,22 +36,52 @@ from ai.common.agent.types import AgentHost, AgentInput, AgentRunResult from rocketlib import ToolDescriptor +_DEFAULT_GOAL = 'Complete the assigned task using available tools.' +_DEFAULT_BACKSTORY = ( + 'You are a specialized agent in a multi-agent pipeline. ' + 'Use the tools available to you and complete your assigned task thoroughly.' +) +_DEFAULT_EXPECTED_OUTPUT = 'A thorough, accurate result for the assigned task.' + class CrewDriver(AgentBase): FRAMEWORK = 'crewai' - def __init__(self, iGlobal: Any, *, process: Any = None): + def __init__(self, iGlobal: Any, *, process: Any = None, role: str = 'Assistant', task_description: str = ''): """ Initialize the CrewDriver. + + iGlobal is passed to AgentBase so it can load config/instructions. """ super().__init__(iGlobal) self._process = process + self._role = role + self._task_description = task_description + + def describe(self, pSelf: Any) -> Any: + """ + Return a DescribeResponse for crewai.describe fan-out. + + Called by IInstance.invoke() when the orchestrator fans out crewai.describe. + Stores the full pSelf IInstance in `invoke` so AgentHostServices(d.invoke) + can call d.invoke.instance.* correctly. + """ + from rocketlib.types import IInvokeCrew + + pipe_type = pSelf.instance.pipeType + node_id = str(pipe_type.get('id') if isinstance(pipe_type, dict) else getattr(pipe_type, 'id', '')) or '' + return IInvokeCrew.DescribeResponse( + role=self._role, + task_description=self._task_description, + node_id=node_id, + invoke=pSelf, + ) def _bind_framework_llm( self, *, host: AgentHost, - call_llm_text: Callable[..., str], + call_llm: Callable[..., str], ctx: Dict[str, Any], ) -> Any: @@ -70,7 +100,7 @@ def call( **kwargs: Any, ) -> Union[str, Any]: stop_words = getattr(self, 'stop', None) - return call_llm_text(messages, stop_words=stop_words) + return call_llm(messages, stop_words=stop_words) return HostInvokeLLM() @@ -169,7 +199,7 @@ def _run( ctx: Dict[str, Any], ) -> AgentRunResult: run_id = ctx.get('run_id', '') - debug('agent_crewai driver _run start run_id={} prompt_len={}'.format(run_id, len(agent_input.question.getPrompt() or ''))) + debug('agent_crewai driver _run start run_id={}'.format(run_id)) from crewai import Agent, Crew, Task # type: ignore @@ -179,14 +209,14 @@ def _call_llm_text(messages: Any, stop_words: Any = None) -> str: return self.call_host_llm( host=host, messages=messages, - question_role='You are a helpful assistant.', + question_role=self._role, stop_words=stop_words, ) def _invoke_tool(tool_name: str, input: Any = None, kwargs: Optional[Dict[str, Any]] = None) -> Any: # noqa: A002 return self.invoke_host_tool(host=host, tool_name=tool_name, input=input, kwargs=kwargs) - llm = self._bind_framework_llm(host=host, call_llm_text=_call_llm_text, ctx=ctx) + llm = self._bind_framework_llm(host=host, call_llm=_call_llm_text, ctx=ctx) tools_for_agent = self._bind_framework_tools( host=host, tool_descriptors=tool_descriptors, @@ -196,28 +226,24 @@ def _invoke_tool(tool_name: str, input: Any = None, kwargs: Optional[Dict[str, A ) agent_obj = Agent( - role='Assistant', - goal='Solve the user request using available tools when helpful.', - backstory=('You are an agent node in a tool-invocation hierarchy. You may call tools wired to you via the host tools interface. When a tool is needed, call it; otherwise respond directly. Follow any additional instructions exactly.'), + role=self._role, + goal=_DEFAULT_GOAL, + backstory=_DEFAULT_BACKSTORY, tools=tools_for_agent, llm=llm, verbose=False, ) - desc_parts = [ - 'You are executing inside an agent pipeline.', - 'Use tools when needed (and only those available to you).', - '', - 'User request:', - _safe_str(agent_input.question.getPrompt() or ''), - ] - desc = '\n'.join(desc_parts).strip() + # Use configured task_description if set, otherwise fall back to the incoming prompt + from ai.common.agent._internal.utils import extract_prompt + prompt = extract_prompt(agent_input.question) if hasattr(agent_input, 'question') else '' + task_text = self._task_description or prompt or '' - desc = desc.replace('{', '{{').replace('}', '}}') + desc = task_text.replace('{', '{{').replace('}', '}}') task_obj = Task( - description=desc, - expected_output='A helpful, accurate response.', + description=desc or 'Complete the user request.', + expected_output=_DEFAULT_EXPECTED_OUTPUT, agent=agent_obj, markdown=False, ) diff --git a/nodes/src/nodes/agent_crewai/orchestrator_driver.py b/nodes/src/nodes/agent_crewai/orchestrator_driver.py new file mode 100644 index 000000000..11c53dc86 --- /dev/null +++ b/nodes/src/nodes/agent_crewai/orchestrator_driver.py @@ -0,0 +1,358 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ============================================================================= + +""" +CrewAI Orchestrator driver — builds a hierarchical Crew from connected sub-agent nodes. + +Fans out `crewai.describe` to all nodes on the 'crewai' invoke channel, assembles +each into a CrewAI Agent + Task, and kicks off a hierarchical Crew with this node +acting as the manager. + +Does NOT implement `describe()` — orchestrators cannot be used as sub-agents. +""" + +from __future__ import annotations + +import json +from typing import Any, Callable, Dict, List, Optional, Union + +from rocketlib import debug + +from ai.common.agent import AgentBase +from ai.common.agent.types import AgentHost, AgentInput, AgentRunResult +from ai.common.tools import ToolsBase + +from .crewai import _safe_str + +_MGR_ROLE = 'Orchestrator' +_MGR_GOAL = 'Coordinate specialized sub-agents to fully solve the user request.' +_MGR_BACKSTORY = ( + 'You are a senior orchestrator managing a team of specialized agents. ' + 'Delegate tasks to the right agent and synthesize their outputs into a final answer.' +) +_DEFAULT_GOAL = 'Complete the assigned task using available tools.' +_DEFAULT_BACKSTORY = ( + 'You are a specialized agent in a multi-agent pipeline. ' + 'Use the tools available to you and complete your assigned task thoroughly.' +) +_DEFAULT_EXPECTED_OUTPUT = 'A thorough, accurate result for the assigned task.' + + +class OrchestratorDriver(AgentBase): + FRAMEWORK = 'crewai_orchestrator' + + def __init__(self, iGlobal: Any): + """ + Initialize the OrchestratorDriver. + + iGlobal is passed to AgentBase so it can load config/instructions. + """ + super().__init__(iGlobal) + # Stash for pSelf — needed in _run() to call pSelf.instance.invoke('crewai', ...). + # Not thread-safe; safe because pipeline runs are sequential per node instance. + self._current_pSelf: Any = None + + def run_agent(self, pSelf: Any, question: Any, *, emit_answers_lane: bool = True) -> Any: + """ + Override to stash pSelf before delegating to AgentBase.run_agent(). + + pSelf is needed inside _run() to fan out crewai.describe. + """ + self._current_pSelf = pSelf + try: + return super().run_agent(pSelf, question, emit_answers_lane=emit_answers_lane) + finally: + self._current_pSelf = None + + def _bind_framework_llm( + self, + *, + host: AgentHost, + call_llm: Callable[..., str], + ctx: Dict[str, Any], + ) -> Any: + + from crewai import BaseLLM + + class HostInvokeLLM(BaseLLM): + def __init__(self): + super().__init__(model='RocketRide-host-llm', temperature=None) + + def call( + self, + messages: Union[str, List[Dict[str, str]]], + tools: Optional[List[dict]] = None, + callbacks: Optional[List[Any]] = None, + available_functions: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> Union[str, Any]: + stop_words = getattr(self, 'stop', None) + return call_llm(messages, stop_words=stop_words) + + return HostInvokeLLM() + + def _bind_framework_tools( + self, + *, + host: AgentHost, + tool_descriptors: List[ToolsBase.ToolDescriptor], + invoke_tool: Callable[..., Any], + log_tool_call: Callable[..., None], + ctx: Dict[str, Any], + ) -> List[Any]: + + from crewai.tools import BaseTool + from pydantic import BaseModel, ConfigDict, Field, create_model + + class _ToolInput(BaseModel): + input: Any = Field(default=None, description='Tool input payload') + model_config = ConfigDict(extra='allow') + + def _make_args_schema(input_schema: Optional[Dict[str, Any]]) -> type[BaseModel]: + if not isinstance(input_schema, dict): + return _ToolInput + props = input_schema.get('properties', {}) + if not props: + return _ToolInput + required_keys = set(input_schema.get('required', [])) + field_defs: Dict[str, Any] = {} + for key, prop in props.items(): + desc = prop.get('description', '') + if key in required_keys: + field_defs[key] = (Any, Field(..., description=desc)) + else: + default = prop.get('default', None) + field_defs[key] = (Any, Field(default=default, description=desc)) + try: + return create_model( + '_DynToolInput', + __config__=ConfigDict(extra='allow'), + **field_defs, + ) + except Exception: + return _ToolInput + + class HostTool(BaseTool): + name: str + description: str + args_schema: type[BaseModel] = _ToolInput + + def _run(self, input: Any = None, **kwargs: Any) -> str: + try: + out = invoke_tool(self.name, input=input, kwargs=kwargs) + except Exception as e: + out = {'error': str(e), 'type': type(e).__name__} + + try: + log_tool_call(tool_name=self.name, input={'input': input, **kwargs}, output=out) + except Exception: + pass + + try: + return json.dumps(out, default=str) if isinstance(out, (dict, list)) else _safe_str(out) + except Exception: + return _safe_str(out) + + tools = [] + for td in tool_descriptors: + name = td.get('name', '') if isinstance(td, dict) else getattr(td, 'name', '') + desc = td.get('description', '') if isinstance(td, dict) else getattr(td, 'description', '') + if not name: + continue + if not desc: + desc = f'Invoke host tool: {name}' + input_schema = td.get('input_schema') if isinstance(td, dict) else None + if isinstance(input_schema, dict): + try: + schema_text = json.dumps(input_schema, ensure_ascii=False) + except Exception: + schema_text = '' + if schema_text: + desc = f'{desc}\n\nTool input schema (JSON): {schema_text}' + + schema_cls = _make_args_schema(input_schema) + tools.append(HostTool(name=name, description=desc, args_schema=schema_cls)) + return tools + + def _run( + self, + *, + agent_input: AgentInput, + host: AgentHost, + ctx: Dict[str, Any], + ) -> AgentRunResult: + from crewai import Agent, Crew, Process, Task + from rocketlib.types import IInvokeCrew + from ai.common.agent._internal.host import AgentHostServices + from ai.common.agent._internal.utils import extract_prompt + + run_id = ctx.get('run_id', '') + debug('agent_crewai_orchestrator _run start run_id={}'.format(run_id)) + + pSelf = self._current_pSelf + + # 1. Fan-out crewai.describe to all connected sub-agents. + req = IInvokeCrew.Describe() + try: + result = pSelf.instance.invoke('crewai', req) + except Exception: + result = req + + # Normalize: engine may return the mutated Describe object or a plain list. + if hasattr(result, 'agents'): + descriptors = result.agents or [] + elif isinstance(result, list): + descriptors = result + else: + descriptors = [] + descriptors = [d for d in descriptors if d is not None] + + if not descriptors: + raise RuntimeError('CrewAI Orchestrator: no sub-agents connected on the crewai channel') + + # 2. Build the manager's LLM (uses this orchestrator's own llm channel). + def _mgr_call_llm(messages: Any, stop_words: Any = None, _h: AgentHost = host) -> str: + return self._call_host_llm( + host=_h, + messages=messages, + question_role=_MGR_ROLE, + stop_words=stop_words, + ) + + manager_llm = self._bind_framework_llm(host=host, call_llm=_mgr_call_llm, ctx=ctx) + + # 3. Build per-sub-agent Agent + Task. + # d.invoke is the sub-agent's full pSelf IInstance — AgentHostServices requires invoker.instance.* + # Default-arg capture (_h, _d) prevents closure-in-loop bugs. + prompt = extract_prompt(agent_input.question) if hasattr(agent_input, 'question') else '' + + sub_agents: List[Any] = [] + sub_tasks: List[Any] = [] + + for d in descriptors: + sub_host = AgentHostServices(d.invoke) + + def _sub_call_llm( + messages: Any, + stop_words: Any = None, + _h: Any = sub_host, + _role: str = d.role, + ) -> str: + return self._call_host_llm( + host=_h, + messages=messages, + question_role=_role, + stop_words=stop_words, + ) + + def _sub_invoke_tool( + tool_name: str, + input: Any = None, # noqa: A002 + kwargs: Optional[Dict[str, Any]] = None, + _h: Any = sub_host, + ) -> Any: + return self._invoke_host_tool(host=_h, tool_name=tool_name, input=input, kwargs=kwargs) + + sub_tool_descs = self._discover_tools(host=sub_host) + sub_llm = self._bind_framework_llm(host=sub_host, call_llm=_sub_call_llm, ctx=ctx) + sub_tools = self._bind_framework_tools( + host=sub_host, + tool_descriptors=sub_tool_descs, + invoke_tool=_sub_invoke_tool, + log_tool_call=lambda **_: None, + ctx=ctx, + ) + + agent_obj = Agent( + role=d.role, + goal=_DEFAULT_GOAL, + backstory=_DEFAULT_BACKSTORY, + tools=sub_tools, + llm=sub_llm, + verbose=False, + ) + + task_text = d.task_description or prompt or '' + task_desc = task_text.replace('{', '{{').replace('}', '}}') + + task_obj = Task( + description=task_desc or 'Complete the user request.', + expected_output=_DEFAULT_EXPECTED_OUTPUT, + agent=agent_obj, + async_execution=True, + ) + + sub_agents.append(agent_obj) + sub_tasks.append(task_obj) + + # 4. Build manager agent tools — exclude any tool whose name starts with a sub-agent's + # node_id prefix (defensive filter: prevents sub-agents accidentally wired to both + # 'crewai' and 'tool' channels from appearing as callable tools for the manager). + sub_agent_prefixes = {f'{d.node_id}.' for d in descriptors if d.node_id} + all_mgr_tool_descs = self._discover_tools(host=host) + + def _is_sub_agent_tool(td: Any) -> bool: + name = td.get('name', '') if isinstance(td, dict) else getattr(td, 'name', '') + return any(name.startswith(p) for p in sub_agent_prefixes) + + mgr_tool_descs = [t for t in all_mgr_tool_descs if not _is_sub_agent_tool(t)] + + def _mgr_invoke_tool( + tool_name: str, + input: Any = None, # noqa: A002 + kwargs: Optional[Dict[str, Any]] = None, + ) -> Any: + return self._invoke_host_tool(host=host, tool_name=tool_name, input=input, kwargs=kwargs) + + mgr_tools = self._bind_framework_tools( + host=host, + tool_descriptors=mgr_tool_descs, + invoke_tool=_mgr_invoke_tool, + log_tool_call=lambda **_: None, + ctx=ctx, + ) + + manager_agent = Agent( + role=_MGR_ROLE, + goal=_MGR_GOAL, + backstory=_MGR_BACKSTORY, + tools=mgr_tools, + llm=manager_llm, + verbose=False, + allow_delegation=True, + ) + + # 5. Assemble and kick off the hierarchical Crew. + crew = Crew( + agents=[manager_agent] + sub_agents, + tasks=sub_tasks, + process=Process.hierarchical, + manager_agent=manager_agent, + verbose=False, + ) + + debug('agent_crewai_orchestrator kicking off crew with {} sub-agents run_id={}'.format(len(sub_agents), run_id)) + result = crew.kickoff() + + final_text = _safe_str(getattr(result, 'raw', None)) or _safe_str(result) + return final_text, result diff --git a/nodes/src/nodes/agent_crewai/services.json b/nodes/src/nodes/agent_crewai/services.json index d7d977e8d..7d9ed0f8f 100644 --- a/nodes/src/nodes/agent_crewai/services.json +++ b/nodes/src/nodes/agent_crewai/services.json @@ -34,7 +34,9 @@ "profiles": { "default": { "agent_description": "", - "instructions": [] + "instructions": [], + "role": "Assistant", + "task_description": "" } } }, @@ -58,7 +60,19 @@ "format": "textarea" } }, + "role": { + "type": "string", + "title": "Role", + "description": "Short role name for this agent when used in a CrewAI Orchestrator pipeline (e.g. 'Financial Analyst')." + }, + "task_description": { + "type": "string", + "format": "textarea", + "title": "Task", + "description": "What this agent should do when used in a CrewAI Orchestrator pipeline. If blank, the incoming question is used." + }, "agent_crewai.default": { "object": "default", "properties": ["agent_description", "instructions"] }, + "agent_crewai.subagent": { "object": "default", "properties": ["role", "task_description"] }, "agent_crewai.profile": { "title": "Profile", "type": "string", @@ -72,6 +86,11 @@ "section": "Pipe", "title": "CrewAI", "properties": ["agent_description", "instructions"] + }, + { + "section": "Sub-Agent Settings", + "title": "Settings when used in a CrewAI Orchestrator pipeline", + "properties": ["agent_crewai.subagent"] } ] } diff --git a/nodes/src/nodes/agent_crewai/services.orchestrator.json b/nodes/src/nodes/agent_crewai/services.orchestrator.json new file mode 100644 index 000000000..123d1e3ba --- /dev/null +++ b/nodes/src/nodes/agent_crewai/services.orchestrator.json @@ -0,0 +1,73 @@ +{ + "title": "CrewAI Orchestrator", + "protocol": "agent_crewai_orchestrator://", + "classType": ["agent", "tool"], + "capabilities": ["invoke"], + "register": "filter", + "node": "python", + "path": "nodes.agent_crewai", + "prefix": "agent", + "description": [ + "Multi-agent orchestrator using CrewAI hierarchical process.", + "Fans out to connected CrewAI sub-agent nodes, assembles a Crew, and synthesizes their outputs.", + "Can be invoked as a tool (`.run_agent`) for nested orchestration." + ], + "icon": "crewai.svg", + "invoke": { + "llm": { + "description": "LLM used by the orchestrator manager agent", + "min": 1 + }, + "crewai": { + "description": "Connected CrewAI sub-agent nodes", + "min": 1 + }, + "tool": { + "description": "Optional direct tools available to the orchestrator manager", + "min": 0 + } + }, + "lanes": { + "questions": ["answers"] + }, + "input": [ + { + "lane": "questions", + "output": [{ "lane": "answers" }] + } + ], + "preconfig": { + "default": "default", + "profiles": { + "default": { + "instructions": [] + } + } + }, + "fields": { + "instructions": { + "type": "array", + "title": "Instructions", + "description": "Additional instructions to guide the orchestrator's delegation strategy.", + "items": { + "type": "string", + "format": "textarea" + } + }, + "agent_crewai_orchestrator.default": { "object": "default", "properties": ["instructions"] }, + "agent_crewai_orchestrator.profile": { + "title": "Profile", + "type": "string", + "default": "default", + "enum": [["default", "Default"]], + "conditional": [{ "value": "default", "properties": ["agent_crewai_orchestrator.default"] }] + } + }, + "shape": [ + { + "section": "Pipe", + "title": "CrewAI Orchestrator", + "properties": ["instructions"] + } + ] +} diff --git a/packages/server/engine-lib/rocketlib-python/lib/rocketlib/__init__.py b/packages/server/engine-lib/rocketlib-python/lib/rocketlib/__init__.py index fd657261f..a7bfc213a 100644 --- a/packages/server/engine-lib/rocketlib-python/lib/rocketlib/__init__.py +++ b/packages/server/engine-lib/rocketlib-python/lib/rocketlib/__init__.py @@ -72,6 +72,7 @@ from .types import IInvoke from .types import IInvokeLLM from .types import IInvokeTool +from .types import IInvokeCrew from .types import IJson from .types import OPEN_MODE from .types import PROTOCOL_CAPS @@ -110,6 +111,7 @@ 'IInvoke', 'IInvokeLLM', 'IInvokeTool', + 'IInvokeCrew', 'IJson', 'ILoader', 'isAppMonitor', diff --git a/packages/server/engine-lib/rocketlib-python/lib/rocketlib/types.py b/packages/server/engine-lib/rocketlib-python/lib/rocketlib/types.py index 5b7a3782b..5416249b2 100644 --- a/packages/server/engine-lib/rocketlib-python/lib/rocketlib/types.py +++ b/packages/server/engine-lib/rocketlib-python/lib/rocketlib/types.py @@ -261,6 +261,35 @@ class Validate(BaseModel): model_config = ConfigDict(extra='allow') +class IInvokeCrew(IInvoke): + """ + Control-plane type for CrewAI sub-agent discovery (crewai.describe fan-out). + + Each sub-agent connected on the 'crewai' channel appends a DescribeResponse + to Describe.agents when the orchestrator fans out a Describe request. + """ + + op: str = 'crewai.describe' + model_config = ConfigDict(extra='allow') + + class Describe(BaseModel): + """Fan-out request: each connected sub-agent appends its descriptor.""" + + op: str = Field(default='crewai.describe', frozen=True) + agents: List[Any] = Field(default_factory=list) + model_config = ConfigDict(extra='allow') + + class DescribeResponse(BaseModel): + """Sub-agent descriptor returned in response to crewai.describe.""" + + op: str = Field(default='crewai.describe', frozen=True) + role: str + task_description: str + node_id: str = '' # pSelf.instance.pipeType['id'] — used to filter sub-agents from tool list + invoke: Any = Field(default=None) # full pSelf IInstance — passed to AgentHostServices(d.invoke) + model_config = ConfigDict(extra='allow') + + class IJson(Impl_IJson): """ A wrapper class for IJson that provides utility methods for handling JSON-like structures. From 87fb8cc2dc5aeb9ba916640983b59bdcfd4a6b76 Mon Sep 17 00:00:00 2001 From: Dylan Savage Date: Thu, 19 Mar 2026 16:15:35 -0700 Subject: [PATCH 2/9] orchestrator wip --- nodes/src/nodes/agent_crewai/IGlobal.py | 2 +- nodes/src/nodes/agent_crewai/crewai.py | 305 ++++++++++++--- .../nodes/agent_crewai/orchestrator_driver.py | 358 ------------------ 3 files changed, 254 insertions(+), 411 deletions(-) delete mode 100644 nodes/src/nodes/agent_crewai/orchestrator_driver.py diff --git a/nodes/src/nodes/agent_crewai/IGlobal.py b/nodes/src/nodes/agent_crewai/IGlobal.py index bd1e3bf8a..567d222a3 100644 --- a/nodes/src/nodes/agent_crewai/IGlobal.py +++ b/nodes/src/nodes/agent_crewai/IGlobal.py @@ -52,7 +52,7 @@ def beginGlobal(self) -> None: conn_config = conn_config if isinstance(conn_config, dict) else {} if self.glb.logicalType == 'agent_crewai_orchestrator': - from .orchestrator_driver import OrchestratorDriver + from .crewai import OrchestratorDriver self.agent = OrchestratorDriver(self) else: self.role = str(conn_config.get('role') or 'Assistant').strip() or 'Assistant' diff --git a/nodes/src/nodes/agent_crewai/crewai.py b/nodes/src/nodes/agent_crewai/crewai.py index e2ccb40c0..f20e6f8c7 100644 --- a/nodes/src/nodes/agent_crewai/crewai.py +++ b/nodes/src/nodes/agent_crewai/crewai.py @@ -22,7 +22,12 @@ # ============================================================================= """ -CrewAI driver implementing the shared `ai.common.agent.AGENT` interface. +CrewAI drivers implementing the shared `ai.common.agent.AGENT` interface. + +Contains: + - CrewAgentBase: shared LLM/tool-binding logic + - CrewDriver: sub-agent mode / standalone single-agent Crew + - OrchestratorDriver: hierarchical multi-agent Crew """ from __future__ import annotations @@ -36,52 +41,38 @@ from ai.common.agent.types import AgentHost, AgentInput, AgentRunResult from rocketlib import ToolDescriptor -_DEFAULT_GOAL = 'Complete the assigned task using available tools.' -_DEFAULT_BACKSTORY = ( - 'You are a specialized agent in a multi-agent pipeline. ' - 'Use the tools available to you and complete your assigned task thoroughly.' -) -_DEFAULT_EXPECTED_OUTPUT = 'A thorough, accurate result for the assigned task.' +# ── Shared utilities ────────────────────────────────────────────────────────── -class CrewDriver(AgentBase): - FRAMEWORK = 'crewai' +def _safe_str(v: Any) -> str: + try: + return '' if v is None else str(v) + except Exception: + return '' - def __init__(self, iGlobal: Any, *, process: Any = None, role: str = 'Assistant', task_description: str = ''): - """ - Initialize the CrewDriver. - iGlobal is passed to AgentBase so it can load config/instructions. - """ - super().__init__(iGlobal) - self._process = process - self._role = role - self._task_description = task_description +_DEFAULT_GOAL = ( + 'Complete the assigned task using the minimum tool calls needed. ' + 'Once you have sufficient information to answer, stop immediately and return your Final Answer.' +) +_DEFAULT_BACKSTORY = ( + 'You are a specialized agent in a multi-agent pipeline. ' + 'Use tools when necessary. As soon as you have enough information to answer, stop and provide your Final Answer. ' + 'Do not make additional tool calls beyond what is needed.' +) +_DEFAULT_EXPECTED_OUTPUT = 'A clear, direct answer to the assigned task.' - def describe(self, pSelf: Any) -> Any: - """ - Return a DescribeResponse for crewai.describe fan-out. - Called by IInstance.invoke() when the orchestrator fans out crewai.describe. - Stores the full pSelf IInstance in `invoke` so AgentHostServices(d.invoke) - can call d.invoke.instance.* correctly. - """ - from rocketlib.types import IInvokeCrew +# ── CrewAgentBase ───────────────────────────────────────────────────────────── - pipe_type = pSelf.instance.pipeType - node_id = str(pipe_type.get('id') if isinstance(pipe_type, dict) else getattr(pipe_type, 'id', '')) or '' - return IInvokeCrew.DescribeResponse( - role=self._role, - task_description=self._task_description, - node_id=node_id, - invoke=pSelf, - ) +class CrewAgentBase(AgentBase): + """Shared base for CrewDriver and OrchestratorDriver.""" def _bind_framework_llm( self, *, host: AgentHost, - call_llm: Callable[..., str], + call_llm_text: Callable[..., str], ctx: Dict[str, Any], ) -> Any: @@ -100,7 +91,7 @@ def call( **kwargs: Any, ) -> Union[str, Any]: stop_words = getattr(self, 'stop', None) - return call_llm(messages, stop_words=stop_words) + return call_llm_text(messages, stop_words=stop_words) return HostInvokeLLM() @@ -110,12 +101,11 @@ def _bind_framework_tools( host: AgentHost, tool_descriptors: List[ToolDescriptor], invoke_tool: Callable[..., Any], - log_tool_call: Callable[..., None], ctx: Dict[str, Any], ) -> List[Any]: from crewai.tools import BaseTool - from pydantic import BaseModel, ConfigDict, Field, create_model # noqa: E501 + from pydantic import BaseModel, ConfigDict, Field, create_model class _ToolInput(BaseModel): input: Any = Field(default=None, description='Tool input payload') @@ -160,11 +150,6 @@ def _run(self, input: Any = None, **kwargs: Any) -> str: except Exception as e: out = {'error': str(e), 'type': type(e).__name__} - try: - log_tool_call(tool_name=self.name, input={'input': input, **kwargs}, output=out) - except Exception: - pass - try: return json.dumps(out, default=str) if isinstance(out, (dict, list)) else _safe_str(out) except Exception: @@ -191,6 +176,38 @@ def _run(self, input: Any = None, **kwargs: Any) -> str: tools.append(HostTool(name=name, description=desc, args_schema=schema_cls)) return tools + +# ── CrewDriver ──────────────────────────────────────────────────────────────── + +class CrewDriver(CrewAgentBase): + """Sub-agent mode / standalone single-agent Crew.""" + + FRAMEWORK = 'crewai' + + def __init__(self, iGlobal: Any, *, process: Any = None, role: str = 'Assistant', task_description: str = ''): + super().__init__(iGlobal) + self._process = process + self._role = role + self._task_description = task_description + + def describe(self, pSelf: Any) -> Any: + """Return a DescribeResponse for crewai.describe fan-out. + + Called by IInstance.invoke() when the orchestrator fans out crewai.describe. + Stores the full pSelf IInstance in `invoke` so AgentHostServices(d.invoke) + can call d.invoke.instance.* correctly. + """ + from rocketlib.types import IInvokeCrew + + pipe_type = pSelf.instance.pipeType + node_id = str(pipe_type.get('id') if isinstance(pipe_type, dict) else getattr(pipe_type, 'id', '')) or '' + return IInvokeCrew.DescribeResponse( + role=self._role, + task_description=self._task_description, + node_id=node_id, + invoke=pSelf, + ) + def _run( self, *, @@ -201,7 +218,7 @@ def _run( run_id = ctx.get('run_id', '') debug('agent_crewai driver _run start run_id={}'.format(run_id)) - from crewai import Agent, Crew, Task # type: ignore + from crewai import Agent, Crew, Task tool_descriptors = self.discover_tools(host=host) @@ -216,12 +233,11 @@ def _call_llm_text(messages: Any, stop_words: Any = None) -> str: def _invoke_tool(tool_name: str, input: Any = None, kwargs: Optional[Dict[str, Any]] = None) -> Any: # noqa: A002 return self.invoke_host_tool(host=host, tool_name=tool_name, input=input, kwargs=kwargs) - llm = self._bind_framework_llm(host=host, call_llm=_call_llm_text, ctx=ctx) + llm = self._bind_framework_llm(host=host, call_llm_text=_call_llm_text, ctx=ctx) tools_for_agent = self._bind_framework_tools( host=host, tool_descriptors=tool_descriptors, invoke_tool=_invoke_tool, - log_tool_call=lambda **_: None, ctx=ctx, ) @@ -234,7 +250,6 @@ def _invoke_tool(tool_name: str, input: Any = None, kwargs: Optional[Dict[str, A verbose=False, ) - # Use configured task_description if set, otherwise fall back to the incoming prompt from ai.common.agent._internal.utils import extract_prompt prompt = extract_prompt(agent_input.question) if hasattr(agent_input, 'question') else '' task_text = self._task_description or prompt or '' @@ -318,8 +333,194 @@ def _on_any_event(source, event): return final_text, result -def _safe_str(v: Any) -> str: - try: - return '' if v is None else str(v) - except Exception: - return '' +# ── OrchestratorDriver ──────────────────────────────────────────────────────── + +_MGR_ROLE = 'Orchestrator' +_MGR_GOAL = ( + 'Coordinate the team to complete the user request. ' + 'Delegate to each available agent exactly once.' + 'Accept their first response without asking for revisions or improvements. ' + 'When all agents have responded, synthesize and return the final answer.' +) +_MGR_BACKSTORY = ( + 'You are a senior orchestrator managing a team of specialized agents. ' + 'Delegate tasks to the right agent and synthesize their outputs into a final answer.' +) + + +class OrchestratorDriver(CrewAgentBase): + """Hierarchical multi-agent Crew. + + Fans out `crewai.describe` to all nodes on the 'crewai' invoke channel, + assembles each into a CrewAI Agent + Task, and kicks off a hierarchical + Crew with this node acting as the manager. + + Does NOT implement `describe()` — orchestrators cannot be used as sub-agents. + """ + + FRAMEWORK = 'crewai_orchestrator' + + def __init__(self, iGlobal: Any): + super().__init__(iGlobal) + # Stash for pSelf — needed in _run() to call pSelf.instance.invoke('crewai', ...). + # Not thread-safe; safe because pipeline runs are sequential per node instance. + self._current_pSelf: Any = None + + def run_agent(self, pSelf: Any, question: Any, *, emit_answers_lane: bool = True) -> Any: + """Override to stash pSelf before delegating to AgentBase.run_agent().""" + self._current_pSelf = pSelf + try: + return super().run_agent(pSelf, question, emit_answers_lane=emit_answers_lane) + finally: + self._current_pSelf = None + + def _run( + self, + *, + agent_input: AgentInput, + host: AgentHost, + ctx: Dict[str, Any], + ) -> AgentRunResult: + from crewai import Agent, Crew, Process, Task + from rocketlib.types import IInvokeCrew + from ai.common.agent._internal.host import AgentHostServices + + run_id = ctx.get('run_id', '') + prompt = _safe_str(agent_input.question.getPrompt() if hasattr(agent_input, 'question') else '') + debug('agent_crewai_orchestrator _run start run_id={} prompt_len={}'.format(run_id, len(prompt))) + + pSelf = self._current_pSelf + + # 1. Discover all connected sub-agents via per-node invoke (mirrors the tool + # discovery pattern in AgentHostServices.Tools.__init__). + # A no-nodeId invoke stops at the first successful handler, so we iterate + # each crewai node individually with nodeId= to reach all of them. + crewai_node_ids = pSelf.instance.getControllerNodeIds('crewai') + if not crewai_node_ids: + raise RuntimeError('CrewAI Orchestrator: no sub-agents connected on the crewai channel') + + descriptors = [] + for node_id in crewai_node_ids: + req = IInvokeCrew.Describe() + try: + pSelf.instance.invoke('crewai', req, nodeId=node_id) + except Exception: + pass + for agent_desc in req.agents: + if agent_desc is not None: + descriptors.append(agent_desc) + + if not descriptors: + raise RuntimeError('CrewAI Orchestrator: no sub-agents responded to crewai.describe') + + # 2. Build the manager's LLM (uses this orchestrator's own llm channel). + def _mgr_call_llm_text(messages: Any, stop_words: Any = None, _h: AgentHost = host) -> str: + return self.call_host_llm( + host=_h, + messages=messages, + question_role=_MGR_ROLE, + stop_words=stop_words, + ) + + manager_llm = self._bind_framework_llm(host=host, call_llm_text=_mgr_call_llm_text, ctx=ctx) + + # 3. Build per-sub-agent Agent + Task. + # d.invoke is the sub-agent's full pSelf IInstance. + # Default-arg capture (_h, _role) prevents closure-in-loop bugs. + sub_agents: List[Any] = [] + sub_tasks: List[Any] = [] + + for d in descriptors: + sub_host = AgentHostServices(d.invoke) + + def _sub_call_llm_text( + messages: Any, + stop_words: Any = None, + _h: Any = sub_host, + _role: str = d.role, + ) -> str: + return self.call_host_llm( + host=_h, + messages=messages, + question_role=_role, + stop_words=stop_words, + ) + + def _sub_invoke_tool( + tool_name: str, + input: Any = None, # noqa: A002 + kwargs: Optional[Dict[str, Any]] = None, + _h: Any = sub_host, + ) -> Any: + return self.invoke_host_tool(host=_h, tool_name=tool_name, input=input, kwargs=kwargs) + + sub_tool_descs = self.discover_tools(host=sub_host) + sub_llm = self._bind_framework_llm(host=sub_host, call_llm_text=_sub_call_llm_text, ctx=ctx) + sub_tools = self._bind_framework_tools( + host=sub_host, + tool_descriptors=sub_tool_descs, + invoke_tool=_sub_invoke_tool, + ctx=ctx, + ) + + agent_obj = Agent( + role=d.role, + goal=_DEFAULT_GOAL, + backstory=_DEFAULT_BACKSTORY, + tools=sub_tools, + llm=sub_llm, + verbose=False, + max_iter=3, + allow_delegation=False, + ) + + task_text = d.task_description or '' + if not task_text: + task_text = prompt or 'Complete the user request.' + elif prompt: + task_text = f'{task_text}\n\nUser request: {prompt}' + task_desc = task_text.replace('{', '{{').replace('}', '}}') + + task_obj = Task( + description=task_desc, + expected_output=_DEFAULT_EXPECTED_OUTPUT, + agent=agent_obj, + ) + + sub_agents.append(agent_obj) + sub_tasks.append(task_obj) + + # 4. Build manager agent. The user's prompt goes into backstory (background context) + # rather than the goal so it doesn't drive active reasoning on every LLM call. + # The goal stays generic: delegate once, return the result. + manager_backstory = _MGR_BACKSTORY + if prompt: + escaped_prompt = prompt.replace('{', '{{').replace('}', '}}') + manager_backstory = f'{_MGR_BACKSTORY}\n\nBackground context — user request: {escaped_prompt}' + + manager_agent = Agent( + role=_MGR_ROLE, + goal=_MGR_GOAL, + backstory=manager_backstory, + llm=manager_llm, + verbose=False, + allow_delegation=True, + max_iter=len(descriptors) + 2, + ) + + # 5. Assemble and kick off the hierarchical Crew. + crew = Crew( + agents=sub_agents, + tasks=sub_tasks, + process=Process.hierarchical, + manager_agent=manager_agent, + planning=True, + planning_llm=manager_llm, + verbose=False, + ) + + debug('agent_crewai_orchestrator kicking off crew with {} sub-agents run_id={}'.format(len(sub_agents), run_id)) + result = crew.kickoff(inputs={'user_request': prompt} if prompt else {}) + + final_text = _safe_str(getattr(result, 'raw', None)) or _safe_str(result) + return final_text, result diff --git a/nodes/src/nodes/agent_crewai/orchestrator_driver.py b/nodes/src/nodes/agent_crewai/orchestrator_driver.py deleted file mode 100644 index 11c53dc86..000000000 --- a/nodes/src/nodes/agent_crewai/orchestrator_driver.py +++ /dev/null @@ -1,358 +0,0 @@ -# ============================================================================= -# MIT License -# Copyright (c) 2026 Aparavi Software AG -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -# ============================================================================= - -""" -CrewAI Orchestrator driver — builds a hierarchical Crew from connected sub-agent nodes. - -Fans out `crewai.describe` to all nodes on the 'crewai' invoke channel, assembles -each into a CrewAI Agent + Task, and kicks off a hierarchical Crew with this node -acting as the manager. - -Does NOT implement `describe()` — orchestrators cannot be used as sub-agents. -""" - -from __future__ import annotations - -import json -from typing import Any, Callable, Dict, List, Optional, Union - -from rocketlib import debug - -from ai.common.agent import AgentBase -from ai.common.agent.types import AgentHost, AgentInput, AgentRunResult -from ai.common.tools import ToolsBase - -from .crewai import _safe_str - -_MGR_ROLE = 'Orchestrator' -_MGR_GOAL = 'Coordinate specialized sub-agents to fully solve the user request.' -_MGR_BACKSTORY = ( - 'You are a senior orchestrator managing a team of specialized agents. ' - 'Delegate tasks to the right agent and synthesize their outputs into a final answer.' -) -_DEFAULT_GOAL = 'Complete the assigned task using available tools.' -_DEFAULT_BACKSTORY = ( - 'You are a specialized agent in a multi-agent pipeline. ' - 'Use the tools available to you and complete your assigned task thoroughly.' -) -_DEFAULT_EXPECTED_OUTPUT = 'A thorough, accurate result for the assigned task.' - - -class OrchestratorDriver(AgentBase): - FRAMEWORK = 'crewai_orchestrator' - - def __init__(self, iGlobal: Any): - """ - Initialize the OrchestratorDriver. - - iGlobal is passed to AgentBase so it can load config/instructions. - """ - super().__init__(iGlobal) - # Stash for pSelf — needed in _run() to call pSelf.instance.invoke('crewai', ...). - # Not thread-safe; safe because pipeline runs are sequential per node instance. - self._current_pSelf: Any = None - - def run_agent(self, pSelf: Any, question: Any, *, emit_answers_lane: bool = True) -> Any: - """ - Override to stash pSelf before delegating to AgentBase.run_agent(). - - pSelf is needed inside _run() to fan out crewai.describe. - """ - self._current_pSelf = pSelf - try: - return super().run_agent(pSelf, question, emit_answers_lane=emit_answers_lane) - finally: - self._current_pSelf = None - - def _bind_framework_llm( - self, - *, - host: AgentHost, - call_llm: Callable[..., str], - ctx: Dict[str, Any], - ) -> Any: - - from crewai import BaseLLM - - class HostInvokeLLM(BaseLLM): - def __init__(self): - super().__init__(model='RocketRide-host-llm', temperature=None) - - def call( - self, - messages: Union[str, List[Dict[str, str]]], - tools: Optional[List[dict]] = None, - callbacks: Optional[List[Any]] = None, - available_functions: Optional[Dict[str, Any]] = None, - **kwargs: Any, - ) -> Union[str, Any]: - stop_words = getattr(self, 'stop', None) - return call_llm(messages, stop_words=stop_words) - - return HostInvokeLLM() - - def _bind_framework_tools( - self, - *, - host: AgentHost, - tool_descriptors: List[ToolsBase.ToolDescriptor], - invoke_tool: Callable[..., Any], - log_tool_call: Callable[..., None], - ctx: Dict[str, Any], - ) -> List[Any]: - - from crewai.tools import BaseTool - from pydantic import BaseModel, ConfigDict, Field, create_model - - class _ToolInput(BaseModel): - input: Any = Field(default=None, description='Tool input payload') - model_config = ConfigDict(extra='allow') - - def _make_args_schema(input_schema: Optional[Dict[str, Any]]) -> type[BaseModel]: - if not isinstance(input_schema, dict): - return _ToolInput - props = input_schema.get('properties', {}) - if not props: - return _ToolInput - required_keys = set(input_schema.get('required', [])) - field_defs: Dict[str, Any] = {} - for key, prop in props.items(): - desc = prop.get('description', '') - if key in required_keys: - field_defs[key] = (Any, Field(..., description=desc)) - else: - default = prop.get('default', None) - field_defs[key] = (Any, Field(default=default, description=desc)) - try: - return create_model( - '_DynToolInput', - __config__=ConfigDict(extra='allow'), - **field_defs, - ) - except Exception: - return _ToolInput - - class HostTool(BaseTool): - name: str - description: str - args_schema: type[BaseModel] = _ToolInput - - def _run(self, input: Any = None, **kwargs: Any) -> str: - try: - out = invoke_tool(self.name, input=input, kwargs=kwargs) - except Exception as e: - out = {'error': str(e), 'type': type(e).__name__} - - try: - log_tool_call(tool_name=self.name, input={'input': input, **kwargs}, output=out) - except Exception: - pass - - try: - return json.dumps(out, default=str) if isinstance(out, (dict, list)) else _safe_str(out) - except Exception: - return _safe_str(out) - - tools = [] - for td in tool_descriptors: - name = td.get('name', '') if isinstance(td, dict) else getattr(td, 'name', '') - desc = td.get('description', '') if isinstance(td, dict) else getattr(td, 'description', '') - if not name: - continue - if not desc: - desc = f'Invoke host tool: {name}' - input_schema = td.get('input_schema') if isinstance(td, dict) else None - if isinstance(input_schema, dict): - try: - schema_text = json.dumps(input_schema, ensure_ascii=False) - except Exception: - schema_text = '' - if schema_text: - desc = f'{desc}\n\nTool input schema (JSON): {schema_text}' - - schema_cls = _make_args_schema(input_schema) - tools.append(HostTool(name=name, description=desc, args_schema=schema_cls)) - return tools - - def _run( - self, - *, - agent_input: AgentInput, - host: AgentHost, - ctx: Dict[str, Any], - ) -> AgentRunResult: - from crewai import Agent, Crew, Process, Task - from rocketlib.types import IInvokeCrew - from ai.common.agent._internal.host import AgentHostServices - from ai.common.agent._internal.utils import extract_prompt - - run_id = ctx.get('run_id', '') - debug('agent_crewai_orchestrator _run start run_id={}'.format(run_id)) - - pSelf = self._current_pSelf - - # 1. Fan-out crewai.describe to all connected sub-agents. - req = IInvokeCrew.Describe() - try: - result = pSelf.instance.invoke('crewai', req) - except Exception: - result = req - - # Normalize: engine may return the mutated Describe object or a plain list. - if hasattr(result, 'agents'): - descriptors = result.agents or [] - elif isinstance(result, list): - descriptors = result - else: - descriptors = [] - descriptors = [d for d in descriptors if d is not None] - - if not descriptors: - raise RuntimeError('CrewAI Orchestrator: no sub-agents connected on the crewai channel') - - # 2. Build the manager's LLM (uses this orchestrator's own llm channel). - def _mgr_call_llm(messages: Any, stop_words: Any = None, _h: AgentHost = host) -> str: - return self._call_host_llm( - host=_h, - messages=messages, - question_role=_MGR_ROLE, - stop_words=stop_words, - ) - - manager_llm = self._bind_framework_llm(host=host, call_llm=_mgr_call_llm, ctx=ctx) - - # 3. Build per-sub-agent Agent + Task. - # d.invoke is the sub-agent's full pSelf IInstance — AgentHostServices requires invoker.instance.* - # Default-arg capture (_h, _d) prevents closure-in-loop bugs. - prompt = extract_prompt(agent_input.question) if hasattr(agent_input, 'question') else '' - - sub_agents: List[Any] = [] - sub_tasks: List[Any] = [] - - for d in descriptors: - sub_host = AgentHostServices(d.invoke) - - def _sub_call_llm( - messages: Any, - stop_words: Any = None, - _h: Any = sub_host, - _role: str = d.role, - ) -> str: - return self._call_host_llm( - host=_h, - messages=messages, - question_role=_role, - stop_words=stop_words, - ) - - def _sub_invoke_tool( - tool_name: str, - input: Any = None, # noqa: A002 - kwargs: Optional[Dict[str, Any]] = None, - _h: Any = sub_host, - ) -> Any: - return self._invoke_host_tool(host=_h, tool_name=tool_name, input=input, kwargs=kwargs) - - sub_tool_descs = self._discover_tools(host=sub_host) - sub_llm = self._bind_framework_llm(host=sub_host, call_llm=_sub_call_llm, ctx=ctx) - sub_tools = self._bind_framework_tools( - host=sub_host, - tool_descriptors=sub_tool_descs, - invoke_tool=_sub_invoke_tool, - log_tool_call=lambda **_: None, - ctx=ctx, - ) - - agent_obj = Agent( - role=d.role, - goal=_DEFAULT_GOAL, - backstory=_DEFAULT_BACKSTORY, - tools=sub_tools, - llm=sub_llm, - verbose=False, - ) - - task_text = d.task_description or prompt or '' - task_desc = task_text.replace('{', '{{').replace('}', '}}') - - task_obj = Task( - description=task_desc or 'Complete the user request.', - expected_output=_DEFAULT_EXPECTED_OUTPUT, - agent=agent_obj, - async_execution=True, - ) - - sub_agents.append(agent_obj) - sub_tasks.append(task_obj) - - # 4. Build manager agent tools — exclude any tool whose name starts with a sub-agent's - # node_id prefix (defensive filter: prevents sub-agents accidentally wired to both - # 'crewai' and 'tool' channels from appearing as callable tools for the manager). - sub_agent_prefixes = {f'{d.node_id}.' for d in descriptors if d.node_id} - all_mgr_tool_descs = self._discover_tools(host=host) - - def _is_sub_agent_tool(td: Any) -> bool: - name = td.get('name', '') if isinstance(td, dict) else getattr(td, 'name', '') - return any(name.startswith(p) for p in sub_agent_prefixes) - - mgr_tool_descs = [t for t in all_mgr_tool_descs if not _is_sub_agent_tool(t)] - - def _mgr_invoke_tool( - tool_name: str, - input: Any = None, # noqa: A002 - kwargs: Optional[Dict[str, Any]] = None, - ) -> Any: - return self._invoke_host_tool(host=host, tool_name=tool_name, input=input, kwargs=kwargs) - - mgr_tools = self._bind_framework_tools( - host=host, - tool_descriptors=mgr_tool_descs, - invoke_tool=_mgr_invoke_tool, - log_tool_call=lambda **_: None, - ctx=ctx, - ) - - manager_agent = Agent( - role=_MGR_ROLE, - goal=_MGR_GOAL, - backstory=_MGR_BACKSTORY, - tools=mgr_tools, - llm=manager_llm, - verbose=False, - allow_delegation=True, - ) - - # 5. Assemble and kick off the hierarchical Crew. - crew = Crew( - agents=[manager_agent] + sub_agents, - tasks=sub_tasks, - process=Process.hierarchical, - manager_agent=manager_agent, - verbose=False, - ) - - debug('agent_crewai_orchestrator kicking off crew with {} sub-agents run_id={}'.format(len(sub_agents), run_id)) - result = crew.kickoff() - - final_text = _safe_str(getattr(result, 'raw', None)) or _safe_str(result) - return final_text, result From 224dc8411b151cfc50cb9fdaa4bc9c4a8acc380d Mon Sep 17 00:00:00 2001 From: Dylan Savage Date: Thu, 2 Apr 2026 09:50:09 -0700 Subject: [PATCH 3/9] feat(agent_crewai): expose expert config fields for both node types Add goal, backstory, expected_output, max_iter to agent_crewai (CrewDriver) and goal, backstory, max_iter to agent_crewai_orchestrator (OrchestratorDriver). All fields fall back to existing defaults when blank/zero. Co-Authored-By: Claude Sonnet 4.6 --- nodes/src/nodes/agent_crewai/IGlobal.py | 17 ++++++++- nodes/src/nodes/agent_crewai/crewai.py | 33 +++++++++++------ nodes/src/nodes/agent_crewai/services.json | 35 ++++++++++++++++++- .../agent_crewai/services.orchestrator.json | 29 +++++++++++++-- 4 files changed, 100 insertions(+), 14 deletions(-) diff --git a/nodes/src/nodes/agent_crewai/IGlobal.py b/nodes/src/nodes/agent_crewai/IGlobal.py index 567d222a3..08d0d5583 100644 --- a/nodes/src/nodes/agent_crewai/IGlobal.py +++ b/nodes/src/nodes/agent_crewai/IGlobal.py @@ -34,6 +34,10 @@ class IGlobal(IGlobalBase): agent: Any = None role: str = 'Assistant' task_description: str = '' + goal: str = '' + backstory: str = '' + expected_output: str = '' + max_iter: int = 0 def beginGlobal(self) -> None: if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG: @@ -51,17 +55,28 @@ def beginGlobal(self) -> None: conn_config = IJson.toDict(self.glb.connConfig) if self.glb.connConfig else {} conn_config = conn_config if isinstance(conn_config, dict) else {} + self.goal = str(conn_config.get('goal') or '').strip() + self.backstory = str(conn_config.get('backstory') or '').strip() + self.max_iter = int(conn_config.get('max_iter') or 0) + if self.glb.logicalType == 'agent_crewai_orchestrator': from .crewai import OrchestratorDriver self.agent = OrchestratorDriver(self) else: self.role = str(conn_config.get('role') or 'Assistant').strip() or 'Assistant' self.task_description = str(conn_config.get('task_description') or '').strip() + self.expected_output = str(conn_config.get('expected_output') or '').strip() from .crewai import CrewDriver - self.agent = CrewDriver(self, process=self.process, role=self.role, task_description=self.task_description) + self.agent = CrewDriver(self, process=self.process, role=self.role, task_description=self.task_description, + goal=self.goal, backstory=self.backstory, expected_output=self.expected_output, + max_iter=self.max_iter) def endGlobal(self) -> None: self.agent = None self.process = None self.role = 'Assistant' self.task_description = '' + self.goal = '' + self.backstory = '' + self.expected_output = '' + self.max_iter = 0 diff --git a/nodes/src/nodes/agent_crewai/crewai.py b/nodes/src/nodes/agent_crewai/crewai.py index f20e6f8c7..bdc781e25 100644 --- a/nodes/src/nodes/agent_crewai/crewai.py +++ b/nodes/src/nodes/agent_crewai/crewai.py @@ -184,11 +184,16 @@ class CrewDriver(CrewAgentBase): FRAMEWORK = 'crewai' - def __init__(self, iGlobal: Any, *, process: Any = None, role: str = 'Assistant', task_description: str = ''): + def __init__(self, iGlobal: Any, *, process: Any = None, role: str = 'Assistant', task_description: str = '', + goal: str = '', backstory: str = '', expected_output: str = '', max_iter: int = 0): super().__init__(iGlobal) self._process = process self._role = role self._task_description = task_description + self._goal = goal + self._backstory = backstory + self._expected_output = expected_output + self._max_iter = max_iter def describe(self, pSelf: Any) -> Any: """Return a DescribeResponse for crewai.describe fan-out. @@ -241,14 +246,17 @@ def _invoke_tool(tool_name: str, input: Any = None, kwargs: Optional[Dict[str, A ctx=ctx, ) - agent_obj = Agent( + agent_kwargs: Dict[str, Any] = dict( role=self._role, - goal=_DEFAULT_GOAL, - backstory=_DEFAULT_BACKSTORY, + goal=self._goal or _DEFAULT_GOAL, + backstory=self._backstory or _DEFAULT_BACKSTORY, tools=tools_for_agent, llm=llm, verbose=False, ) + if self._max_iter > 0: + agent_kwargs['max_iter'] = self._max_iter + agent_obj = Agent(**agent_kwargs) from ai.common.agent._internal.utils import extract_prompt prompt = extract_prompt(agent_input.question) if hasattr(agent_input, 'question') else '' @@ -258,7 +266,7 @@ def _invoke_tool(tool_name: str, input: Any = None, kwargs: Optional[Dict[str, A task_obj = Task( description=desc or 'Complete the user request.', - expected_output=_DEFAULT_EXPECTED_OUTPUT, + expected_output=self._expected_output or _DEFAULT_EXPECTED_OUTPUT, agent=agent_obj, markdown=False, ) @@ -362,6 +370,7 @@ class OrchestratorDriver(CrewAgentBase): def __init__(self, iGlobal: Any): super().__init__(iGlobal) + self._iGlobal = iGlobal # Stash for pSelf — needed in _run() to call pSelf.instance.invoke('crewai', ...). # Not thread-safe; safe because pipeline runs are sequential per node instance. self._current_pSelf: Any = None @@ -493,20 +502,24 @@ def _sub_invoke_tool( # 4. Build manager agent. The user's prompt goes into backstory (background context) # rather than the goal so it doesn't drive active reasoning on every LLM call. # The goal stays generic: delegate once, return the result. - manager_backstory = _MGR_BACKSTORY + ig = self._iGlobal + base_backstory = ig.backstory or _MGR_BACKSTORY if prompt: escaped_prompt = prompt.replace('{', '{{').replace('}', '}}') - manager_backstory = f'{_MGR_BACKSTORY}\n\nBackground context — user request: {escaped_prompt}' + manager_backstory = f'{base_backstory}\n\nBackground context — user request: {escaped_prompt}' + else: + manager_backstory = base_backstory - manager_agent = Agent( + mgr_kwargs: Dict[str, Any] = dict( role=_MGR_ROLE, - goal=_MGR_GOAL, + goal=ig.goal or _MGR_GOAL, backstory=manager_backstory, llm=manager_llm, verbose=False, allow_delegation=True, - max_iter=len(descriptors) + 2, + max_iter=ig.max_iter if ig.max_iter > 0 else len(descriptors) + 2, ) + manager_agent = Agent(**mgr_kwargs) # 5. Assemble and kick off the hierarchical Crew. crew = Crew( diff --git a/nodes/src/nodes/agent_crewai/services.json b/nodes/src/nodes/agent_crewai/services.json index 7d9ed0f8f..93c58cd47 100644 --- a/nodes/src/nodes/agent_crewai/services.json +++ b/nodes/src/nodes/agent_crewai/services.json @@ -36,7 +36,11 @@ "agent_description": "", "instructions": [], "role": "Assistant", - "task_description": "" + "task_description": "", + "goal": "", + "backstory": "", + "expected_output": "", + "max_iter": 0 } } }, @@ -71,8 +75,32 @@ "title": "Task", "description": "What this agent should do when used in a CrewAI Orchestrator pipeline. If blank, the incoming question is used." }, + "goal": { + "type": "string", + "format": "textarea", + "title": "Goal", + "description": "What this agent is trying to achieve. Leave blank to use the default." + }, + "backstory": { + "type": "string", + "format": "textarea", + "title": "Backstory", + "description": "Background context for this agent's persona. Leave blank to use the default." + }, + "expected_output": { + "type": "string", + "format": "textarea", + "title": "Expected Output", + "description": "Description of the expected output format. Leave blank to use the default." + }, + "max_iter": { + "type": "number", + "title": "Max Iterations", + "description": "Maximum agent iterations before forced stop. 0 uses the framework default (25)." + }, "agent_crewai.default": { "object": "default", "properties": ["agent_description", "instructions"] }, "agent_crewai.subagent": { "object": "default", "properties": ["role", "task_description"] }, + "agent_crewai.expert": { "object": "default", "properties": ["goal", "backstory", "expected_output", "max_iter"] }, "agent_crewai.profile": { "title": "Profile", "type": "string", @@ -91,6 +119,11 @@ "section": "Sub-Agent Settings", "title": "Settings when used in a CrewAI Orchestrator pipeline", "properties": ["agent_crewai.subagent"] + }, + { + "section": "Advanced", + "title": "Expert CrewAI settings", + "properties": ["agent_crewai.expert"] } ] } diff --git a/nodes/src/nodes/agent_crewai/services.orchestrator.json b/nodes/src/nodes/agent_crewai/services.orchestrator.json index 123d1e3ba..25214bfb2 100644 --- a/nodes/src/nodes/agent_crewai/services.orchestrator.json +++ b/nodes/src/nodes/agent_crewai/services.orchestrator.json @@ -40,7 +40,10 @@ "default": "default", "profiles": { "default": { - "instructions": [] + "instructions": [], + "goal": "", + "backstory": "", + "max_iter": 0 } } }, @@ -54,7 +57,24 @@ "format": "textarea" } }, - "agent_crewai_orchestrator.default": { "object": "default", "properties": ["instructions"] }, + "goal": { + "type": "string", + "format": "textarea", + "title": "Manager Goal", + "description": "What the orchestrator manager agent is trying to achieve. Leave blank to use the default." + }, + "backstory": { + "type": "string", + "format": "textarea", + "title": "Manager Backstory", + "description": "Background context for the orchestrator's persona. Leave blank to use the default." + }, + "max_iter": { + "type": "number", + "title": "Manager Max Iterations", + "description": "Maximum iterations for the orchestrator manager agent. 0 defaults to number of sub-agents + 2." + }, + "agent_crewai_orchestrator.default": { "object": "default", "properties": ["instructions", "goal", "backstory", "max_iter"] }, "agent_crewai_orchestrator.profile": { "title": "Profile", "type": "string", @@ -68,6 +88,11 @@ "section": "Pipe", "title": "CrewAI Orchestrator", "properties": ["instructions"] + }, + { + "section": "Advanced", + "title": "Expert manager settings", + "properties": ["goal", "backstory", "max_iter"] } ] } From 6a8e85e562a25e5af8eca60bbd8b48a620e60277 Mon Sep 17 00:00:00 2001 From: Dylan Savage Date: Thu, 2 Apr 2026 12:28:01 -0700 Subject: [PATCH 4/9] chore: cleanup modified crew code --- nodes/src/nodes/agent_crewai/IGlobal.py | 7 +- nodes/src/nodes/agent_crewai/crewai.py | 84 +++++++++++------- nodes/src/nodes/agent_crewai/services.json | 86 +++++++++++-------- .../agent_crewai/services.orchestrator.json | 46 +++++----- 4 files changed, 126 insertions(+), 97 deletions(-) diff --git a/nodes/src/nodes/agent_crewai/IGlobal.py b/nodes/src/nodes/agent_crewai/IGlobal.py index 08d0d5583..78b6f4b26 100644 --- a/nodes/src/nodes/agent_crewai/IGlobal.py +++ b/nodes/src/nodes/agent_crewai/IGlobal.py @@ -37,7 +37,6 @@ class IGlobal(IGlobalBase): goal: str = '' backstory: str = '' expected_output: str = '' - max_iter: int = 0 def beginGlobal(self) -> None: if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG: @@ -53,11 +52,9 @@ def beginGlobal(self) -> None: self.process = Process.sequential conn_config = IJson.toDict(self.glb.connConfig) if self.glb.connConfig else {} - conn_config = conn_config if isinstance(conn_config, dict) else {} self.goal = str(conn_config.get('goal') or '').strip() self.backstory = str(conn_config.get('backstory') or '').strip() - self.max_iter = int(conn_config.get('max_iter') or 0) if self.glb.logicalType == 'agent_crewai_orchestrator': from .crewai import OrchestratorDriver @@ -68,8 +65,7 @@ def beginGlobal(self) -> None: self.expected_output = str(conn_config.get('expected_output') or '').strip() from .crewai import CrewDriver self.agent = CrewDriver(self, process=self.process, role=self.role, task_description=self.task_description, - goal=self.goal, backstory=self.backstory, expected_output=self.expected_output, - max_iter=self.max_iter) + goal=self.goal, backstory=self.backstory, expected_output=self.expected_output) def endGlobal(self) -> None: self.agent = None @@ -79,4 +75,3 @@ def endGlobal(self) -> None: self.goal = '' self.backstory = '' self.expected_output = '' - self.max_iter = 0 diff --git a/nodes/src/nodes/agent_crewai/crewai.py b/nodes/src/nodes/agent_crewai/crewai.py index bdc781e25..2bd2e95b6 100644 --- a/nodes/src/nodes/agent_crewai/crewai.py +++ b/nodes/src/nodes/agent_crewai/crewai.py @@ -51,14 +51,15 @@ def _safe_str(v: Any) -> str: return '' -_DEFAULT_GOAL = ( - 'Complete the assigned task using the minimum tool calls needed. ' - 'Once you have sufficient information to answer, stop immediately and return your Final Answer.' -) +def _escape_braces(text: str) -> str: + """Escape curly braces so CrewAI doesn't treat them as template variables.""" + return text.replace('{', '{{').replace('}', '}}') + + +_DEFAULT_GOAL = 'Complete the assigned task to the best of your ability.' _DEFAULT_BACKSTORY = ( - 'You are a specialized agent in a multi-agent pipeline. ' - 'Use tools when necessary. As soon as you have enough information to answer, stop and provide your Final Answer. ' - 'Do not make additional tool calls beyond what is needed.' + 'You are a specialized agent in a multi-agent pipeline with access to tools. ' + 'Use your tools and reasoning to complete tasks effectively.' ) _DEFAULT_EXPECTED_OUTPUT = 'A clear, direct answer to the assigned task.' @@ -75,7 +76,11 @@ def _bind_framework_llm( call_llm_text: Callable[..., str], ctx: Dict[str, Any], ) -> Any: + """Wrap the host LLM channel as a CrewAI-compatible BaseLLM instance. + The returned HostInvokeLLM delegates all calls back through + ``call_llm_text``, which routes to the engine's llm invoke channel. + """ from crewai import BaseLLM class HostInvokeLLM(BaseLLM): @@ -103,7 +108,12 @@ def _bind_framework_tools( invoke_tool: Callable[..., Any], ctx: Dict[str, Any], ) -> List[Any]: + """Convert host tool descriptors into CrewAI BaseTool instances. + Each tool's JSON Schema is embedded in the description so CrewAI can + pass structured arguments. A dynamic Pydantic args_schema is built per + tool to preserve real parameter names through CrewAI's argument filter. + """ from crewai.tools import BaseTool from pydantic import BaseModel, ConfigDict, Field, create_model @@ -185,7 +195,12 @@ class CrewDriver(CrewAgentBase): FRAMEWORK = 'crewai' def __init__(self, iGlobal: Any, *, process: Any = None, role: str = 'Assistant', task_description: str = '', - goal: str = '', backstory: str = '', expected_output: str = '', max_iter: int = 0): + goal: str = '', backstory: str = '', expected_output: str = ''): + """Initialise the driver with per-node config loaded from connConfig. + + All string fields default to empty; empty values fall back to the + module-level ``_DEFAULT_*`` constants at run time. + """ super().__init__(iGlobal) self._process = process self._role = role @@ -193,7 +208,6 @@ def __init__(self, iGlobal: Any, *, process: Any = None, role: str = 'Assistant' self._goal = goal self._backstory = backstory self._expected_output = expected_output - self._max_iter = max_iter def describe(self, pSelf: Any) -> Any: """Return a DescribeResponse for crewai.describe fan-out. @@ -220,6 +234,13 @@ def _run( host: AgentHost, ctx: Dict[str, Any], ) -> AgentRunResult: + """Execute a single-agent CrewAI Crew and return the result text. + + Builds a one-agent, one-task Crew using the host's LLM and tool + channels. If ``task_description`` is blank the incoming prompt is used + as the task. All config fields fall back to ``_DEFAULT_*`` constants + when empty. + """ run_id = ctx.get('run_id', '') debug('agent_crewai driver _run start run_id={}'.format(run_id)) @@ -246,7 +267,7 @@ def _invoke_tool(tool_name: str, input: Any = None, kwargs: Optional[Dict[str, A ctx=ctx, ) - agent_kwargs: Dict[str, Any] = dict( + agent_obj = Agent( role=self._role, goal=self._goal or _DEFAULT_GOAL, backstory=self._backstory or _DEFAULT_BACKSTORY, @@ -254,15 +275,12 @@ def _invoke_tool(tool_name: str, input: Any = None, kwargs: Optional[Dict[str, A llm=llm, verbose=False, ) - if self._max_iter > 0: - agent_kwargs['max_iter'] = self._max_iter - agent_obj = Agent(**agent_kwargs) from ai.common.agent._internal.utils import extract_prompt prompt = extract_prompt(agent_input.question) if hasattr(agent_input, 'question') else '' task_text = self._task_description or prompt or '' - desc = task_text.replace('{', '{{').replace('}', '}}') + desc = _escape_braces(task_text) task_obj = Task( description=desc or 'Complete the user request.', @@ -329,15 +347,7 @@ def _on_any_event(source, event): result = crew.kickoff() - final_text = '' - if hasattr(result, 'raw'): - try: - final_text = _safe_str(getattr(result, 'raw')) - except Exception: - final_text = '' - if not final_text: - final_text = _safe_str(result) - + final_text = _safe_str(getattr(result, 'raw', None)) or _safe_str(result) return final_text, result @@ -346,9 +356,7 @@ def _on_any_event(source, event): _MGR_ROLE = 'Orchestrator' _MGR_GOAL = ( 'Coordinate the team to complete the user request. ' - 'Delegate to each available agent exactly once.' - 'Accept their first response without asking for revisions or improvements. ' - 'When all agents have responded, synthesize and return the final answer.' + 'Delegate to the appropriate agents and synthesize their outputs into a final answer.' ) _MGR_BACKSTORY = ( 'You are a senior orchestrator managing a team of specialized agents. ' @@ -369,6 +377,12 @@ class OrchestratorDriver(CrewAgentBase): FRAMEWORK = 'crewai_orchestrator' def __init__(self, iGlobal: Any): + """Initialise the orchestrator driver. + + Stores a reference to iGlobal for accessing expert config fields at + run time, and initialises the pSelf stash used to capture the engine + context across the run_agent → _run call boundary. + """ super().__init__(iGlobal) self._iGlobal = iGlobal # Stash for pSelf — needed in _run() to call pSelf.instance.invoke('crewai', ...). @@ -390,6 +404,15 @@ def _run( host: AgentHost, ctx: Dict[str, Any], ) -> AgentRunResult: + """Fan out crewai.describe to all connected sub-agents and run a hierarchical Crew. + + Steps: + 1. Collect descriptors from each sub-agent node via per-node crewai.describe invoke. + 2. Build a CrewAI Agent + Task per descriptor, routing LLM/tool calls back through + each sub-agent's own engine channels. + 3. Build the manager agent using this node's LLM channel and expert config. + 4. Kick off a hierarchical Crew and return the synthesised result. + """ from crewai import Agent, Crew, Process, Task from rocketlib.types import IInvokeCrew from ai.common.agent._internal.host import AgentHostServices @@ -479,7 +502,7 @@ def _sub_invoke_tool( tools=sub_tools, llm=sub_llm, verbose=False, - max_iter=3, + max_iter=5, allow_delegation=False, ) @@ -505,21 +528,20 @@ def _sub_invoke_tool( ig = self._iGlobal base_backstory = ig.backstory or _MGR_BACKSTORY if prompt: - escaped_prompt = prompt.replace('{', '{{').replace('}', '}}') + escaped_prompt = _escape_braces(prompt) manager_backstory = f'{base_backstory}\n\nBackground context — user request: {escaped_prompt}' else: manager_backstory = base_backstory - mgr_kwargs: Dict[str, Any] = dict( + manager_agent = Agent( role=_MGR_ROLE, goal=ig.goal or _MGR_GOAL, backstory=manager_backstory, llm=manager_llm, verbose=False, allow_delegation=True, - max_iter=ig.max_iter if ig.max_iter > 0 else len(descriptors) + 2, + max_iter=5, ) - manager_agent = Agent(**mgr_kwargs) # 5. Assemble and kick off the hierarchical Crew. crew = Crew( diff --git a/nodes/src/nodes/agent_crewai/services.json b/nodes/src/nodes/agent_crewai/services.json index 93c58cd47..6a068b432 100644 --- a/nodes/src/nodes/agent_crewai/services.json +++ b/nodes/src/nodes/agent_crewai/services.json @@ -1,7 +1,7 @@ { "title": "CrewAI", "protocol": "agent_crewai://", - "classType": ["agent", "tool"], + "classType": ["agent", "tool", "crewai"], "capabilities": ["invoke"], "register": "filter", "node": "python", @@ -35,12 +35,12 @@ "default": { "agent_description": "", "instructions": [], + "expert_mode": false, "role": "Assistant", "task_description": "", "goal": "", "backstory": "", - "expected_output": "", - "max_iter": 0 + "expected_output": "" } } }, @@ -64,66 +64,76 @@ "format": "textarea" } }, + "expert_mode": { + "type": "boolean", + "title": "Expert Mode", + "description": "Expose CrewAI Agent and Task configuration directly.", + "default": false, + "enum": [ + [false, "Off"], + [true, "On"] + ], + "conditional": [ + { + "value": true, + "properties": [ + "agent_crewai.agent_config_header", + "role", "goal", "backstory", + "agent_crewai.task_config_header", + "task_description", "expected_output" + ] + } + ] + }, + "agent_crewai.agent_config_header": { + "type": "null", + "title": "Agent Config", + "default": null + }, "role": { "type": "string", "title": "Role", - "description": "Short role name for this agent when used in a CrewAI Orchestrator pipeline (e.g. 'Financial Analyst')." - }, - "task_description": { - "type": "string", - "format": "textarea", - "title": "Task", - "description": "What this agent should do when used in a CrewAI Orchestrator pipeline. If blank, the incoming question is used." + "description": "Agent role name (e.g. 'Financial Analyst'). Maps to CrewAI Agent(role=...)." }, "goal": { "type": "string", "format": "textarea", "title": "Goal", - "description": "What this agent is trying to achieve. Leave blank to use the default." + "description": "What this agent is trying to achieve. Maps to CrewAI Agent(goal=...)." }, "backstory": { "type": "string", "format": "textarea", "title": "Backstory", - "description": "Background context for this agent's persona. Leave blank to use the default." + "description": "Background context for this agent's persona. Maps to CrewAI Agent(backstory=...)." + }, + "agent_crewai.task_config_header": { + "type": "null", + "title": "Task Config", + "default": null + }, + "task_description": { + "type": "string", + "format": "textarea", + "title": "Task", + "description": "What this agent should do. If blank, the incoming question is used. Maps to CrewAI Task(description=...)." }, "expected_output": { "type": "string", "format": "textarea", "title": "Expected Output", - "description": "Description of the expected output format. Leave blank to use the default." + "description": "Description of the expected output format. Maps to CrewAI Task(expected_output=...)." }, - "max_iter": { - "type": "number", - "title": "Max Iterations", - "description": "Maximum agent iterations before forced stop. 0 uses the framework default (25)." - }, - "agent_crewai.default": { "object": "default", "properties": ["agent_description", "instructions"] }, - "agent_crewai.subagent": { "object": "default", "properties": ["role", "task_description"] }, - "agent_crewai.expert": { "object": "default", "properties": ["goal", "backstory", "expected_output", "max_iter"] }, - "agent_crewai.profile": { - "title": "Profile", - "type": "string", - "default": "default", - "enum": [["default", "Default"]], - "conditional": [{ "value": "default", "properties": ["agent_crewai.default"] }] + "agent_crewai.default": { + "object": "default", + "properties": ["agent_description", "instructions", "expert_mode"] } }, "shape": [ { "section": "Pipe", "title": "CrewAI", - "properties": ["agent_description", "instructions"] - }, - { - "section": "Sub-Agent Settings", - "title": "Settings when used in a CrewAI Orchestrator pipeline", - "properties": ["agent_crewai.subagent"] - }, - { - "section": "Advanced", - "title": "Expert CrewAI settings", - "properties": ["agent_crewai.expert"] + "properties": ["agent_crewai.default"] } ] } diff --git a/nodes/src/nodes/agent_crewai/services.orchestrator.json b/nodes/src/nodes/agent_crewai/services.orchestrator.json index 25214bfb2..c27b0eca9 100644 --- a/nodes/src/nodes/agent_crewai/services.orchestrator.json +++ b/nodes/src/nodes/agent_crewai/services.orchestrator.json @@ -41,9 +41,9 @@ "profiles": { "default": { "instructions": [], + "expert_mode": false, "goal": "", - "backstory": "", - "max_iter": 0 + "backstory": "" } } }, @@ -57,42 +57,44 @@ "format": "textarea" } }, + "expert_mode": { + "type": "boolean", + "title": "Expert Mode", + "description": "Expose CrewAI manager Agent configuration directly.", + "default": false, + "enum": [ + [false, "Off"], + [true, "On"] + ], + "conditional": [ + { + "value": true, + "properties": ["goal", "backstory"] + } + ] + }, "goal": { "type": "string", "format": "textarea", "title": "Manager Goal", - "description": "What the orchestrator manager agent is trying to achieve. Leave blank to use the default." + "description": "What the orchestrator manager is trying to achieve. Maps to CrewAI Agent(goal=...)." }, "backstory": { "type": "string", "format": "textarea", "title": "Manager Backstory", - "description": "Background context for the orchestrator's persona. Leave blank to use the default." - }, - "max_iter": { - "type": "number", - "title": "Manager Max Iterations", - "description": "Maximum iterations for the orchestrator manager agent. 0 defaults to number of sub-agents + 2." + "description": "Background context for the orchestrator's persona. Maps to CrewAI Agent(backstory=...)." }, - "agent_crewai_orchestrator.default": { "object": "default", "properties": ["instructions", "goal", "backstory", "max_iter"] }, - "agent_crewai_orchestrator.profile": { - "title": "Profile", - "type": "string", - "default": "default", - "enum": [["default", "Default"]], - "conditional": [{ "value": "default", "properties": ["agent_crewai_orchestrator.default"] }] + "agent_crewai_orchestrator.default": { + "object": "default", + "properties": ["instructions", "expert_mode"] } }, "shape": [ { "section": "Pipe", "title": "CrewAI Orchestrator", - "properties": ["instructions"] - }, - { - "section": "Advanced", - "title": "Expert manager settings", - "properties": ["goal", "backstory", "max_iter"] + "properties": ["agent_crewai_orchestrator.default"] } ] } From 6892ad2c8d579ac6401eefcaeb03b57374380a58 Mon Sep 17 00:00:00 2001 From: Dylan Savage Date: Thu, 2 Apr 2026 12:45:59 -0700 Subject: [PATCH 5/9] Chore: Delete Crew readme --- nodes/src/nodes/agent_crewai/crew.md | 107 --------------------------- 1 file changed, 107 deletions(-) delete mode 100644 nodes/src/nodes/agent_crewai/crew.md diff --git a/nodes/src/nodes/agent_crewai/crew.md b/nodes/src/nodes/agent_crewai/crew.md deleted file mode 100644 index ca78450e6..000000000 --- a/nodes/src/nodes/agent_crewai/crew.md +++ /dev/null @@ -1,107 +0,0 @@ -# CrewAI Nodes — Topology Guide - -## Node Types - -### `agent_crewai` (Sub-Agent / Standalone) - -Channels: `llm` (required), `tool` (optional) - -**Standalone mode** (no orchestrator above): receives questions on the pipeline lane and runs a single-agent CrewAI Crew. - -**Sub-agent mode** (connected to an orchestrator via `crewai` channel): responds to `crewai.describe` with its role, task description, and LLM/tool invoke handle. The orchestrator assembles it into a hierarchical Crew. - -Config: -- `role` — short role name shown to the manager (e.g. `Financial Analyst`). Default: `Assistant`. -- `task_description` — what this agent should do. If blank, the full incoming question is used as the task. -- `instructions` — optional additional instructions (applied to every question). - ---- - -### `agent_crewai_orchestrator` (Orchestrator) - -Channels: `llm` (required), `crewai` (required, min 1), `tool` (optional) - -Always the top-level driver. On each question: -1. Fans out `crewai.describe` to all nodes on the `crewai` channel -2. Builds a hierarchical CrewAI Crew with each connected sub-agent -3. Kicks off all sub-tasks with `async_execution=True` -4. The manager (this node's own LLM) synthesizes sub-agent outputs -5. Emits the final answer on the pipeline - -Config: -- `instructions` — optional delegation guidance appended to every question. - -**Does NOT respond to `crewai.describe`** — orchestrators cannot be wired as sub-agents of another orchestrator. - ---- - -## Topology Map - -### Flat Crew (standard use) - -``` -[Orchestrator A] ──crewai──► [Sub-agent B] (own llm + tools) - ──crewai──► [Sub-agent C] (own llm + tools) - ──llm────► [LLM_A] -``` - -- A receives a question → fans out `crewai.describe` → B and C respond -- A builds hierarchical Crew: B and C as async sub-agents, A's LLM as manager -- B and C tasks run in parallel; each uses its own `llm`/`tool` channels -- Manager synthesizes outputs → answer emitted on A's pipeline lane - ---- - -### Depth via tool channel - -``` -[Orchestrator A] ──crewai──► [Sub-agent B] ──tool──► [Orchestrator C] - ──crewai──► [Sub-agent D] - ──crewai──► [Sub-agent E] -``` - -- B's task can invoke C as a tool during execution -- Calling C triggers C's full `_run()`, which fans out its own `crewai.describe` to D and E -- C assembles D and E into its own hierarchical Crew and returns a synthesized result to B as a tool call string -- C does NOT emit to the pipeline answers lane (`emit_answers_lane=False` for tool invocations) -- D and E's LLM/tool calls are routed through their own engine channels - ---- - -### What a sub-agent cannot do - -``` -[Sub-agent B] ──crewai──► ??? ← IMPOSSIBLE -``` - -`agent_crewai` has no `crewai` invoke channel. Sub-agents cannot directly have CrewAI sub-agents. To add depth, put an orchestrator on their `tool` channel instead (see above). - ---- - -## A→B→C Reference Cases - -| Wiring | What happens | -|--------|-------------| -| A(orch) → B(sub) via crewai, B(sub) → C(sub) via crewai | **IMPOSSIBLE** — B has no `crewai` channel | -| A(orch) → B(sub) via crewai, B(sub) → C(sub) via tool | C runs as a standalone single-agent Crew when B invokes it as a tool | -| A(orch) → B(sub) via crewai, B(sub) → C(orch) via tool | C assembles its own hierarchical Crew of its sub-agents when B calls it as a tool | -| A(orch) → C(orch) via crewai | C silently skipped (orchestrator has no `describe()`) — A sees 0 descriptors → RuntimeError | -| A(orch) → B(sub) via crewai, B(sub) → A via tool | Circular — A re-enters `run_agent`, overwrites `_current_pSelf`. Undefined behavior. | - ---- - -## Known Constraints - -1. **One level of CrewAI sub-agents per Crew.** `crewai.describe` fan-out is one level deep — only nodes directly on the orchestrator's `crewai` channel are discovered. Multi-level orchestration requires the `tool` channel. - -2. **Sub-agent tasks run in parallel, tool calls are serial.** `async_execution=True` applies within one Crew's flat sub-agent list. Tool-channel depth is blocking — each nested Crew must complete before the calling sub-agent continues. - -3. **No orchestrator-as-sub-agent.** Wiring an orchestrator to another orchestrator's `crewai` channel produces no result (no `describe()` method). The outer orchestrator raises `RuntimeError: no sub-agents connected` if no valid sub-agents respond. Use the `tool` channel for nested orchestrators. - -4. **Empty `task_description`.** If a sub-agent has no configured task, the full incoming question becomes its task. All such sub-agents receive identical work — differentiate them with explicit `task_description` values. - -5. **Defensive tool filter.** If a sub-agent node is accidentally wired to both the `crewai` and `tool` channels of the same orchestrator, its `run_agent` tool is automatically excluded from the manager's tool repertoire. The orchestrator uses `crewai` channel only for Crew assembly and never calls sub-agents directly as tools. - -6. **Circular tool wiring causes undefined behavior.** Do not wire a node as a tool to an orchestrator that is already above it in the Crew chain. - -7. **Nested orchestrator output size.** When an orchestrator returns as a tool result, its full synthesized answer enters the calling sub-agent's LLM context as a tool response. Deep nesting with verbose outputs can bloat context rapidly. From fc08a8800e9f74cb70657c83454b4593112ab9cb Mon Sep 17 00:00:00 2001 From: Dylan Savage Date: Thu, 2 Apr 2026 13:12:43 -0700 Subject: [PATCH 6/9] chore: rename 'expert' mode to 'advanced' mode --- nodes/src/nodes/agent_crewai/services.json | 15 +++++---------- .../nodes/agent_crewai/services.orchestrator.json | 14 +++++--------- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/nodes/src/nodes/agent_crewai/services.json b/nodes/src/nodes/agent_crewai/services.json index 6a068b432..fd9c8031d 100644 --- a/nodes/src/nodes/agent_crewai/services.json +++ b/nodes/src/nodes/agent_crewai/services.json @@ -35,7 +35,7 @@ "default": { "agent_description": "", "instructions": [], - "expert_mode": false, + "advanced_mode": false, "role": "Assistant", "task_description": "", "goal": "", @@ -64,9 +64,9 @@ "format": "textarea" } }, - "expert_mode": { + "advanced_mode": { "type": "boolean", - "title": "Expert Mode", + "title": "Advanced Mode", "description": "Expose CrewAI Agent and Task configuration directly.", "default": false, "enum": [ @@ -76,12 +76,7 @@ "conditional": [ { "value": true, - "properties": [ - "agent_crewai.agent_config_header", - "role", "goal", "backstory", - "agent_crewai.task_config_header", - "task_description", "expected_output" - ] + "properties": ["agent_crewai.agent_config_header", "role", "goal", "backstory", "agent_crewai.task_config_header", "task_description", "expected_output"] } ] }, @@ -126,7 +121,7 @@ }, "agent_crewai.default": { "object": "default", - "properties": ["agent_description", "instructions", "expert_mode"] + "properties": ["agent_description", "instructions", "advanced_mode"] } }, "shape": [ diff --git a/nodes/src/nodes/agent_crewai/services.orchestrator.json b/nodes/src/nodes/agent_crewai/services.orchestrator.json index c27b0eca9..1bcb44355 100644 --- a/nodes/src/nodes/agent_crewai/services.orchestrator.json +++ b/nodes/src/nodes/agent_crewai/services.orchestrator.json @@ -7,11 +7,7 @@ "node": "python", "path": "nodes.agent_crewai", "prefix": "agent", - "description": [ - "Multi-agent orchestrator using CrewAI hierarchical process.", - "Fans out to connected CrewAI sub-agent nodes, assembles a Crew, and synthesizes their outputs.", - "Can be invoked as a tool (`.run_agent`) for nested orchestration." - ], + "description": ["Multi-agent orchestrator using CrewAI hierarchical process.", "Fans out to connected CrewAI sub-agent nodes, assembles a Crew, and synthesizes their outputs.", "Can be invoked as a tool (`.run_agent`) for nested orchestration."], "icon": "crewai.svg", "invoke": { "llm": { @@ -41,7 +37,7 @@ "profiles": { "default": { "instructions": [], - "expert_mode": false, + "advanced_mode": false, "goal": "", "backstory": "" } @@ -57,9 +53,9 @@ "format": "textarea" } }, - "expert_mode": { + "advanced_mode": { "type": "boolean", - "title": "Expert Mode", + "title": "Advanced Mode", "description": "Expose CrewAI manager Agent configuration directly.", "default": false, "enum": [ @@ -87,7 +83,7 @@ }, "agent_crewai_orchestrator.default": { "object": "default", - "properties": ["instructions", "expert_mode"] + "properties": ["instructions", "advanced_mode"] } }, "shape": [ From 56555e9fbe4f3b3129499f98132a38a0ae545e10 Mon Sep 17 00:00:00 2001 From: Dylan Savage Date: Thu, 2 Apr 2026 15:24:41 -0700 Subject: [PATCH 7/9] chore: remove dead code found by code rabbit --- nodes/src/nodes/agent_crewai/crewai.py | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/nodes/src/nodes/agent_crewai/crewai.py b/nodes/src/nodes/agent_crewai/crewai.py index 2bd2e95b6..066b39389 100644 --- a/nodes/src/nodes/agent_crewai/crewai.py +++ b/nodes/src/nodes/agent_crewai/crewai.py @@ -44,6 +44,7 @@ # ── Shared utilities ────────────────────────────────────────────────────────── + def _safe_str(v: Any) -> str: try: return '' if v is None else str(v) @@ -57,15 +58,13 @@ def _escape_braces(text: str) -> str: _DEFAULT_GOAL = 'Complete the assigned task to the best of your ability.' -_DEFAULT_BACKSTORY = ( - 'You are a specialized agent in a multi-agent pipeline with access to tools. ' - 'Use your tools and reasoning to complete tasks effectively.' -) +_DEFAULT_BACKSTORY = 'You are a specialized agent in a multi-agent pipeline with access to tools. Use your tools and reasoning to complete tasks effectively.' _DEFAULT_EXPECTED_OUTPUT = 'A clear, direct answer to the assigned task.' # ── CrewAgentBase ───────────────────────────────────────────────────────────── + class CrewAgentBase(AgentBase): """Shared base for CrewDriver and OrchestratorDriver.""" @@ -189,13 +188,13 @@ def _run(self, input: Any = None, **kwargs: Any) -> str: # ── CrewDriver ──────────────────────────────────────────────────────────────── + class CrewDriver(CrewAgentBase): """Sub-agent mode / standalone single-agent Crew.""" FRAMEWORK = 'crewai' - def __init__(self, iGlobal: Any, *, process: Any = None, role: str = 'Assistant', task_description: str = '', - goal: str = '', backstory: str = '', expected_output: str = ''): + def __init__(self, iGlobal: Any, *, process: Any = None, role: str = 'Assistant', task_description: str = '', goal: str = '', backstory: str = '', expected_output: str = ''): """Initialise the driver with per-node config loaded from connConfig. All string fields default to empty; empty values fall back to the @@ -276,9 +275,7 @@ def _invoke_tool(tool_name: str, input: Any = None, kwargs: Optional[Dict[str, A verbose=False, ) - from ai.common.agent._internal.utils import extract_prompt - prompt = extract_prompt(agent_input.question) if hasattr(agent_input, 'question') else '' - task_text = self._task_description or prompt or '' + task_text = self._task_description or '' desc = _escape_braces(task_text) @@ -354,14 +351,8 @@ def _on_any_event(source, event): # ── OrchestratorDriver ──────────────────────────────────────────────────────── _MGR_ROLE = 'Orchestrator' -_MGR_GOAL = ( - 'Coordinate the team to complete the user request. ' - 'Delegate to the appropriate agents and synthesize their outputs into a final answer.' -) -_MGR_BACKSTORY = ( - 'You are a senior orchestrator managing a team of specialized agents. ' - 'Delegate tasks to the right agent and synthesize their outputs into a final answer.' -) +_MGR_GOAL = 'Coordinate the team to complete the user request. Delegate to the appropriate agents and synthesize their outputs into a final answer.' +_MGR_BACKSTORY = 'You are a senior orchestrator managing a team of specialized agents. Delegate tasks to the right agent and synthesize their outputs into a final answer.' class OrchestratorDriver(CrewAgentBase): From 5a310491467a5a8b2eb9ee50832e2ea4d6e3028c Mon Sep 17 00:00:00 2001 From: Dylan Savage Date: Mon, 6 Apr 2026 14:37:18 -0700 Subject: [PATCH 8/9] Fix: Rename orchestrator to Manager; remove tool invoke from manager node --- nodes/src/nodes/agent_crewai/IGlobal.py | 11 +++---- nodes/src/nodes/agent_crewai/IInstance.py | 2 +- nodes/src/nodes/agent_crewai/crewai.py | 30 +++++++++---------- ...rchestrator.json => services.manager.json} | 24 +++++++-------- 4 files changed, 32 insertions(+), 35 deletions(-) rename nodes/src/nodes/agent_crewai/{services.orchestrator.json => services.manager.json} (59%) diff --git a/nodes/src/nodes/agent_crewai/IGlobal.py b/nodes/src/nodes/agent_crewai/IGlobal.py index 78b6f4b26..49cb343ca 100644 --- a/nodes/src/nodes/agent_crewai/IGlobal.py +++ b/nodes/src/nodes/agent_crewai/IGlobal.py @@ -56,16 +56,17 @@ def beginGlobal(self) -> None: self.goal = str(conn_config.get('goal') or '').strip() self.backstory = str(conn_config.get('backstory') or '').strip() - if self.glb.logicalType == 'agent_crewai_orchestrator': - from .crewai import OrchestratorDriver - self.agent = OrchestratorDriver(self) + if self.glb.logicalType == 'agent_crewai_manager': + from .crewai import ManagerDriver + + self.agent = ManagerDriver(self) else: self.role = str(conn_config.get('role') or 'Assistant').strip() or 'Assistant' self.task_description = str(conn_config.get('task_description') or '').strip() self.expected_output = str(conn_config.get('expected_output') or '').strip() from .crewai import CrewDriver - self.agent = CrewDriver(self, process=self.process, role=self.role, task_description=self.task_description, - goal=self.goal, backstory=self.backstory, expected_output=self.expected_output) + + self.agent = CrewDriver(self, process=self.process, role=self.role, task_description=self.task_description, goal=self.goal, backstory=self.backstory, expected_output=self.expected_output) def endGlobal(self) -> None: self.agent = None diff --git a/nodes/src/nodes/agent_crewai/IInstance.py b/nodes/src/nodes/agent_crewai/IInstance.py index 90e4f5430..57946a9ef 100644 --- a/nodes/src/nodes/agent_crewai/IInstance.py +++ b/nodes/src/nodes/agent_crewai/IInstance.py @@ -45,7 +45,7 @@ def invoke(self, param: Any) -> Any: # noqa: ANN401 op = param.get('op') if isinstance(param, dict) else getattr(param, 'op', None) # crewai.describe fan-out: only sub-agents (CrewDriver) respond — guarded by hasattr. - # OrchestratorDriver has no describe() so it silently falls through. + # ManagerDriver has no describe() so it silently falls through. if isinstance(op, str) and op == 'crewai.describe' and hasattr(self.IGlobal.agent, 'describe'): descriptor = self.IGlobal.agent.describe(self) existing = getattr(param, 'agents', None) diff --git a/nodes/src/nodes/agent_crewai/crewai.py b/nodes/src/nodes/agent_crewai/crewai.py index 066b39389..2b703cdac 100644 --- a/nodes/src/nodes/agent_crewai/crewai.py +++ b/nodes/src/nodes/agent_crewai/crewai.py @@ -27,7 +27,7 @@ Contains: - CrewAgentBase: shared LLM/tool-binding logic - CrewDriver: sub-agent mode / standalone single-agent Crew - - OrchestratorDriver: hierarchical multi-agent Crew + - ManagerDriver: hierarchical multi-agent Crew """ from __future__ import annotations @@ -66,7 +66,7 @@ def _escape_braces(text: str) -> str: class CrewAgentBase(AgentBase): - """Shared base for CrewDriver and OrchestratorDriver.""" + """Shared base for CrewDriver and ManagerDriver.""" def _bind_framework_llm( self, @@ -211,7 +211,7 @@ def __init__(self, iGlobal: Any, *, process: Any = None, role: str = 'Assistant' def describe(self, pSelf: Any) -> Any: """Return a DescribeResponse for crewai.describe fan-out. - Called by IInstance.invoke() when the orchestrator fans out crewai.describe. + Called by IInstance.invoke() when the manager fans out crewai.describe. Stores the full pSelf IInstance in `invoke` so AgentHostServices(d.invoke) can call d.invoke.instance.* correctly. """ @@ -348,27 +348,27 @@ def _on_any_event(source, event): return final_text, result -# ── OrchestratorDriver ──────────────────────────────────────────────────────── +# ── ManagerDriver ───────────────────────────────────────────────────────────── -_MGR_ROLE = 'Orchestrator' +_MGR_ROLE = 'Manager' _MGR_GOAL = 'Coordinate the team to complete the user request. Delegate to the appropriate agents and synthesize their outputs into a final answer.' -_MGR_BACKSTORY = 'You are a senior orchestrator managing a team of specialized agents. Delegate tasks to the right agent and synthesize their outputs into a final answer.' +_MGR_BACKSTORY = 'You are a senior manager coordinating a team of specialized agents. Delegate tasks to the right agent and synthesize their outputs into a final answer.' -class OrchestratorDriver(CrewAgentBase): +class ManagerDriver(CrewAgentBase): """Hierarchical multi-agent Crew. Fans out `crewai.describe` to all nodes on the 'crewai' invoke channel, assembles each into a CrewAI Agent + Task, and kicks off a hierarchical Crew with this node acting as the manager. - Does NOT implement `describe()` — orchestrators cannot be used as sub-agents. + Does NOT implement `describe()` — the manager cannot be used as a sub-agent. """ - FRAMEWORK = 'crewai_orchestrator' + FRAMEWORK = 'crewai_manager' def __init__(self, iGlobal: Any): - """Initialise the orchestrator driver. + """Initialise the manager driver. Stores a reference to iGlobal for accessing expert config fields at run time, and initialises the pSelf stash used to capture the engine @@ -410,7 +410,7 @@ def _run( run_id = ctx.get('run_id', '') prompt = _safe_str(agent_input.question.getPrompt() if hasattr(agent_input, 'question') else '') - debug('agent_crewai_orchestrator _run start run_id={} prompt_len={}'.format(run_id, len(prompt))) + debug('agent_crewai_manager _run start run_id={} prompt_len={}'.format(run_id, len(prompt))) pSelf = self._current_pSelf @@ -420,7 +420,7 @@ def _run( # each crewai node individually with nodeId= to reach all of them. crewai_node_ids = pSelf.instance.getControllerNodeIds('crewai') if not crewai_node_ids: - raise RuntimeError('CrewAI Orchestrator: no sub-agents connected on the crewai channel') + raise RuntimeError('CrewAI Manager: no sub-agents connected on the crewai channel') descriptors = [] for node_id in crewai_node_ids: @@ -434,9 +434,9 @@ def _run( descriptors.append(agent_desc) if not descriptors: - raise RuntimeError('CrewAI Orchestrator: no sub-agents responded to crewai.describe') + raise RuntimeError('CrewAI Manager: no sub-agents responded to crewai.describe') - # 2. Build the manager's LLM (uses this orchestrator's own llm channel). + # 2. Build the manager's LLM (uses this node's own llm channel). def _mgr_call_llm_text(messages: Any, stop_words: Any = None, _h: AgentHost = host) -> str: return self.call_host_llm( host=_h, @@ -545,7 +545,7 @@ def _sub_invoke_tool( verbose=False, ) - debug('agent_crewai_orchestrator kicking off crew with {} sub-agents run_id={}'.format(len(sub_agents), run_id)) + debug('agent_crewai_manager kicking off crew with {} sub-agents run_id={}'.format(len(sub_agents), run_id)) result = crew.kickoff(inputs={'user_request': prompt} if prompt else {}) final_text = _safe_str(getattr(result, 'raw', None)) or _safe_str(result) diff --git a/nodes/src/nodes/agent_crewai/services.orchestrator.json b/nodes/src/nodes/agent_crewai/services.manager.json similarity index 59% rename from nodes/src/nodes/agent_crewai/services.orchestrator.json rename to nodes/src/nodes/agent_crewai/services.manager.json index 1bcb44355..df3e014da 100644 --- a/nodes/src/nodes/agent_crewai/services.orchestrator.json +++ b/nodes/src/nodes/agent_crewai/services.manager.json @@ -1,26 +1,22 @@ { - "title": "CrewAI Orchestrator", - "protocol": "agent_crewai_orchestrator://", + "title": "CrewAI Manager", + "protocol": "agent_crewai_manager://", "classType": ["agent", "tool"], "capabilities": ["invoke"], "register": "filter", "node": "python", "path": "nodes.agent_crewai", "prefix": "agent", - "description": ["Multi-agent orchestrator using CrewAI hierarchical process.", "Fans out to connected CrewAI sub-agent nodes, assembles a Crew, and synthesizes their outputs.", "Can be invoked as a tool (`.run_agent`) for nested orchestration."], + "description": ["Multi-agent manager using CrewAI hierarchical process.", "Fans out to connected CrewAI sub-agent nodes, assembles a Crew, and synthesizes their outputs.", "Can be invoked as a tool (`.run_agent`) for nested orchestration."], "icon": "crewai.svg", "invoke": { "llm": { - "description": "LLM used by the orchestrator manager agent", + "description": "LLM used by the manager manager agent", "min": 1 }, "crewai": { "description": "Connected CrewAI sub-agent nodes", "min": 1 - }, - "tool": { - "description": "Optional direct tools available to the orchestrator manager", - "min": 0 } }, "lanes": { @@ -47,7 +43,7 @@ "instructions": { "type": "array", "title": "Instructions", - "description": "Additional instructions to guide the orchestrator's delegation strategy.", + "description": "Additional instructions to guide the manager's delegation strategy.", "items": { "type": "string", "format": "textarea" @@ -73,15 +69,15 @@ "type": "string", "format": "textarea", "title": "Manager Goal", - "description": "What the orchestrator manager is trying to achieve. Maps to CrewAI Agent(goal=...)." + "description": "What the manager manager is trying to achieve. Maps to CrewAI Agent(goal=...)." }, "backstory": { "type": "string", "format": "textarea", "title": "Manager Backstory", - "description": "Background context for the orchestrator's persona. Maps to CrewAI Agent(backstory=...)." + "description": "Background context for the manager's persona. Maps to CrewAI Agent(backstory=...)." }, - "agent_crewai_orchestrator.default": { + "agent_crewai_manager.default": { "object": "default", "properties": ["instructions", "advanced_mode"] } @@ -89,8 +85,8 @@ "shape": [ { "section": "Pipe", - "title": "CrewAI Orchestrator", - "properties": ["agent_crewai_orchestrator.default"] + "title": "CrewAI Manager", + "properties": ["agent_crewai_manager.default"] } ] } From 3e8ad38284e890017ca387da31fef1ad0fdef958 Mon Sep 17 00:00:00 2001 From: Dylan Savage Date: Mon, 6 Apr 2026 15:35:30 -0700 Subject: [PATCH 9/9] fix: propagate sub-agent advanced config through crewai.describe() goal, backstory, expected_output, and instructions were all loaded from connConfig but not included in DescribeResponse, causing the manager to silently fall back to hardcoded defaults for every sub-agent. - Add goal, backstory, expected_output, instructions to DescribeResponse - Populate them in CrewDriver.describe() - Use d.goal/backstory/expected_output in ManagerDriver sub-agent build - Append instructions to sub-agent backstory (substitutes for the run_agent() path that is bypassed in manager mode) Co-Authored-By: Claude Sonnet 4.6 --- nodes/src/nodes/agent_crewai/crewai.py | 15 ++++++++++++--- .../rocketlib-python/lib/rocketlib/types.py | 4 ++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/nodes/src/nodes/agent_crewai/crewai.py b/nodes/src/nodes/agent_crewai/crewai.py index 2b703cdac..0768300f9 100644 --- a/nodes/src/nodes/agent_crewai/crewai.py +++ b/nodes/src/nodes/agent_crewai/crewai.py @@ -222,6 +222,10 @@ def describe(self, pSelf: Any) -> Any: return IInvokeCrew.DescribeResponse( role=self._role, task_description=self._task_description, + goal=self._goal, + backstory=self._backstory, + expected_output=self._expected_output, + instructions=list(self._instructions), node_id=node_id, invoke=pSelf, ) @@ -486,10 +490,15 @@ def _sub_invoke_tool( ctx=ctx, ) + sub_backstory = d.backstory or _DEFAULT_BACKSTORY + sub_instructions = [i.strip() for i in (d.instructions or []) if i and i.strip()] + if sub_instructions: + sub_backstory = sub_backstory + '\n\nInstructions:\n' + '\n'.join(f'- {i}' for i in sub_instructions) + agent_obj = Agent( role=d.role, - goal=_DEFAULT_GOAL, - backstory=_DEFAULT_BACKSTORY, + goal=d.goal or _DEFAULT_GOAL, + backstory=sub_backstory, tools=sub_tools, llm=sub_llm, verbose=False, @@ -506,7 +515,7 @@ def _sub_invoke_tool( task_obj = Task( description=task_desc, - expected_output=_DEFAULT_EXPECTED_OUTPUT, + expected_output=d.expected_output or _DEFAULT_EXPECTED_OUTPUT, agent=agent_obj, ) diff --git a/packages/server/engine-lib/rocketlib-python/lib/rocketlib/types.py b/packages/server/engine-lib/rocketlib-python/lib/rocketlib/types.py index 5416249b2..11a237908 100644 --- a/packages/server/engine-lib/rocketlib-python/lib/rocketlib/types.py +++ b/packages/server/engine-lib/rocketlib-python/lib/rocketlib/types.py @@ -285,6 +285,10 @@ class DescribeResponse(BaseModel): op: str = Field(default='crewai.describe', frozen=True) role: str task_description: str + goal: str = '' + backstory: str = '' + expected_output: str = '' + instructions: List[str] = Field(default_factory=list) node_id: str = '' # pSelf.instance.pipeType['id'] — used to filter sub-agents from tool list invoke: Any = Field(default=None) # full pSelf IInstance — passed to AgentHostServices(d.invoke) model_config = ConfigDict(extra='allow')