Skip to content

Commit e90b119

Browse files
google-genai-botcopybara-github
authored andcommitted
fix(a2a): Preserve execution metadata in final events
Preserve critical execution metadata (invocation_id, author, and event_id) in synthesized final A2A events to ensure clients can correctly correlate them with the preceding execution stream. PiperOrigin-RevId: 932423755
1 parent 71b936b commit e90b119

2 files changed

Lines changed: 97 additions & 0 deletions

File tree

src/google/adk/a2a/executor/a2a_agent_executor.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,8 +251,10 @@ async def _handle_request(
251251
)
252252

253253
task_result_aggregator = TaskResultAggregator()
254+
last_adk_event = None
254255
async with Aclosing(runner.run_async(**vars(run_request))) as agen:
255256
async for adk_event in agen:
257+
last_adk_event = adk_event
256258
for a2a_event in self._config.event_converter(
257259
adk_event,
258260
invocation_context,
@@ -270,6 +272,22 @@ async def _handle_request(
270272
task_result_aggregator.process_event(e)
271273
await event_queue.enqueue_event(e)
272274

275+
# Build metadata for final event to preserve invocation_id and event_id.
276+
final_metadata = {
277+
_get_adk_metadata_key('app_name'): runner.app_name,
278+
_get_adk_metadata_key('user_id'): run_request.user_id,
279+
_get_adk_metadata_key('session_id'): run_request.session_id,
280+
}
281+
if last_adk_event:
282+
for key, attr in [
283+
('invocation_id', 'invocation_id'),
284+
('author', 'author'),
285+
('event_id', 'id'),
286+
]:
287+
val = getattr(last_adk_event, attr, None)
288+
if val is not None:
289+
final_metadata[_get_adk_metadata_key(key)] = val
290+
273291
# publish the task result event - this is final
274292
if (
275293
task_result_aggregator.task_state == TaskState.working
@@ -287,6 +305,7 @@ async def _handle_request(
287305
artifact_id=platform_uuid.new_uuid(),
288306
parts=task_result_aggregator.task_status_message.parts,
289307
),
308+
metadata=final_metadata,
290309
)
291310
)
292311
# public the final status update event
@@ -299,6 +318,7 @@ async def _handle_request(
299318
).isoformat(),
300319
),
301320
context_id=context.context_id,
321+
metadata=final_metadata,
302322
final=True,
303323
)
304324
else:
@@ -312,6 +332,7 @@ async def _handle_request(
312332
message=task_result_aggregator.task_status_message,
313333
),
314334
context_id=context.context_id,
335+
metadata=final_metadata,
315336
final=True,
316337
)
317338

tests/unittests/a2a/executor/test_a2a_agent_executor.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1072,3 +1072,79 @@ async def mock_run_async(**kwargs):
10721072
assert (
10731073
modified_a2a_event in enqueued_events
10741074
), "The modified event should have been enqueued"
1075+
1076+
@pytest.mark.asyncio
1077+
async def test_handle_request_preserves_metadata_in_final_events(
1078+
self,
1079+
) -> None:
1080+
"""Test that final events preserve invocation_id, author, and event_id in metadata."""
1081+
# Setup context with task_id
1082+
self.mock_context.task_id = "test-task-id"
1083+
self.mock_context.context_id = "test-context-id"
1084+
1085+
# Setup detailed mocks
1086+
self.mock_request_converter.return_value = AgentRunRequest(
1087+
user_id="test-user",
1088+
session_id="test-session",
1089+
new_message=Mock(spec=Content),
1090+
run_config=Mock(spec=RunConfig),
1091+
)
1092+
1093+
# Mock session service
1094+
mock_session = Mock()
1095+
mock_session.id = "test-session"
1096+
self.mock_runner.session_service.get_session = AsyncMock(
1097+
return_value=mock_session
1098+
)
1099+
1100+
# Mock invocation context
1101+
mock_invocation_context = Mock()
1102+
self.mock_runner._new_invocation_context.return_value = (
1103+
mock_invocation_context
1104+
)
1105+
1106+
# Mock ADK event with specific metadata to preserve
1107+
mock_adk_event = Mock(spec=Event)
1108+
mock_adk_event.invocation_id = "test-invocation-id"
1109+
mock_adk_event.author = "test-author"
1110+
mock_adk_event.id = "test-event-id"
1111+
1112+
# Configure run_async to yield our mock ADK event
1113+
async def mock_run_async(**kwargs):
1114+
async for item in self._create_async_generator([mock_adk_event]):
1115+
yield item
1116+
1117+
self.mock_runner.run_async = mock_run_async
1118+
self.mock_event_converter.return_value = [Mock()]
1119+
1120+
with patch(
1121+
"google.adk.a2a.executor.a2a_agent_executor.TaskResultAggregator"
1122+
) as mock_aggregator_class:
1123+
mock_aggregator = Mock()
1124+
mock_aggregator.task_state = TaskState.completed
1125+
mock_aggregator.task_status_message = Mock(spec=Message)
1126+
mock_aggregator_class.return_value = mock_aggregator
1127+
1128+
# Execute
1129+
await self.executor._handle_request(
1130+
self.mock_context, self.mock_event_queue
1131+
)
1132+
1133+
# Verify final status event was published and has correct metadata
1134+
final_events = [
1135+
call[0][0]
1136+
for call in self.mock_event_queue.enqueue_event.call_args_list
1137+
if hasattr(call[0][0], "final") and call[0][0].final == True
1138+
]
1139+
assert len(final_events) >= 1
1140+
final_event = final_events[-1]
1141+
1142+
assert final_event.metadata is not None
1143+
assert (
1144+
final_event.metadata.get("adk_invocation_id") == "test-invocation-id"
1145+
)
1146+
assert final_event.metadata.get("adk_author") == "test-author"
1147+
assert final_event.metadata.get("adk_event_id") == "test-event-id"
1148+
assert final_event.metadata.get("adk_app_name") == "test-app"
1149+
assert final_event.metadata.get("adk_user_id") == "test-user"
1150+
assert final_event.metadata.get("adk_session_id") == "test-session"

0 commit comments

Comments
 (0)