Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions drift/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
load_tusk_config,
)
from .core.logger import LogLevel, get_log_level, set_log_level
from .core.metrics import SDKMetrics, get_sdk_metrics
from .core.resilience import CircuitBreaker, CircuitBreakerConfig, RetryConfig
from .core.tracing.adapters import (
ApiSpanAdapter,
ApiSpanAdapterConfig,
Expand Down Expand Up @@ -59,6 +61,13 @@
"LogLevel",
"set_log_level",
"get_log_level",
# Metrics
"SDKMetrics",
"get_sdk_metrics",
# Resilience
"RetryConfig",
"CircuitBreaker",
"CircuitBreakerConfig",
# Instrumentations
"FlaskInstrumentation",
"FastAPIInstrumentation",
Expand Down
36 changes: 35 additions & 1 deletion drift/core/batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
import asyncio
import logging
import threading
import time
from collections import deque
from dataclasses import dataclass
from typing import TYPE_CHECKING

from .metrics import get_metrics_collector

if TYPE_CHECKING:
from .tracing.span_exporter import TdSpanExporter
from .types import CleanSpanData
Expand Down Expand Up @@ -63,6 +66,10 @@ def __init__(
self._started = False
self._dropped_spans = 0

# Set up metrics
self._metrics = get_metrics_collector()
self._metrics.set_queue_max_size(self._config.max_queue_size)

def start(self) -> None:
"""Start the background export thread."""
if self._started:
Expand Down Expand Up @@ -110,18 +117,38 @@ def add_span(self, span: CleanSpanData) -> bool:
span: The span to add

Returns:
True if span was added, False if queue is full and span was dropped
True if span was added, False if queue is full or trace is blocked
"""
from .trace_blocking_manager import TraceBlockingManager, should_block_span

# Check blocking conditions outside lock (read-only checks)
is_blocked = should_block_span(span)
is_trace_blocked = TraceBlockingManager.get_instance().is_trace_blocked(span.trace_id)

with self._condition:
# Handle blocked spans (increment counter under lock)
if is_blocked:
self._dropped_spans += 1
self._metrics.record_spans_dropped()
return False

if is_trace_blocked:
logger.debug(f"Skipping span '{span.name}' - trace {span.trace_id} is blocked")
self._dropped_spans += 1
self._metrics.record_spans_dropped()
return False

if len(self._queue) >= self._config.max_queue_size:
self._dropped_spans += 1
self._metrics.record_spans_dropped()
logger.warning(
f"Span queue full ({self._config.max_queue_size}), dropping span. "
f"Total dropped: {self._dropped_spans}"
)
return False

self._queue.append(span)
self._metrics.update_queue_size(len(self._queue))

# Trigger immediate export if batch size reached
if len(self._queue) >= self._config.max_export_batch_size:
Expand Down Expand Up @@ -149,6 +176,7 @@ def _export_batch(self) -> None:
with self._condition:
while self._queue and len(batch) < self._config.max_export_batch_size:
batch.append(self._queue.popleft())
self._metrics.update_queue_size(len(self._queue))

if not batch:
return
Expand All @@ -158,6 +186,7 @@ def _export_batch(self) -> None:

# Export to all adapters
for adapter in adapters:
start_time = time.monotonic()
try:
# Handle async adapters (create new event loop for this thread)
if asyncio.iscoroutinefunction(adapter.export_spans):
Expand All @@ -170,8 +199,13 @@ def _export_batch(self) -> None:
else:
adapter.export_spans(batch) # type: ignore

latency_ms = (time.monotonic() - start_time) * 1000
self._metrics.record_spans_exported(len(batch))
self._metrics.record_export_latency(latency_ms)

except Exception as e:
logger.error(f"Failed to export batch via {adapter.name}: {e}")
self._metrics.record_spans_failed(len(batch))

def _force_flush(self) -> None:
"""Force export all remaining spans in the queue."""
Expand Down
Loading