Skip to content

Commit c65dd55

Browse files
GWealecopybara-github
authored andcommitted
feat: Add OpenTelemetry tracing for event compaction
This change introduces a new helper function `_summarize_events_with_trace` to wrap the event summarization process. This function creates a trace span and records metadata about the compaction trigger, summarizer type, event count, and configuration parameters both before and after the summarization occurs. The existing compaction logic is updated to use this new traced function. Co-authored-by: George Weale <gweale@google.com> PiperOrigin-RevId: 905302520
1 parent b0b8b31 commit c65dd55

3 files changed

Lines changed: 287 additions & 4 deletions

File tree

src/google/adk/apps/compaction.py

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,47 @@
2222
from ..events.event import Event
2323
from ..sessions.base_session_service import BaseSessionService
2424
from ..sessions.session import Session
25+
from ..telemetry.tracing import _build_compaction_attributes
26+
from ..telemetry.tracing import _build_compaction_result_attributes
27+
from ..telemetry.tracing import tracer
2528
from .app import App
2629
from .app import EventsCompactionConfig
2730
from .llm_event_summarizer import LlmEventSummarizer
2831

2932
logger = logging.getLogger('google_adk.' + __name__)
3033

3134

35+
async def _summarize_events_with_trace(
36+
*,
37+
session: Session,
38+
config: EventsCompactionConfig,
39+
events_to_compact: list[Event],
40+
trigger: str,
41+
) -> Event | None:
42+
"""Summarizes events within a trace span labeled for compaction."""
43+
if config.summarizer is None:
44+
return None
45+
46+
attributes = _build_compaction_attributes(
47+
session_id=session.id,
48+
trigger=trigger,
49+
summarizer_type=type(config.summarizer).__name__,
50+
event_count=len(events_to_compact),
51+
token_threshold=config.token_threshold,
52+
event_retention_size=config.event_retention_size,
53+
compaction_interval=config.compaction_interval,
54+
overlap_size=config.overlap_size,
55+
)
56+
57+
with tracer.start_as_current_span(f'compact_events {trigger}') as span:
58+
span.set_attributes(attributes)
59+
compaction_event = await config.summarizer.maybe_summarize_events(
60+
events=events_to_compact
61+
)
62+
span.set_attributes(_build_compaction_result_attributes(compaction_event))
63+
return compaction_event
64+
65+
3266
def _count_text_chars_in_content(content: types.Content | None) -> int:
3367
"""Returns the number of text characters in a content object."""
3468
total_chars = 0
@@ -383,8 +417,11 @@ async def _run_compaction_for_token_threshold_config(
383417
if config.summarizer is None:
384418
return False
385419

386-
compaction_event = await config.summarizer.maybe_summarize_events(
387-
events=events_to_compact
420+
compaction_event = await _summarize_events_with_trace(
421+
session=session,
422+
config=config,
423+
events_to_compact=events_to_compact,
424+
trigger='token_threshold',
388425
)
389426
if compaction_event:
390427
await session_service.append_event(session=session, event=compaction_event)
@@ -602,8 +639,11 @@ async def _run_compaction_for_sliding_window(
602639
if config.summarizer is None:
603640
return None
604641

605-
compaction_event = await config.summarizer.maybe_summarize_events(
606-
events=events_to_compact
642+
compaction_event = await _summarize_events_with_trace(
643+
session=session,
644+
config=config,
645+
events_to_compact=events_to_compact,
646+
trigger='sliding_window',
607647
)
608648
if compaction_event:
609649
await session_service.append_event(session=session, event=compaction_event)

src/google/adk/telemetry/tracing.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,58 @@ def trace_send_data(
445445
span.set_attribute('gcp.vertex.agent.data', '{}')
446446

447447

448+
def _build_compaction_attributes(
449+
*,
450+
session_id: str,
451+
trigger: str,
452+
summarizer_type: str,
453+
event_count: int,
454+
token_threshold: int | None = None,
455+
event_retention_size: int | None = None,
456+
compaction_interval: int | None = None,
457+
overlap_size: int | None = None,
458+
) -> dict[str, AttributeValue]:
459+
"""Builds span attributes for event compaction tracing."""
460+
attributes: dict[str, AttributeValue] = {
461+
GEN_AI_SYSTEM: _guess_gemini_system_name(),
462+
GEN_AI_OPERATION_NAME: 'compact_events',
463+
GEN_AI_CONVERSATION_ID: session_id,
464+
'gen_ai.compaction.trigger': trigger,
465+
'gen_ai.compaction.summarizer_type': summarizer_type,
466+
'gen_ai.compaction.event_count': event_count,
467+
}
468+
if token_threshold is not None:
469+
attributes['gen_ai.compaction.token_threshold'] = token_threshold
470+
if event_retention_size is not None:
471+
attributes['gen_ai.compaction.event_retention_size'] = event_retention_size
472+
if compaction_interval is not None:
473+
attributes['gen_ai.compaction.compaction_interval'] = compaction_interval
474+
if overlap_size is not None:
475+
attributes['gen_ai.compaction.overlap_size'] = overlap_size
476+
return attributes
477+
478+
479+
def _build_compaction_result_attributes(
480+
compacted_event: Event | None,
481+
) -> dict[str, AttributeValue]:
482+
"""Builds span attributes for compaction result."""
483+
if (
484+
compacted_event is None
485+
or compacted_event.actions is None
486+
or compacted_event.actions.compaction is None
487+
):
488+
return {}
489+
490+
attributes: dict[str, AttributeValue] = {}
491+
compaction = compacted_event.actions.compaction
492+
attributes['gen_ai.compaction.result_event_id'] = compacted_event.id
493+
if compaction.start_timestamp is not None:
494+
attributes['gen_ai.compaction.start_timestamp'] = compaction.start_timestamp
495+
if compaction.end_timestamp is not None:
496+
attributes['gen_ai.compaction.end_timestamp'] = compaction.end_timestamp
497+
return attributes
498+
499+
448500
def _build_llm_request_for_trace(llm_request: LlmRequest) -> dict[str, Any]:
449501
"""Builds a dictionary representation of the LLM request for tracing.
450502

tests/unittests/apps/test_compaction.py

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from google.adk.agents.base_agent import BaseAgent
2020
from google.adk.apps.app import App
2121
from google.adk.apps.app import EventsCompactionConfig
22+
from google.adk.apps.base_events_summarizer import BaseEventsSummarizer
2223
from google.adk.apps.compaction import _run_compaction_for_sliding_window
2324
import google.adk.apps.compaction as compaction_module
2425
from google.adk.apps.llm_event_summarizer import LlmEventSummarizer
@@ -31,10 +32,78 @@
3132
from google.genai import types
3233
from google.genai.types import Content
3334
from google.genai.types import Part
35+
from opentelemetry.sdk.trace import TracerProvider
36+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
37+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
3438
from pydantic import ValidationError
3539
import pytest
3640

3741

42+
class _StubSummarizer(BaseEventsSummarizer):
43+
44+
def __init__(self, compacted_event: Event | None):
45+
self._compacted_event = compacted_event
46+
47+
async def maybe_summarize_events(
48+
self, *, events: list[Event]
49+
) -> Event | None:
50+
del events
51+
return self._compacted_event
52+
53+
54+
def _create_trace_test_event(
55+
*,
56+
timestamp: float,
57+
invocation_id: str,
58+
text: str,
59+
prompt_token_count: int | None = None,
60+
) -> Event:
61+
usage_metadata = None
62+
if prompt_token_count is not None:
63+
usage_metadata = types.GenerateContentResponseUsageMetadata(
64+
prompt_token_count=prompt_token_count
65+
)
66+
return Event(
67+
timestamp=timestamp,
68+
invocation_id=invocation_id,
69+
author='user',
70+
content=Content(role='user', parts=[Part(text=text)]),
71+
usage_metadata=usage_metadata,
72+
)
73+
74+
75+
def _create_trace_compacted_event(
76+
*, start_ts: float, end_ts: float, summary_text: str
77+
) -> Event:
78+
compaction = EventCompaction(
79+
start_timestamp=start_ts,
80+
end_timestamp=end_ts,
81+
compacted_content=Content(role='model', parts=[Part(text=summary_text)]),
82+
)
83+
return Event(
84+
id='compacted-event-id',
85+
timestamp=end_ts,
86+
author='compactor',
87+
content=compaction.compacted_content,
88+
actions=EventActions(compaction=compaction),
89+
invocation_id='compacted-invocation-id',
90+
)
91+
92+
93+
@pytest.fixture
94+
def span_exporter(monkeypatch: pytest.MonkeyPatch) -> InMemorySpanExporter:
95+
tracer_provider = TracerProvider()
96+
span_exporter = InMemorySpanExporter()
97+
tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter))
98+
real_tracer = tracer_provider.get_tracer(__name__)
99+
monkeypatch.setattr(
100+
compaction_module.tracer,
101+
'start_as_current_span',
102+
real_tracer.start_as_current_span,
103+
)
104+
return span_exporter
105+
106+
38107
@pytest.mark.parametrize(
39108
'env_variables', ['GOOGLE_AI', 'VERTEX'], indirect=True
40109
)
@@ -923,6 +992,128 @@ def test_get_contents_compaction_at_beginning(self):
923992
actual_texts = [c.parts[0].text for c in result_contents]
924993
self.assertEqual(actual_texts, expected_texts)
925994

995+
996+
@pytest.mark.asyncio
997+
async def test_run_compaction_for_token_threshold_adds_summary_trace(
998+
span_exporter: InMemorySpanExporter,
999+
):
1000+
session = Session(
1001+
app_name='app',
1002+
user_id='user',
1003+
id='session-id',
1004+
events=[
1005+
_create_trace_test_event(
1006+
timestamp=1.0, invocation_id='inv1', text='e1'
1007+
),
1008+
_create_trace_test_event(
1009+
timestamp=2.0, invocation_id='inv2', text='e2'
1010+
),
1011+
_create_trace_test_event(
1012+
timestamp=3.0,
1013+
invocation_id='inv3',
1014+
text='e3',
1015+
prompt_token_count=100,
1016+
),
1017+
],
1018+
)
1019+
session_service = AsyncMock(spec=BaseSessionService)
1020+
compacted_event = _create_trace_compacted_event(
1021+
start_ts=1.0, end_ts=2.0, summary_text='summary'
1022+
)
1023+
summarizer = _StubSummarizer(compacted_event)
1024+
config = EventsCompactionConfig(
1025+
summarizer=summarizer,
1026+
compaction_interval=999,
1027+
overlap_size=0,
1028+
token_threshold=50,
1029+
event_retention_size=1,
1030+
)
1031+
1032+
compacted = (
1033+
await (
1034+
compaction_module._run_compaction_for_token_threshold_config(
1035+
config=config,
1036+
session=session,
1037+
session_service=session_service,
1038+
agent=Mock(spec=BaseAgent),
1039+
)
1040+
)
1041+
)
1042+
1043+
assert compacted is True
1044+
spans = span_exporter.get_finished_spans()
1045+
summary_span = next(
1046+
span for span in spans if span.name == 'compact_events token_threshold'
1047+
)
1048+
assert summary_span.attributes['gen_ai.conversation.id'] == 'session-id'
1049+
assert (
1050+
summary_span.attributes['gen_ai.compaction.trigger'] == 'token_threshold'
1051+
)
1052+
assert summary_span.attributes['gen_ai.compaction.event_count'] == 2
1053+
assert summary_span.attributes['gen_ai.compaction.token_threshold'] == 50
1054+
assert summary_span.attributes['gen_ai.compaction.event_retention_size'] == 1
1055+
assert (
1056+
summary_span.attributes['gen_ai.compaction.result_event_id']
1057+
== 'compacted-event-id'
1058+
)
1059+
1060+
1061+
@pytest.mark.asyncio
1062+
async def test_run_compaction_for_sliding_window_adds_summary_trace(
1063+
span_exporter: InMemorySpanExporter,
1064+
):
1065+
compacted_event = _create_trace_compacted_event(
1066+
start_ts=1.0, end_ts=4.0, summary_text='summary'
1067+
)
1068+
summarizer = _StubSummarizer(compacted_event)
1069+
app = App(
1070+
name='test',
1071+
root_agent=Mock(spec=BaseAgent),
1072+
events_compaction_config=EventsCompactionConfig(
1073+
summarizer=summarizer,
1074+
compaction_interval=2,
1075+
overlap_size=1,
1076+
),
1077+
)
1078+
session = Session(
1079+
app_name='test',
1080+
user_id='u1',
1081+
id='session-id',
1082+
events=[
1083+
_create_trace_test_event(
1084+
timestamp=1.0, invocation_id='inv1', text='e1'
1085+
),
1086+
_create_trace_test_event(
1087+
timestamp=2.0, invocation_id='inv2', text='e2'
1088+
),
1089+
_create_trace_test_event(
1090+
timestamp=3.0, invocation_id='inv3', text='e3'
1091+
),
1092+
_create_trace_test_event(
1093+
timestamp=4.0, invocation_id='inv4', text='e4'
1094+
),
1095+
],
1096+
)
1097+
session_service = AsyncMock(spec=BaseSessionService)
1098+
1099+
await _run_compaction_for_sliding_window(app, session, session_service)
1100+
1101+
spans = span_exporter.get_finished_spans()
1102+
summary_span = next(
1103+
span for span in spans if span.name == 'compact_events sliding_window'
1104+
)
1105+
assert summary_span.attributes['gen_ai.conversation.id'] == 'session-id'
1106+
assert (
1107+
summary_span.attributes['gen_ai.compaction.trigger'] == 'sliding_window'
1108+
)
1109+
assert summary_span.attributes['gen_ai.compaction.event_count'] == 4
1110+
assert summary_span.attributes['gen_ai.compaction.compaction_interval'] == 2
1111+
assert summary_span.attributes['gen_ai.compaction.overlap_size'] == 1
1112+
assert (
1113+
summary_span.attributes['gen_ai.compaction.result_event_id']
1114+
== 'compacted-event-id'
1115+
)
1116+
9261117
async def test_sliding_window_excludes_pending_function_call_events(self):
9271118
"""Sliding-window compaction stops before pending function calls."""
9281119
app = App(

0 commit comments

Comments
 (0)