Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions sentry_sdk/_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
self._record_lost_func = record_lost_func
self._running = True
self._lock = threading.Lock()
self._active: "threading.local" = threading.local()

self._flush_event: "threading.Event" = threading.Event()

Expand Down Expand Up @@ -70,23 +71,40 @@
return True

def _flush_loop(self) -> None:
# Mark the flush-loop thread as active for its entire lifetime so
# that any re-entrant add() triggered by GC warnings during wait(),
# flush(), or Event operations is silently dropped instead of
# deadlocking on internal locks.
self._active.flag = True
while self._running:
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
self._flush_event.clear()
self._flush()

def add(self, item: "T") -> None:
if not self._ensure_thread() or self._flusher is None:
# Bail out if the current thread is already executing batcher code.
# This prevents deadlocks when code running inside the batcher (e.g.
# _add_to_envelope during flush, or _flush_event.wait/set) triggers
# a GC-emitted warning that routes back through the logging
# integration into add().
if getattr(self._active, "flag", False):
return None

with self._lock:
if len(self._buffer) >= self.MAX_BEFORE_DROP:
self._record_lost(item)
self._active.flag = True
try:
if not self._ensure_thread() or self._flusher is None:
return None

self._buffer.append(item)
if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
with self._lock:
if len(self._buffer) >= self.MAX_BEFORE_DROP:
self._record_lost(item)
return None

self._buffer.append(item)
if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
Comment thread
sentry[bot] marked this conversation as resolved.
self._flush_event.set()
finally:
self._active.flag = False

def kill(self) -> None:
if self._flusher is None:
Expand All @@ -96,8 +114,12 @@
self._flush_event.set()
self._flusher = None

def flush(self) -> None:
self._flush()
self._active.flag = True
try:
self._flush()
finally:
self._active.flag = False

Check warning on line 122 in sentry_sdk/_batcher.py

View check run for this annotation

@sentry/warden / warden: find-bugs

flush() clobbers re-entrancy guard without checking existing state

The `flush()` method unconditionally sets `_active.flag = True` and resets it to `False` in the finally block, without first checking if the flag was already set. If `flush()` is called while another method (like `add()`) has already set the flag on the same thread, the finally block will incorrectly clear the flag, defeating the re-entrancy protection for the remaining code in the outer method. This could allow a subsequent re-entrant `add()` call to pass the guard check when it should be blocked.
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated

def _add_to_envelope(self, envelope: "Envelope") -> None:
envelope.add_item(
Expand Down
55 changes: 35 additions & 20 deletions sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,36 +43,51 @@
self._record_lost_func = record_lost_func
self._running = True
self._lock = threading.Lock()
self._active: "threading.local" = threading.local()

self._flush_event: "threading.Event" = threading.Event()

self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None

def add(self, span: "StreamedSpan") -> None:
if not self._ensure_thread() or self._flusher is None:
# Bail out if the current thread is already executing batcher code.
# This prevents deadlocks when code running inside the batcher (e.g.
# _add_to_envelope during flush, or _flush_event.wait/set) triggers
# a GC-emitted warning that routes back through the logging
# integration into add().

if getattr(self._active, "flag", False):
return None

with self._lock:
size = len(self._span_buffer[span.trace_id])
if size >= self.MAX_BEFORE_DROP:
self._record_lost_func(
reason="queue_overflow",
data_category="span",
quantity=1,
)
return None

self._span_buffer[span.trace_id].append(span)
self._running_size[span.trace_id] += self._estimate_size(span)

if size + 1 >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
return
self._active.flag = True

if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
self._flush_event.set()
return
if not self._ensure_thread() or self._flusher is None:
return None

Check failure on line 66 in sentry_sdk/_span_batcher.py

View check run for this annotation

@sentry/warden / warden: code-review

Re-entry guard flag not reset on early return, causing spans to be silently dropped

When `_ensure_thread()` returns `False` or `_flusher is None`, the function returns at line 66 without entering the try/finally block. This leaves `self._active.flag` set to `True`, causing all subsequent calls to `add()` on the same thread to be silently dropped. Compare with the parent `Batcher.add()` in `_batcher.py` where the `_ensure_thread()` check is inside the try block to ensure the flag is always reset.

Check warning on line 66 in sentry_sdk/_span_batcher.py

View check run for this annotation

@sentry/warden / warden: find-bugs

[MHU-N43] flush() clobbers re-entrancy guard without checking existing state (additional location)

The `flush()` method unconditionally sets `_active.flag = True` and resets it to `False` in the finally block, without first checking if the flag was already set. If `flush()` is called while another method (like `add()`) has already set the flag on the same thread, the finally block will incorrectly clear the flag, defeating the re-entrancy protection for the remaining code in the outer method. This could allow a subsequent re-entrant `add()` call to pass the guard check when it should be blocked.
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated

try:
with self._lock:
size = len(self._span_buffer[span.trace_id])
if size >= self.MAX_BEFORE_DROP:
self._record_lost_func(
reason="queue_overflow",
data_category="span",
quantity=1,
)
return None

self._span_buffer[span.trace_id].append(span)
self._running_size[span.trace_id] += self._estimate_size(span)

if size + 1 >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
return

if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
self._flush_event.set()
return
finally:
self._active.flag = False

@staticmethod
def _estimate_size(item: "StreamedSpan") -> int:
Expand Down
Loading