Skip to content

Commit d500945

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
feat: add A2A support in Agent Engine
PiperOrigin-RevId: 805046180
1 parent e3a2ebe commit d500945

9 files changed

Lines changed: 698 additions & 8 deletions

File tree

setup.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@
142142
"google-adk >= 1.0.0, < 2.0.0",
143143
]
144144

145+
a2a_extra_require = [
146+
"a2a-sdk >= 0.3.4",
147+
]
148+
145149
reasoning_engine_extra_require = [
146150
"cloudpickle >= 3.0, < 4.0",
147151
"google-cloud-trace < 2",
@@ -325,6 +329,7 @@
325329
"ray": ray_extra_require,
326330
"ray_testing": ray_testing_extra_require,
327331
"adk": adk_extra_require,
332+
"a2a": a2a_extra_require,
328333
"reasoningengine": reasoning_engine_extra_require,
329334
"agent_engines": agent_engines_extra_require,
330335
"evaluation": evaluation_extra_require,

tests/unit/vertex_langchain/test_agent_engines.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3482,7 +3482,7 @@ def test_update_class_methods_spec_with_registered_operation_not_found(self):
34823482
"register the API methods: "
34833483
"https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/develop/custom#custom-methods. "
34843484
"Error: {Unsupported api mode: `UNKNOWN_API_MODE`, "
3485-
"Supported modes are: ``, `async`, `async_stream`, "
3485+
"Supported modes are: ``, `a2a_extension`, `async`, `async_stream`, "
34863486
"`bidi_stream`, `stream`.}"
34873487
),
34883488
),

tests/unit/vertexai/genai/test_agent_engines.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2073,7 +2073,7 @@ def test_update_agent_engine_description(self, mock_await_operation):
20732073
"register the API methods: "
20742074
"https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/develop/custom#custom-methods. "
20752075
"Error: {Unsupported api mode: `UNKNOWN_API_MODE`, "
2076-
"Supported modes are: ``, `async`, `async_stream`, `stream`.}"
2076+
"Supported modes are: ``, `a2a_extension`, `async`, `async_stream`, `stream`.}"
20772077
),
20782078
),
20792079
],

vertexai/_genai/_agent_engines_utils.py

Lines changed: 179 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
Union,
4545
)
4646

47+
import httpx
48+
4749
import proto
4850

4951
from google.api_core import exceptions
@@ -104,6 +106,32 @@
104106
Session = Any
105107

106108

