2020import io .opentelemetry .sdk .common .CompletableResultCode ;
2121import io .opentelemetry .sdk .trace .data .SpanData ;
2222import io .opentelemetry .sdk .trace .export .SpanExporter ;
23+ import java .util .ArrayDeque ;
2324import java .util .ArrayList ;
2425import java .util .Collection ;
25- import java .util .Collections ;
26+ import java .util .Deque ;
2627import java .util .HashMap ;
2728import java .util .List ;
2829import java .util .Map ;
29- import java .util .concurrent .ConcurrentHashMap ;
3030import org .slf4j .Logger ;
3131import org .slf4j .LoggerFactory ;
3232
4141public class ApiServerSpanExporter implements SpanExporter {
4242 private static final Logger exporterLog = LoggerFactory .getLogger (ApiServerSpanExporter .class );
4343
44- private final Map <String , Map <String , Object >> eventIdTraceStorage = new ConcurrentHashMap <>();
44+ private final ApiServerSpanExporterConfig config ;
45+
46+ private final Map <String , Integer > eventIdRefCount = new HashMap <>();
47+ private final Map <String , Map <String , Object >> eventIdTraceStorage = new HashMap <>();
4548
4649 // Session ID -> Trace IDs -> Trace Object
47- private final Map <String , List <String >> sessionToTraceIdsMap = new ConcurrentHashMap <>();
50+ private final Map <String , List <String >> sessionToTraceIdsMap = new HashMap <>();
51+
52+ private final Deque <SpanData > allExportedSpans = new ArrayDeque <>();
4853
49- private final List <SpanData > allExportedSpans = Collections .synchronizedList (new ArrayList <>());
54+ public ApiServerSpanExporter () {
55+ this (ApiServerSpanExporterConfig .builder ().build ());
56+ }
5057
51- public ApiServerSpanExporter () {}
58+ public ApiServerSpanExporter (ApiServerSpanExporterConfig config ) {
59+ this .config = config ;
60+ }
5261
5362 public Map <String , Object > getEventTraceAttributes (String eventId ) {
54- return this .eventIdTraceStorage .get (eventId );
63+ synchronized (allExportedSpans ) {
64+ return this .eventIdTraceStorage .get (eventId );
65+ }
5566 }
5667
5768 public Map <String , List <String >> getSessionToTraceIdsMap () {
58- return this .sessionToTraceIdsMap ;
69+ synchronized (allExportedSpans ) {
70+ return new HashMap <>(this .sessionToTraceIdsMap );
71+ }
5972 }
6073
6174 public List <SpanData > getAllExportedSpans () {
62- return this .allExportedSpans ;
75+ synchronized (allExportedSpans ) {
76+ return new ArrayList <>(this .allExportedSpans );
77+ }
6378 }
6479
6580 @ Override
6681 public CompletableResultCode export (Collection <SpanData > spans ) {
6782 exporterLog .debug ("ApiServerSpanExporter received {} spans to export." , spans .size ());
68- List <SpanData > currentBatch = new ArrayList <>(spans );
69- allExportedSpans .addAll (currentBatch );
70-
71- for (SpanData span : currentBatch ) {
72- String spanName = span .getName ();
73- if ("call_llm" .equals (spanName )
74- || "send_data" .equals (spanName )
75- || (spanName != null && spanName .startsWith ("tool_response" ))) {
76- String eventId =
77- span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.event_id" ));
78- if (eventId != null && !eventId .isEmpty ()) {
79- Map <String , Object > attributesMap = new HashMap <>();
80- span .getAttributes ().forEach ((key , value ) -> attributesMap .put (key .getKey (), value ));
81- attributesMap .put ("trace_id" , span .getSpanContext ().getTraceId ());
82- attributesMap .put ("span_id" , span .getSpanContext ().getSpanId ());
83- attributesMap .putIfAbsent ("gcp.vertex.agent.event_id" , eventId );
84- exporterLog .debug ("Storing event-based trace attributes for event_id: {}" , eventId );
85- this .eventIdTraceStorage .put (eventId , attributesMap ); // Use internal storage
86- } else {
87- exporterLog .trace (
88- "Span {} for event-based trace did not have 'gcp.vertex.agent.event_id'"
89- + " attribute or it was empty." ,
90- spanName );
83+
84+ synchronized (allExportedSpans ) {
85+ for (SpanData span : spans ) {
86+ if (config .maxSpansToKeep ().isPresent ()
87+ && allExportedSpans .size () >= config .maxSpansToKeep ().get ()) {
88+ SpanData evicted = allExportedSpans .pollFirst ();
89+ if (evicted != null ) {
90+ handleEviction (evicted );
91+ }
9192 }
93+ allExportedSpans .addLast (span );
94+ handleAddition (span );
9295 }
96+ }
97+ return CompletableResultCode .ofSuccess ();
98+ }
9399
94- if ("call_llm" .equals (spanName )) {
95- String sessionId =
96- span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.session_id" ));
97- if (sessionId != null && !sessionId .isEmpty ()) {
98- String traceId = span .getSpanContext ().getTraceId ();
99- sessionToTraceIdsMap
100- .computeIfAbsent (sessionId , k -> Collections .synchronizedList (new ArrayList <>()))
101- .add (traceId );
102- exporterLog .trace (
103- "Associated trace_id {} with session_id {} for session tracing" , traceId , sessionId );
100+ private void handleAddition (SpanData span ) {
101+ String spanName = span .getName ();
102+ String eventId = span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.event_id" ));
103+ boolean isEventTraceSpan =
104+ "call_llm" .equals (spanName )
105+ || "send_data" .equals (spanName )
106+ || (spanName != null && spanName .startsWith ("tool_response" ));
107+ if (eventId != null && !eventId .isEmpty ()) {
108+ eventIdRefCount .merge (eventId , 1 , Integer ::sum );
109+ if (isEventTraceSpan ) {
110+ Map <String , Object > attributesMap = new HashMap <>();
111+ span .getAttributes ().forEach ((key , value ) -> attributesMap .put (key .getKey (), value ));
112+ attributesMap .put ("trace_id" , span .getSpanContext ().getTraceId ());
113+ attributesMap .put ("span_id" , span .getSpanContext ().getSpanId ());
114+ attributesMap .putIfAbsent ("gcp.vertex.agent.event_id" , eventId );
115+ exporterLog .debug ("Storing event-based trace attributes for event_id: {}" , eventId );
116+ eventIdTraceStorage .put (eventId , attributesMap );
117+ }
118+ } else if (isEventTraceSpan ) {
119+ exporterLog .trace (
120+ "Span {} for event-based trace did not have 'gcp.vertex.agent.event_id'"
121+ + " attribute or it was empty." ,
122+ spanName );
123+ }
124+
125+ if ("call_llm" .equals (spanName )) {
126+ String sessionId =
127+ span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.session_id" ));
128+ if (sessionId != null && !sessionId .isEmpty ()) {
129+ String traceId = span .getSpanContext ().getTraceId ();
130+ sessionToTraceIdsMap .computeIfAbsent (sessionId , k -> new ArrayList <>()).add (traceId );
131+ exporterLog .trace (
132+ "Associated trace_id {} with session_id {} for session tracing" , traceId , sessionId );
133+ } else {
134+ exporterLog .trace (
135+ "Span {} for session trace did not have 'gcp.vertex.agent.session_id' attribute." ,
136+ spanName );
137+ }
138+ }
139+ }
140+
141+ private void handleEviction (SpanData span ) {
142+ String spanName = span .getName ();
143+ String eventId = span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.event_id" ));
144+ if (eventId != null && !eventId .isEmpty ()) {
145+ Integer count = eventIdRefCount .get (eventId );
146+ if (count != null ) {
147+ if (count <= 1 ) {
148+ eventIdRefCount .remove (eventId );
149+ eventIdTraceStorage .remove (eventId );
104150 } else {
105- exporterLog .trace (
106- "Span {} for session trace did not have 'gcp.vertex.agent.session_id' attribute." ,
107- spanName );
151+ eventIdRefCount .put (eventId , count - 1 );
152+ }
153+ }
154+ }
155+
156+ if ("call_llm" .equals (spanName )) {
157+ String sessionId =
158+ span .getAttributes ().get (AttributeKey .stringKey ("gcp.vertex.agent.session_id" ));
159+ if (sessionId != null && !sessionId .isEmpty ()) {
160+ List <String > traceIds = sessionToTraceIdsMap .get (sessionId );
161+ if (traceIds != null ) {
162+ traceIds .remove (span .getSpanContext ().getTraceId ());
163+ if (traceIds .isEmpty ()) {
164+ sessionToTraceIdsMap .remove (sessionId );
165+ }
108166 }
109167 }
110168 }
111- return CompletableResultCode .ofSuccess ();
112169 }
113170
114171 @ Override
@@ -119,7 +176,12 @@ public CompletableResultCode flush() {
119176 @ Override
120177 public CompletableResultCode shutdown () {
121178 exporterLog .debug ("Shutting down ApiServerSpanExporter." );
122- // no need to clear storage on shutdown, as everything is currently stored in memory.
179+ synchronized (allExportedSpans ) {
180+ allExportedSpans .clear ();
181+ eventIdRefCount .clear ();
182+ eventIdTraceStorage .clear ();
183+ sessionToTraceIdsMap .clear ();
184+ }
123185 return CompletableResultCode .ofSuccess ();
124186 }
125187}
0 commit comments