Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions drift/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
load_tusk_config,
)
from .core.logger import LogLevel, get_log_level, set_log_level
from .core.metrics import SDKMetrics, get_sdk_metrics
from .core.resilience import CircuitBreaker, CircuitBreakerConfig, RetryConfig
from .core.tracing.adapters import (
ApiSpanAdapter,
ApiSpanAdapterConfig,
Expand Down Expand Up @@ -59,6 +61,13 @@
"LogLevel",
"set_log_level",
"get_log_level",
# Metrics
"SDKMetrics",
"get_sdk_metrics",
# Resilience
"RetryConfig",
"CircuitBreaker",
"CircuitBreakerConfig",
# Instrumentations
"FlaskInstrumentation",
"FastAPIInstrumentation",
Expand Down
34 changes: 33 additions & 1 deletion drift/core/batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
import asyncio
import logging
import threading
import time
from collections import deque
from dataclasses import dataclass
from typing import TYPE_CHECKING

from .metrics import get_metrics_collector

if TYPE_CHECKING:
from .tracing.span_exporter import TdSpanExporter
from .types import CleanSpanData
Expand Down Expand Up @@ -63,6 +66,10 @@ def __init__(
self._started = False
self._dropped_spans = 0

# Set up metrics
self._metrics = get_metrics_collector()
self._metrics.set_queue_max_size(self._config.max_queue_size)

def start(self) -> None:
"""Start the background export thread."""
if self._started:
Expand Down Expand Up @@ -110,18 +117,36 @@ def add_span(self, span: CleanSpanData) -> bool:
span: The span to add

Returns:
True if span was added, False if queue is full and span was dropped
True if span was added, False if queue is full or trace is blocked
"""
# Check if span should be blocked (size limit or server error)
# Blocks entire trace
from .trace_blocking_manager import TraceBlockingManager, should_block_span

if should_block_span(span):
self._dropped_spans += 1
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
Outdated
self._metrics.record_spans_dropped()
return False

# Check if trace is already blocked
if TraceBlockingManager.get_instance().is_trace_blocked(span.trace_id):
logger.debug(f"Skipping span '{span.name}' - trace {span.trace_id} is blocked")
self._dropped_spans += 1
self._metrics.record_spans_dropped()
return False

with self._condition:
if len(self._queue) >= self._config.max_queue_size:
self._dropped_spans += 1
self._metrics.record_spans_dropped()
logger.warning(
f"Span queue full ({self._config.max_queue_size}), dropping span. "
f"Total dropped: {self._dropped_spans}"
)
return False

self._queue.append(span)
self._metrics.update_queue_size(len(self._queue))

# Trigger immediate export if batch size reached
if len(self._queue) >= self._config.max_export_batch_size:
Expand Down Expand Up @@ -149,6 +174,7 @@ def _export_batch(self) -> None:
with self._condition:
while self._queue and len(batch) < self._config.max_export_batch_size:
batch.append(self._queue.popleft())
self._metrics.update_queue_size(len(self._queue))

if not batch:
return
Expand All @@ -158,6 +184,7 @@ def _export_batch(self) -> None:

# Export to all adapters
for adapter in adapters:
start_time = time.monotonic()
try:
# Handle async adapters (create new event loop for this thread)
if asyncio.iscoroutinefunction(adapter.export_spans):
Expand All @@ -170,8 +197,13 @@ def _export_batch(self) -> None:
else:
adapter.export_spans(batch) # type: ignore

latency_ms = (time.monotonic() - start_time) * 1000
self._metrics.record_spans_exported(len(batch))
self._metrics.record_export_latency(latency_ms)

except Exception as e:
logger.error(f"Failed to export batch via {adapter.name}: {e}")
self._metrics.record_spans_failed(len(batch))

def _force_flush(self) -> None:
"""Force export all remaining spans in the queue."""
Expand Down
Loading