Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions sentry_sdk/_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
self._record_lost_func = record_lost_func
self._running = True
self._lock = threading.Lock()
self._active: "threading.local" = threading.local()

self._flush_event: "threading.Event" = threading.Event()

Expand Down Expand Up @@ -70,23 +71,40 @@
return True

def _flush_loop(self) -> None:
# Mark the flush-loop thread as active for its entire lifetime so
# that any re-entrant add() triggered by GC warnings during wait(),
# flush(), or Event operations is silently dropped instead of
# deadlocking on internal locks.
self._active.flag = True
while self._running:
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
self._flush_event.clear()
self._flush()

def add(self, item: "T") -> None:
if not self._ensure_thread() or self._flusher is None:
# Bail out if the current thread is already executing batcher code.
# This prevents deadlocks when code running inside the batcher (e.g.
# _add_to_envelope during flush, or _flush_event.wait/set) triggers
# a GC-emitted warning that routes back through the logging
# integration into add().
if getattr(self._active, "flag", False):
return None

with self._lock:
if len(self._buffer) >= self.MAX_BEFORE_DROP:
self._record_lost(item)
self._active.flag = True
try:
if not self._ensure_thread() or self._flusher is None:
return None

self._buffer.append(item)
if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
with self._lock:
if len(self._buffer) >= self.MAX_BEFORE_DROP:
self._record_lost(item)
return None

self._buffer.append(item)
if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
Comment thread
sentry[bot] marked this conversation as resolved.
self._flush_event.set()
finally:
self._active.flag = False

def kill(self) -> None:
if self._flusher is None:
Expand All @@ -96,8 +114,12 @@
self._flush_event.set()
self._flusher = None

def flush(self) -> None:
self._flush()
self._active.flag = True
try:
self._flush()
finally:
self._active.flag = False

Check failure on line 122 in sentry_sdk/_batcher.py

View check run for this annotation

@sentry/warden / warden: code-review

[8TH-B26] Re-entry flag not reset on early return, causing subsequent spans to be silently dropped (additional location)

When `_ensure_thread()` fails or `_flusher` is None, the code returns at line 66 after setting `self._active.flag = True` at line 63, but before entering the `try` block. This bypasses the `finally` clause that resets the flag. Once this path is taken, the flag remains `True` permanently for that thread, causing all subsequent `add()` calls from the same thread to bail out early at line 60-61. This will result in silent span data loss.

Check warning on line 122 in sentry_sdk/_batcher.py

View check run for this annotation

@sentry/warden / warden: find-bugs

flush() method lacks re-entry guard, causing protection to be incorrectly reset

The `flush()` method unconditionally sets `self._active.flag = True` and resets it to `False` in the finally block, without first checking if the flag was already set. If `flush()` is called while `_flush_loop()` is running on the same thread (which permanently sets `_active.flag = True`), the finally block will incorrectly reset the flag to `False`, defeating the deadlock protection. This could allow a subsequent `add()` call to proceed and attempt to acquire locks that are already held.
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated

def _add_to_envelope(self, envelope: "Envelope") -> None:
envelope.add_item(
Expand Down
55 changes: 35 additions & 20 deletions sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,36 +43,51 @@
self._record_lost_func = record_lost_func
self._running = True
self._lock = threading.Lock()
self._active: "threading.local" = threading.local()

self._flush_event: "threading.Event" = threading.Event()

self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None

def add(self, span: "StreamedSpan") -> None:
if not self._ensure_thread() or self._flusher is None:
# Bail out if the current thread is already executing batcher code.
# This prevents deadlocks when code running inside the batcher (e.g.
# _add_to_envelope during flush, or _flush_event.wait/set) triggers
# a GC-emitted warning that routes back through the logging
# integration into add().

if getattr(self._active, "flag", False):
return None

with self._lock:
size = len(self._span_buffer[span.trace_id])
if size >= self.MAX_BEFORE_DROP:
self._record_lost_func(
reason="queue_overflow",
data_category="span",
quantity=1,
)
return None

self._span_buffer[span.trace_id].append(span)
self._running_size[span.trace_id] += self._estimate_size(span)

if size + 1 >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
return
self._active.flag = True

if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
self._flush_event.set()
return
if not self._ensure_thread() or self._flusher is None:
return None

Check failure on line 66 in sentry_sdk/_span_batcher.py

View check run for this annotation

@sentry/warden / warden: code-review

Re-entry flag not reset on early return, causing subsequent spans to be silently dropped

When `_ensure_thread()` fails or `_flusher` is None, the code returns at line 66 after setting `self._active.flag = True` at line 63, but before entering the `try` block. This bypasses the `finally` clause that resets the flag. Once this path is taken, the flag remains `True` permanently for that thread, causing all subsequent `add()` calls from the same thread to bail out early at line 60-61. This will result in silent span data loss.

Check warning on line 66 in sentry_sdk/_span_batcher.py

View check run for this annotation

@sentry/warden / warden: find-bugs

Re-entry flag not cleared on early return, permanently dropping all spans from thread

When `_ensure_thread()` returns False or `_flusher is None` at lines 65-66, the method returns without clearing `self._active.flag`. Since the flag is set at line 63 **before** the `try` block that contains the `finally` to reset it, this early return path leaves the flag permanently True for the current thread. All subsequent `add()` calls from that thread will be silently discarded. The base class `_batcher.py` correctly places this check **inside** the `try` block (lines 94-96), ensuring the `finally` always runs.
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated

try:
with self._lock:
size = len(self._span_buffer[span.trace_id])
if size >= self.MAX_BEFORE_DROP:
self._record_lost_func(
reason="queue_overflow",
data_category="span",
quantity=1,
)
return None

self._span_buffer[span.trace_id].append(span)
self._running_size[span.trace_id] += self._estimate_size(span)

if size + 1 >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
return

if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
self._flush_event.set()
return
finally:
self._active.flag = False

@staticmethod
def _estimate_size(item: "StreamedSpan") -> int:
Expand Down
Loading