Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=protected-access,bad-continuation,missing-function-docstring

import contextlib
import os
from unittest import mock
from tests.unit.vertexai.genai.replays import pytest_helper
from vertexai._genai import types
from vertexai.agent_engines.templates.a2a import default_a2a_agent
import pytest


pytest.importorskip(
"a2a.types", reason="a2a-sdk not installed, skipping A2A Agent tests"
)
from a2a.types import AgentInterface
from a2a.utils.constants import TransportProtocol


def test_create_a2a_agent(client, is_replay_mode):
# Use the autopush environment.

client._api_client._http_options.base_url = (
"https://us-central1-autopush-aiplatform.sandbox.googleapis.com/"
)

my_agent = default_a2a_agent()

my_agent.agent_card.supported_interfaces.append(
AgentInterface(
url="http://localhost:8888/",
protocol_binding=TransportProtocol.HTTP_JSON,
protocol_version="1.0",
)
)


staging_bucket = os.environ["GCS_BUCKET"]

# In replay mode, GCS operations are mocked and blob.open("rb") returns a mock
# that fails when cloudpickle.load expects bytes. We mock _upload_agent_engine
# to skip this verification step, which is not needed when replaying API calls.
upload_patch = (
mock.patch("vertexai._genai._agent_engines_utils._upload_agent_engine")
if is_replay_mode
else contextlib.nullcontext()
)

with upload_patch:
agent_engine = client.agent_engines.create(
agent=my_agent,
config={
"staging_bucket": staging_bucket,
"display_name": "test-a2a-agent",
"http_options": {"api_version": "v1beta1"},
"requirements": [
"google-cloud-aiplatform[agent_engines] @ git+https://github.com/googleapis/python-aiplatform.git@copybara_903097201",
"a2a-sdk",
"sse-starlette",
],
},
)


assert isinstance(agent_engine, types.AgentEngine)
assert agent_engine.api_resource.display_name == "test-a2a-agent"

# Clean up resources.
client.agent_engines.delete(name=agent_engine.api_resource.name, force=True)


pytestmark = pytest_helper.setup(
file=__file__,
globals_for_file=globals(),
)

pytest_plugins = ("pytest_asyncio",)



