Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion drift/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
from .instrumentation.flask import FlaskInstrumentation
from .instrumentation.requests import RequestsInstrumentation
from .instrumentation.urllib3 import Urllib3Instrumentation
from .version import SDK_VERSION

__version__ = "0.1.0"
__version__ = SDK_VERSION

__all__ = [
# Core
Expand Down
21 changes: 13 additions & 8 deletions drift/core/batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(
self._config = config or BatchSpanProcessorConfig()
self._queue: deque[CleanSpanData] = deque(maxlen=self._config.max_queue_size)
self._lock = threading.Lock()
self._condition = threading.Condition(self._lock)
self._shutdown_event = threading.Event()
self._export_thread: threading.Thread | None = None
self._started = False
Expand Down Expand Up @@ -88,6 +89,9 @@ def stop(self, timeout: float | None = None) -> None:
return

self._shutdown_event.set()
# Wake up the export thread so it can see the shutdown event
with self._condition:
self._condition.notify_all()

if self._export_thread is not None:
self._export_thread.join(timeout=timeout or self._config.export_timeout_seconds)
Expand All @@ -108,7 +112,7 @@ def add_span(self, span: CleanSpanData) -> bool:
Returns:
True if span was added, False if queue is full and span was dropped
"""
with self._lock:
with self._condition:
if len(self._queue) >= self._config.max_queue_size:
self._dropped_spans += 1
logger.warning(
Expand All @@ -121,16 +125,17 @@ def add_span(self, span: CleanSpanData) -> bool:

# Trigger immediate export if batch size reached
if len(self._queue) >= self._config.max_export_batch_size:
# Signal export thread to wake up (if using condition variable)
pass
self._condition.notify()

return True

def _export_loop(self) -> None:
"""Background thread that periodically exports spans."""
while not self._shutdown_event.is_set():
# Wait for scheduled delay or shutdown
self._shutdown_event.wait(timeout=self._config.scheduled_delay_seconds)
# Wait for either: batch size reached, scheduled delay, or shutdown
with self._condition:
# Wait until batch is ready or timeout
self._condition.wait(timeout=self._config.scheduled_delay_seconds)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: _export_loop always sleeps even when the queue already meets the export threshold, so condition notifications delivered while the thread wasn’t waiting are lost and batches aren’t exported immediately.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At drift/core/batch_processor.py, line 138:

<comment>`_export_loop` always sleeps even when the queue already meets the export threshold, so condition notifications delivered while the thread wasn’t waiting are lost and batches aren’t exported immediately.</comment>

<file context>
@@ -121,16 +125,17 @@ def add_span(self, span: CleanSpanData) -> bool:
+            # Wait for either: batch size reached, scheduled delay, or shutdown
+            with self._condition:
+                # Wait until batch is ready or timeout
+                self._condition.wait(timeout=self._config.scheduled_delay_seconds)
 
             if self._shutdown_event.is_set():
</file context>
Suggested change
self._condition.wait(timeout=self._config.scheduled_delay_seconds)
if (
not self._shutdown_event.is_set()
and len(self._queue) < self._config.max_export_batch_size
):
self._condition.wait(timeout=self._config.scheduled_delay_seconds)


if self._shutdown_event.is_set():
break
Expand All @@ -141,7 +146,7 @@ def _export_batch(self) -> None:
"""Export a batch of spans from the queue."""
# Get batch of spans
batch: list[CleanSpanData] = []
with self._lock:
with self._condition:
while self._queue and len(batch) < self._config.max_export_batch_size:
batch.append(self._queue.popleft())

Expand Down Expand Up @@ -171,7 +176,7 @@ def _export_batch(self) -> None:
def _force_flush(self) -> None:
"""Force export all remaining spans in the queue."""
while True:
with self._lock:
with self._condition:
if not self._queue:
break

Expand All @@ -180,7 +185,7 @@ def _force_flush(self) -> None:
@property
def queue_size(self) -> int:
"""Get the current queue size."""
with self._lock:
with self._condition:
return len(self._queue)

@property
Expand Down
21 changes: 6 additions & 15 deletions drift/core/communication/communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ def connect_sync(
ConnectionError: If connection fails
TimeoutError: If connection times out
"""
logger.info("[CONNECT_SYNC] Starting synchronous connection")
# Determine address
if connection_info:
if "socketPath" in connection_info:
Expand Down Expand Up @@ -319,8 +318,6 @@ def connect_sync(
if response.success:
logger.debug("CLI acknowledged connection successfully")
self._connected = True
logger.info(f"[CONNECT_SYNC] Connection successful! Socket is: {self._socket}")
logger.info(f"[CONNECT_SYNC] _connected={self._connected}, is_connected={self.is_connected}")
else:
error_msg = response.error or "Unknown error"
raise ConnectionError(f"CLI rejected connection: {error_msg}")
Expand All @@ -336,8 +333,6 @@ def connect_sync(
finally:
calling_library_context.reset(context_token)

logger.info(f"[CONNECT_SYNC] Exiting connect_sync(). Socket still open: {self._socket is not None}")

async def disconnect(self) -> None:
"""Disconnect from CLI."""
self._cleanup()
Expand Down Expand Up @@ -509,8 +504,9 @@ async def send_unpatched_dependency_alert(

try:
await self._send_protobuf_message(sdk_message)
except Exception:
pass # Alerts are non-critical
except Exception as e:
# Alerts are non-critical, just log at debug level
logger.debug(f"Failed to send unpatched dependency alert: {e}")

async def _send_protobuf_message(self, message: SdkMessage) -> None:
"""Send a protobuf message to CLI."""
Expand Down Expand Up @@ -765,21 +761,16 @@ def _clean_span(self, data: Any) -> Any:

def _cleanup(self) -> None:
"""Clean up resources."""
import traceback

logger.warning("[CLEANUP] _cleanup() called! Stack trace:")
logger.warning("".join(traceback.format_stack()))

self._connected = False
self._session_id = None
self._incoming_buffer.clear()

if self._socket:
try:
logger.warning("[CLEANUP] Closing socket")
self._socket.close()
except Exception:
pass
except OSError as e:
# Socket may already be closed, which is fine
logger.debug(f"Error closing socket during cleanup: {e}")
self._socket = None

self._pending_requests.clear()
11 changes: 10 additions & 1 deletion drift/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,17 @@ def _parse_service_config(data: dict[str, Any]) -> ServiceConfig:

def _parse_recording_config(data: dict[str, Any]) -> RecordingConfig:
"""Parse recording configuration from raw dict."""
# Validate sampling_rate type
sampling_rate = data.get("sampling_rate")
if sampling_rate is not None and not isinstance(sampling_rate, (int, float)):
logger.warning(
f"Invalid 'sampling_rate' in config: expected number, got {type(sampling_rate).__name__}. "
"This value will be ignored."
)
sampling_rate = None

return RecordingConfig(
sampling_rate=data.get("sampling_rate"),
sampling_rate=sampling_rate,
export_spans=data.get("export_spans"),
enable_env_var_recording=data.get("enable_env_var_recording"),
enable_analytics=data.get("enable_analytics"),
Expand Down
18 changes: 4 additions & 14 deletions drift/core/drift_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,6 @@ def _init_communicator_for_replay(self) -> None:
self._is_connected_with_cli = False
# Don't raise - allow Django to start even if CLI isn't ready yet

logger.info(f"[INIT_COMPLETE] SDK initialization finished. Connected to CLI: {self._is_connected_with_cli}")
if self.communicator and self.communicator.is_connected:
logger.info("[INIT_COMPLETE] Communicator reports connected: True")
else:
logger.info("[INIT_COMPLETE] Communicator reports connected: False")

def _init_auto_instrumentations(self) -> None:
"""Initialize instrumentations."""
try:
Expand Down Expand Up @@ -645,10 +639,6 @@ def request_mock_sync(self, mock_request: MockRequestInput) -> MockResponseOutpu
logger.error("Requesting sync mock but CLI is not ready yet")
return MockResponseOutput(found=False, error="CLI not connected yet")

if not self._is_connected_with_cli:
logger.error("Requesting sync mock but CLI is not ready yet")
raise RuntimeError("Requesting sync mock but CLI is not ready yet")

try:
logger.debug(f"Sending protobuf request to CLI (sync) {mock_request.test_id}")
response = self.communicator.request_mock_sync(mock_request)
Expand Down Expand Up @@ -685,8 +675,8 @@ async def send_instrumentation_version_mismatch_alert(
await self.communicator.send_instrumentation_version_mismatch_alert(
module_name, requested_version, supported_versions
)
except Exception:
pass
except Exception as e:
logger.debug(f"Failed to send version mismatch alert: {e}")

async def send_unpatched_dependency_alert(
self,
Expand All @@ -699,8 +689,8 @@ async def send_unpatched_dependency_alert(

try:
await self.communicator.send_unpatched_dependency_alert(stack_trace, trace_test_server_span_id)
except Exception:
pass
except Exception as e:
logger.debug(f"Failed to send unpatched dependency alert: {e}")

def get_sampling_rate(self) -> float:
"""Get the current sampling rate."""
Expand Down
9 changes: 4 additions & 5 deletions drift/core/sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

from __future__ import annotations

import logging
import random

logger = logging.getLogger(__name__)


def should_sample(sampling_rate: float, is_app_ready: bool) -> bool:
"""
Expand Down Expand Up @@ -40,12 +43,8 @@ def validate_sampling_rate(rate: float | None, source: str = "config") -> float
if rate is None:
return None

if not isinstance(rate, (int, float)):
print(f"Warning: Invalid sampling rate from {source}: not a number. Ignoring.")
return None

if rate < 0.0 or rate > 1.0:
print(f"Warning: Invalid sampling rate from {source}: {rate}. Must be between 0.0 and 1.0. Ignoring.")
logger.warning(f"Invalid sampling rate from {source}: {rate}. Must be between 0.0 and 1.0. Ignoring.")
return None

return float(rate)
8 changes: 2 additions & 6 deletions drift/instrumentation/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import sys
from collections.abc import Callable, Sequence
from types import ModuleType
from typing import override
from typing import TypeVar, override

PatchFn = Callable[[ModuleType], None]
T = TypeVar("T")

_registry: dict[str, PatchFn] = {}
_installed = False
Expand Down Expand Up @@ -91,11 +92,6 @@ def _apply_patch(module: ModuleType, patch_fn: PatchFn) -> None:
module.__drift_patched__ = True # type: ignore[attr-defined]


from typing import TypeVar

T = TypeVar("T")


def patch_instances_via_gc[T](class_type: type, patch_instance_fn: Callable[[T], None]) -> None:
"""Use gc to patch instances created before SDK initialization"""
for obj in gc.get_objects(): # pyright: ignore[reportAny]
Expand Down
8 changes: 6 additions & 2 deletions drift/version.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
"""Version information for the Drift Python SDK."""

# SDK version - should match package version
SDK_VERSION = "1.0.0"
import importlib.metadata

try:
SDK_VERSION = importlib.metadata.version("tusk-drift-python-sdk")
except importlib.metadata.PackageNotFoundError:
SDK_VERSION = "0.0.0.dev"

# Minimum CLI version required for this SDK
MIN_CLI_VERSION = "0.1.0"
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ dev = [
[project.urls]
Homepage = "https://usetusk.ai"
Documentation = "https://docs.usetusk.ai"
Repository = "https://github.com/Use-Tusk/drift-node-sdk"
Issues = "https://github.com/Use-Tusk/drift-node-sdk/issues"
Repository = "https://github.com/Use-Tusk/drift-python-sdk"
Issues = "https://github.com/Use-Tusk/drift-python-sdk/issues"

[tool.setuptools.packages.find]
where = ["."]
Expand Down