1616
1717package com .google .adk .web .service ;
1818
19+ import com .google .errorprone .annotations .CanIgnoreReturnValue ;
1920import io .opentelemetry .api .common .AttributeKey ;
2021import io .opentelemetry .sdk .common .CompletableResultCode ;
2122import io .opentelemetry .sdk .trace .data .SpanData ;
2223import io .opentelemetry .sdk .trace .export .SpanExporter ;
24+ import java .util .ArrayDeque ;
2325import java .util .ArrayList ;
2426import java .util .Collection ;
25- import java .util .Collections ;
27+ import java .util .Deque ;
2628import java .util .HashMap ;
2729import java .util .List ;
2830import java .util .Map ;
29- import java .util .concurrent .ConcurrentHashMap ;
3031import org .slf4j .Logger ;
3132import org .slf4j .LoggerFactory ;
3233
4142public class ApiServerSpanExporter implements SpanExporter {
4243 private static final Logger exporterLog = LoggerFactory .getLogger (ApiServerSpanExporter .class );
4344
44- private final Map <String , Map <String , Object >> eventIdTraceStorage = new ConcurrentHashMap <>();
45+ private final ApiServerSpanExporterConfig config ;
46+
47+ private final Map <String , Integer > eventIdRefCount = new HashMap <>();
48+ private final Map <String , Map <String , Object >> eventIdTraceStorage = new HashMap <>();
4549
4650 // Session ID -> Trace IDs -> Trace Object
47- private final Map <String , List <String >> sessionToTraceIdsMap = new ConcurrentHashMap <>();
51+ private final Map <String , List <String >> sessionToTraceIdsMap = new HashMap <>();
52+
53+ private final Deque <SpanData > allExportedSpans = new ArrayDeque <>();
4854
49- private final List <SpanData > allExportedSpans = Collections .synchronizedList (new ArrayList <>());
55+ public ApiServerSpanExporter () {
56+ this (ApiServerSpanExporterConfig .builder ().build ());
57+ }
5058
51- public ApiServerSpanExporter () {}
59+ public ApiServerSpanExporter (ApiServerSpanExporterConfig config ) {
60+ this .config = config ;
61+ }
5262
5363 public Map <String , Object > getEventTraceAttributes (String eventId ) {
54- return this .eventIdTraceStorage .get (eventId );
64+ synchronized (allExportedSpans ) {
65+ return this .eventIdTraceStorage .get (eventId );
66+ }
5567 }
5668
5769 public Map <String , List <String >> getSessionToTraceIdsMap () {
58- return this .sessionToTraceIdsMap ;
70+ synchronized (allExportedSpans ) {
71+ return new HashMap <>(this .sessionToTraceIdsMap );
72+ }
5973 }
6074
6175 public List <SpanData > getAllExportedSpans () {
62- return this .allExportedSpans ;
76+ synchronized (allExportedSpans ) {
77+ return new ArrayList <>(this .allExportedSpans );
78+ }
6379 }
6480
6581 @ Override
82+ @ CanIgnoreReturnValue
6683 public CompletableResultCode export (Collection <SpanData > spans ) {
6784 exporterLog .debug ("ApiServerSpanExporter received {} spans to export." , spans .size ());
68- List <SpanData > currentBatch = new ArrayList <>(spans );
69- allExportedSpans .addAll (currentBatch );
7085
71- for (SpanData span : currentBatch ) {
72- String spanName = span .getName ();
86+ synchronized (allExportedSpans ) {
87+ for (SpanData span : spans ) {
88+ if (config .maxSpansToKeep ().isPresent ()
89+ && allExportedSpans .size () >= config .maxSpansToKeep ().get ()) {
90+ SpanData evicted = allExportedSpans .pollFirst ();
91+ if (evicted != null ) {
92+ handleEviction (evicted );
93+ }
94+ }
95+ allExportedSpans .addLast (span );
96+ handleAddition (span );
97+ }
98+ }
99+ return CompletableResultCode .ofSuccess ();
100+ }
101+
102+ private void handleAddition (SpanData span ) {
103+ String spanName = span .getName ();
104+ String eventId = span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.event_id" ));
105+ if (eventId != null && !eventId .isEmpty ()) {
106+ eventIdRefCount .merge (eventId , 1 , Integer ::sum );
73107 if ("call_llm" .equals (spanName )
74108 || "send_data" .equals (spanName )
75109 || (spanName != null && spanName .startsWith ("tool_response" ))) {
76- String eventId =
77- span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.event_id" ));
78- if (eventId != null && !eventId .isEmpty ()) {
79- Map <String , Object > attributesMap = new HashMap <>();
80- span .getAttributes ().forEach ((key , value ) -> attributesMap .put (key .getKey (), value ));
81- attributesMap .put ("trace_id" , span .getSpanContext ().getTraceId ());
82- attributesMap .put ("span_id" , span .getSpanContext ().getSpanId ());
83- attributesMap .putIfAbsent ("gcp.vertex.agent.event_id" , eventId );
84- exporterLog .debug ("Storing event-based trace attributes for event_id: {}" , eventId );
85- this .eventIdTraceStorage .put (eventId , attributesMap ); // Use internal storage
110+ Map <String , Object > attributesMap = new HashMap <>();
111+ span .getAttributes ().forEach ((key , value ) -> attributesMap .put (key .getKey (), value ));
112+ attributesMap .put ("trace_id" , span .getSpanContext ().getTraceId ());
113+ attributesMap .put ("span_id" , span .getSpanContext ().getSpanId ());
114+ attributesMap .putIfAbsent ("gcp.vertex.agent.event_id" , eventId );
115+ eventIdTraceStorage .put (eventId , attributesMap );
116+ }
117+ }
118+
119+ if ("call_llm" .equals (spanName )) {
120+ String sessionId =
121+ span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.session_id" ));
122+ if (sessionId != null && !sessionId .isEmpty ()) {
123+ sessionToTraceIdsMap
124+ .computeIfAbsent (sessionId , k -> new ArrayList <>())
125+ .add (span .getSpanContext ().getTraceId ());
126+ }
127+ }
128+ }
129+
130+ private void handleEviction (SpanData span ) {
131+ String spanName = span .getName ();
132+ String eventId = span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.event_id" ));
133+ if (eventId != null && !eventId .isEmpty ()) {
134+ Integer count = eventIdRefCount .get (eventId );
135+ if (count != null ) {
136+ if (count <= 1 ) {
137+ eventIdRefCount .remove (eventId );
138+ eventIdTraceStorage .remove (eventId );
86139 } else {
87- exporterLog .trace (
88- "Span {} for event-based trace did not have 'gcp.vertex.agent.event_id'"
89- + " attribute or it was empty." ,
90- spanName );
140+ eventIdRefCount .put (eventId , count - 1 );
91141 }
92142 }
143+ }
93144
94- if ("call_llm" .equals (spanName )) {
95- String sessionId =
96- span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.session_id" ));
97- if (sessionId != null && !sessionId .isEmpty ()) {
98- String traceId = span .getSpanContext ().getTraceId ();
99- sessionToTraceIdsMap
100- .computeIfAbsent (sessionId , k -> Collections .synchronizedList (new ArrayList <>()))
101- .add (traceId );
102- exporterLog .trace (
103- "Associated trace_id {} with session_id {} for session tracing" , traceId , sessionId );
104- } else {
105- exporterLog .trace (
106- "Span {} for session trace did not have 'gcp.vertex.agent.session_id' attribute." ,
107- spanName );
145+ if ("call_llm" .equals (spanName )) {
146+ String sessionId =
147+ span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.session_id" ));
148+ if (sessionId != null && !sessionId .isEmpty ()) {
149+ List <String > traceIds = sessionToTraceIdsMap .get (sessionId );
150+ if (traceIds != null ) {
151+ traceIds .remove (span .getSpanContext ().getTraceId ());
152+ if (traceIds .isEmpty ()) {
153+ sessionToTraceIdsMap .remove (sessionId );
154+ }
108155 }
109156 }
110157 }
111- return CompletableResultCode .ofSuccess ();
112158 }
113159
114160 @ Override
@@ -117,9 +163,15 @@ public CompletableResultCode flush() {
117163 }
118164
119165 @ Override
166+ @ CanIgnoreReturnValue
120167 public CompletableResultCode shutdown () {
121168 exporterLog .debug ("Shutting down ApiServerSpanExporter." );
122- // no need to clear storage on shutdown, as everything is currently stored in memory.
169+ synchronized (allExportedSpans ) {
170+ allExportedSpans .clear ();
171+ eventIdRefCount .clear ();
172+ eventIdTraceStorage .clear ();
173+ sessionToTraceIdsMap .clear ();
174+ }
123175 return CompletableResultCode .ofSuccess ();
124176 }
125177}
0 commit comments