2020import io .opentelemetry .sdk .common .CompletableResultCode ;
2121import io .opentelemetry .sdk .trace .data .SpanData ;
2222import io .opentelemetry .sdk .trace .export .SpanExporter ;
23+ import java .util .ArrayDeque ;
2324import java .util .ArrayList ;
2425import java .util .Collection ;
25- import java .util .Collections ;
26+ import java .util .Deque ;
2627import java .util .HashMap ;
2728import java .util .List ;
2829import java .util .Map ;
29- import java .util .concurrent .ConcurrentHashMap ;
3030import org .slf4j .Logger ;
3131import org .slf4j .LoggerFactory ;
3232
4141public class ApiServerSpanExporter implements SpanExporter {
4242 private static final Logger exporterLog = LoggerFactory .getLogger (ApiServerSpanExporter .class );
4343
44- private final Map <String , Map <String , Object >> eventIdTraceStorage = new ConcurrentHashMap <>();
44+ private final ApiServerSpanExporterConfig config ;
45+
46+ private final Map <String , Integer > eventIdRefCount = new HashMap <>();
47+ private final Map <String , Map <String , Object >> eventIdTraceStorage = new HashMap <>();
4548
4649 // Session ID -> Trace IDs -> Trace Object
47- private final Map <String , List <String >> sessionToTraceIdsMap = new ConcurrentHashMap <>();
50+ private final Map <String , List <String >> sessionToTraceIdsMap = new HashMap <>();
51+
52+ private final Deque <SpanData > allExportedSpans = new ArrayDeque <>();
4853
49- private final List <SpanData > allExportedSpans = Collections .synchronizedList (new ArrayList <>());
54+ public ApiServerSpanExporter () {
55+ this (ApiServerSpanExporterConfig .builder ().build ());
56+ }
5057
51- public ApiServerSpanExporter () {}
58+ public ApiServerSpanExporter (ApiServerSpanExporterConfig config ) {
59+ this .config = config ;
60+ }
5261
5362 public Map <String , Object > getEventTraceAttributes (String eventId ) {
54- return this .eventIdTraceStorage .get (eventId );
63+ synchronized (allExportedSpans ) {
64+ return this .eventIdTraceStorage .get (eventId );
65+ }
5566 }
5667
5768 public Map <String , List <String >> getSessionToTraceIdsMap () {
58- return this .sessionToTraceIdsMap ;
69+ synchronized (allExportedSpans ) {
70+ return new HashMap <>(this .sessionToTraceIdsMap );
71+ }
5972 }
6073
6174 public List <SpanData > getAllExportedSpans () {
62- return this .allExportedSpans ;
75+ synchronized (allExportedSpans ) {
76+ return new ArrayList <>(this .allExportedSpans );
77+ }
6378 }
6479
6580 @ Override
6681 public CompletableResultCode export (Collection <SpanData > spans ) {
6782 exporterLog .debug ("ApiServerSpanExporter received {} spans to export." , spans .size ());
68- List <SpanData > currentBatch = new ArrayList <>(spans );
69- allExportedSpans .addAll (currentBatch );
7083
71- for (SpanData span : currentBatch ) {
72- String spanName = span .getName ();
84+ synchronized (allExportedSpans ) {
85+ for (SpanData span : spans ) {
86+ if (allExportedSpans .size () >= config .maxSpansToKeep ()) {
87+ SpanData evicted = allExportedSpans .pollFirst ();
88+ if (evicted != null ) {
89+ handleEviction (evicted );
90+ }
91+ }
92+ allExportedSpans .addLast (span );
93+ handleAddition (span );
94+ }
95+ }
96+ return CompletableResultCode .ofSuccess ();
97+ }
98+
99+ private void handleAddition (SpanData span ) {
100+ String spanName = span .getName ();
101+ String eventId = span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.event_id" ));
102+ if (eventId != null && !eventId .isEmpty ()) {
103+ eventIdRefCount .merge (eventId , 1 , Integer ::sum );
73104 if ("call_llm" .equals (spanName )
74105 || "send_data" .equals (spanName )
75106 || (spanName != null && spanName .startsWith ("tool_response" ))) {
76- String eventId =
77- span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.event_id" ));
78- if (eventId != null && !eventId .isEmpty ()) {
79- Map <String , Object > attributesMap = new HashMap <>();
80- span .getAttributes ().forEach ((key , value ) -> attributesMap .put (key .getKey (), value ));
81- attributesMap .put ("trace_id" , span .getSpanContext ().getTraceId ());
82- attributesMap .put ("span_id" , span .getSpanContext ().getSpanId ());
83- attributesMap .putIfAbsent ("gcp.vertex.agent.event_id" , eventId );
84- exporterLog .debug ("Storing event-based trace attributes for event_id: {}" , eventId );
85- this .eventIdTraceStorage .put (eventId , attributesMap ); // Use internal storage
107+ Map <String , Object > attributesMap = new HashMap <>();
108+ span .getAttributes ().forEach ((key , value ) -> attributesMap .put (key .getKey (), value ));
109+ attributesMap .put ("trace_id" , span .getSpanContext ().getTraceId ());
110+ attributesMap .put ("span_id" , span .getSpanContext ().getSpanId ());
111+ attributesMap .putIfAbsent ("gcp.vertex.agent.event_id" , eventId );
112+ eventIdTraceStorage .put (eventId , attributesMap );
113+ }
114+ }
115+
116+ if ("call_llm" .equals (spanName )) {
117+ String sessionId =
118+ span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.session_id" ));
119+ if (sessionId != null && !sessionId .isEmpty ()) {
120+ sessionToTraceIdsMap
121+ .computeIfAbsent (sessionId , k -> new ArrayList <>())
122+ .add (span .getSpanContext ().getTraceId ());
123+ }
124+ }
125+ }
126+
127+ private void handleEviction (SpanData span ) {
128+ String spanName = span .getName ();
129+ String eventId = span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.event_id" ));
130+ if (eventId != null && !eventId .isEmpty ()) {
131+ Integer count = eventIdRefCount .get (eventId );
132+ if (count != null ) {
133+ if (count <= 1 ) {
134+ eventIdRefCount .remove (eventId );
135+ eventIdTraceStorage .remove (eventId );
86136 } else {
87- exporterLog .trace (
88- "Span {} for event-based trace did not have 'gcp.vertex.agent.event_id'"
89- + " attribute or it was empty." ,
90- spanName );
137+ eventIdRefCount .put (eventId , count - 1 );
91138 }
92139 }
140+ }
93141
94- if ("call_llm" .equals (spanName )) {
95- String sessionId =
96- span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.session_id" ));
97- if (sessionId != null && !sessionId .isEmpty ()) {
98- String traceId = span .getSpanContext ().getTraceId ();
99- sessionToTraceIdsMap
100- .computeIfAbsent (sessionId , k -> Collections .synchronizedList (new ArrayList <>()))
101- .add (traceId );
102- exporterLog .trace (
103- "Associated trace_id {} with session_id {} for session tracing" , traceId , sessionId );
104- } else {
105- exporterLog .trace (
106- "Span {} for session trace did not have 'gcp.vertex.agent.session_id' attribute." ,
107- spanName );
142+ if ("call_llm" .equals (spanName )) {
143+ String sessionId =
144+ span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.session_id" ));
145+ if (sessionId != null && !sessionId .isEmpty ()) {
146+ List <String > traceIds = sessionToTraceIdsMap .get (sessionId );
147+ if (traceIds != null ) {
148+ traceIds .remove (span .getSpanContext ().getTraceId ());
149+ if (traceIds .isEmpty ()) {
150+ sessionToTraceIdsMap .remove (sessionId );
151+ }
108152 }
109153 }
110154 }
111- return CompletableResultCode .ofSuccess ();
112155 }
113156
114157 @ Override
@@ -119,7 +162,12 @@ public CompletableResultCode flush() {
119162 @ Override
120163 public CompletableResultCode shutdown () {
121164 exporterLog .debug ("Shutting down ApiServerSpanExporter." );
122- // no need to clear storage on shutdown, as everything is currently stored in memory.
165+ synchronized (allExportedSpans ) {
166+ allExportedSpans .clear ();
167+ eventIdRefCount .clear ();
168+ eventIdTraceStorage .clear ();
169+ sessionToTraceIdsMap .clear ();
170+ }
123171 return CompletableResultCode .ofSuccess ();
124172 }
125173}
0 commit comments