Skip to content

Commit cadedd4

Browse files
committed
feat(a2a): A2A orchestrator + Level 4 peer/MCP wiring + L1 model pin
Full A2A deploy bundle for the Wave 9 / Phase A migration. Source-verified against vertexai/preview/reasoning_engines/templates/a2a.py. deploy_a2a.py Shared deploy script for any ADK root_agent. Wraps with vertexai.preview.reasoning_engines.A2aAgent (which mints api_mode='a2a_extension' on the engine), uploads the agent dir as extra_packages, and forwards SMITHERY_API_KEY / LEVEL_1_A2A_* env vars into the deployed container so MCP and A2A peer routing keep working without per-deploy edits. a2a_orchestrator/ Pro-tier orchestrator deployed to us-central1 (resource id 6139044558605910016). Three agents in one engine: - root: gemini-2.5-pro + BuiltInPlanner thinking; 5 consult_level_* function tools (A2A peer calls to Levels 1-4 + 2b in asia-southeast1). - chart_agent: gemini-2.5-flash + BuiltInCodeExecutor for matplotlib charts (split off because Gemini forbids built-in + function-tool mixing on one LlmAgent — same constraint as Level 4). - writer_agent: gemini-2.5-flash for the final Markdown brief. level_4_agent/ data_fetcher_agent refactored from "google_search + load_web_page + consult + gahmen" (4 paths) to "consult_level_1 + gahmen" (2 paths). Every external lookup now goes through one of two inter-system protocols — A2A or MCP — so the agent demonstrates both ADK 2.0 patterns at once. agent_creator switched to Flash + BuiltInPlanner thinking (Pro isn't available in asia-southeast1; Flash + native thinking is the proven combination per analyst_agent's own comment). level_4_agent/remote_tools.py Defines consult_level_1 — the A2A peer-consult FunctionTool wrapping on_message_send against the deployed Level 1 engine. Reads the peer resource id from LEVEL_1_A2A_ENGINE_ID env var (auto-forwarded by deploy_a2a.py) so redeploys don't require code edits here. level_1_agent/agent.py Pinned model from gemini-3.1-flash-lite-preview (which only resolves via GOOGLE_CLOUD_LOCATION=global) to gemini-2.5-flash. Vertex Agent Engine force-overwrites GOOGLE_CLOUD_LOCATION to the engine's deploy region (templates/a2a.py:241-245), so preview aliases 404 in asia-southeast1. gemini-2.5-flash works in both regions.
1 parent 2be26ce commit cadedd4

7 files changed

Lines changed: 1311 additions & 52 deletions

File tree

a2a_orchestrator/__init__.py

Whitespace-only changes.

a2a_orchestrator/agent.py

Lines changed: 465 additions & 0 deletions
Large diffs are not rendered by default.

