Skip to content

Commit 609fb33

Browse files
committed
tests: add a variety of tests relating to threading. Tests should pass whether GIL or free threaded. Tested with pytest-run-parallel: pytest --parallel-threads=10 --iterations=5
1 parent 841ae9e commit 609fb33

12 files changed

Lines changed: 953 additions & 0 deletions

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ test = [ # dependencies used for running tests
227227
"pytest",
228228
"pytest-reraise",
229229
"pytest-timeout",
230+
"pytest-run-parallel",
230231
"mypy",
231232
"coverage",
232233
"gcovr; python_version < '3.14'",

tests/conftest.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,3 +336,13 @@ def finalizer():
336336

337337
duckdb.connect(test_dbfarm)
338338
return test_dbfarm
339+
340+
341+
@pytest.fixture(scope="function")
342+
def num_threads_testing():
343+
"""Get thread count: enough to load the system, but still as fast test."""
344+
import multiprocessing
345+
346+
cpu_count = multiprocessing.cpu_count()
347+
# Use 1.5x CPU count, max 12 for CI compatibility
348+
return min(12, max(4, int(cpu_count * 1.5)))

tests/fast/threading/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
Tests in this directory are intended to be run with [pytest-run-parallel](https://github.com/Quansight-Labs/pytest-run-parallel) to exercise thread safety.
2+
3+
Example usage: `pytest --parallel-threads=10 --iterations=5 --verbose tests/fast/threading -n 4 --durations=5`
4+
5+
#### Thread Safety and DuckDB
6+
7+
Not all duckdb operations are thread safe - cursors are not thread safe, so some care must be considered to avoid running tests that concurrently hit the same tests.
8+
9+
Tests can be marked as single threaded with:
10+
- `pytest.mark.thread_unsafe` or the equivalent `pytest.mark.parallel_threads(1)`
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import gc
2+
import random
3+
import time
4+
import weakref
5+
from threading import get_ident
6+
7+
import uuid
8+
9+
import pytest
10+
11+
import duckdb
12+
13+
14+
def test_basic():
15+
with duckdb.connect(":memory:") as conn:
16+
result = conn.execute("SELECT 1").fetchone()
17+
assert result[0] == 1
18+
int_type = duckdb.type("INTEGER")
19+
assert int_type is not None, "type creation failed"
20+
21+
22+
def test_connection_instance_cache(tmp_path):
23+
thread_id = get_ident()
24+
for i in range(10):
25+
with duckdb.connect(tmp_path / f"{thread_id}_{uuid.uuid4()}.db") as conn:
26+
conn.execute(
27+
f"CREATE TABLE IF NOT EXISTS thread_{thread_id}_data_{i} (x BIGINT)"
28+
)
29+
conn.execute(f"INSERT INTO thread_{thread_id}_data_{i} VALUES (100), (100)")
30+
31+
time.sleep(random.uniform(0.0001, 0.001))
32+
33+
result = conn.execute(
34+
f"SELECT COUNT(*) FROM thread_{thread_id}_data_{i}"
35+
).fetchone()[0]
36+
assert result == 2, f"Iteration {i}: expected 2 rows, got {result}"
37+
38+
39+
def test_cleanup():
40+
weak_refs = []
41+
42+
for i in range(5):
43+
conn = duckdb.connect(":memory:")
44+
weak_refs.append(weakref.ref(conn))
45+
try:
46+
conn.execute("CREATE TABLE test (x INTEGER)")
47+
conn.execute("INSERT INTO test VALUES (1), (2), (3)")
48+
result = conn.execute("SELECT COUNT(*) FROM test").fetchone()
49+
assert result[0] == 3
50+
finally:
51+
conn.close()
52+
conn = None
53+
54+
if i % 3 == 0:
55+
with duckdb.connect(":memory:") as new_conn:
56+
result = new_conn.execute("SELECT 1").fetchone()
57+
assert result[0] == 1
58+
59+
if i % 10 == 0:
60+
gc.collect()
61+
time.sleep(random.uniform(0.0001, 0.0005))
62+
63+
gc.collect()
64+
time.sleep(0.1)
65+
gc.collect()
66+
67+
alive_refs = [ref for ref in weak_refs if ref() is not None]
68+
assert len(alive_refs) <= 10, (
69+
f"{len(alive_refs)} connections still alive (expected <= 10)"
70+
)
71+
72+
73+
def test_default_connection():
74+
with duckdb.connect() as conn1:
75+
r1 = conn1.execute("SELECT 1").fetchone()[0]
76+
assert r1 == 1, f"expected 1, got {r1}"
77+
78+
with duckdb.connect(":memory:") as conn2:
79+
r2 = conn2.execute("SELECT 2").fetchone()[0]
80+
assert r2 == 2, f"expected 2, got {r2}"
81+
82+
83+
def test_type_system():
84+
for i in range(20):
85+
types = [
86+
duckdb.type("INTEGER"),
87+
duckdb.type("VARCHAR"),
88+
duckdb.type("DOUBLE"),
89+
duckdb.type("BOOLEAN"),
90+
duckdb.list_type(duckdb.type("INTEGER")),
91+
duckdb.struct_type(
92+
{"a": duckdb.type("INTEGER"), "b": duckdb.type("VARCHAR")}
93+
),
94+
]
95+
96+
for t in types:
97+
assert t is not None, "type creation failed"
98+
99+
if i % 5 == 0:
100+
with duckdb.connect(":memory:") as conn:
101+
conn.execute(
102+
"CREATE TABLE test (a INTEGER, b VARCHAR, c DOUBLE, d BOOLEAN)"
103+
)
104+
result = conn.execute("SELECT COUNT(*) FROM test").fetchone()
105+
assert result[0] == 0
106+
107+
108+
def test_import_cache():
109+
with duckdb.connect(":memory:") as conn:
110+
conn.execute("CREATE TABLE test AS SELECT range as x FROM range(10)")
111+
result = conn.fetchdf()
112+
assert len(result) > 0, "fetchdf failed"
113+
114+
result = conn.execute("SELECT range as x FROM range(5)").fetchnumpy()
115+
assert len(result["x"]) == 5, "fetchnumpy failed"
116+
117+
conn.execute("DROP TABLE test")
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
"""
2+
Concurrent access tests for DuckDB Python bindings with free threading support.
3+
4+
These tests verify that the DuckDB Python module can handle concurrent access
5+
from multiple threads safely, testing module state isolation, memory management,
6+
and connection handling under various stress conditions.
7+
"""
8+
9+
import gc
10+
import random
11+
import time
12+
import concurrent.futures
13+
14+
import pytest
15+
16+
import duckdb
17+
18+
19+
def test_concurrent_connections():
20+
with duckdb.connect() as conn:
21+
result = conn.execute("SELECT random() as id, random()*2 as doubled").fetchone()
22+
assert result is not None
23+
24+
25+
@pytest.mark.parallel_threads(1)
26+
def test_shared_connection_stress(num_threads_testing):
27+
"""Test concurrent operations on shared connection using cursors."""
28+
iterations = 10
29+
30+
with duckdb.connect(":memory:") as connection:
31+
connection.execute(
32+
"CREATE TABLE stress_test (id INTEGER, thread_id INTEGER, value TEXT)"
33+
)
34+
35+
def worker_thread(thread_id: int) -> None:
36+
cursor = connection.cursor()
37+
for i in range(iterations):
38+
cursor.execute(
39+
"INSERT INTO stress_test VALUES (?, ?, ?)",
40+
[i, thread_id, f"thread_{thread_id}_value_{i}"],
41+
)
42+
cursor.execute(
43+
"SELECT COUNT(*) FROM stress_test WHERE thread_id = ?", [thread_id]
44+
).fetchone()
45+
time.sleep(random.uniform(0.0001, 0.001))
46+
47+
with concurrent.futures.ThreadPoolExecutor(
48+
max_workers=num_threads_testing
49+
) as executor:
50+
futures = [
51+
executor.submit(worker_thread, i) for i in range(num_threads_testing)
52+
]
53+
# Wait for all to complete, will raise if any fail
54+
for future in concurrent.futures.as_completed(futures):
55+
future.result()
56+
57+
total_rows = connection.execute("SELECT COUNT(*) FROM stress_test").fetchone()[
58+
0
59+
]
60+
expected_rows = num_threads_testing * iterations
61+
assert total_rows == expected_rows
62+
63+
64+
@pytest.mark.parallel_threads(1)
65+
def test_module_state_isolation():
66+
"""Test that module state is properly accessible."""
67+
with duckdb.connect(":memory:"):
68+
assert hasattr(duckdb, "__version__")
69+
70+
with duckdb.connect() as default_conn:
71+
result = default_conn.execute("SELECT 'default' as type").fetchone()
72+
assert result[0] == "default"
73+
74+
int_type = duckdb.type("INTEGER")
75+
string_type = duckdb.type("VARCHAR")
76+
assert int_type is not None
77+
assert string_type is not None
78+
79+
80+
def test_rapid_connect_disconnect():
81+
connections_count = 10
82+
"""Test rapid connection creation and destruction."""
83+
for i in range(connections_count):
84+
conn = duckdb.connect(":memory:")
85+
try:
86+
result = conn.execute("SELECT 1").fetchone()[0]
87+
assert result == 1
88+
finally:
89+
conn.close()
90+
91+
# Sometimes force GC to increase pressure
92+
if i % 3 == 0:
93+
gc.collect()
94+
95+
96+
def test_exception_handling():
97+
"""Test exception handling doesn't affect module state."""
98+
conn = duckdb.connect(":memory:")
99+
try:
100+
conn.execute("CREATE TABLE test (x INTEGER)")
101+
conn.execute("INSERT INTO test VALUES (1), (2), (3)")
102+
103+
for i in range(10):
104+
if i % 3 == 0:
105+
with pytest.raises(duckdb.CatalogException):
106+
conn.execute("SELECT * FROM nonexistent_table")
107+
else:
108+
result = conn.execute("SELECT COUNT(*) FROM test").fetchone()[0]
109+
assert result == 3
110+
finally:
111+
conn.close()
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
"""
2+
Test connection lifecycle races.
3+
4+
Focused on DuckDBPyConnection constructor and Close
5+
"""
6+
7+
import gc
8+
import concurrent.futures
9+
10+
import pytest
11+
12+
import duckdb
13+
14+
15+
def test_concurrent_connection_creation_destruction():
16+
conn = duckdb.connect()
17+
try:
18+
result = conn.execute("SELECT 1").fetchone()
19+
assert result[0] == 1
20+
finally:
21+
conn.close()
22+
23+
24+
def test_connection_destructor_race():
25+
conn = duckdb.connect()
26+
result = conn.execute("SELECT COUNT(*) FROM range(1)").fetchone()
27+
assert result[0] == 1
28+
29+
del conn
30+
gc.collect()
31+
32+
33+
@pytest.mark.parallel_threads(1)
34+
def test_concurrent_close_operations(num_threads_testing):
35+
with duckdb.connect(":memory:") as conn:
36+
conn.execute("CREATE TABLE shared_table (id INTEGER, data VARCHAR)")
37+
conn.execute("INSERT INTO shared_table VALUES (1, 'test')")
38+
39+
def attempt_close_connection(cursor, thread_id):
40+
_result = cursor.execute("SELECT COUNT(*) FROM shared_table").fetchone()
41+
42+
cursor.close()
43+
44+
return True
45+
46+
with concurrent.futures.ThreadPoolExecutor(
47+
max_workers=num_threads_testing
48+
) as executor:
49+
futures = [
50+
executor.submit(attempt_close_connection, conn.cursor(), i)
51+
for i in range(num_threads_testing)
52+
]
53+
results = [
54+
future.result() for future in concurrent.futures.as_completed(futures)
55+
]
56+
57+
assert all(results)
58+
59+
60+
@pytest.mark.parallel_threads(1)
61+
def test_cursor_operations_race(num_threads_testing):
62+
conn = duckdb.connect(":memory:")
63+
try:
64+
conn.execute("CREATE TABLE cursor_test (id INTEGER, name VARCHAR)")
65+
conn.execute(
66+
"INSERT INTO cursor_test SELECT i, 'name_' || i FROM range(100) t(i)"
67+
)
68+
69+
def cursor_operations(thread_id):
70+
"""Perform cursor operations concurrently."""
71+
# Get a cursor
72+
cursor = conn.cursor()
73+
cursor.execute(
74+
f"SELECT * FROM cursor_test WHERE id % {num_threads_testing} = {thread_id}"
75+
)
76+
results = cursor.fetchall()
77+
78+
return True
79+
80+
with concurrent.futures.ThreadPoolExecutor(
81+
max_workers=num_threads_testing
82+
) as executor:
83+
futures = [
84+
executor.submit(cursor_operations, i)
85+
for i in range(num_threads_testing)
86+
]
87+
results = [
88+
future.result() for future in concurrent.futures.as_completed(futures)
89+
]
90+
91+
assert all(results)
92+
finally:
93+
conn.close()
94+
95+
96+
def test_rapid_connection_cycling():
97+
"""Test rapid connection creation and destruction cycles."""
98+
num_cycles = 5
99+
for cycle in range(num_cycles):
100+
conn = duckdb.connect(":memory:")
101+
try:
102+
result = conn.execute(f"SELECT 1 + {cycle}").fetchone()
103+
assert result[0] == 1 + cycle
104+
finally:
105+
conn.close()

0 commit comments

Comments
 (0)