Skip to content

Commit c959f75

Browse files
committed
Fix native/subinterpreter handling and strengthen integration coverage
1 parent 6f9df58 commit c959f75

5 files changed

Lines changed: 326 additions & 25 deletions

File tree

src/pystack/_pystack.pyx

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,7 @@ def _get_process_threads(
638638
if thread.tid in all_tids:
639639
all_tids.remove(thread.tid)
640640
yield thread
641-
head = InterpreterUtils.getNextInterpreter(manager, head);
641+
head = InterpreterUtils.getNextInterpreter(manager, head)
642642

643643
if native_mode == NativeReportingMode.ALL:
644644
yield from _construct_os_threads(manager, pid, all_tids)
@@ -773,14 +773,20 @@ def _get_process_threads_for_core(
773773
774774
all_tids = list(manager.get().Tids())
775775
776-
if head:
777-
native = native_mode in {NativeReportingMode.PYTHON, NativeReportingMode.ALL}
776+
while head:
777+
add_native_traces = native_mode != NativeReportingMode.OFF
778778
for thread in _construct_threads_from_interpreter_state(
779-
manager, head, pymanager.pid, pymanager.python_version, native, locals
779+
manager,
780+
head,
781+
pymanager.pid,
782+
pymanager.python_version,
783+
add_native_traces,
784+
locals,
780785
):
781786
if thread.tid in all_tids:
782787
all_tids.remove(thread.tid)
783788
yield thread
789+
head = InterpreterUtils.getNextInterpreter(manager, head)
784790
785791
if native_mode == NativeReportingMode.ALL:
786792
yield from _construct_os_threads(manager, pymanager.pid, all_tids)

src/pystack/_pystack/cpython/interpreter.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,10 +375,10 @@ struct _gil_runtime_state
375375
int locked;
376376
unsigned long switch_number;
377377
pthread_cond_t cond;
378-
pthread_cond_t mutex;
378+
pthread_mutex_t mutex;
379379
#ifdef FORCE_SWITCHING
380380
pthread_cond_t switch_cond;
381-
pthread_cond_t switch_mutex;
381+
pthread_mutex_t switch_mutex;
382382
#endif
383383
};
384384

src/pystack/traceback_formatter.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ def print_thread(self, thread: PyThread) -> None:
2525
if self.include_subinterpreters:
2626
if thread.interpreter_id != self._current_interpreter_id:
2727
self._print_interpreter_header(thread.interpreter_id)
28-
self._current_interpreter_id = thread.interpreter_id or -1
28+
self._current_interpreter_id = (
29+
thread.interpreter_id
30+
if thread.interpreter_id is not None
31+
else -1
32+
)
2933

3034
# Print the thread with indentation
3135
for line in format_thread(thread, self.native_mode):

tests/integration/test_subinterpreters.py

Lines changed: 273 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,24 @@
11
import io
2+
from collections import Counter
23
from contextlib import redirect_stdout
34
from pathlib import Path
45

6+
import pytest
7+
58
from pystack.engine import NativeReportingMode
9+
from pystack.engine import StackMethod
610
from pystack.engine import get_process_threads
11+
from pystack.engine import get_process_threads_for_core
712
from pystack.traceback_formatter import TracebackPrinter
13+
from pystack.types import NativeFrame
14+
from pystack.types import frame_type
815
from tests.utils import ALL_PYTHONS_THAT_SUPPORT_SUBINTERPRETERS
16+
from tests.utils import generate_core_file
917
from tests.utils import spawn_child_process
1018

1119
NUM_INTERPRETERS = 3
20+
NUM_INTERPRETERS_WITH_THREADS = 2
21+
NUM_THREADS_PER_SUBINTERPRETER = 2
1222

1323
PROGRAM = f"""\
1424
import sys
@@ -34,7 +44,7 @@ def start_interpreter_async(interp, code):
3444
'''
3545
3646
threads = []
37-
for i in range(NUM_INTERPRETERS):
47+
for _ in range(NUM_INTERPRETERS):
3848
interp = interpreters.create()
3949
t = start_interpreter_async(interp, CODE)
4050
threads.append(t)
@@ -51,31 +61,87 @@ def start_interpreter_async(interp, code):
5161
"""
5262

5363

54-
@ALL_PYTHONS_THAT_SUPPORT_SUBINTERPRETERS
55-
def test_subinterpreters(python, tmpdir):
56-
"""Test that pystack can detect and report multiple sub-interpreters."""
64+
PROGRAM_WITH_THREADS = f"""\
65+
import sys
66+
import threading
67+
import time
5768
58-
# GIVEN
59-
_, python_executable = python
69+
from concurrent import interpreters
70+
71+
NUM_INTERPRETERS = {NUM_INTERPRETERS_WITH_THREADS}
72+
73+
74+
def start_interpreter_async(interp, code):
75+
t = threading.Thread(target=interp.exec, args=(code,))
76+
t.daemon = True
77+
t.start()
78+
return t
79+
80+
81+
CODE = '''\\
82+
import threading
83+
import time
84+
85+
NUM_THREADS = {NUM_THREADS_PER_SUBINTERPRETER}
86+
87+
def worker():
88+
while True:
89+
time.sleep(1)
90+
91+
threads = []
92+
for _ in range(NUM_THREADS):
93+
t = threading.Thread(target=worker)
94+
# daemon threads are disabled in isolated subinterpreters
95+
t.start()
96+
threads.append(t)
97+
98+
while True:
99+
time.sleep(1)
100+
'''
101+
102+
threads = []
103+
for _ in range(NUM_INTERPRETERS):
104+
interp = interpreters.create()
105+
t = start_interpreter_async(interp, CODE)
106+
threads.append(t)
107+
108+
# Give sub-interpreters and their internal workers time to start.
109+
time.sleep(2)
110+
111+
fifo = sys.argv[1]
112+
with open(fifo, "w") as f:
113+
f.write("ready")
114+
115+
while True:
116+
time.sleep(1)
117+
"""
118+
119+
120+
def _collect_threads(
121+
python_executable: Path,
122+
tmpdir: Path,
123+
native_mode: NativeReportingMode = NativeReportingMode.OFF,
124+
):
60125
test_file = Path(str(tmpdir)) / "subinterpreters_program.py"
61126
test_file.write_text(PROGRAM)
62127

63-
# WHEN
64128
with spawn_child_process(python_executable, test_file, tmpdir) as child_process:
65-
threads = list(get_process_threads(child_process.pid, stop_process=True))
129+
return list(
130+
get_process_threads(
131+
child_process.pid,
132+
stop_process=True,
133+
native_mode=native_mode,
134+
)
135+
)
66136

67-
# Collect all interpreter IDs from the threads
68-
interpreter_ids = {thread.interpreter_id for thread in threads}
69137

70-
# THEN
71-
72-
# We expect the main interpreter (0) plus NUM_INTERPRETERS sub-interpreters
73-
assert 0 in interpreter_ids
74-
assert len(interpreter_ids) == NUM_INTERPRETERS + 1
75-
76-
# Verify the TracebackPrinter output contains the interpreter headers
138+
def _assert_interpreter_headers(
139+
threads,
140+
native_mode: NativeReportingMode,
141+
interpreter_ids,
142+
) -> str:
77143
printer = TracebackPrinter(
78-
native_mode=NativeReportingMode.OFF,
144+
native_mode=native_mode,
79145
include_subinterpreters=True,
80146
)
81147
output = io.StringIO()
@@ -89,3 +155,192 @@ def test_subinterpreters(python, tmpdir):
89155
if interpreter_id == 0:
90156
continue
91157
assert f"Interpreter-{interpreter_id}" in result
158+
return result
159+
160+
161+
def _count_threads_by_interpreter(threads):
162+
return dict(
163+
Counter(
164+
thread.interpreter_id
165+
for thread in threads
166+
if thread.interpreter_id is not None
167+
)
168+
)
169+
170+
171+
def _interpreter_ids(threads) -> set[int]:
172+
return {
173+
thread.interpreter_id for thread in threads if thread.interpreter_id is not None
174+
}
175+
176+
177+
def _assert_subinterpreter_coverage(threads) -> set[int]:
178+
interpreter_ids = _interpreter_ids(threads)
179+
assert 0 in interpreter_ids
180+
assert len(interpreter_ids) == NUM_INTERPRETERS + 1
181+
return interpreter_ids
182+
183+
184+
def _assert_native_eval_symbols(threads) -> None:
185+
eval_frames = [
186+
frame
187+
for thread in threads
188+
for frame in thread.native_frames
189+
if frame_type(frame, thread.python_version) == NativeFrame.FrameType.EVAL
190+
]
191+
assert eval_frames
192+
assert all("?" not in frame.symbol for frame in eval_frames)
193+
if any(frame.linenumber == 0 for frame in eval_frames): # pragma: no cover
194+
assert all(frame.linenumber == 0 for frame in eval_frames)
195+
assert all(frame.path == "???" for frame in eval_frames)
196+
else: # pragma: no cover
197+
assert all(frame.linenumber != 0 for frame in eval_frames)
198+
assert any(frame.path and "?" not in frame.path for frame in eval_frames)
199+
200+
201+
@ALL_PYTHONS_THAT_SUPPORT_SUBINTERPRETERS
202+
def test_subinterpreters(python, tmpdir):
203+
_, python_executable = python
204+
205+
threads = _collect_threads(
206+
python_executable=python_executable,
207+
tmpdir=tmpdir,
208+
native_mode=NativeReportingMode.OFF,
209+
)
210+
211+
interpreter_ids = _assert_subinterpreter_coverage(threads)
212+
assert all(not thread.native_frames for thread in threads)
213+
_assert_interpreter_headers(
214+
threads=threads,
215+
native_mode=NativeReportingMode.OFF,
216+
interpreter_ids=interpreter_ids,
217+
)
218+
219+
220+
@ALL_PYTHONS_THAT_SUPPORT_SUBINTERPRETERS
221+
@pytest.mark.parametrize(
222+
"native_mode",
223+
[
224+
NativeReportingMode.PYTHON,
225+
NativeReportingMode.LAST,
226+
NativeReportingMode.ALL,
227+
],
228+
ids=["python", "last", "all"],
229+
)
230+
def test_subinterpreters_with_native(python, tmpdir, native_mode):
231+
_, python_executable = python
232+
233+
threads = _collect_threads(
234+
python_executable=python_executable,
235+
tmpdir=tmpdir,
236+
native_mode=native_mode,
237+
)
238+
239+
interpreter_ids = _assert_subinterpreter_coverage(threads)
240+
assert any(thread.native_frames for thread in threads)
241+
_assert_native_eval_symbols(threads)
242+
243+
output = _assert_interpreter_headers(
244+
threads=threads,
245+
native_mode=native_mode,
246+
interpreter_ids=interpreter_ids,
247+
)
248+
assert "(C)" in output or "Unable to merge native stack" in output
249+
250+
251+
@ALL_PYTHONS_THAT_SUPPORT_SUBINTERPRETERS
252+
def test_subinterpreters_many_threads_with_native(python, tmpdir):
253+
_, python_executable = python
254+
255+
test_file = Path(str(tmpdir)) / "subinterpreters_with_threads_program.py"
256+
test_file.write_text(PROGRAM_WITH_THREADS)
257+
258+
with spawn_child_process(python_executable, test_file, tmpdir) as child_process:
259+
threads = list(
260+
get_process_threads(
261+
child_process.pid,
262+
stop_process=True,
263+
native_mode=NativeReportingMode.PYTHON,
264+
method=StackMethod.DEBUG_OFFSETS,
265+
)
266+
)
267+
268+
interpreter_ids = _interpreter_ids(threads)
269+
assert 0 in interpreter_ids
270+
assert len(interpreter_ids) == NUM_INTERPRETERS_WITH_THREADS + 1
271+
272+
counts_by_interpreter = _count_threads_by_interpreter(threads)
273+
assert all(
274+
counts_by_interpreter.get(interpreter_id, 0) >= 1
275+
for interpreter_id in interpreter_ids
276+
)
277+
# At least one sub-interpreter should expose multiple Python threads.
278+
assert any(
279+
count > 1
280+
for interpreter_id, count in counts_by_interpreter.items()
281+
if interpreter_id != 0
282+
)
283+
284+
assert any(thread.native_frames for thread in threads)
285+
_assert_native_eval_symbols(threads)
286+
287+
288+
@ALL_PYTHONS_THAT_SUPPORT_SUBINTERPRETERS
289+
def test_subinterpreters_for_core(python, tmpdir):
290+
_, python_executable = python
291+
292+
test_file = Path(str(tmpdir)) / "subinterpreters_program.py"
293+
test_file.write_text(PROGRAM)
294+
295+
with generate_core_file(python_executable, test_file, tmpdir) as core_file:
296+
threads = list(
297+
get_process_threads_for_core(
298+
core_file,
299+
python_executable,
300+
native_mode=NativeReportingMode.OFF,
301+
)
302+
)
303+
304+
interpreter_ids = _assert_subinterpreter_coverage(threads)
305+
assert all(not thread.native_frames for thread in threads)
306+
_assert_interpreter_headers(
307+
threads=threads,
308+
native_mode=NativeReportingMode.OFF,
309+
interpreter_ids=interpreter_ids,
310+
)
311+
312+
313+
@ALL_PYTHONS_THAT_SUPPORT_SUBINTERPRETERS
314+
@pytest.mark.parametrize(
315+
"native_mode",
316+
[
317+
NativeReportingMode.PYTHON,
318+
NativeReportingMode.LAST,
319+
NativeReportingMode.ALL,
320+
],
321+
ids=["python", "last", "all"],
322+
)
323+
def test_subinterpreters_for_core_with_native(python, tmpdir, native_mode):
324+
_, python_executable = python
325+
326+
test_file = Path(str(tmpdir)) / "subinterpreters_program.py"
327+
test_file.write_text(PROGRAM)
328+
329+
with generate_core_file(python_executable, test_file, tmpdir) as core_file:
330+
threads = list(
331+
get_process_threads_for_core(
332+
core_file,
333+
python_executable,
334+
native_mode=native_mode,
335+
)
336+
)
337+
338+
interpreter_ids = _assert_subinterpreter_coverage(threads)
339+
assert any(thread.native_frames for thread in threads)
340+
_assert_native_eval_symbols(threads)
341+
output = _assert_interpreter_headers(
342+
threads=threads,
343+
native_mode=native_mode,
344+
interpreter_ids=interpreter_ids,
345+
)
346+
assert "(C)" in output or "Unable to merge native stack" in output

0 commit comments

Comments
 (0)