55import asyncio
66import logging
77import threading
8+ import time
89from collections import deque
910from dataclasses import dataclass
1011from typing import TYPE_CHECKING
1112
13+ from .metrics import get_metrics_collector
14+
1215if TYPE_CHECKING :
1316 from .tracing .span_exporter import TdSpanExporter
1417 from .types import CleanSpanData
@@ -63,6 +66,10 @@ def __init__(
6366 self ._started = False
6467 self ._dropped_spans = 0
6568
69+ # Set up metrics
70+ self ._metrics = get_metrics_collector ()
71+ self ._metrics .set_queue_max_size (self ._config .max_queue_size )
72+
6673 def start (self ) -> None :
6774 """Start the background export thread."""
6875 if self ._started :
@@ -110,18 +117,38 @@ def add_span(self, span: CleanSpanData) -> bool:
110117 span: The span to add
111118
112119 Returns:
113- True if span was added, False if queue is full and span was dropped
120+ True if span was added, False if queue is full or trace is blocked
114121 """
122+ from .trace_blocking_manager import TraceBlockingManager , should_block_span
123+
124+ # Check blocking conditions outside lock (read-only checks)
125+ is_blocked = should_block_span (span )
126+ is_trace_blocked = TraceBlockingManager .get_instance ().is_trace_blocked (span .trace_id )
127+
115128 with self ._condition :
129+ # Handle blocked spans (increment counter under lock)
130+ if is_blocked :
131+ self ._dropped_spans += 1
132+ self ._metrics .record_spans_dropped ()
133+ return False
134+
135+ if is_trace_blocked :
136+ logger .debug (f"Skipping span '{ span .name } ' - trace { span .trace_id } is blocked" )
137+ self ._dropped_spans += 1
138+ self ._metrics .record_spans_dropped ()
139+ return False
140+
116141 if len (self ._queue ) >= self ._config .max_queue_size :
117142 self ._dropped_spans += 1
143+ self ._metrics .record_spans_dropped ()
118144 logger .warning (
119145 f"Span queue full ({ self ._config .max_queue_size } ), dropping span. "
120146 f"Total dropped: { self ._dropped_spans } "
121147 )
122148 return False
123149
124150 self ._queue .append (span )
151+ self ._metrics .update_queue_size (len (self ._queue ))
125152
126153 # Trigger immediate export if batch size reached
127154 if len (self ._queue ) >= self ._config .max_export_batch_size :
@@ -149,6 +176,7 @@ def _export_batch(self) -> None:
149176 with self ._condition :
150177 while self ._queue and len (batch ) < self ._config .max_export_batch_size :
151178 batch .append (self ._queue .popleft ())
179+ self ._metrics .update_queue_size (len (self ._queue ))
152180
153181 if not batch :
154182 return
@@ -158,6 +186,7 @@ def _export_batch(self) -> None:
158186
159187 # Export to all adapters
160188 for adapter in adapters :
189+ start_time = time .monotonic ()
161190 try :
162191 # Handle async adapters (create new event loop for this thread)
163192 if asyncio .iscoroutinefunction (adapter .export_spans ):
@@ -170,8 +199,13 @@ def _export_batch(self) -> None:
170199 else :
171200 adapter .export_spans (batch ) # type: ignore
172201
202+ latency_ms = (time .monotonic () - start_time ) * 1000
203+ self ._metrics .record_spans_exported (len (batch ))
204+ self ._metrics .record_export_latency (latency_ms )
205+
173206 except Exception as e :
174207 logger .error (f"Failed to export batch via { adapter .name } : { e } " )
208+ self ._metrics .record_spans_failed (len (batch ))
175209
176210 def _force_flush (self ) -> None :
177211 """Force export all remaining spans in the queue."""
0 commit comments