1- from __future__ import annotations
1+ """API Endpoints for Google Agent-to-Agent (A2A) Communication Protocol.
22
3+ Implements registration, handshake, and message relay according to the A2A spec.
4+ Ref: https://google.github.io/A2A/#/documentation
35"""
4- A2A Endpoints: All endpoints in this file implement the Google Agent2Agent (A2A) protocol.
5- Any reference to 'a2a' refers exclusively to Google A2A: https://google.github.io/A2A/#/documentation
6- """
76
8- from fastapi import APIRouter , Depends , HTTPException , Request
7+ import logging
8+ from typing import Annotated , Optional
9+
10+ from fastapi import APIRouter , BackgroundTasks , Depends , HTTPException , Request , status
911
1012from app .core .security import get_current_user_id
1113from app .models .a2a import (
1214 A2AErrorResponse ,
1315 A2AHandshakeRequest ,
1416 A2AHandshakeResponse ,
17+ A2AMessage ,
1518 A2ARegistrationRequest ,
1619 A2ARegistrationResponse ,
1720 A2ARequest ,
2023)
2124from app .services .a2a_service import A2AService
2225
26+ logger = logging .getLogger (__name__ )
27+
28+
29+ def get_a2a_service ():
30+ """FastAPI dependency injector for A2AService."""
31+ return A2AService ()
32+
33+
2334router = APIRouter (prefix = "/api/v1/a2a" , tags = ["a2a" ])
2435
36+
2537@router .post ("/register" , response_model = A2ARegistrationResponse )
26- async def register_agent (payload : A2ARegistrationRequest ):
27- """Google A2A: Register this agent with the registry or another agent."""
38+ async def register_agent (
39+ request : A2ARegistrationRequest ,
40+ a2a_service : Annotated [A2AService , Depends (get_a2a_service )],
41+ # current_user: Annotated[User, Depends(get_current_active_user)] # TODO: Add auth
42+ ) -> A2ARegistrationResponse :
43+ """Register an agent instance with the A2A service.
44+
45+ Allows an agent to announce its presence and capabilities.
46+ """
47+ logger .info (f"Received A2A registration request for agent: { request .agent_id } " )
2848 try :
29- return await A2AService .register_agent (payload )
30- except Exception as exc :
31- raise HTTPException (status_code = 500 , detail = str (exc )) from exc
49+ response = await a2a_service .handle_registration (request )
50+ logger .info (f"Agent { request .agent_id } registered successfully." )
51+ return response
52+ except Exception as e :
53+ logger .exception (f"Error during A2A registration for agent { request .agent_id } " )
54+ raise HTTPException (
55+ status_code = status .HTTP_500_INTERNAL_SERVER_ERROR ,
56+ detail = f"Registration failed: { e } " ,
57+ ) from e
58+
3259
3360@router .post ("/handshake" , response_model = A2AHandshakeResponse )
34- async def handshake (payload : A2AHandshakeRequest ):
35- """Google A2A: Perform handshake/authentication with another agent."""
61+ async def perform_handshake (
62+ request : A2AHandshakeRequest ,
63+ a2a_service : Annotated [A2AService , Depends (get_a2a_service )],
64+ # current_user: Annotated[User, Depends(get_current_active_user)] # TODO: Add auth
65+ ) -> A2AHandshakeResponse :
66+ """Initiate or respond to an A2A handshake between agents.
67+
68+ Establishes secure communication channels.
69+ """
70+ logger .info (
71+ f"Received A2A handshake request from { request .sender_id } "
72+ f"to { request .receiver_id } "
73+ )
3674 try :
37- return await A2AService .handshake (payload )
38- except Exception as exc :
39- raise HTTPException (status_code = 500 , detail = str (exc )) from exc
75+ response = await a2a_service .handle_handshake (request )
76+ logger .info (
77+ f"A2A handshake successful between { request .sender_id } "
78+ f"and { request .receiver_id } "
79+ )
80+ return response
81+ except ValueError as ve :
82+ logger .warning (f"A2A handshake validation error: { ve } " )
83+ raise HTTPException (
84+ status_code = status .HTTP_400_BAD_REQUEST , detail = f"Handshake error: { ve } "
85+ ) from ve
86+ except Exception as e :
87+ logger .exception (
88+ f"Error during A2A handshake between { request .sender_id } "
89+ f"and { request .receiver_id } "
90+ )
91+ raise HTTPException (
92+ status_code = status .HTTP_500_INTERNAL_SERVER_ERROR ,
93+ detail = f"Handshake failed: { e } " ,
94+ ) from e
95+
96+
97+ @router .post ("/message" , status_code = status .HTTP_202_ACCEPTED )
98+ async def relay_message (
99+ request : A2AMessage ,
100+ a2a_service : Annotated [A2AService , Depends (get_a2a_service )],
101+ background_tasks : BackgroundTasks ,
102+ # current_user: Annotated[User, Depends(get_current_active_user)] # TODO: Add auth
103+ ) -> dict [str , str ]:
104+ """Receive and relay an A2A message to the intended recipient agent.
105+
106+ Uses background tasks for asynchronous delivery.
107+ """
108+ logger .info (
109+ f"Received A2A message relay request from { request .sender_id } "
110+ f"to { request .receiver_id } "
111+ )
112+ try :
113+ # Validate message structure before queueing
114+ # Minimal validation here; deeper validation in the service
115+ if not request .sender_id or not request .receiver_id or not request .payload :
116+ raise ValueError ("Invalid A2A message structure" )
117+
118+ background_tasks .add_task (a2a_service .handle_message_relay , request )
119+ logger .info (
120+ f"A2A message from { request .sender_id } to { request .receiver_id } "
121+ f"queued for relay."
122+ )
123+ return {"status" : "Message relay accepted" }
124+ except ValueError as ve :
125+ logger .warning (f"A2A message relay validation error: { ve } " )
126+ raise HTTPException (
127+ status_code = status .HTTP_400_BAD_REQUEST , detail = f"Message relay error: { ve } "
128+ ) from ve
129+ except Exception as e :
130+ logger .exception (
131+ f"Error queueing A2A message relay from { request .sender_id } "
132+ f"to { request .receiver_id } "
133+ )
134+ raise HTTPException (
135+ status_code = status .HTTP_500_INTERNAL_SERVER_ERROR ,
136+ detail = f"Message relay failed: { e } " ,
137+ ) from e
138+
40139
41140@router .post ("/send" , response_model = A2AResponse )
42141async def send_a2a (
@@ -51,6 +150,7 @@ async def send_a2a(
51150 except Exception as exc :
52151 raise HTTPException (status_code = 500 , detail = str (exc )) from exc
53152
153+
54154@router .post ("/receive" , response_model = A2AResponse )
55155async def receive_a2a (
56156 request : Request ,
@@ -64,6 +164,7 @@ async def receive_a2a(
64164 except Exception as exc :
65165 raise HTTPException (status_code = 500 , detail = str (exc )) from exc
66166
167+
67168@router .get ("/status/{agent_id}" , response_model = A2AStatusResponse )
68169async def agent_status (agent_id : str ):
69170 """Google A2A: Get the current status of this agent."""
@@ -72,7 +173,11 @@ async def agent_status(agent_id: str):
72173 except Exception as exc :
73174 raise HTTPException (status_code = 500 , detail = str (exc )) from exc
74175
75- @router .get ("/error" , response_model = A2AErrorResponse )
76- async def error_response (error : str , code : int = None ):
176+
177+ @router .post ("/error" , response_model = A2AErrorResponse )
178+ async def error_response (error : str , code : Optional [int ] = None ):
77179 """Google A2A: Return a protocol-compliant error response."""
78- return await A2AService .error_response (error , code )
180+ try :
181+ return await A2AService .error_response (error , code )
182+ except Exception as exc :
183+ raise HTTPException (status_code = 500 , detail = str (exc )) from exc
0 commit comments