109+
try:
110+
from a2a.types import (
111+
AgentCard,
112+
TransportProtocol,
113+
Message,
114+
TaskIdParams,
115+
TaskQueryParams,
116+
)
117+
from a2a.client import ClientConfig, ClientFactory
118+
119+
AgentCard = AgentCard
120+
TransportProtocol = TransportProtocol
121+
Message = Message
122+
ClientConfig = ClientConfig
123+
ClientFactory = ClientFactory
124+
TaskIdParams = TaskIdParams
125+
TaskQueryParams = TaskQueryParams
126+
except (ImportError, AttributeError):
127+
AgentCard = None
128+
TransportProtocol = None
129+
Message = None
130+
ClientConfig = None
131+
ClientFactory = None
132+
TaskIdParams = None
133+
TaskQueryParams = None
134+
107135
_ACTIONS_KEY = "actions"
108136
_ACTION_APPEND = "append"
109137
_AGENT_FRAMEWORK_ATTR = "agent_framework"
@@ -148,6 +176,8 @@
148176
_REQUIREMENTS_FILE = "requirements.txt"
149177
_STANDARD_API_MODE = ""
150178
_STREAM_API_MODE = "stream"
179+
_A2A_EXTENSION_MODE = "a2a_extension"
180+
_A2A_AGENT_CARD = "a2a_agent_card"
151181
_WARNINGS_KEY = "warnings"
152182
_WARNING_MISSING = "missing"
153183
_WARNING_INCOMPATIBLE = "incompatible"
@@ -506,11 +536,32 @@ def _generate_class_methods_spec_or_raise(
506536

507537
class_method = _to_proto(schema_dict)
508538
class_method[_MODE_KEY_IN_SCHEMA] = mode
539+
if hasattr(agent, "agent_card"):
540+
class_method[_A2A_AGENT_CARD] = getattr(
541+
agent, "agent_card"
542+
).model_dump_json()
509543
class_methods_spec.append(class_method)
510544

511545
return class_methods_spec
512546

513547

548+
def _is_pydantic_serializable(param: inspect.Parameter) -> bool:
549+
"""Checks if the parameter is pydantic serializable."""
550+
551+
if param.annotation == inspect.Parameter.empty:
552+
return True
553+
554+
if isinstance(param.annotation, str):
555+
return False
556+
557+
pydantic = _import_pydantic_or_raise()
558+
try:
559+
pydantic.TypeAdapter(param.annotation)
560+
return True
561+
except Exception:
562+
return False
563+
564+
514565
def _generate_schema(
515566
f: Callable[..., Any],
516567
*,
@@ -572,7 +623,7 @@ def _generate_schema(
572623
)
573624
# For a bidi endpoint, it requires an asyncio.Queue as the input, but
574625
# it is not JSON serializable. We hence exclude it from the schema.
575-
and param.annotation != asyncio.Queue
626+
and param.annotation != asyncio.Queue and _is_pydantic_serializable(param)
576627
}
577628
parameters = pydantic.create_model(f.__name__, **fields_dict).schema()
578629
# Postprocessing
@@ -890,6 +941,7 @@ def _register_api_methods_or_raise(
890941
_ASYNC_API_MODE: _wrap_async_query_operation,
891942
_STREAM_API_MODE: _wrap_stream_query_operation,
892943
_ASYNC_STREAM_API_MODE: _wrap_async_stream_query_operation,
944+
_A2A_EXTENSION_MODE: _wrap_a2a_operation,
893945
}
894946
if isinstance(wrap_operation_fn, dict) and api_mode in wrap_operation_fn:
895947
# Override the default function with user-specified function if it exists.
@@ -906,7 +958,13 @@ def _register_api_methods_or_raise(
906958
)
907959

908960
# Bind the method to the object.
909-
method = _wrap_operation(method_name=method_name) # type: ignore[call-arg]
961+
if api_mode == _A2A_EXTENSION_MODE:
962+
agent_card = operation_schema.get(_A2A_AGENT_CARD)
963+
method = _wrap_operation(
964+
method_name=method_name, agent_card=agent_card
965+
) # type: ignore[call-arg]
966+
else:
967+
method = _wrap_operation(method_name=method_name) # type: ignore[call-arg]
910968
method.__name__ = method_name
911969
if method_description and isinstance(method_description, str):
912970
method.__doc__ = method_description
@@ -1522,6 +1580,125 @@ async def _method(self: genai_types.AgentEngine, **kwargs) -> AsyncIterator[Any]
15221580
return _method
15231581

15241582

1583+
def _wrap_a2a_operation(method_name: str, agent_card: str) -> Callable[..., list]:
1584+
"""Wraps an Agent Engine method, creating a callable for A2A API.
1585+
1586+
Args:
1587+
method_name: The name of the Agent Engine method to call.
1588+
agent_card: The agent card to use for the A2A API call.
1589+
Example:
1590+
{'additionalInterfaces': None,
1591+
'capabilities': {'extensions': None,
1592+
'pushNotifications': None,
1593+
'stateTransitionHistory': None,
1594+
'streaming': False},
1595+
'defaultInputModes': ['text'],
1596+
'defaultOutputModes': ['text'],
1597+
'description': (
1598+
'A helpful assistant agent that can answer questions.'
1599+
),
1600+
'documentationUrl': None,
1601+
'iconUrl': None,
1602+
'name': 'Q&A Agent',
1603+
'preferredTransport': 'JSONRPC',
1604+
'protocolVersion': '0.3.0',
1605+
'provider': None,
1606+
'security': None,
1607+
'securitySchemes': None,
1608+
'signatures': None,
1609+
'skills': [{
1610+
'description': (
1611+
'A helpful assistant agent that can answer questions.'
1612+
),
1613+
'examples': ['Who is leading 2025 F1 Standings?',
1614+
'Where can i find an active volcano?'],
1615+
'id': 'question_answer',
1616+
'inputModes': None,
1617+
'name': 'Q&A Agent',
1618+
'outputModes': None,
1619+
'security': None,
1620+
'tags': ['Question-Answer']}],
1621+
'supportsAuthenticatedExtendedCard': True,
1622+
'url': 'http://localhost:8080/',
1623+
'version': '1.0.0'}
1624+
Returns:
1625+
A callable object that executes the method on the Agent Engine via
1626+
the A2A API.
1627+
"""
1628+
1629+
async def _method(self, **kwargs) -> Any:
1630+
"""Wraps an Agent Engine method, creating a callable for A2A API."""
1631+
if not self.api_client:
1632+
raise ValueError("api_client is not initialized.")
1633+
if not self.api_resource:
1634+
raise ValueError("api_resource is not initialized.")
1635+
a2a_agent_card = AgentCard(**json.loads(agent_card))
1636+
# A2A + AE integration currently only supports Rest API.
1637+
if (
1638+
a2a_agent_card.preferred_transport
1639+
and a2a_agent_card.preferred_transport != TransportProtocol.http_json
1640+
):
1641+
raise ValueError(
1642+
"Only HTTP+JSON is supported for preferred transport on agent card "
1643+
)
1644+
1645+
# Set preferred transport to HTTP+JSON if not set.
1646+
if not hasattr(a2a_agent_card, "preferred_transport"):
1647+
a2a_agent_card.preferred_transport = TransportProtocol.http_json
1648+
1649+
# AE cannot support streaming yet. Turn off streaming for now.
1650+
if a2a_agent_card.capabilities and a2a_agent_card.capabilities.streaming:
1651+
raise ValueError(
1652+
"Streaming is not supported in Agent Engine, please change "
1653+
"a2a_agent_card.capabilities.streaming to False."
1654+
)
1655+
1656+
if not hasattr(a2a_agent_card.capabilities, "streaming"):
1657+
a2a_agent_card.capabilities.streaming = False
1658+
1659+
# agent_card is set on the class_methods before set_up is invoked.
1660+
# Ensure that the agent_card url is set correctly before the client is created.
1661+
base_url = self.api_client._api_client._http_options.base_url.rstrip("/")
1662+
api_version = self.api_client._api_client._http_options.api_version
1663+
a2a_agent_card.url = f"{base_url}/{api_version}/{self.api_resource.name}/a2a"
1664+
1665+
# Using a2a client, inject the auth token from the global config.
1666+
config = ClientConfig(
1667+
supported_transports=[
1668+
TransportProtocol.http_json,
1669+
],
1670+
use_client_preference=True,
1671+
httpx_client=httpx.AsyncClient(
1672+
headers={
1673+
"Authorization": (
1674+
f"Bearer {self.api_client._api_client._credentials.token}"
1675+
)
1676+
}
1677+
),
1678+
)
1679+
factory = ClientFactory(config)
1680+
client = factory.create(a2a_agent_card)
1681+
1682+
if method_name == "on_message_send":
1683+
response = client.send_message(Message(**kwargs))
1684+
chunks = []
1685+
async for chunk in response:
1686+
chunks.append(chunk)
1687+
return chunks
1688+
elif method_name == "on_get_task":
1689+
response = await client.get_task(TaskQueryParams(**kwargs))
1690+
elif method_name == "on_cancel_task":
1691+
response = await client.cancel_task(TaskIdParams(**kwargs))
1692+
elif method_name == "handle_authenticated_agent_card":
1693+
response = await client.get_card()
1694+
else:
1695+
raise ValueError(f"Unknown method name: {method_name}")
1696+
1697+
return response
1698+
1699+
return _method
1700+
1701+
15251702
def _yield_parsed_json(http_response: google_genai_types.HttpResponse) -> Iterator[Any]:
15261703
"""Converts the body of the HTTP Response message to JSON format.
15271704

vertexai/_genai/agent_engines.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1335,6 +1335,7 @@ def _register_api_methods(
13351335
"async": _agent_engines_utils._wrap_async_query_operation,
13361336
"stream": _agent_engines_utils._wrap_stream_query_operation,
13371337
"async_stream": _agent_engines_utils._wrap_async_stream_query_operation,
1338+
"a2a_extension": _agent_engines_utils._wrap_a2a_operation,
13381339
},
13391340
)
13401341
except Exception as e:

0 commit comments

Comments
 (0)