Skip to content

Commit b8c3dff

Browse files
committed
feat: add A2A tool for invoking remote agents at runtime
1 parent f536077 commit b8c3dff

6 files changed

Lines changed: 348 additions & 2 deletions

File tree

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-langchain"
3-
version = "0.9.24"
3+
version = "0.9.25"
44
description = "Python SDK that enables developers to build and deploy LangGraph agents to the UiPath Cloud Platform"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"
@@ -23,6 +23,7 @@ dependencies = [
2323
"mcp==1.26.0",
2424
"langchain-mcp-adapters==0.2.1",
2525
"pillow>=12.1.1",
26+
"a2a-sdk>=0.2.0",
2627
]
2728

2829
classifiers = [

src/uipath_langchain/agent/tools/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Tool creation and management for LowCode agents."""
22

3+
from .a2a import create_a2a_agent_tools
34
from .context_tool import create_context_tool
45
from .escalation_tool import create_escalation_tool
56
from .extraction_tool import create_ixp_extraction_tool
@@ -18,6 +19,7 @@
1819
)
1920

2021
__all__ = [
22+
"create_a2a_agent_tools",
2123
"create_tools_from_resources",
2224
"create_tool_node",
2325
"create_context_tool",
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""A2A (Agent-to-Agent) tools."""
2+
3+
from .a2a_tool import create_a2a_agent_tools
4+
5+
__all__ = [
6+
"create_a2a_agent_tools",
7+
]
Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
"""A2A singleton tool — one tool per remote agent.
2+
3+
Each tool maintains conversation context (task_id/context_id) across calls
4+
using deterministic persistence via LangGraph graph state (tools_storage).
5+
6+
Authentication uses the UiPath SDK Bearer token, resolved lazily on first call.
7+
"""
8+
9+
import asyncio
10+
import json
11+
from logging import getLogger
12+
from uuid import uuid4
13+
14+
import httpx
15+
from a2a.client import Client
16+
from a2a.types import (
17+
AgentCard,
18+
Message,
19+
Part,
20+
Role,
21+
Task,
22+
TaskArtifactUpdateEvent,
23+
TaskState,
24+
TextPart,
25+
)
26+
from langchain_core.messages import ToolCall, ToolMessage
27+
from langchain_core.tools import BaseTool
28+
from langgraph.types import Command
29+
from pydantic import BaseModel, Field
30+
from uipath._utils._ssl_context import get_httpx_client_kwargs
31+
from uipath.agent.models.agent import AgentA2aResourceConfig
32+
33+
from uipath_langchain.agent.react.types import AgentGraphState
34+
from uipath_langchain.agent.tools.base_uipath_structured_tool import (
35+
BaseUiPathStructuredTool,
36+
)
37+
from uipath_langchain.agent.tools.tool_node import (
38+
ToolWrapperMixin,
39+
ToolWrapperReturnType,
40+
)
41+
from uipath_langchain.agent.tools.utils import sanitize_tool_name
42+
43+
logger = getLogger(__name__)
44+
45+
46+
class A2aToolInput(BaseModel):
47+
"""Input schema for A2A agent tool."""
48+
49+
message: str = Field(description="The message to send to the remote agent.")
50+
51+
52+
class A2aStructuredToolWithWrapper(BaseUiPathStructuredTool, ToolWrapperMixin):
53+
pass
54+
55+
56+
def _extract_text(obj: Task | Message) -> str:
57+
"""Extract text content from a Task or Message response."""
58+
parts: list[Part] = []
59+
60+
if isinstance(obj, Message):
61+
parts = obj.parts or []
62+
elif isinstance(obj, Task):
63+
if obj.status and obj.status.state == TaskState.input_required:
64+
if obj.status.message:
65+
parts = obj.status.message.parts or []
66+
else:
67+
if obj.artifacts:
68+
for artifact in obj.artifacts:
69+
parts.extend(artifact.parts or [])
70+
if not parts and obj.status and obj.status.message:
71+
parts = obj.status.message.parts or []
72+
if not parts and obj.history:
73+
for msg in reversed(obj.history):
74+
if msg.role == Role.agent:
75+
parts = msg.parts or []
76+
break
77+
78+
texts = []
79+
for part in parts:
80+
if isinstance(part.root, TextPart):
81+
texts.append(part.root.text)
82+
return "\n".join(texts) if texts else ""
83+
84+
85+
def _format_response(text: str, state: str) -> str:
86+
"""Build a structured tool response the LLM can act on."""
87+
return json.dumps({"agent_response": text, "task_state": state})
88+
89+
90+
def _build_description(card: AgentCard) -> str:
91+
"""Build a tool description from an agent card."""
92+
parts = []
93+
if card.description:
94+
parts.append(card.description)
95+
if card.skills:
96+
for skill in card.skills:
97+
skill_desc = skill.name or ""
98+
if skill.description:
99+
skill_desc += f": {skill.description}"
100+
if skill_desc:
101+
parts.append(f"Skill: {skill_desc}")
102+
return " | ".join(parts) if parts else f"Remote A2A agent at {card.url}"
103+
104+
105+
def _resolve_a2a_url(config: AgentA2aResourceConfig) -> str:
106+
"""Resolve the A2A endpoint URL from the cached agent card."""
107+
if config.cached_agent_card and "url" in config.cached_agent_card:
108+
return config.cached_agent_card["url"]
109+
return ""
110+
111+
112+
async def create_a2a_agent_tools(
113+
resources: list[AgentA2aResourceConfig],
114+
) -> list[BaseTool]:
115+
"""Create A2A tools from a list of A2A resource configurations.
116+
117+
Each enabled A2A resource becomes a single tool representing the remote agent.
118+
Conversation context (task_id/context_id) is persisted in LangGraph graph state.
119+
120+
Args:
121+
resources: List of A2A resource configurations from agent.json.
122+
123+
Returns:
124+
List of BaseTool instances, one per enabled A2A resource.
125+
"""
126+
tools: list[BaseTool] = []
127+
128+
for resource in resources:
129+
if resource.is_enabled is False:
130+
logger.info("Skipping disabled A2A resource '%s'", resource.name)
131+
continue
132+
133+
logger.info("Creating A2A tool for resource '%s'", resource.name)
134+
tool = _create_a2a_tool(resource)
135+
tools.append(tool)
136+
137+
return tools
138+
139+
140+
async def _send_a2a_message(
141+
client: Client,
142+
a2a_url: str,
143+
*,
144+
message: str,
145+
task_id: str | None,
146+
context_id: str | None,
147+
) -> tuple[str, str, str | None, str | None]:
148+
"""Send a message to a remote A2A agent and return the response.
149+
150+
Returns:
151+
Tuple of (response_text, task_state, new_task_id, new_context_id).
152+
"""
153+
if task_id or context_id:
154+
logger.info(
155+
"A2A continue task=%s context=%s to %s", task_id, context_id, a2a_url
156+
)
157+
else:
158+
logger.info("A2A new message to %s", a2a_url)
159+
160+
a2a_message = Message(
161+
role=Role.user,
162+
parts=[Part(root=TextPart(text=message))],
163+
message_id=str(uuid4()),
164+
task_id=task_id,
165+
context_id=context_id,
166+
)
167+
168+
try:
169+
text = ""
170+
state = "unknown"
171+
new_task_id = task_id
172+
new_context_id = context_id
173+
174+
async for event in client.send_message(a2a_message):
175+
if isinstance(event, Message):
176+
text = _extract_text(event)
177+
new_context_id = event.context_id
178+
state = "completed"
179+
break
180+
else:
181+
task, update = event
182+
new_task_id = task.id
183+
new_context_id = task.context_id
184+
state = task.status.state.value if task.status else "unknown"
185+
if update is None:
186+
text = _extract_text(task)
187+
break
188+
elif isinstance(update, TaskArtifactUpdateEvent):
189+
for part in update.artifact.parts or []:
190+
if isinstance(part.root, TextPart):
191+
text += part.root.text
192+
193+
return (text or "No response received.", state, new_task_id, new_context_id)
194+
195+
except Exception as e:
196+
logger.exception("A2A request to %s failed", a2a_url)
197+
return (f"Error: {e}", "error", task_id, context_id)
198+
199+
200+
def _create_a2a_tool(config: AgentA2aResourceConfig) -> BaseTool:
201+
"""Create a single LangChain tool for A2A communication.
202+
203+
Conversation context (task_id/context_id) is persisted deterministically
204+
in LangGraph's graph state via tools_storage, ensuring reliable
205+
multi-turn conversations with the remote agent.
206+
"""
207+
if config.cached_agent_card:
208+
agent_card = AgentCard(**config.cached_agent_card)
209+
else:
210+
agent_card = AgentCard(
211+
url="",
212+
name=config.name,
213+
description=config.description or "",
214+
version="1.0.0",
215+
skills=[],
216+
capabilities={},
217+
default_input_modes=["text/plain"],
218+
default_output_modes=["text/plain"],
219+
)
220+
221+
raw_name = agent_card.name or config.name
222+
tool_name = sanitize_tool_name(raw_name)
223+
tool_description = _build_description(agent_card)
224+
a2a_url = _resolve_a2a_url(config)
225+
226+
_lock = asyncio.Lock()
227+
_client: Client | None = None
228+
_http_client: httpx.AsyncClient | None = None
229+
230+
async def _ensure_client() -> Client:
231+
nonlocal _client, _http_client
232+
if _client is None:
233+
async with _lock:
234+
if _client is None:
235+
from a2a.client import ClientConfig, ClientFactory
236+
from uipath.platform import UiPath
237+
238+
sdk = UiPath()
239+
client_kwargs = get_httpx_client_kwargs(
240+
headers={"Authorization": f"Bearer {sdk._config.secret}"},
241+
)
242+
client_kwargs["timeout"] = httpx.Timeout(300.0, connect=10.0)
243+
_http_client = httpx.AsyncClient(**client_kwargs)
244+
_client = await ClientFactory.connect(
245+
agent_card,
246+
client_config=ClientConfig(
247+
httpx_client=_http_client,
248+
streaming=False,
249+
),
250+
)
251+
return _client
252+
253+
metadata = {
254+
"tool_type": "a2a",
255+
"display_name": raw_name,
256+
"slug": config.slug,
257+
}
258+
259+
async def _send(*, message: str) -> str:
260+
client = await _ensure_client()
261+
text, state, _, _ = await _send_a2a_message(
262+
client, a2a_url, message=message, task_id=None, context_id=None
263+
)
264+
return _format_response(text, state)
265+
266+
async def _a2a_wrapper(
267+
tool: BaseTool,
268+
call: ToolCall,
269+
state: AgentGraphState,
270+
) -> ToolWrapperReturnType:
271+
prior = state.inner_state.tools_storage.get(tool.name) or {}
272+
task_id = prior.get("task_id")
273+
context_id = prior.get("context_id")
274+
275+
client = await _ensure_client()
276+
text, task_state, new_task_id, new_context_id = await _send_a2a_message(
277+
client,
278+
a2a_url,
279+
message=call["args"]["message"],
280+
task_id=task_id,
281+
context_id=context_id,
282+
)
283+
284+
return Command(
285+
update={
286+
"messages": [
287+
ToolMessage(
288+
content=_format_response(text, task_state),
289+
name=call["name"],
290+
tool_call_id=call["id"],
291+
)
292+
],
293+
"inner_state": {
294+
"tools_storage": {
295+
tool.name: {
296+
"task_id": new_task_id,
297+
"context_id": new_context_id,
298+
}
299+
}
300+
},
301+
}
302+
)
303+
304+
tool = A2aStructuredToolWithWrapper(
305+
name=tool_name,
306+
description=tool_description,
307+
coroutine=_send,
308+
args_schema=A2aToolInput,
309+
metadata=metadata,
310+
)
311+
tool.set_tool_wrappers(awrapper=_a2a_wrapper)
312+
return tool

src/uipath_langchain/agent/tools/tool_factory.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from langchain_core.language_models import BaseChatModel
66
from langchain_core.tools import BaseTool
77
from uipath.agent.models.agent import (
8+
AgentA2aResourceConfig,
89
AgentContextResourceConfig,
910
AgentEscalationResourceConfig,
1011
AgentIntegrationToolResourceConfig,
@@ -18,6 +19,7 @@
1819

1920
from uipath_langchain.chat.hitl import REQUIRE_CONVERSATIONAL_CONFIRMATION
2021

22+
from .a2a import create_a2a_agent_tools
2123
from .context_tool import create_context_tool
2224
from .escalation_tool import create_escalation_tool
2325
from .extraction_tool import create_ixp_extraction_tool
@@ -96,4 +98,7 @@ async def _build_tool_for_resource(
9698
elif isinstance(resource, AgentIxpVsEscalationResourceConfig):
9799
return create_ixp_escalation_tool(resource)
98100

101+
elif isinstance(resource, AgentA2aResourceConfig):
102+
return await create_a2a_agent_tools([resource])
103+
99104
return None

0 commit comments

Comments
 (0)