Skip to content

Commit eae2235

Browse files
committed
feat: add A2A tool for invoking remote agents at runtime
1 parent f536077 commit eae2235

File tree

6 files changed

+347
-0
lines changed

6 files changed

+347
-0
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ dependencies = [
2323
"mcp==1.26.0",
2424
"langchain-mcp-adapters==0.2.1",
2525
"pillow>=12.1.1",
26+
"a2a-sdk>=0.2.0",
2627
]
2728

2829
classifiers = [

src/uipath_langchain/agent/tools/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Tool creation and management for LowCode agents."""
22

3+
from .a2a import create_a2a_agent_tools
34
from .context_tool import create_context_tool
45
from .escalation_tool import create_escalation_tool
56
from .extraction_tool import create_ixp_extraction_tool
@@ -18,6 +19,7 @@
1819
)
1920

2021
__all__ = [
22+
"create_a2a_agent_tools",
2123
"create_tools_from_resources",
2224
"create_tool_node",
2325
"create_context_tool",
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""A2A (Agent-to-Agent) tools."""
2+
3+
from .a2a_tool import create_a2a_agent_tools
4+
5+
__all__ = [
6+
"create_a2a_agent_tools",
7+
]
Lines changed: 313 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,313 @@
1+
"""A2A singleton tool — one tool per remote agent.
2+
3+
Each tool maintains conversation context (task_id/context_id) across calls
4+
using deterministic persistence via LangGraph graph state (tools_storage).
5+
6+
Authentication uses the UiPath SDK Bearer token, resolved lazily on first call.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
import asyncio
12+
import json
13+
from logging import getLogger
14+
from uuid import uuid4
15+
16+
import httpx
17+
from a2a.client import Client
18+
from a2a.types import (
19+
AgentCard,
20+
Message,
21+
Part,
22+
Role,
23+
Task,
24+
TaskArtifactUpdateEvent,
25+
TaskState,
26+
TextPart,
27+
)
28+
from langchain_core.messages import ToolCall, ToolMessage
29+
from langchain_core.tools import BaseTool
30+
from langgraph.types import Command
31+
from pydantic import BaseModel, Field
32+
from uipath._utils._ssl_context import get_httpx_client_kwargs
33+
from uipath.agent.models.agent import AgentA2aResourceConfig
34+
35+
from uipath_langchain.agent.react.types import AgentGraphState
36+
from uipath_langchain.agent.tools.base_uipath_structured_tool import (
37+
BaseUiPathStructuredTool,
38+
)
39+
from uipath_langchain.agent.tools.tool_node import (
40+
ToolWrapperMixin,
41+
ToolWrapperReturnType,
42+
)
43+
from uipath_langchain.agent.tools.utils import sanitize_tool_name
44+
45+
logger = getLogger(__name__)
46+
47+
48+
class A2aToolInput(BaseModel):
49+
"""Input schema for A2A agent tool."""
50+
51+
message: str = Field(description="The message to send to the remote agent.")
52+
53+
54+
class A2aStructuredToolWithWrapper(BaseUiPathStructuredTool, ToolWrapperMixin):
55+
pass
56+
57+
58+
def _extract_text(obj: Task | Message) -> str:
59+
"""Extract text content from a Task or Message response."""
60+
parts: list[Part] = []
61+
62+
if isinstance(obj, Message):
63+
parts = obj.parts or []
64+
elif isinstance(obj, Task):
65+
if obj.status and obj.status.state == TaskState.input_required:
66+
if obj.status.message:
67+
parts = obj.status.message.parts or []
68+
else:
69+
if obj.artifacts:
70+
for artifact in obj.artifacts:
71+
parts.extend(artifact.parts or [])
72+
if not parts and obj.status and obj.status.message:
73+
parts = obj.status.message.parts or []
74+
if not parts and obj.history:
75+
for msg in reversed(obj.history):
76+
if msg.role == Role.agent:
77+
parts = msg.parts or []
78+
break
79+
80+
texts = []
81+
for part in parts:
82+
if isinstance(part.root, TextPart):
83+
texts.append(part.root.text)
84+
return "\n".join(texts) if texts else ""
85+
86+
87+
def _format_response(text: str, state: str) -> str:
88+
"""Build a structured tool response the LLM can act on."""
89+
return json.dumps({"agent_response": text, "task_state": state})
90+
91+
92+
def _build_description(card: AgentCard) -> str:
93+
"""Build a tool description from an agent card."""
94+
parts = []
95+
if card.description:
96+
parts.append(card.description)
97+
if card.skills:
98+
for skill in card.skills:
99+
skill_desc = skill.name or ""
100+
if skill.description:
101+
skill_desc += f": {skill.description}"
102+
if skill_desc:
103+
parts.append(f"Skill: {skill_desc}")
104+
return " | ".join(parts) if parts else f"Remote A2A agent at {card.url}"
105+
106+
107+
def _resolve_a2a_url(config: AgentA2aResourceConfig) -> str:
108+
"""Resolve the A2A endpoint URL from the cached agent card."""
109+
if config.cached_agent_card and "url" in config.cached_agent_card:
110+
return config.cached_agent_card["url"]
111+
return ""
112+
113+
114+
async def create_a2a_agent_tools(
115+
resources: list[AgentA2aResourceConfig],
116+
) -> list[BaseTool]:
117+
"""Create A2A tools from a list of A2A resource configurations.
118+
119+
Each enabled A2A resource becomes a single tool representing the remote agent.
120+
Conversation context (task_id/context_id) is persisted in LangGraph graph state.
121+
122+
Args:
123+
resources: List of A2A resource configurations from agent.json.
124+
125+
Returns:
126+
List of BaseTool instances, one per enabled A2A resource.
127+
"""
128+
tools: list[BaseTool] = []
129+
130+
for resource in resources:
131+
if resource.is_enabled is False:
132+
logger.info("Skipping disabled A2A resource '%s'", resource.name)
133+
continue
134+
135+
logger.info("Creating A2A tool for resource '%s'", resource.name)
136+
tool = _create_a2a_tool(resource)
137+
tools.append(tool)
138+
139+
return tools
140+
141+
142+
async def _send_a2a_message(
143+
client: Client,
144+
a2a_url: str,
145+
*,
146+
message: str,
147+
task_id: str | None,
148+
context_id: str | None,
149+
) -> tuple[str, str, str | None, str | None]:
150+
"""Send a message to a remote A2A agent and return the response.
151+
152+
Returns:
153+
Tuple of (response_text, task_state, new_task_id, new_context_id).
154+
"""
155+
if task_id or context_id:
156+
logger.info(
157+
"A2A continue task=%s context=%s to %s", task_id, context_id, a2a_url
158+
)
159+
else:
160+
logger.info("A2A new message to %s", a2a_url)
161+
162+
a2a_message = Message(
163+
role=Role.user,
164+
parts=[Part(root=TextPart(text=message))],
165+
message_id=str(uuid4()),
166+
task_id=task_id,
167+
context_id=context_id,
168+
)
169+
170+
try:
171+
text = ""
172+
state = "unknown"
173+
new_task_id = task_id
174+
new_context_id = context_id
175+
176+
async for event in client.send_message(a2a_message):
177+
if isinstance(event, Message):
178+
text = _extract_text(event)
179+
new_context_id = event.context_id
180+
state = "completed"
181+
break
182+
else:
183+
task, update = event
184+
new_task_id = task.id
185+
new_context_id = task.context_id
186+
state = task.status.state.value if task.status else "unknown"
187+
if update is None:
188+
text = _extract_text(task)
189+
break
190+
elif isinstance(update, TaskArtifactUpdateEvent):
191+
for part in update.artifact.parts or []:
192+
if isinstance(part.root, TextPart):
193+
text += part.root.text
194+
195+
return (text or "No response received.", state, new_task_id, new_context_id)
196+
197+
except Exception as e:
198+
logger.exception("A2A request to %s failed", a2a_url)
199+
return (f"Error: {e}", "error", task_id, context_id)
200+
201+
202+
def _create_a2a_tool(config: AgentA2aResourceConfig) -> BaseTool:
203+
"""Create a single LangChain tool for A2A communication.
204+
205+
Conversation context (task_id/context_id) is persisted deterministically
206+
in LangGraph's graph state via tools_storage, ensuring reliable
207+
multi-turn conversations with the remote agent.
208+
"""
209+
if config.cached_agent_card:
210+
agent_card = AgentCard(**config.cached_agent_card)
211+
else:
212+
agent_card = AgentCard(
213+
url="",
214+
name=config.name,
215+
description=config.description or "",
216+
version="1.0.0",
217+
skills=[],
218+
capabilities={},
219+
default_input_modes=["text/plain"],
220+
default_output_modes=["text/plain"],
221+
)
222+
223+
raw_name = agent_card.name or config.name
224+
tool_name = sanitize_tool_name(raw_name)
225+
tool_description = _build_description(agent_card)
226+
a2a_url = _resolve_a2a_url(config)
227+
228+
_lock = asyncio.Lock()
229+
_client: Client | None = None
230+
_http_client: httpx.AsyncClient | None = None
231+
232+
async def _ensure_client() -> Client:
233+
nonlocal _client, _http_client
234+
if _client is None:
235+
async with _lock:
236+
if _client is None:
237+
from a2a.client import ClientConfig, ClientFactory
238+
from uipath.platform import UiPath
239+
240+
sdk = UiPath()
241+
client_kwargs = get_httpx_client_kwargs(
242+
headers={"Authorization": f"Bearer {sdk._config.secret}"},
243+
)
244+
_http_client = httpx.AsyncClient(**client_kwargs)
245+
_client = await ClientFactory.connect(
246+
a2a_url,
247+
client_config=ClientConfig(
248+
httpx_client=_http_client,
249+
streaming=False,
250+
),
251+
)
252+
return _client
253+
254+
metadata = {
255+
"tool_type": "a2a",
256+
"display_name": raw_name,
257+
"slug": config.slug,
258+
}
259+
260+
async def _send(*, message: str) -> str:
261+
client = await _ensure_client()
262+
text, state, _, _ = await _send_a2a_message(
263+
client, a2a_url, message=message, task_id=None, context_id=None
264+
)
265+
return _format_response(text, state)
266+
267+
async def _a2a_wrapper(
268+
tool: BaseTool,
269+
call: ToolCall,
270+
state: AgentGraphState,
271+
) -> ToolWrapperReturnType:
272+
prior = state.inner_state.tools_storage.get(tool.name) or {}
273+
task_id = prior.get("task_id")
274+
context_id = prior.get("context_id")
275+
276+
client = await _ensure_client()
277+
text, task_state, new_task_id, new_context_id = await _send_a2a_message(
278+
client,
279+
a2a_url,
280+
message=call["args"]["message"],
281+
task_id=task_id,
282+
context_id=context_id,
283+
)
284+
285+
return Command(
286+
update={
287+
"messages": [
288+
ToolMessage(
289+
content=_format_response(text, task_state),
290+
name=call["name"],
291+
tool_call_id=call["id"],
292+
)
293+
],
294+
"inner_state": {
295+
"tools_storage": {
296+
tool.name: {
297+
"task_id": new_task_id,
298+
"context_id": new_context_id,
299+
}
300+
}
301+
},
302+
}
303+
)
304+
305+
tool = A2aStructuredToolWithWrapper(
306+
name=tool_name,
307+
description=tool_description,
308+
coroutine=_send,
309+
args_schema=A2aToolInput,
310+
metadata=metadata,
311+
)
312+
tool.set_tool_wrappers(awrapper=_a2a_wrapper)
313+
return tool

src/uipath_langchain/agent/tools/tool_factory.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from langchain_core.language_models import BaseChatModel
66
from langchain_core.tools import BaseTool
77
from uipath.agent.models.agent import (
8+
AgentA2aResourceConfig,
89
AgentContextResourceConfig,
910
AgentEscalationResourceConfig,
1011
AgentIntegrationToolResourceConfig,
@@ -18,6 +19,7 @@
1819

1920
from uipath_langchain.chat.hitl import REQUIRE_CONVERSATIONAL_CONFIRMATION
2021

22+
from .a2a import create_a2a_agent_tools
2123
from .context_tool import create_context_tool
2224
from .escalation_tool import create_escalation_tool
2325
from .extraction_tool import create_ixp_extraction_tool
@@ -96,4 +98,7 @@ async def _build_tool_for_resource(
9698
elif isinstance(resource, AgentIxpVsEscalationResourceConfig):
9799
return create_ixp_escalation_tool(resource)
98100

101+
elif isinstance(resource, AgentA2aResourceConfig):
102+
return await create_a2a_agent_tools([resource])
103+
99104
return None

0 commit comments

Comments
 (0)