Skip to content

Commit e85e13b

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
chore: add create agent engine a2a replay test
FUTURE_COPYBARA_INTEGRATE_REVIEW=#6618 from googleapis:release-please--branches--main 731accb PiperOrigin-RevId: 907394942
1 parent 68f053e commit e85e13b

5 files changed

Lines changed: 729 additions & 31 deletions

File tree

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
# pylint: disable=protected-access,bad-continuation,missing-function-docstring
16+
17+
import contextlib
18+
import os
19+
from unittest import mock
20+
from tests.unit.vertexai.genai.replays import pytest_helper
21+
from vertexai._genai import types
22+
from vertexai.agent_engines.templates.a2a import default_a2a_agent
23+
import pytest
24+
25+
26+
pytest.importorskip(
27+
"a2a.types", reason="a2a-sdk not installed, skipping A2A Agent tests"
28+
)
29+
from a2a.types import AgentInterface
30+
from a2a.utils.constants import TransportProtocol
31+
32+
33+
def test_create_a2a_agent(client, is_replay_mode):
34+
# Use the autopush environment.
35+
36+
client._api_client._http_options.base_url = (
37+
"https://us-central1-autopush-aiplatform.sandbox.googleapis.com/"
38+
)
39+
40+
my_agent = default_a2a_agent()
41+
42+
my_agent.agent_card.supported_interfaces.append(
43+
AgentInterface(
44+
url="http://localhost:8888/",
45+
protocol_binding=TransportProtocol.HTTP_JSON,
46+
protocol_version="1.0",
47+
)
48+
)
49+
50+
51+
staging_bucket = os.environ["GCS_BUCKET"]
52+
53+
# In replay mode, GCS operations are mocked and blob.open("rb") returns a mock
54+
# that fails when cloudpickle.load expects bytes. We mock _upload_agent_engine
55+
# to skip this verification step, which is not needed when replaying API calls.
56+
upload_patch = (
57+
mock.patch("vertexai._genai._agent_engines_utils._upload_agent_engine")
58+
if is_replay_mode
59+
else contextlib.nullcontext()
60+
)
61+
62+
with upload_patch:
63+
agent_engine = client.agent_engines.create(
64+
agent=my_agent,
65+
config={
66+
"staging_bucket": staging_bucket,
67+
"display_name": "test-a2a-agent",
68+
"http_options": {"api_version": "v1beta1"},
69+
"requirements": [
70+
"google-cloud-aiplatform[agent_engines] @ git+https://github.com/googleapis/python-aiplatform.git@copybara_903097201",
71+
"a2a-sdk",
72+
"sse-starlette",
73+
],
74+
},
75+
)
76+
77+
78+
assert isinstance(agent_engine, types.AgentEngine)
79+
assert agent_engine.api_resource.display_name == "test-a2a-agent"
80+
81+
# Clean up resources.
82+
client.agent_engines.delete(name=agent_engine.api_resource.name, force=True)
83+
84+
85+
pytestmark = pytest_helper.setup(
86+
file=__file__,
87+
globals_for_file=globals(),
88+
)
89+
90+
pytest_plugins = ("pytest_asyncio",)
91+
92+
93+