a2a_orchestrator/remote_tools.py

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
"""A2A tools for the orchestrator — five `consult_level_*` FunctionTool wrappers.
2+
3+
Each function calls `on_message_send` against a separately deployed Vertex
4+
Agent Engine over the A2A protocol. The Levels live in `asia-southeast1`;
5+
this orchestrator lives in `us-central1` (Pro is regional). So every consult
6+
is a cross-region A2A call: ~150-200ms RTT added to the inference time, all
7+
authenticated by the appspot SA's `roles/aiplatform.user` (no per-resource
8+
IAM grants needed — see CLAUDE.md "Why A2A on Vertex sidesteps NTU's
9+
setIamPolicy deny").
10+
11+
Resource IDs are baked in as defaults BUT can be overridden at deploy time
12+
via env vars (deploy_a2a.py auto-forwards anything starting with LEVEL_).
13+
That way Phase A redeploys of any Level (which mint new IDs) don't require
14+
a code edit here — set `LEVEL_X_A2A_ENGINE_ID` in the deploy shell.
15+
16+
Why these IDs are correct for THIS deploy: pulled from
17+
`adk/tests/integration/a2a_engines.py` after Phase A finished 2026-04-28.
18+
"""
19+
from __future__ import annotations
20+
21+
import logging
22+
import os
23+
24+
import vertexai
25+
26+
logger = logging.getLogger(__name__)
27+
28+
# All five Level engines live in asia-southeast1.
29+
_LEVEL_REGION = os.environ.get("LEVEL_REGION", "asia-southeast1")
30+
_PROJECT_NUMBER = os.environ.get("LEVEL_PROJECT_NUMBER", "888142536377")
31+
32+
# Defaults are the post-Phase-A engine IDs (verified 2026-04-28). Override
33+
# any of these by setting the corresponding env var in the deploy shell.
34+
_LEVEL_IDS = {
35+
"level_1": os.environ.get("LEVEL_1_A2A_ENGINE_ID", "2134899737420103680"),
36+
"level_2": os.environ.get("LEVEL_2_A2A_ENGINE_ID", "2181061633600651264"),
37+
"level_2b": os.environ.get("LEVEL_2B_A2A_ENGINE_ID", "1635000178781978624"),
38+
"level_3": os.environ.get("LEVEL_3_A2A_ENGINE_ID", "1988532749530562560"),
39+
"level_4": os.environ.get("LEVEL_4_A2A_ENGINE_ID", "4048929579052564480"),
40+
}
41+
42+
43+
# One Vertex client per region; cached to avoid rebuilding auth state on every
44+
# call. Region-bound: A2aAgent constructs a region-specific URL at set_up.
45+
_clients: dict[str, vertexai.Client] = {}
46+
47+
48+
def _client(region: str) -> vertexai.Client:
49+
if region not in _clients:
50+
_clients[region] = vertexai.Client(location=region)
51+
return _clients[region]
52+
53+
54+
def _full_name(slug: str) -> str:
55+
rid = _LEVEL_IDS[slug]
56+
return (
57+
f"projects/{_PROJECT_NUMBER}/locations/{_LEVEL_REGION}/"
58+
f"reasoningEngines/{rid}"
59+
)
60+
61+
62+
def _extract_a2a_text(response) -> str:
63+
"""Pull the agent's text reply out of an `on_message_send` response.
64+
65+
A2A `on_message_send` returns `list[tuple[Task | Message, str | None]]`.
66+
Text lives at `Task.artifacts[*].parts[*].root.text` (preferred) or
67+
`Task.history[-1].parts[*].root.text` (fallback for the last agent
68+
message). Mirrors the helper in the swarm repo's
69+
`tests/integration/a2a_engines.py`.
70+
"""
71+
chunks: list[str] = []
72+
73+
def _from_parts(parts) -> None:
74+
for p in parts or []:
75+
root = getattr(p, "root", p)
76+
text = getattr(root, "text", None)
77+
if text:
78+
chunks.append(text)
79+
80+
if response is None:
81+
return ""
82+
83+
if hasattr(response, "parts"):
84+
_from_parts(response.parts)
85+
return "\n".join(chunks)
86+
87+
items = response if isinstance(response, list) else [response]
88+
for entry in items:
89+
item = entry[0] if (isinstance(entry, tuple) and len(entry) >= 1) else entry
90+
if item is None:
91+
continue
92+
93+
artifacts = getattr(item, "artifacts", None)
94+
if artifacts:
95+
for art in artifacts:
96+
_from_parts(getattr(art, "parts", None))
97+
if chunks:
98+
continue
99+
100+
history = getattr(item, "history", None)
101+
if history:
102+
for msg in reversed(history):
103+
role = getattr(msg, "role", None)
104+
role_value = getattr(role, "value", role)
105+
if str(role_value) == "agent":
106+
_from_parts(getattr(msg, "parts", None))
107+
break
108+
if chunks:
109+
continue
110+
111+
_from_parts(getattr(item, "parts", None))
112+
113+
return "\n".join(chunks)
114+
115+
116+
async def _consult(slug: str, query: str) -> str:
117+
"""Generic A2A consult helper. The five public consult_level_* functions
118+
below are thin wrappers — each binds a slug so Gemini sees them as
119+
distinct, well-named tools (with their own docstrings driving routing).
120+
"""
121+
try:
122+
remote = _client(_LEVEL_REGION).agent_engines.get(name=_full_name(slug))
123+
except Exception as exc: # noqa: BLE001
124+
logger.exception("Failed to bind %s engine", slug)
125+
return f"[error] Could not reach {slug} engine: {exc}"
126+
127+
if not hasattr(remote, "on_message_send"):
128+
return (
129+
f"[error] {slug} engine has no on_message_send — its deploy may "
130+
"have used the legacy AdkApp template instead of A2aAgent. "
131+
"Check check_a2a.py output."
132+
)
133+
134+
try:
135+
response = await remote.on_message_send(
136+
messageId=f"orchestrator-consult-{slug}",
137+
role="user",
138+
parts=[{"kind": "text", "text": query}],
139+
)
140+
except Exception as exc: # noqa: BLE001
141+
logger.exception("on_message_send to %s failed", slug)
142+
return f"[error] A2A call to {slug} failed: {exc}"
143+
144+
text = _extract_a2a_text(response)
145+
if not text:
146+
return f"[empty] {slug} returned no text parts."
147+
return text
148+
149+
150+
# ----------------------------------------------------------------------------
151+
# Public consult tools — one per Level. Docstrings drive Gemini's routing
152+
# decisions; keep them precise and prescriptive.
153+
# ----------------------------------------------------------------------------
154+
155+
156+
async def consult_level_1(query: str) -> str:
157+
"""Consult the Level 1 agent (single LlmAgent + google_search) over A2A.
158+
159+
Use when:
160+
- the question is a simple factual lookup answerable with one web search,
161+
- you want a direct answer (Level 1 returns natural-language reply with
162+
inline source attribution),
163+
- the question is NOT Singapore-specific (use consult_level_4 for SG —
164+
it has authoritative gahmen MCP data).
165+
166+
Examples of well-suited queries:
167+
- "What was Apple's Q4 2025 revenue?"
168+
- "When did the EU AI Act take effect?"
169+
- "Who is the current CEO of Tesla?"
170+
"""
171+
return await _consult("level_1", query)
172+
173+
174+
async def consult_level_2(query: str) -> str:
175+
"""Consult the Level 2 agent (Day Planner / Strategic Problem-Solver) over A2A.
176+
177+
Use when:
178+
- the question is a planning task (study schedule, project breakdown,
179+
multi-step task decomposition),
180+
- the user wants a structured timetable or sequenced action list,
181+
- the question requires "decompose → look up → assemble" workflow.
182+
183+
Examples of well-suited queries:
184+
- "Plan a focused 2-hour study block on solid-state batteries."
185+
- "Break down 'launch a podcast' into the next 4 weeks of milestones."
186+
"""
187+
return await _consult("level_2", query)
188+
189+
190+
async def consult_level_2b(query: str) -> str:
191+
"""Consult the Level 2b agent (Graph Router / classify-then-route) over A2A.
192+
193+
Use when:
194+
- the question is a short customer-support style message that needs
195+
triage (bug / billing / feature-request / greeting),
196+
- you want a category label + the routing decision visible.
197+
198+
Examples of well-suited queries:
199+
- "My credit card was charged twice — what should I do?"
200+
- "Hey can the dashboard support dark mode?"
201+
202+
Note: Level 2b returns JSON-shaped routing output (e.g.,
203+
`{"category": "BILLING"}`). For chat-style answers, prefer other levels.
204+
"""
205+
return await _consult("level_2b", query)
206+
207+
208+
async def consult_level_3(query: str) -> str:
209+
"""Consult the Level 3 agent (Research Coordinator with sub-agents) over A2A.
210+
211+
Use when:
212+
- the question is a multi-aspect research topic where you want a
213+
STRUCTURED brief (key findings, patterns, contradictions, gaps),
214+
- the question benefits from a coordinator that delegates to a
215+
search agent + analyst + writer (Level 3's internal pipeline),
216+
- you want explicit confidence/uncertainty surfaced in the answer.
217+
218+
Examples of well-suited queries:
219+
- "Compare mRNA vs viral-vector vaccine platforms — efficacy, safety,
220+
manufacturing scale."
221+
- "What's the latest in solid-state battery commercialisation?"
222+
"""
223+
return await _consult("level_3", query)
224+
225+
226+
async def consult_level_4(query: str) -> str:
227+
"""Consult the Level 4 agent (Self-Evolving BI + gahmen MCP + A2A) over A2A.
228+
229+
Use when:
230+
- the question involves SINGAPORE-specific data (any SG ministry,
231+
agency, dataset, indicator — Level 4 has gahmen MCP tools that
232+
access SingStat and data.gov.sg directly),
233+
- the question is a business-intelligence query needing computation
234+
+ charts (Level 4 has a code-executor sub-agent for pandas /
235+
matplotlib),
236+
- the question might need a NEW specialist agent created at runtime
237+
(Level 4 has the agent_creator with native thinking).
238+
239+
Examples of well-suited queries:
240+
- "What's Singapore's resident unemployment rate over the last 4
241+
quarters? Compute the QoQ delta."
242+
- "Compare HDB resale prices vs private property prices in 2025."
243+
- "Build a specialist that pulls Singapore weather observations."
244+
245+
Note: Level 4 internally consults Level 1 via A2A for non-SG web
246+
queries and uses gahmen MCP for SG data. So this consult can result
247+
in TWO inter-system calls under the hood — you pay for that
248+
transitively in latency.
249+
"""
250+
return await _consult("level_4", query)

0 commit comments

Comments
 (0)