|
| 1 | +"""A2A singleton tool — one tool per remote agent. |
| 2 | +
|
| 3 | +Each tool maintains conversation context (task_id/context_id) across calls |
| 4 | +using deterministic persistence via LangGraph graph state (tools_storage). |
| 5 | +
|
| 6 | +Authentication uses the UiPath SDK Bearer token, resolved lazily on first call. |
| 7 | +""" |
| 8 | + |
| 9 | +from __future__ import annotations |
| 10 | + |
| 11 | +import asyncio |
| 12 | +import json |
| 13 | +from logging import getLogger |
| 14 | +from uuid import uuid4 |
| 15 | + |
| 16 | +import httpx |
| 17 | +from a2a.client import Client |
| 18 | +from a2a.types import ( |
| 19 | + AgentCard, |
| 20 | + Message, |
| 21 | + Part, |
| 22 | + Role, |
| 23 | + Task, |
| 24 | + TaskArtifactUpdateEvent, |
| 25 | + TaskState, |
| 26 | + TextPart, |
| 27 | +) |
| 28 | +from langchain_core.messages import ToolCall, ToolMessage |
| 29 | +from langchain_core.tools import BaseTool |
| 30 | +from langgraph.types import Command |
| 31 | +from pydantic import BaseModel, Field |
| 32 | +from uipath.agent.models.agent import AgentA2aResourceConfig |
| 33 | + |
| 34 | +from uipath_langchain.agent.react.types import AgentGraphState |
| 35 | +from uipath_langchain.agent.tools.base_uipath_structured_tool import ( |
| 36 | + BaseUiPathStructuredTool, |
| 37 | +) |
| 38 | +from uipath_langchain.agent.tools.tool_node import ( |
| 39 | + ToolWrapperMixin, |
| 40 | + ToolWrapperReturnType, |
| 41 | +) |
| 42 | +from uipath_langchain.agent.tools.utils import sanitize_tool_name |
| 43 | + |
| 44 | +logger = getLogger(__name__) |
| 45 | + |
| 46 | + |
| 47 | +class A2aToolInput(BaseModel): |
| 48 | + """Input schema for A2A agent tool.""" |
| 49 | + |
| 50 | + message: str = Field(description="The message to send to the remote agent.") |
| 51 | + |
| 52 | + |
| 53 | +class A2aStructuredToolWithWrapper(BaseUiPathStructuredTool, ToolWrapperMixin): |
| 54 | + pass |
| 55 | + |
| 56 | + |
| 57 | +def _extract_text(obj: Task | Message) -> str: |
| 58 | + """Extract text content from a Task or Message response.""" |
| 59 | + parts: list[Part] = [] |
| 60 | + |
| 61 | + if isinstance(obj, Message): |
| 62 | + parts = obj.parts or [] |
| 63 | + elif isinstance(obj, Task): |
| 64 | + if obj.status and obj.status.state == TaskState.input_required: |
| 65 | + if obj.status.message: |
| 66 | + parts = obj.status.message.parts or [] |
| 67 | + else: |
| 68 | + if obj.artifacts: |
| 69 | + for artifact in obj.artifacts: |
| 70 | + parts.extend(artifact.parts or []) |
| 71 | + if not parts and obj.status and obj.status.message: |
| 72 | + parts = obj.status.message.parts or [] |
| 73 | + if not parts and obj.history: |
| 74 | + for msg in reversed(obj.history): |
| 75 | + if msg.role == Role.agent: |
| 76 | + parts = msg.parts or [] |
| 77 | + break |
| 78 | + |
| 79 | + texts = [] |
| 80 | + for part in parts: |
| 81 | + if isinstance(part.root, TextPart): |
| 82 | + texts.append(part.root.text) |
| 83 | + return "\n".join(texts) if texts else "" |
| 84 | + |
| 85 | + |
| 86 | +def _format_response(text: str, state: str) -> str: |
| 87 | + """Build a structured tool response the LLM can act on.""" |
| 88 | + return json.dumps({"agent_response": text, "task_state": state}) |
| 89 | + |
| 90 | + |
| 91 | +def _build_description(card: AgentCard) -> str: |
| 92 | + """Build a tool description from an agent card.""" |
| 93 | + parts = [] |
| 94 | + if card.description: |
| 95 | + parts.append(card.description) |
| 96 | + if card.skills: |
| 97 | + for skill in card.skills: |
| 98 | + skill_desc = skill.name or "" |
| 99 | + if skill.description: |
| 100 | + skill_desc += f": {skill.description}" |
| 101 | + if skill_desc: |
| 102 | + parts.append(f"Skill: {skill_desc}") |
| 103 | + return " | ".join(parts) if parts else f"Remote A2A agent at {card.url}" |
| 104 | + |
| 105 | + |
| 106 | +def _resolve_a2a_url(config: AgentA2aResourceConfig) -> str: |
| 107 | + """Resolve the A2A endpoint URL from config.""" |
| 108 | + a2a_url = getattr(config, "a2a_url", None) |
| 109 | + if a2a_url: |
| 110 | + return a2a_url |
| 111 | + return config.agent_card_url.replace("/.well-known/agent-card.json", "") |
| 112 | + |
| 113 | + |
| 114 | +async def create_a2a_agent_tools( |
| 115 | + resources: list[AgentA2aResourceConfig], |
| 116 | +) -> list[BaseTool]: |
| 117 | + """Create A2A tools from a list of A2A resource configurations. |
| 118 | +
|
| 119 | + Each enabled A2A resource becomes a single tool representing the remote agent. |
| 120 | + Conversation context (task_id/context_id) is persisted in LangGraph graph state. |
| 121 | +
|
| 122 | + Args: |
| 123 | + resources: List of A2A resource configurations from agent.json. |
| 124 | +
|
| 125 | + Returns: |
| 126 | + List of BaseTool instances, one per enabled A2A resource. |
| 127 | + """ |
| 128 | + tools: list[BaseTool] = [] |
| 129 | + |
| 130 | + for resource in resources: |
| 131 | + if resource.is_enabled is False: |
| 132 | + logger.info("Skipping disabled A2A resource '%s'", resource.name) |
| 133 | + continue |
| 134 | + if resource.is_active is False: |
| 135 | + logger.info("Skipping inactive A2A resource '%s'", resource.name) |
| 136 | + continue |
| 137 | + |
| 138 | + logger.info("Creating A2A tool for resource '%s'", resource.name) |
| 139 | + tool = _create_a2a_tool(resource) |
| 140 | + tools.append(tool) |
| 141 | + |
| 142 | + return tools |
| 143 | + |
| 144 | + |
| 145 | +async def _send_a2a_message( |
| 146 | + client: Client, |
| 147 | + a2a_url: str, |
| 148 | + *, |
| 149 | + message: str, |
| 150 | + task_id: str | None, |
| 151 | + context_id: str | None, |
| 152 | +) -> tuple[str, str, str | None, str | None]: |
| 153 | + """Send a message to a remote A2A agent and return the response. |
| 154 | +
|
| 155 | + Returns: |
| 156 | + Tuple of (response_text, task_state, new_task_id, new_context_id). |
| 157 | + """ |
| 158 | + if task_id or context_id: |
| 159 | + logger.info( |
| 160 | + "A2A continue task=%s context=%s to %s", task_id, context_id, a2a_url |
| 161 | + ) |
| 162 | + else: |
| 163 | + logger.info("A2A new message to %s", a2a_url) |
| 164 | + |
| 165 | + a2a_message = Message( |
| 166 | + role=Role.user, |
| 167 | + parts=[Part(root=TextPart(text=message))], |
| 168 | + message_id=str(uuid4()), |
| 169 | + task_id=task_id, |
| 170 | + context_id=context_id, |
| 171 | + ) |
| 172 | + |
| 173 | + try: |
| 174 | + text = "" |
| 175 | + state = "unknown" |
| 176 | + new_task_id = task_id |
| 177 | + new_context_id = context_id |
| 178 | + |
| 179 | + async for event in client.send_message(a2a_message): |
| 180 | + if isinstance(event, Message): |
| 181 | + text = _extract_text(event) |
| 182 | + new_context_id = event.context_id |
| 183 | + state = "completed" |
| 184 | + break |
| 185 | + else: |
| 186 | + task, update = event |
| 187 | + new_task_id = task.id |
| 188 | + new_context_id = task.context_id |
| 189 | + state = task.status.state.value if task.status else "unknown" |
| 190 | + if update is None: |
| 191 | + text = _extract_text(task) |
| 192 | + break |
| 193 | + elif isinstance(update, TaskArtifactUpdateEvent): |
| 194 | + for part in update.artifact.parts or []: |
| 195 | + if isinstance(part.root, TextPart): |
| 196 | + text += part.root.text |
| 197 | + |
| 198 | + return (text or "No response received.", state, new_task_id, new_context_id) |
| 199 | + |
| 200 | + except Exception as e: |
| 201 | + logger.exception("A2A request to %s failed", a2a_url) |
| 202 | + return (f"Error: {e}", "error", task_id, context_id) |
| 203 | + |
| 204 | + |
| 205 | +def _create_a2a_tool(config: AgentA2aResourceConfig) -> BaseTool: |
| 206 | + """Create a single LangChain tool for A2A communication. |
| 207 | +
|
| 208 | + Conversation context (task_id/context_id) is persisted deterministically |
| 209 | + in LangGraph's graph state via tools_storage, ensuring reliable |
| 210 | + multi-turn conversations with the remote agent. |
| 211 | + """ |
| 212 | + if config.cached_agent_card: |
| 213 | + agent_card = AgentCard(**config.cached_agent_card) |
| 214 | + else: |
| 215 | + agent_card = AgentCard( |
| 216 | + url=config.agent_card_url, |
| 217 | + name=config.name, |
| 218 | + description=config.description or "", |
| 219 | + version="1.0.0", |
| 220 | + skills=[], |
| 221 | + capabilities={}, |
| 222 | + default_input_modes=["text/plain"], |
| 223 | + default_output_modes=["text/plain"], |
| 224 | + ) |
| 225 | + |
| 226 | + raw_name = agent_card.name or config.name |
| 227 | + tool_name = sanitize_tool_name(raw_name) |
| 228 | + tool_description = _build_description(agent_card) |
| 229 | + a2a_url = _resolve_a2a_url(config) |
| 230 | + |
| 231 | + _lock = asyncio.Lock() |
| 232 | + _client: Client | None = None |
| 233 | + _http_client: httpx.AsyncClient | None = None |
| 234 | + |
| 235 | + async def _ensure_client() -> Client: |
| 236 | + nonlocal _client, _http_client |
| 237 | + if _client is None: |
| 238 | + async with _lock: |
| 239 | + if _client is None: |
| 240 | + from a2a.client import ClientConfig, ClientFactory |
| 241 | + from uipath.platform import UiPath |
| 242 | + |
| 243 | + sdk = UiPath() |
| 244 | + _http_client = httpx.AsyncClient( |
| 245 | + timeout=120, |
| 246 | + headers={"Authorization": f"Bearer {sdk._config.secret}"}, |
| 247 | + ) |
| 248 | + _client = await ClientFactory.connect( |
| 249 | + a2a_url, |
| 250 | + client_config=ClientConfig( |
| 251 | + httpx_client=_http_client, |
| 252 | + streaming=False, |
| 253 | + ), |
| 254 | + ) |
| 255 | + return _client |
| 256 | + |
| 257 | + metadata = { |
| 258 | + "tool_type": "a2a", |
| 259 | + "display_name": raw_name, |
| 260 | + "slug": config.slug, |
| 261 | + } |
| 262 | + |
| 263 | + async def _send(*, message: str) -> str: |
| 264 | + client = await _ensure_client() |
| 265 | + text, state, _, _ = await _send_a2a_message( |
| 266 | + client, a2a_url, message=message, task_id=None, context_id=None |
| 267 | + ) |
| 268 | + return _format_response(text, state) |
| 269 | + |
| 270 | + async def _a2a_wrapper( |
| 271 | + tool: BaseTool, |
| 272 | + call: ToolCall, |
| 273 | + state: AgentGraphState, |
| 274 | + ) -> ToolWrapperReturnType: |
| 275 | + prior = state.inner_state.tools_storage.get(tool.name) or {} |
| 276 | + task_id = prior.get("task_id") |
| 277 | + context_id = prior.get("context_id") |
| 278 | + |
| 279 | + client = await _ensure_client() |
| 280 | + text, task_state, new_task_id, new_context_id = await _send_a2a_message( |
| 281 | + client, |
| 282 | + a2a_url, |
| 283 | + message=call["args"]["message"], |
| 284 | + task_id=task_id, |
| 285 | + context_id=context_id, |
| 286 | + ) |
| 287 | + |
| 288 | + return Command( |
| 289 | + update={ |
| 290 | + "messages": [ |
| 291 | + ToolMessage( |
| 292 | + content=_format_response(text, task_state), |
| 293 | + name=call["name"], |
| 294 | + tool_call_id=call["id"], |
| 295 | + ) |
| 296 | + ], |
| 297 | + "inner_state": { |
| 298 | + "tools_storage": { |
| 299 | + tool.name: { |
| 300 | + "task_id": new_task_id, |
| 301 | + "context_id": new_context_id, |
| 302 | + } |
| 303 | + } |
| 304 | + }, |
| 305 | + } |
| 306 | + ) |
| 307 | + |
| 308 | + tool = A2aStructuredToolWithWrapper( |
| 309 | + name=tool_name, |
| 310 | + description=tool_description, |
| 311 | + coroutine=_send, |
| 312 | + args_schema=A2aToolInput, |
| 313 | + metadata=metadata, |
| 314 | + ) |
| 315 | + tool.set_tool_wrappers(awrapper=_a2a_wrapper) |
| 316 | + return tool |
0 commit comments