26 changes: 13 additions & 13 deletions vertexai/_genai/_agent_engines_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,9 @@ def _generate_class_methods_spec_or_raise(
class_method = _to_proto(schema_dict)
class_method[_MODE_KEY_IN_SCHEMA] = mode
if hasattr(agent, "agent_card"):
class_method[_A2A_AGENT_CARD] = getattr(
agent, "agent_card"
).model_dump_json()
class_method[_A2A_AGENT_CARD] = json_format.MessageToJson(
getattr(agent, "agent_card")
)
class_methods_spec.append(class_method)

return class_methods_spec
Expand Down Expand Up @@ -1234,9 +1234,16 @@ def _upload_agent_engine(
cloudpickle.dump(agent, f)
except Exception as e:
url = "https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/develop/custom#deployment-considerations"
raise TypeError(
f"Failed to serialize agent engine. Visit {url} for details."
) from e
error_msg = f"Failed to serialize agent engine. Visit {url} for details."
if "google._upb._message" in str(e) or "Descriptor" in str(e):
error_msg += (
" This is often caused by protobuf objects (like Part, AgentCard) "
"being imported at the global module level. Please move these "
"imports inside the functions or methods where they are used. "
"Alternatively, you can import the entire module: "
"`from a2a import types`."
)
raise TypeError(error_msg) from e
with blob.open("rb") as f:
try:
_ = cloudpickle.load(f)
Expand Down Expand Up @@ -1796,13 +1803,6 @@ async def _method(self, **kwargs) -> Any: # type: ignore[no-untyped-def]
if not hasattr(a2a_agent_card, "preferred_transport"):
a2a_agent_card.preferred_transport = TransportProtocol.http_json

# AE cannot support streaming yet. Turn off streaming for now.
if a2a_agent_card.capabilities and a2a_agent_card.capabilities.streaming:
raise ValueError(
"Streaming is not supported in Agent Engine, please change "
"a2a_agent_card.capabilities.streaming to False."
)

if not hasattr(a2a_agent_card.capabilities, "streaming"):
a2a_agent_card.capabilities.streaming = False

Expand Down
8 changes: 5 additions & 3 deletions vertexai/_genai/agent_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -2475,10 +2475,12 @@ def _create_config(
agent_card = getattr(agent, "agent_card")
if agent_card:
try:
agent_engine_spec["agent_card"] = agent_card.model_dump(
exclude_none=True
from google.protobuf import json_format

agent_engine_spec["agent_card"] = json_format.MessageToDict(
agent_card
)
except TypeError as e:
except Exception as e:
raise ValueError(
f"Failed to convert agent card to dict (serialization error): {e}"
) from e
Expand Down
52 changes: 37 additions & 15 deletions vertexai/agent_engines/_agent_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,28 @@
try:
from a2a.types import (
AgentCard,
TransportProtocol,
AgentInterface,
Message,
TaskIdParams,
TaskQueryParams,
)
from a2a.utils.constants import TransportProtocol, PROTOCOL_VERSION_CURRENT
from a2a.client import ClientConfig, ClientFactory

AgentCard = AgentCard
AgentInterface = AgentInterface
TransportProtocol = TransportProtocol
PROTOCOL_VERSION_CURRENT = PROTOCOL_VERSION_CURRENT
Message = Message
ClientConfig = ClientConfig
ClientFactory = ClientFactory
TaskIdParams = TaskIdParams
TaskQueryParams = TaskQueryParams
except (ImportError, AttributeError):
AgentCard = None
AgentInterface = None
TransportProtocol = None
PROTOCOL_VERSION_CURRENT = None
Message = None
ClientConfig = None
ClientFactory = None
Expand Down Expand Up @@ -1216,9 +1221,16 @@ def _upload_agent_engine(
cloudpickle.dump(agent_engine, f)
except Exception as e:
url = "https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/develop/custom#deployment-considerations"
raise TypeError(
f"Failed to serialize agent engine. Visit {url} for details."
) from e
error_msg = f"Failed to serialize agent engine. Visit {url} for details."
if "google._upb._message" in str(e) or "Descriptor" in str(e):
error_msg += (
" This is often caused by protobuf objects (like Part, AgentCard) "
"being imported at the global module level. Please move these "
"imports inside the functions or methods where they are used. "
"Alternatively, you can import the entire module: "
"`from a2a import types as a2a_types`."
)
raise TypeError(error_msg) from e
with blob.open("rb") as f:
try:
_ = cloudpickle.load(f)
Expand Down Expand Up @@ -1736,16 +1748,23 @@ async def _method(self, **kwargs) -> Any:

# A2A + AE integration currently only supports Rest API.
if (
a2a_agent_card.preferred_transport
and a2a_agent_card.preferred_transport != TransportProtocol.http_json
a2a_agent_card.supported_interfaces
and a2a_agent_card.supported_interfaces[0].protocol_binding
!= TransportProtocol.HTTP_JSON
):
raise ValueError(
"Only HTTP+JSON is supported for preferred transport on agent card "
"Only HTTP+JSON is supported for primary interface on agent card "
)

# Set preferred transport to HTTP+JSON if not set.
if not hasattr(a2a_agent_card, "preferred_transport"):
a2a_agent_card.preferred_transport = TransportProtocol.http_json
# Set primary interface to HTTP+JSON if not set.
if not a2a_agent_card.supported_interfaces:
a2a_agent_card.supported_interfaces = []
a2a_agent_card.supported_interfaces.append(
AgentInterface(
protocol_binding=TransportProtocol.HTTP_JSON,
protocol_version=PROTOCOL_VERSION_CURRENT,
)
)

# AE cannot support streaming yet. Turn off streaming for now.
if a2a_agent_card.capabilities and a2a_agent_card.capabilities.streaming:
Expand All @@ -1759,12 +1778,13 @@ async def _method(self, **kwargs) -> Any:

# agent_card is set on the class_methods before set_up is invoked.
# Ensure that the agent_card url is set correctly before the client is created.
a2a_agent_card.url = f"https://{initializer.global_config.api_endpoint}/v1beta1/{self.resource_name}/a2a"
url = f"https://{initializer.global_config.api_endpoint}/v1beta1/{self.resource_name}/a2a"
a2a_agent_card.supported_interfaces[0].url = url

# Using a2a client, inject the auth token from the global config.
config = ClientConfig(
supported_transports=[
TransportProtocol.http_json,
TransportProtocol.HTTP_JSON,
],
use_client_preference=True,
httpx_client=httpx.AsyncClient(
Expand Down Expand Up @@ -1977,9 +1997,11 @@ def _generate_class_methods_spec_or_raise(
class_method[_MODE_KEY_IN_SCHEMA] = mode
# A2A agent card is a special case, when running in A2A mode,
if hasattr(agent_engine, "agent_card"):
class_method[_A2A_AGENT_CARD] = getattr(
agent_engine, "agent_card"
).model_dump_json()
from google.protobuf import json_format

class_method[_A2A_AGENT_CARD] = json_format.MessageToJson(
getattr(agent_engine, "agent_card")
)
class_methods_spec.append(class_method)

return class_methods_spec
Expand Down
Loading
Loading