Skip to content

Commit b29c4bb

Browse files
authored
fix(logging): Fix deadlock in log batcher (#5684)
### Description In certain scenarios, the SDK's log batcher might cause a deadlock. This happens if it's currently flushing, and during the flush, something emits a log that we try to capture and add to the (locked) batcher. With this PR, we're adding a re-entry guard to the batcher, preventing it from recursively handling log items during locked code paths like `flush()`. #### Issues Closes #5681 #### Reminders - Please add tests to validate your changes, and lint your code using `tox -e linters`. - Add GH Issue ID _&_ Linear ID (if applicable) - PR title should use [conventional commit](https://develop.sentry.dev/engineering-practices/commit-messages/#type) style (`feat:`, `fix:`, `ref:`, `meta:`) - For external contributors: [CONTRIBUTING.md](https://github.com/getsentry/sentry-python/blob/master/CONTRIBUTING.md), [Sentry SDK development docs](https://develop.sentry.dev/sdk/), [Discord community](https://discord.gg/Ww9hbqr)
1 parent b905cd3 commit b29c4bb

File tree

4 files changed

+101
-27
lines changed

4 files changed

+101
-27
lines changed

requirements-testing.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ tomli;python_version<"3.11" # Only needed for pytest on Python < 3.11
44
pytest-cov
55
pytest-forked
66
pytest-localserver
7+
pytest-timeout
78
pytest-watch
89
jsonschema
910
executing

sentry_sdk/_batcher.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def __init__(
3131
self._record_lost_func = record_lost_func
3232
self._running = True
3333
self._lock = threading.Lock()
34+
self._active: "threading.local" = threading.local()
3435

3536
self._flush_event: "threading.Event" = threading.Event()
3637

@@ -70,23 +71,40 @@ def _ensure_thread(self) -> bool:
7071
return True
7172

7273
def _flush_loop(self) -> None:
74+
# Mark the flush-loop thread as active for its entire lifetime so
75+
# that any re-entrant add() triggered by GC warnings during wait(),
76+
# flush(), or Event operations is silently dropped instead of
77+
# deadlocking on internal locks.
78+
self._active.flag = True
7379
while self._running:
7480
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
7581
self._flush_event.clear()
7682
self._flush()
7783

7884
def add(self, item: "T") -> None:
79-
if not self._ensure_thread() or self._flusher is None:
85+
# Bail out if the current thread is already executing batcher code.
86+
# This prevents deadlocks when code running inside the batcher (e.g.
87+
# _add_to_envelope during flush, or _flush_event.wait/set) triggers
88+
# a GC-emitted warning that routes back through the logging
89+
# integration into add().
90+
if getattr(self._active, "flag", False):
8091
return None
8192

82-
with self._lock:
83-
if len(self._buffer) >= self.MAX_BEFORE_DROP:
84-
self._record_lost(item)
93+
self._active.flag = True
94+
try:
95+
if not self._ensure_thread() or self._flusher is None:
8596
return None
8697

87-
self._buffer.append(item)
88-
if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
89-
self._flush_event.set()
98+
with self._lock:
99+
if len(self._buffer) >= self.MAX_BEFORE_DROP:
100+
self._record_lost(item)
101+
return None
102+
103+
self._buffer.append(item)
104+
if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
105+
self._flush_event.set()
106+
finally:
107+
self._active.flag = False
90108

91109
def kill(self) -> None:
92110
if self._flusher is None:
@@ -97,7 +115,12 @@ def kill(self) -> None:
97115
self._flusher = None
98116

99117
def flush(self) -> None:
100-
self._flush()
118+
was_active = getattr(self._active, "flag", False)
119+
self._active.flag = True
120+
try:
121+
self._flush()
122+
finally:
123+
self._active.flag = was_active
101124

102125
def _add_to_envelope(self, envelope: "Envelope") -> None:
103126
envelope.add_item(

sentry_sdk/_span_batcher.py

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -43,36 +43,50 @@ def __init__(
4343
self._record_lost_func = record_lost_func
4444
self._running = True
4545
self._lock = threading.Lock()
46+
self._active: "threading.local" = threading.local()
4647

4748
self._flush_event: "threading.Event" = threading.Event()
4849

4950
self._flusher: "Optional[threading.Thread]" = None
5051
self._flusher_pid: "Optional[int]" = None
5152

5253
def add(self, span: "StreamedSpan") -> None:
53-
if not self._ensure_thread() or self._flusher is None:
54+
# Bail out if the current thread is already executing batcher code.
55+
# This prevents deadlocks when code running inside the batcher (e.g.
56+
# _add_to_envelope during flush, or _flush_event.wait/set) triggers
57+
# a GC-emitted warning that routes back through the logging
58+
# integration into add().
59+
if getattr(self._active, "flag", False):
5460
return None
5561

56-
with self._lock:
57-
size = len(self._span_buffer[span.trace_id])
58-
if size >= self.MAX_BEFORE_DROP:
59-
self._record_lost_func(
60-
reason="queue_overflow",
61-
data_category="span",
62-
quantity=1,
63-
)
64-
return None
65-
66-
self._span_buffer[span.trace_id].append(span)
67-
self._running_size[span.trace_id] += self._estimate_size(span)
62+
self._active.flag = True
6863

69-
if size + 1 >= self.MAX_BEFORE_FLUSH:
70-
self._flush_event.set()
71-
return
64+
try:
65+
if not self._ensure_thread() or self._flusher is None:
66+
return None
7267

73-
if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
74-
self._flush_event.set()
75-
return
68+
with self._lock:
69+
size = len(self._span_buffer[span.trace_id])
70+
if size >= self.MAX_BEFORE_DROP:
71+
self._record_lost_func(
72+
reason="queue_overflow",
73+
data_category="span",
74+
quantity=1,
75+
)
76+
return None
77+
78+
self._span_buffer[span.trace_id].append(span)
79+
self._running_size[span.trace_id] += self._estimate_size(span)
80+
81+
if size + 1 >= self.MAX_BEFORE_FLUSH:
82+
self._flush_event.set()
83+
return
84+
85+
if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
86+
self._flush_event.set()
87+
return
88+
finally:
89+
self._active.flag = False
7690

7791
@staticmethod
7892
def _estimate_size(item: "StreamedSpan") -> int:

tests/test_logs.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,3 +783,39 @@ def before_send_log(log, _):
783783
)
784784

785785
get_client().flush()
786+
787+
788+
@minimum_python_37
789+
@pytest.mark.timeout(5)
790+
def test_reentrant_add_does_not_deadlock(sentry_init, capture_envelopes):
791+
"""Adding to the batcher from within a flush must not deadlock.
792+
793+
This covers the scenario where GC emits a ResourceWarning during
794+
_add_to_envelope (or _flush_event.wait/set), and the warning is
795+
routed through the logging integration back into batcher.add().
796+
See https://github.com/getsentry/sentry-python/issues/5681
797+
"""
798+
sentry_init(enable_logs=True)
799+
capture_envelopes()
800+
801+
client = sentry_sdk.get_client()
802+
batcher = client.log_batcher
803+
804+
reentrant_add_called = False
805+
original_add_to_envelope = batcher._add_to_envelope
806+
807+
def add_to_envelope_with_reentrant_add(envelope):
808+
nonlocal reentrant_add_called
809+
# Simulate a GC warning routing back into add() during flush
810+
batcher.add({"fake": "log"})
811+
reentrant_add_called = True
812+
original_add_to_envelope(envelope)
813+
814+
batcher._add_to_envelope = add_to_envelope_with_reentrant_add
815+
816+
sentry_sdk.logger.warning("test log")
817+
client.flush()
818+
819+
assert reentrant_add_called
820+
# If the re-entrancy guard didn't work, this test would hang and it'd
821+
# eventually be timed out by pytest-timeout

0 commit comments

Comments
 (0)