diff --git a/nodes/src/nodes/agent_crewai/IGlobal.py b/nodes/src/nodes/agent_crewai/IGlobal.py index 19b897822..49cb343ca 100644 --- a/nodes/src/nodes/agent_crewai/IGlobal.py +++ b/nodes/src/nodes/agent_crewai/IGlobal.py @@ -26,12 +26,17 @@ import os from typing import Any -from rocketlib import IGlobalBase, OPEN_MODE +from rocketlib import IGlobalBase, IJson, OPEN_MODE class IGlobal(IGlobalBase): process: Any = None agent: Any = None + role: str = 'Assistant' + task_description: str = '' + goal: str = '' + backstory: str = '' + expected_output: str = '' def beginGlobal(self) -> None: if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG: @@ -46,10 +51,28 @@ def beginGlobal(self) -> None: self.process = Process.sequential - from .crewai import CrewDriver + conn_config = IJson.toDict(self.glb.connConfig) if self.glb.connConfig else {} - self.agent = CrewDriver(self, process=self.process) + self.goal = str(conn_config.get('goal') or '').strip() + self.backstory = str(conn_config.get('backstory') or '').strip() + + if self.glb.logicalType == 'agent_crewai_manager': + from .crewai import ManagerDriver + + self.agent = ManagerDriver(self) + else: + self.role = str(conn_config.get('role') or 'Assistant').strip() or 'Assistant' + self.task_description = str(conn_config.get('task_description') or '').strip() + self.expected_output = str(conn_config.get('expected_output') or '').strip() + from .crewai import CrewDriver + + self.agent = CrewDriver(self, process=self.process, role=self.role, task_description=self.task_description, goal=self.goal, backstory=self.backstory, expected_output=self.expected_output) def endGlobal(self) -> None: self.agent = None self.process = None + self.role = 'Assistant' + self.task_description = '' + self.goal = '' + self.backstory = '' + self.expected_output = '' diff --git a/nodes/src/nodes/agent_crewai/IInstance.py b/nodes/src/nodes/agent_crewai/IInstance.py index ab92e5eea..57946a9ef 100644 --- a/nodes/src/nodes/agent_crewai/IInstance.py +++ b/nodes/src/nodes/agent_crewai/IInstance.py @@ -42,8 +42,24 @@ def writeQuestions(self, question: Question): self.IGlobal.agent.run_agent(self, question, emit_answers_lane=True) def invoke(self, param: Any) -> Any: # noqa: ANN401 - # Only intercept tool.* control-plane operations; otherwise fall back. op = param.get('op') if isinstance(param, dict) else getattr(param, 'op', None) + + # crewai.describe fan-out: only sub-agents (CrewDriver) respond — guarded by hasattr. + # ManagerDriver has no describe() so it silently falls through. + if isinstance(op, str) and op == 'crewai.describe' and hasattr(self.IGlobal.agent, 'describe'): + descriptor = self.IGlobal.agent.describe(self) + existing = getattr(param, 'agents', None) + if isinstance(existing, list): + existing.append(descriptor) + try: + param.agents = existing + except Exception: + pass + return param + return [descriptor] + + # tool.* control-plane operations for agent-as-tool. if isinstance(op, str) and op.startswith('tool.'): return self.IGlobal.agent.handle_invoke(self, param) + return super().invoke(param) diff --git a/nodes/src/nodes/agent_crewai/crewai.py b/nodes/src/nodes/agent_crewai/crewai.py index 3ac81e46c..0768300f9 100644 --- a/nodes/src/nodes/agent_crewai/crewai.py +++ b/nodes/src/nodes/agent_crewai/crewai.py @@ -5,7 +5,7 @@ # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights -# to use, copy, merge, publish, distribute, sublicense, and/or sell +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # @@ -17,12 +17,17 @@ # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OF OTHER DEALINGS IN THE +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # ============================================================================= """ -CrewAI driver implementing the shared `ai.common.agent.AGENT` interface. +CrewAI drivers implementing the shared `ai.common.agent.AGENT` interface. + +Contains: + - CrewAgentBase: shared LLM/tool-binding logic + - CrewDriver: sub-agent mode / standalone single-agent Crew + - ManagerDriver: hierarchical multi-agent Crew """ from __future__ import annotations @@ -37,15 +42,31 @@ from rocketlib import ToolDescriptor -class CrewDriver(AgentBase): - FRAMEWORK = 'crewai' +# ── Shared utilities ────────────────────────────────────────────────────────── - def __init__(self, iGlobal: Any, *, process: Any = None): - """ - Initialize the CrewDriver. - """ - super().__init__(iGlobal) - self._process = process + +def _safe_str(v: Any) -> str: + try: + return '' if v is None else str(v) + except Exception: + return '' + + +def _escape_braces(text: str) -> str: + """Escape curly braces so CrewAI doesn't treat them as template variables.""" + return text.replace('{', '{{').replace('}', '}}') + + +_DEFAULT_GOAL = 'Complete the assigned task to the best of your ability.' +_DEFAULT_BACKSTORY = 'You are a specialized agent in a multi-agent pipeline with access to tools. Use your tools and reasoning to complete tasks effectively.' +_DEFAULT_EXPECTED_OUTPUT = 'A clear, direct answer to the assigned task.' + + +# ── CrewAgentBase ───────────────────────────────────────────────────────────── + + +class CrewAgentBase(AgentBase): + """Shared base for CrewDriver and ManagerDriver.""" def _bind_framework_llm( self, @@ -54,7 +75,11 @@ def _bind_framework_llm( call_llm_text: Callable[..., str], ctx: Dict[str, Any], ) -> Any: + """Wrap the host LLM channel as a CrewAI-compatible BaseLLM instance. + The returned HostInvokeLLM delegates all calls back through + ``call_llm_text``, which routes to the engine's llm invoke channel. + """ from crewai import BaseLLM class HostInvokeLLM(BaseLLM): @@ -80,12 +105,16 @@ def _bind_framework_tools( host: AgentHost, tool_descriptors: List[ToolDescriptor], invoke_tool: Callable[..., Any], - log_tool_call: Callable[..., None], ctx: Dict[str, Any], ) -> List[Any]: + """Convert host tool descriptors into CrewAI BaseTool instances. + Each tool's JSON Schema is embedded in the description so CrewAI can + pass structured arguments. A dynamic Pydantic args_schema is built per + tool to preserve real parameter names through CrewAI's argument filter. + """ from crewai.tools import BaseTool - from pydantic import BaseModel, ConfigDict, Field, create_model # noqa: E501 + from pydantic import BaseModel, ConfigDict, Field, create_model class _ToolInput(BaseModel): input: Any = Field(default=None, description='Tool input payload') @@ -130,11 +159,6 @@ def _run(self, input: Any = None, **kwargs: Any) -> str: except Exception as e: out = {'error': str(e), 'type': type(e).__name__} - try: - log_tool_call(tool_name=self.name, input={'input': input, **kwargs}, output=out) - except Exception: - pass - try: return json.dumps(out, default=str) if isinstance(out, (dict, list)) else _safe_str(out) except Exception: @@ -161,6 +185,51 @@ def _run(self, input: Any = None, **kwargs: Any) -> str: tools.append(HostTool(name=name, description=desc, args_schema=schema_cls)) return tools + +# ── CrewDriver ──────────────────────────────────────────────────────────────── + + +class CrewDriver(CrewAgentBase): + """Sub-agent mode / standalone single-agent Crew.""" + + FRAMEWORK = 'crewai' + + def __init__(self, iGlobal: Any, *, process: Any = None, role: str = 'Assistant', task_description: str = '', goal: str = '', backstory: str = '', expected_output: str = ''): + """Initialise the driver with per-node config loaded from connConfig. + + All string fields default to empty; empty values fall back to the + module-level ``_DEFAULT_*`` constants at run time. + """ + super().__init__(iGlobal) + self._process = process + self._role = role + self._task_description = task_description + self._goal = goal + self._backstory = backstory + self._expected_output = expected_output + + def describe(self, pSelf: Any) -> Any: + """Return a DescribeResponse for crewai.describe fan-out. + + Called by IInstance.invoke() when the manager fans out crewai.describe. + Stores the full pSelf IInstance in `invoke` so AgentHostServices(d.invoke) + can call d.invoke.instance.* correctly. + """ + from rocketlib.types import IInvokeCrew + + pipe_type = pSelf.instance.pipeType + node_id = str(pipe_type.get('id') if isinstance(pipe_type, dict) else getattr(pipe_type, 'id', '')) or '' + return IInvokeCrew.DescribeResponse( + role=self._role, + task_description=self._task_description, + goal=self._goal, + backstory=self._backstory, + expected_output=self._expected_output, + instructions=list(self._instructions), + node_id=node_id, + invoke=pSelf, + ) + def _run( self, *, @@ -168,10 +237,17 @@ def _run( host: AgentHost, ctx: Dict[str, Any], ) -> AgentRunResult: + """Execute a single-agent CrewAI Crew and return the result text. + + Builds a one-agent, one-task Crew using the host's LLM and tool + channels. If ``task_description`` is blank the incoming prompt is used + as the task. All config fields fall back to ``_DEFAULT_*`` constants + when empty. + """ run_id = ctx.get('run_id', '') - debug('agent_crewai driver _run start run_id={} prompt_len={}'.format(run_id, len(agent_input.question.getPrompt() or ''))) + debug('agent_crewai driver _run start run_id={}'.format(run_id)) - from crewai import Agent, Crew, Task # type: ignore + from crewai import Agent, Crew, Task tool_descriptors = self.discover_tools(host=host) @@ -179,7 +255,7 @@ def _call_llm_text(messages: Any, stop_words: Any = None) -> str: return self.call_host_llm( host=host, messages=messages, - question_role='You are a helpful assistant.', + question_role=self._role, stop_words=stop_words, ) @@ -191,33 +267,25 @@ def _invoke_tool(tool_name: str, input: Any = None, kwargs: Optional[Dict[str, A host=host, tool_descriptors=tool_descriptors, invoke_tool=_invoke_tool, - log_tool_call=lambda **_: None, ctx=ctx, ) agent_obj = Agent( - role='Assistant', - goal='Solve the user request using available tools when helpful.', - backstory=('You are an agent node in a tool-invocation hierarchy. You may call tools wired to you via the host tools interface. When a tool is needed, call it; otherwise respond directly. Follow any additional instructions exactly.'), + role=self._role, + goal=self._goal or _DEFAULT_GOAL, + backstory=self._backstory or _DEFAULT_BACKSTORY, tools=tools_for_agent, llm=llm, verbose=False, ) - desc_parts = [ - 'You are executing inside an agent pipeline.', - 'Use tools when needed (and only those available to you).', - '', - 'User request:', - _safe_str(agent_input.question.getPrompt() or ''), - ] - desc = '\n'.join(desc_parts).strip() + task_text = self._task_description or '' - desc = desc.replace('{', '{{').replace('}', '}}') + desc = _escape_braces(task_text) task_obj = Task( - description=desc, - expected_output='A helpful, accurate response.', + description=desc or 'Complete the user request.', + expected_output=self._expected_output or _DEFAULT_EXPECTED_OUTPUT, agent=agent_obj, markdown=False, ) @@ -280,20 +348,214 @@ def _on_any_event(source, event): result = crew.kickoff() - final_text = '' - if hasattr(result, 'raw'): + final_text = _safe_str(getattr(result, 'raw', None)) or _safe_str(result) + return final_text, result + + +# ── ManagerDriver ───────────────────────────────────────────────────────────── + +_MGR_ROLE = 'Manager' +_MGR_GOAL = 'Coordinate the team to complete the user request. Delegate to the appropriate agents and synthesize their outputs into a final answer.' +_MGR_BACKSTORY = 'You are a senior manager coordinating a team of specialized agents. Delegate tasks to the right agent and synthesize their outputs into a final answer.' + + +class ManagerDriver(CrewAgentBase): + """Hierarchical multi-agent Crew. + + Fans out `crewai.describe` to all nodes on the 'crewai' invoke channel, + assembles each into a CrewAI Agent + Task, and kicks off a hierarchical + Crew with this node acting as the manager. + + Does NOT implement `describe()` — the manager cannot be used as a sub-agent. + """ + + FRAMEWORK = 'crewai_manager' + + def __init__(self, iGlobal: Any): + """Initialise the manager driver. + + Stores a reference to iGlobal for accessing expert config fields at + run time, and initialises the pSelf stash used to capture the engine + context across the run_agent → _run call boundary. + """ + super().__init__(iGlobal) + self._iGlobal = iGlobal + # Stash for pSelf — needed in _run() to call pSelf.instance.invoke('crewai', ...). + # Not thread-safe; safe because pipeline runs are sequential per node instance. + self._current_pSelf: Any = None + + def run_agent(self, pSelf: Any, question: Any, *, emit_answers_lane: bool = True) -> Any: + """Override to stash pSelf before delegating to AgentBase.run_agent().""" + self._current_pSelf = pSelf + try: + return super().run_agent(pSelf, question, emit_answers_lane=emit_answers_lane) + finally: + self._current_pSelf = None + + def _run( + self, + *, + agent_input: AgentInput, + host: AgentHost, + ctx: Dict[str, Any], + ) -> AgentRunResult: + """Fan out crewai.describe to all connected sub-agents and run a hierarchical Crew. + + Steps: + 1. Collect descriptors from each sub-agent node via per-node crewai.describe invoke. + 2. Build a CrewAI Agent + Task per descriptor, routing LLM/tool calls back through + each sub-agent's own engine channels. + 3. Build the manager agent using this node's LLM channel and expert config. + 4. Kick off a hierarchical Crew and return the synthesised result. + """ + from crewai import Agent, Crew, Process, Task + from rocketlib.types import IInvokeCrew + from ai.common.agent._internal.host import AgentHostServices + + run_id = ctx.get('run_id', '') + prompt = _safe_str(agent_input.question.getPrompt() if hasattr(agent_input, 'question') else '') + debug('agent_crewai_manager _run start run_id={} prompt_len={}'.format(run_id, len(prompt))) + + pSelf = self._current_pSelf + + # 1. Discover all connected sub-agents via per-node invoke (mirrors the tool + # discovery pattern in AgentHostServices.Tools.__init__). + # A no-nodeId invoke stops at the first successful handler, so we iterate + # each crewai node individually with nodeId= to reach all of them. + crewai_node_ids = pSelf.instance.getControllerNodeIds('crewai') + if not crewai_node_ids: + raise RuntimeError('CrewAI Manager: no sub-agents connected on the crewai channel') + + descriptors = [] + for node_id in crewai_node_ids: + req = IInvokeCrew.Describe() try: - final_text = _safe_str(getattr(result, 'raw')) + pSelf.instance.invoke('crewai', req, nodeId=node_id) except Exception: - final_text = '' - if not final_text: - final_text = _safe_str(result) + pass + for agent_desc in req.agents: + if agent_desc is not None: + descriptors.append(agent_desc) - return final_text, result + if not descriptors: + raise RuntimeError('CrewAI Manager: no sub-agents responded to crewai.describe') + # 2. Build the manager's LLM (uses this node's own llm channel). + def _mgr_call_llm_text(messages: Any, stop_words: Any = None, _h: AgentHost = host) -> str: + return self.call_host_llm( + host=_h, + messages=messages, + question_role=_MGR_ROLE, + stop_words=stop_words, + ) -def _safe_str(v: Any) -> str: - try: - return '' if v is None else str(v) - except Exception: - return '' + manager_llm = self._bind_framework_llm(host=host, call_llm_text=_mgr_call_llm_text, ctx=ctx) + + # 3. Build per-sub-agent Agent + Task. + # d.invoke is the sub-agent's full pSelf IInstance. + # Default-arg capture (_h, _role) prevents closure-in-loop bugs. + sub_agents: List[Any] = [] + sub_tasks: List[Any] = [] + + for d in descriptors: + sub_host = AgentHostServices(d.invoke) + + def _sub_call_llm_text( + messages: Any, + stop_words: Any = None, + _h: Any = sub_host, + _role: str = d.role, + ) -> str: + return self.call_host_llm( + host=_h, + messages=messages, + question_role=_role, + stop_words=stop_words, + ) + + def _sub_invoke_tool( + tool_name: str, + input: Any = None, # noqa: A002 + kwargs: Optional[Dict[str, Any]] = None, + _h: Any = sub_host, + ) -> Any: + return self.invoke_host_tool(host=_h, tool_name=tool_name, input=input, kwargs=kwargs) + + sub_tool_descs = self.discover_tools(host=sub_host) + sub_llm = self._bind_framework_llm(host=sub_host, call_llm_text=_sub_call_llm_text, ctx=ctx) + sub_tools = self._bind_framework_tools( + host=sub_host, + tool_descriptors=sub_tool_descs, + invoke_tool=_sub_invoke_tool, + ctx=ctx, + ) + + sub_backstory = d.backstory or _DEFAULT_BACKSTORY + sub_instructions = [i.strip() for i in (d.instructions or []) if i and i.strip()] + if sub_instructions: + sub_backstory = sub_backstory + '\n\nInstructions:\n' + '\n'.join(f'- {i}' for i in sub_instructions) + + agent_obj = Agent( + role=d.role, + goal=d.goal or _DEFAULT_GOAL, + backstory=sub_backstory, + tools=sub_tools, + llm=sub_llm, + verbose=False, + max_iter=5, + allow_delegation=False, + ) + + task_text = d.task_description or '' + if not task_text: + task_text = prompt or 'Complete the user request.' + elif prompt: + task_text = f'{task_text}\n\nUser request: {prompt}' + task_desc = task_text.replace('{', '{{').replace('}', '}}') + + task_obj = Task( + description=task_desc, + expected_output=d.expected_output or _DEFAULT_EXPECTED_OUTPUT, + agent=agent_obj, + ) + + sub_agents.append(agent_obj) + sub_tasks.append(task_obj) + + # 4. Build manager agent. The user's prompt goes into backstory (background context) + # rather than the goal so it doesn't drive active reasoning on every LLM call. + # The goal stays generic: delegate once, return the result. + ig = self._iGlobal + base_backstory = ig.backstory or _MGR_BACKSTORY + if prompt: + escaped_prompt = _escape_braces(prompt) + manager_backstory = f'{base_backstory}\n\nBackground context — user request: {escaped_prompt}' + else: + manager_backstory = base_backstory + + manager_agent = Agent( + role=_MGR_ROLE, + goal=ig.goal or _MGR_GOAL, + backstory=manager_backstory, + llm=manager_llm, + verbose=False, + allow_delegation=True, + max_iter=5, + ) + + # 5. Assemble and kick off the hierarchical Crew. + crew = Crew( + agents=sub_agents, + tasks=sub_tasks, + process=Process.hierarchical, + manager_agent=manager_agent, + planning=True, + planning_llm=manager_llm, + verbose=False, + ) + + debug('agent_crewai_manager kicking off crew with {} sub-agents run_id={}'.format(len(sub_agents), run_id)) + result = crew.kickoff(inputs={'user_request': prompt} if prompt else {}) + + final_text = _safe_str(getattr(result, 'raw', None)) or _safe_str(result) + return final_text, result diff --git a/nodes/src/nodes/agent_crewai/services.json b/nodes/src/nodes/agent_crewai/services.json index d7d977e8d..fd9c8031d 100644 --- a/nodes/src/nodes/agent_crewai/services.json +++ b/nodes/src/nodes/agent_crewai/services.json @@ -1,7 +1,7 @@ { "title": "CrewAI", "protocol": "agent_crewai://", - "classType": ["agent", "tool"], + "classType": ["agent", "tool", "crewai"], "capabilities": ["invoke"], "register": "filter", "node": "python", @@ -34,7 +34,13 @@ "profiles": { "default": { "agent_description": "", - "instructions": [] + "instructions": [], + "advanced_mode": false, + "role": "Assistant", + "task_description": "", + "goal": "", + "backstory": "", + "expected_output": "" } } }, @@ -58,20 +64,71 @@ "format": "textarea" } }, - "agent_crewai.default": { "object": "default", "properties": ["agent_description", "instructions"] }, - "agent_crewai.profile": { - "title": "Profile", + "advanced_mode": { + "type": "boolean", + "title": "Advanced Mode", + "description": "Expose CrewAI Agent and Task configuration directly.", + "default": false, + "enum": [ + [false, "Off"], + [true, "On"] + ], + "conditional": [ + { + "value": true, + "properties": ["agent_crewai.agent_config_header", "role", "goal", "backstory", "agent_crewai.task_config_header", "task_description", "expected_output"] + } + ] + }, + "agent_crewai.agent_config_header": { + "type": "null", + "title": "Agent Config", + "default": null + }, + "role": { + "type": "string", + "title": "Role", + "description": "Agent role name (e.g. 'Financial Analyst'). Maps to CrewAI Agent(role=...)." + }, + "goal": { + "type": "string", + "format": "textarea", + "title": "Goal", + "description": "What this agent is trying to achieve. Maps to CrewAI Agent(goal=...)." + }, + "backstory": { "type": "string", - "default": "default", - "enum": [["default", "Default"]], - "conditional": [{ "value": "default", "properties": ["agent_crewai.default"] }] + "format": "textarea", + "title": "Backstory", + "description": "Background context for this agent's persona. Maps to CrewAI Agent(backstory=...)." + }, + "agent_crewai.task_config_header": { + "type": "null", + "title": "Task Config", + "default": null + }, + "task_description": { + "type": "string", + "format": "textarea", + "title": "Task", + "description": "What this agent should do. If blank, the incoming question is used. Maps to CrewAI Task(description=...)." + }, + "expected_output": { + "type": "string", + "format": "textarea", + "title": "Expected Output", + "description": "Description of the expected output format. Maps to CrewAI Task(expected_output=...)." + }, + "agent_crewai.default": { + "object": "default", + "properties": ["agent_description", "instructions", "advanced_mode"] } }, "shape": [ { "section": "Pipe", "title": "CrewAI", - "properties": ["agent_description", "instructions"] + "properties": ["agent_crewai.default"] } ] } diff --git a/nodes/src/nodes/agent_crewai/services.manager.json b/nodes/src/nodes/agent_crewai/services.manager.json new file mode 100644 index 000000000..df3e014da --- /dev/null +++ b/nodes/src/nodes/agent_crewai/services.manager.json @@ -0,0 +1,92 @@ +{ + "title": "CrewAI Manager", + "protocol": "agent_crewai_manager://", + "classType": ["agent", "tool"], + "capabilities": ["invoke"], + "register": "filter", + "node": "python", + "path": "nodes.agent_crewai", + "prefix": "agent", + "description": ["Multi-agent manager using CrewAI hierarchical process.", "Fans out to connected CrewAI sub-agent nodes, assembles a Crew, and synthesizes their outputs.", "Can be invoked as a tool (`.run_agent`) for nested orchestration."], + "icon": "crewai.svg", + "invoke": { + "llm": { + "description": "LLM used by the manager manager agent", + "min": 1 + }, + "crewai": { + "description": "Connected CrewAI sub-agent nodes", + "min": 1 + } + }, + "lanes": { + "questions": ["answers"] + }, + "input": [ + { + "lane": "questions", + "output": [{ "lane": "answers" }] + } + ], + "preconfig": { + "default": "default", + "profiles": { + "default": { + "instructions": [], + "advanced_mode": false, + "goal": "", + "backstory": "" + } + } + }, + "fields": { + "instructions": { + "type": "array", + "title": "Instructions", + "description": "Additional instructions to guide the manager's delegation strategy.", + "items": { + "type": "string", + "format": "textarea" + } + }, + "advanced_mode": { + "type": "boolean", + "title": "Advanced Mode", + "description": "Expose CrewAI manager Agent configuration directly.", + "default": false, + "enum": [ + [false, "Off"], + [true, "On"] + ], + "conditional": [ + { + "value": true, + "properties": ["goal", "backstory"] + } + ] + }, + "goal": { + "type": "string", + "format": "textarea", + "title": "Manager Goal", + "description": "What the manager manager is trying to achieve. Maps to CrewAI Agent(goal=...)." + }, + "backstory": { + "type": "string", + "format": "textarea", + "title": "Manager Backstory", + "description": "Background context for the manager's persona. Maps to CrewAI Agent(backstory=...)." + }, + "agent_crewai_manager.default": { + "object": "default", + "properties": ["instructions", "advanced_mode"] + } + }, + "shape": [ + { + "section": "Pipe", + "title": "CrewAI Manager", + "properties": ["agent_crewai_manager.default"] + } + ] +} diff --git a/packages/server/engine-lib/rocketlib-python/lib/rocketlib/__init__.py b/packages/server/engine-lib/rocketlib-python/lib/rocketlib/__init__.py index fd657261f..a7bfc213a 100644 --- a/packages/server/engine-lib/rocketlib-python/lib/rocketlib/__init__.py +++ b/packages/server/engine-lib/rocketlib-python/lib/rocketlib/__init__.py @@ -72,6 +72,7 @@ from .types import IInvoke from .types import IInvokeLLM from .types import IInvokeTool +from .types import IInvokeCrew from .types import IJson from .types import OPEN_MODE from .types import PROTOCOL_CAPS @@ -110,6 +111,7 @@ 'IInvoke', 'IInvokeLLM', 'IInvokeTool', + 'IInvokeCrew', 'IJson', 'ILoader', 'isAppMonitor', diff --git a/packages/server/engine-lib/rocketlib-python/lib/rocketlib/types.py b/packages/server/engine-lib/rocketlib-python/lib/rocketlib/types.py index 5b7a3782b..11a237908 100644 --- a/packages/server/engine-lib/rocketlib-python/lib/rocketlib/types.py +++ b/packages/server/engine-lib/rocketlib-python/lib/rocketlib/types.py @@ -261,6 +261,39 @@ class Validate(BaseModel): model_config = ConfigDict(extra='allow') +class IInvokeCrew(IInvoke): + """ + Control-plane type for CrewAI sub-agent discovery (crewai.describe fan-out). + + Each sub-agent connected on the 'crewai' channel appends a DescribeResponse + to Describe.agents when the orchestrator fans out a Describe request. + """ + + op: str = 'crewai.describe' + model_config = ConfigDict(extra='allow') + + class Describe(BaseModel): + """Fan-out request: each connected sub-agent appends its descriptor.""" + + op: str = Field(default='crewai.describe', frozen=True) + agents: List[Any] = Field(default_factory=list) + model_config = ConfigDict(extra='allow') + + class DescribeResponse(BaseModel): + """Sub-agent descriptor returned in response to crewai.describe.""" + + op: str = Field(default='crewai.describe', frozen=True) + role: str + task_description: str + goal: str = '' + backstory: str = '' + expected_output: str = '' + instructions: List[str] = Field(default_factory=list) + node_id: str = '' # pSelf.instance.pipeType['id'] — used to filter sub-agents from tool list + invoke: Any = Field(default=None) # full pSelf IInstance — passed to AgentHostServices(d.invoke) + model_config = ConfigDict(extra='allow') + + class IJson(Impl_IJson): """ A wrapper class for IJson that provides utility methods for handling JSON-like structures.