Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions sentry_sdk/_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
self._record_lost_func = record_lost_func
self._running = True
self._lock = threading.Lock()
self._active: "threading.local" = threading.local()

self._flush_event: "threading.Event" = threading.Event()

Expand Down Expand Up @@ -70,23 +71,40 @@
return True

def _flush_loop(self) -> None:
# Mark the flush-loop thread as active for its entire lifetime so
# that any re-entrant add() triggered by GC warnings during wait(),
# flush(), or Event operations is silently dropped instead of
# deadlocking on internal locks.
self._active.flag = True
while self._running:
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
self._flush_event.clear()
self._flush()

def add(self, item: "T") -> None:
if not self._ensure_thread() or self._flusher is None:
# Bail out if the current thread is already executing batcher code.
# This prevents deadlocks when code running inside the batcher (e.g.
# _add_to_envelope during flush, or _flush_event.wait/set) triggers
# a GC-emitted warning that routes back through the logging
# integration into add().
if getattr(self._active, "flag", False):
return None

with self._lock:
if len(self._buffer) >= self.MAX_BEFORE_DROP:
self._record_lost(item)
self._active.flag = True
try:
if not self._ensure_thread() or self._flusher is None:
return None

self._buffer.append(item)
if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
with self._lock:
if len(self._buffer) >= self.MAX_BEFORE_DROP:
self._record_lost(item)
return None

self._buffer.append(item)
if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
Comment thread
sentry[bot] marked this conversation as resolved.
self._flush_event.set()
finally:
self._active.flag = False

def kill(self) -> None:
if self._flusher is None:
Expand All @@ -96,8 +114,12 @@
self._flush_event.set()
self._flusher = None

def flush(self) -> None:
self._flush()
self._active.flag = True
try:
self._flush()
finally:
self._active.flag = False

Check warning on line 122 in sentry_sdk/_batcher.py

View check run for this annotation

@sentry/warden / warden: find-bugs

flush() missing re-entry guard check

The `flush()` method sets `_active.flag = True` without first checking if it's already set. Unlike `add()` which guards with `if getattr(self._active, "flag", False): return None`, `flush()` unconditionally sets the flag and proceeds. If `_flush()` triggers a GC warning that routes back through the logging integration to `flush()` (instead of `add()`), the nested `flush()` will execute, and its finally block will set `_active.flag = False` prematurely, leaving subsequent re-entrant calls unprotected.
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated

def _add_to_envelope(self, envelope: "Envelope") -> None:
envelope.add_item(
Expand Down
52 changes: 33 additions & 19 deletions sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,36 +43,50 @@ def __init__(
self._record_lost_func = record_lost_func
self._running = True
self._lock = threading.Lock()
self._active: "threading.local" = threading.local()

self._flush_event: "threading.Event" = threading.Event()

self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None

def add(self, span: "StreamedSpan") -> None:
if not self._ensure_thread() or self._flusher is None:
# Bail out if the current thread is already executing batcher code.
# This prevents deadlocks when code running inside the batcher (e.g.
# _add_to_envelope during flush, or _flush_event.wait/set) triggers
# a GC-emitted warning that routes back through the logging
# integration into add().
if getattr(self._active, "flag", False):
return None

with self._lock:
size = len(self._span_buffer[span.trace_id])
if size >= self.MAX_BEFORE_DROP:
self._record_lost_func(
reason="queue_overflow",
data_category="span",
quantity=1,
)
return None

self._span_buffer[span.trace_id].append(span)
self._running_size[span.trace_id] += self._estimate_size(span)
self._active.flag = True

if size + 1 >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
return
try:
if not self._ensure_thread() or self._flusher is None:
return None

if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
self._flush_event.set()
return
with self._lock:
size = len(self._span_buffer[span.trace_id])
if size >= self.MAX_BEFORE_DROP:
self._record_lost_func(
reason="queue_overflow",
data_category="span",
quantity=1,
)
return None

self._span_buffer[span.trace_id].append(span)
self._running_size[span.trace_id] += self._estimate_size(span)

if size + 1 >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
return

if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
self._flush_event.set()
return
finally:
self._active.flag = False

@staticmethod
def _estimate_size(item: "StreamedSpan") -> int:
Expand Down
Loading