55import asyncio
66import logging
77import threading
8+ import time
89from collections import deque
910from dataclasses import dataclass
1011from typing import TYPE_CHECKING
1112
13+ from .metrics import get_metrics_collector
14+
1215if TYPE_CHECKING :
1316 from .tracing .span_exporter import TdSpanExporter
1417 from .types import CleanSpanData
@@ -63,6 +66,10 @@ def __init__(
6366 self ._started = False
6467 self ._dropped_spans = 0
6568
69+ # Set up metrics
70+ self ._metrics = get_metrics_collector ()
71+ self ._metrics .set_queue_max_size (self ._config .max_queue_size )
72+
6673 def start (self ) -> None :
6774 """Start the background export thread."""
6875 if self ._started :
@@ -110,18 +117,36 @@ def add_span(self, span: CleanSpanData) -> bool:
110117 span: The span to add
111118
112119 Returns:
113- True if span was added, False if queue is full and span was dropped
120+ True if span was added, False if queue is full or trace is blocked
114121 """
122+ # Check if span should be blocked (size limit or server error)
123+ # Blocks entire trace
124+ from .trace_blocking_manager import TraceBlockingManager , should_block_span
125+
126+ if should_block_span (span ):
127+ self ._dropped_spans += 1
128+ self ._metrics .record_spans_dropped ()
129+ return False
130+
131+ # Check if trace is already blocked
132+ if TraceBlockingManager .get_instance ().is_trace_blocked (span .trace_id ):
133+ logger .debug (f"Skipping span '{ span .name } ' - trace { span .trace_id } is blocked" )
134+ self ._dropped_spans += 1
135+ self ._metrics .record_spans_dropped ()
136+ return False
137+
115138 with self ._condition :
116139 if len (self ._queue ) >= self ._config .max_queue_size :
117140 self ._dropped_spans += 1
141+ self ._metrics .record_spans_dropped ()
118142 logger .warning (
119143 f"Span queue full ({ self ._config .max_queue_size } ), dropping span. "
120144 f"Total dropped: { self ._dropped_spans } "
121145 )
122146 return False
123147
124148 self ._queue .append (span )
149+ self ._metrics .update_queue_size (len (self ._queue ))
125150
126151 # Trigger immediate export if batch size reached
127152 if len (self ._queue ) >= self ._config .max_export_batch_size :
@@ -149,6 +174,7 @@ def _export_batch(self) -> None:
149174 with self ._condition :
150175 while self ._queue and len (batch ) < self ._config .max_export_batch_size :
151176 batch .append (self ._queue .popleft ())
177+ self ._metrics .update_queue_size (len (self ._queue ))
152178
153179 if not batch :
154180 return
@@ -158,6 +184,7 @@ def _export_batch(self) -> None:
158184
159185 # Export to all adapters
160186 for adapter in adapters :
187+ start_time = time .monotonic ()
161188 try :
162189 # Handle async adapters (create new event loop for this thread)
163190 if asyncio .iscoroutinefunction (adapter .export_spans ):
@@ -170,8 +197,13 @@ def _export_batch(self) -> None:
170197 else :
171198 adapter .export_spans (batch ) # type: ignore
172199
200+ latency_ms = (time .monotonic () - start_time ) * 1000
201+ self ._metrics .record_spans_exported (len (batch ))
202+ self ._metrics .record_export_latency (latency_ms )
203+
173204 except Exception as e :
174205 logger .error (f"Failed to export batch via { adapter .name } : { e } " )
206+ self ._metrics .record_spans_failed (len (batch ))
175207
176208 def _force_flush (self ) -> None :
177209 """Force export all remaining spans in the queue."""
0 commit comments