vertexai/_genai/_agent_engines_utils.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -633,9 +633,9 @@ def _generate_class_methods_spec_or_raise(
633633
class_method = _to_proto(schema_dict)
634634
class_method[_MODE_KEY_IN_SCHEMA] = mode
635635
if hasattr(agent, "agent_card"):
636-
class_method[_A2A_AGENT_CARD] = getattr(
637-
agent, "agent_card"
638-
).model_dump_json()
636+
class_method[_A2A_AGENT_CARD] = json_format.MessageToJson(
637+
getattr(agent, "agent_card")
638+
)
639639
class_methods_spec.append(class_method)
640640

641641
return class_methods_spec
@@ -1234,9 +1234,16 @@ def _upload_agent_engine(
12341234
cloudpickle.dump(agent, f)
12351235
except Exception as e:
12361236
url = "https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/develop/custom#deployment-considerations"
1237-
raise TypeError(
1238-
f"Failed to serialize agent engine. Visit {url} for details."
1239-
) from e
1237+
error_msg = f"Failed to serialize agent engine. Visit {url} for details."
1238+
if "google._upb._message" in str(e) or "Descriptor" in str(e):
1239+
error_msg += (
1240+
" This is often caused by protobuf objects (like Part, AgentCard) "
1241+
"being imported at the global module level. Please move these "
1242+
"imports inside the functions or methods where they are used. "
1243+
"Alternatively, you can import the entire module: "
1244+
"`from a2a import types`."
1245+
)
1246+
raise TypeError(error_msg) from e
12401247
with blob.open("rb") as f:
12411248
try:
12421249
_ = cloudpickle.load(f)
@@ -1796,13 +1803,6 @@ async def _method(self, **kwargs) -> Any: # type: ignore[no-untyped-def]
17961803
if not hasattr(a2a_agent_card, "preferred_transport"):
17971804
a2a_agent_card.preferred_transport = TransportProtocol.http_json
17981805

1799-
# AE cannot support streaming yet. Turn off streaming for now.
1800-
if a2a_agent_card.capabilities and a2a_agent_card.capabilities.streaming:
1801-
raise ValueError(
1802-
"Streaming is not supported in Agent Engine, please change "
1803-
"a2a_agent_card.capabilities.streaming to False."
1804-
)
1805-
18061806
if not hasattr(a2a_agent_card.capabilities, "streaming"):
18071807
a2a_agent_card.capabilities.streaming = False
18081808

vertexai/_genai/agent_engines.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2475,10 +2475,12 @@ def _create_config(
24752475
agent_card = getattr(agent, "agent_card")
24762476
if agent_card:
24772477
try:
2478-
agent_engine_spec["agent_card"] = agent_card.model_dump(
2479-
exclude_none=True
2478+
from google.protobuf import json_format
2479+
2480+
agent_engine_spec["agent_card"] = json_format.MessageToDict(
2481+
agent_card
24802482
)
2481-
except TypeError as e:
2483+
except Exception as e:
24822484
raise ValueError(
24832485
f"Failed to convert agent card to dict (serialization error): {e}"
24842486
) from e

vertexai/agent_engines/_agent_engines.py

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -119,23 +119,28 @@
119119
try:
120120
from a2a.types import (
121121
AgentCard,
122-
TransportProtocol,
122+
AgentInterface,
123123
Message,
124124
TaskIdParams,
125125
TaskQueryParams,
126126
)
127+
from a2a.utils.constants import TransportProtocol, PROTOCOL_VERSION_CURRENT
127128
from a2a.client import ClientConfig, ClientFactory
128129

129130
AgentCard = AgentCard
131+
AgentInterface = AgentInterface
130132
TransportProtocol = TransportProtocol
133+
PROTOCOL_VERSION_CURRENT = PROTOCOL_VERSION_CURRENT
131134
Message = Message
132135
ClientConfig = ClientConfig
133136
ClientFactory = ClientFactory
134137
TaskIdParams = TaskIdParams
135138
TaskQueryParams = TaskQueryParams
136139
except (ImportError, AttributeError):
137140
AgentCard = None
141+
AgentInterface = None
138142
TransportProtocol = None
143+
PROTOCOL_VERSION_CURRENT = None
139144
Message = None
140145
ClientConfig = None
141146
ClientFactory = None
@@ -1216,9 +1221,16 @@ def _upload_agent_engine(
12161221
cloudpickle.dump(agent_engine, f)
12171222
except Exception as e:
12181223
url = "https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/develop/custom#deployment-considerations"
1219-
raise TypeError(
1220-
f"Failed to serialize agent engine. Visit {url} for details."
1221-
) from e
1224+
error_msg = f"Failed to serialize agent engine. Visit {url} for details."
1225+
if "google._upb._message" in str(e) or "Descriptor" in str(e):
1226+
error_msg += (
1227+
" This is often caused by protobuf objects (like Part, AgentCard) "
1228+
"being imported at the global module level. Please move these "
1229+
"imports inside the functions or methods where they are used. "
1230+
"Alternatively, you can import the entire module: "
1231+
"`from a2a import types as a2a_types`."
1232+
)
1233+
raise TypeError(error_msg) from e
12221234
with blob.open("rb") as f:
12231235
try:
12241236
_ = cloudpickle.load(f)
@@ -1736,16 +1748,23 @@ async def _method(self, **kwargs) -> Any:
17361748

17371749
# A2A + AE integration currently only supports Rest API.
17381750
if (
1739-
a2a_agent_card.preferred_transport
1740-
and a2a_agent_card.preferred_transport != TransportProtocol.http_json
1751+
a2a_agent_card.supported_interfaces
1752+
and a2a_agent_card.supported_interfaces[0].protocol_binding
1753+
!= TransportProtocol.HTTP_JSON
17411754
):
17421755
raise ValueError(
1743-
"Only HTTP+JSON is supported for preferred transport on agent card "
1756+
"Only HTTP+JSON is supported for primary interface on agent card "
17441757
)
17451758

1746-
# Set preferred transport to HTTP+JSON if not set.
1747-
if not hasattr(a2a_agent_card, "preferred_transport"):
1748-
a2a_agent_card.preferred_transport = TransportProtocol.http_json
1759+
# Set primary interface to HTTP+JSON if not set.
1760+
if not a2a_agent_card.supported_interfaces:
1761+
a2a_agent_card.supported_interfaces = []
1762+
a2a_agent_card.supported_interfaces.append(
1763+
AgentInterface(
1764+
protocol_binding=TransportProtocol.HTTP_JSON,
1765+
protocol_version=PROTOCOL_VERSION_CURRENT,
1766+
)
1767+
)
17491768

17501769
# AE cannot support streaming yet. Turn off streaming for now.
17511770
if a2a_agent_card.capabilities and a2a_agent_card.capabilities.streaming:
@@ -1759,12 +1778,13 @@ async def _method(self, **kwargs) -> Any:
17591778

17601779
# agent_card is set on the class_methods before set_up is invoked.
17611780
# Ensure that the agent_card url is set correctly before the client is created.
1762-
a2a_agent_card.url = f"https://{initializer.global_config.api_endpoint}/v1beta1/{self.resource_name}/a2a"
1781+
url = f"https://{initializer.global_config.api_endpoint}/v1beta1/{self.resource_name}/a2a"
1782+
a2a_agent_card.supported_interfaces[0].url = url
17631783

17641784
# Using a2a client, inject the auth token from the global config.
17651785
config = ClientConfig(
17661786
supported_transports=[
1767-
TransportProtocol.http_json,
1787+
TransportProtocol.HTTP_JSON,
17681788
],
17691789
use_client_preference=True,
17701790
httpx_client=httpx.AsyncClient(
@@ -1977,9 +1997,11 @@ def _generate_class_methods_spec_or_raise(
19771997
class_method[_MODE_KEY_IN_SCHEMA] = mode
19781998
# A2A agent card is a special case, when running in A2A mode,
19791999
if hasattr(agent_engine, "agent_card"):
1980-
class_method[_A2A_AGENT_CARD] = getattr(
1981-
agent_engine, "agent_card"
1982-
).model_dump_json()
2000+
from google.protobuf import json_format
2001+
2002+
class_method[_A2A_AGENT_CARD] = json_format.MessageToJson(
2003+
getattr(agent_engine, "agent_card")
2004+
)
19832005
class_methods_spec.append(class_method)
19842006

19852007
return class_methods_spec

0 commit comments

Comments
 (0)