Skip to content

Commit 3be0986

Browse files
committed
Add subcursor() for interleaved streaming + DML within a transaction
Adds subcursor() method that creates a cursor sharing the same underlying Connection and transaction as the parent. This enables interleaved streaming SELECT + UPDATE/INSERT/DELETE within an explicit transaction without destroying the streaming result. Requires the companion duckdb core PR that adds the enable_suspended_queries setting and suspend/resume mechanics. Key changes: - ConnectionGuard: unique_ptr<Connection> to shared_ptr<Connection> - Add ShareConnection/GetSharedConnection accessors - Add Subcursor() method that shares the parent's Connection - Add is_subcursor flag with transaction management guard - Subcursors cannot call begin/commit/rollback - Closing a subcursor cleans up any suspended query state
1 parent fb10ef5 commit 3be0986

File tree

3 files changed

+290
-6
lines changed

3 files changed

+290
-6
lines changed

src/duckdb_py/include/duckdb_python/pyconnection/pyconnection.hpp

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,12 @@ struct ConnectionGuard {
131131
void SetConnection(unique_ptr<Connection> con) {
132132
connection = std::move(con);
133133
}
134+
void ShareConnection(shared_ptr<Connection> con) {
135+
connection = std::move(con);
136+
}
137+
shared_ptr<Connection> GetSharedConnection() {
138+
return connection;
139+
}
134140
void SetResult(unique_ptr<DuckDBPyRelation> res) {
135141
result = std::move(res);
136142
}
@@ -142,7 +148,7 @@ struct ConnectionGuard {
142148

143149
private:
144150
shared_ptr<DuckDB> database;
145-
unique_ptr<Connection> connection;
151+
shared_ptr<Connection> connection;
146152
unique_ptr<DuckDBPyRelation> result;
147153
};
148154

@@ -165,7 +171,11 @@ struct DuckDBPyConnection : public enable_shared_from_this<DuckDBPyConnection> {
165171
public:
166172
ConnectionGuard con;
167173
Cursors cursors;
168-
std::mutex py_connection_lock;
174+
std::mutex owned_py_connection_lock;
175+
//! Points to owned_py_connection_lock by default, or to parent's lock for subcursors
176+
std::mutex *py_connection_lock = &owned_py_connection_lock;
177+
//! Whether this is a subcursor (shares connection with parent)
178+
bool is_subcursor = false;
169179
//! MemoryFileSystem used to temporarily store file-like objects for reading
170180
shared_ptr<ModifiedMemoryFileSystem> internal_object_filesystem;
171181
case_insensitive_map_t<unique_ptr<ExternalDependency>> registered_functions;
@@ -303,6 +313,10 @@ struct DuckDBPyConnection : public enable_shared_from_this<DuckDBPyConnection> {
303313
// cursor() is stupid
304314
shared_ptr<DuckDBPyConnection> Cursor();
305315

316+
//! Create a subcursor that shares the same connection and transaction.
317+
//! This enables interleaved streaming + DML within a single transaction.
318+
shared_ptr<DuckDBPyConnection> Subcursor();
319+
306320
Optional<py::list> GetDescription();
307321

308322
int GetRowcount();

src/duckdb_py/pyconnection.cpp

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,8 @@ void DuckDBPyConnection::Initialize(py::handle &m) {
483483
connection_module.def("__del__", &DuckDBPyConnection::Close);
484484

485485
InitializeConnectionMethods(connection_module);
486+
connection_module.def("subcursor", &DuckDBPyConnection::Subcursor,
487+
"Create a cursor sharing the same connection and transaction");
486488
connection_module.def_property_readonly("description", &DuckDBPyConnection::GetDescription,
487489
"Get result set attributes, mainly column names");
488490
connection_module.def_property_readonly("rowcount", &DuckDBPyConnection::GetRowcount, "Get result set row count");
@@ -623,7 +625,7 @@ unique_ptr<PreparedStatement> DuckDBPyConnection::PrepareQuery(unique_ptr<SQLSta
623625
{
624626
D_ASSERT(py::gil_check());
625627
py::gil_scoped_release release;
626-
unique_lock<mutex> lock(py_connection_lock);
628+
unique_lock<mutex> lock(*py_connection_lock);
627629

628630
prep = connection.Prepare(std::move(statement));
629631
if (prep->HasError()) {
@@ -644,7 +646,7 @@ unique_ptr<QueryResult> DuckDBPyConnection::ExecuteInternal(PreparedStatement &p
644646
{
645647
D_ASSERT(py::gil_check());
646648
py::gil_scoped_release release;
647-
unique_lock<std::mutex> lock(py_connection_lock);
649+
unique_lock<std::mutex> lock(*py_connection_lock);
648650

649651
auto pending_query = prep.PendingQuery(named_values);
650652
if (pending_query->HasError()) {
@@ -671,7 +673,7 @@ unique_ptr<QueryResult> DuckDBPyConnection::PrepareAndExecuteInternal(unique_ptr
671673
{
672674
D_ASSERT(py::gil_check());
673675
py::gil_scoped_release release;
674-
unique_lock<std::mutex> lock(py_connection_lock);
676+
unique_lock<std::mutex> lock(*py_connection_lock);
675677

676678
auto pending_query = con.GetConnection().PendingQuery(std::move(statement), named_values, true);
677679

@@ -1855,11 +1857,17 @@ shared_ptr<DuckDBPyConnection> DuckDBPyConnection::UnregisterPythonObject(const
18551857
}
18561858

18571859
shared_ptr<DuckDBPyConnection> DuckDBPyConnection::Begin() {
1860+
if (is_subcursor) {
1861+
throw InvalidInputException("Cannot manage transactions from a subcursor — use the parent connection");
1862+
}
18581863
ExecuteFromString("BEGIN TRANSACTION");
18591864
return shared_from_this();
18601865
}
18611866

18621867
shared_ptr<DuckDBPyConnection> DuckDBPyConnection::Commit() {
1868+
if (is_subcursor) {
1869+
throw InvalidInputException("Cannot manage transactions from a subcursor — use the parent connection");
1870+
}
18631871
auto &connection = con.GetConnection();
18641872
if (connection.context->transaction.IsAutoCommit()) {
18651873
return shared_from_this();
@@ -1869,6 +1877,9 @@ shared_ptr<DuckDBPyConnection> DuckDBPyConnection::Commit() {
18691877
}
18701878

18711879
shared_ptr<DuckDBPyConnection> DuckDBPyConnection::Rollback() {
1880+
if (is_subcursor) {
1881+
throw InvalidInputException("Cannot manage transactions from a subcursor — use the parent connection");
1882+
}
18721883
ExecuteFromString("ROLLBACK");
18731884
return shared_from_this();
18741885
}
@@ -2024,6 +2035,17 @@ shared_ptr<DuckDBPyConnection> DuckDBPyConnection::Cursor() {
20242035
return res;
20252036
}
20262037

2038+
shared_ptr<DuckDBPyConnection> DuckDBPyConnection::Subcursor() {
2039+
auto res = make_shared_ptr<DuckDBPyConnection>();
2040+
res->con.SetDatabase(con);
2041+
res->con.ShareConnection(con.GetSharedConnection());
2042+
res->is_subcursor = true;
2043+
// Share the parent's py_connection_lock so concurrent subcursor access is serialized
2044+
res->py_connection_lock = py_connection_lock;
2045+
cursors.AddCursor(res);
2046+
return res;
2047+
}
2048+
20272049
// these should be functions on the result but well
20282050
Optional<py::tuple> DuckDBPyConnection::FetchOne() {
20292051
if (!con.HasResult()) {
@@ -2185,7 +2207,7 @@ static shared_ptr<DuckDBPyConnection> FetchOrCreateInstance(const string &databa
21852207
{
21862208
D_ASSERT(py::gil_check());
21872209
py::gil_scoped_release release;
2188-
unique_lock<mutex> lock(res->py_connection_lock);
2210+
unique_lock<mutex> lock(*res->py_connection_lock);
21892211
auto database =
21902212
instance_cache.GetOrCreateInstance(database_path, config, cache_instance, InstantiateNewInstance);
21912213
res->con.SetDatabase(std::move(database));

tests/fast/api/test_subcursor.py

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
"""Tests for subcursor() — a cursor that shares the same connection and transaction."""
2+
3+
import pytest
4+
import duckdb
5+
6+
7+
class TestSubcursor:
8+
def test_subcursor_basic_interleaved(self):
9+
"""Single scan + update pattern with subcursor."""
10+
con = duckdb.connect(":memory:")
11+
con.execute("CREATE TABLE t AS SELECT i AS id, false AS processed FROM range(100000) tbl(i)")
12+
con.execute("SET enable_suspended_queries = true")
13+
con.execute("BEGIN TRANSACTION")
14+
15+
scanner = con.subcursor()
16+
updater = con.subcursor()
17+
18+
result = scanner.execute("SELECT id FROM t WHERE NOT processed ORDER BY id")
19+
batch = result.fetchmany(1000)
20+
assert len(batch) == 1000
21+
assert batch[0][0] == 0
22+
23+
# Update some rows via the subcursor
24+
ids = ",".join(str(r[0]) for r in batch[:10])
25+
updater.execute(f"UPDATE t SET processed = true WHERE id IN ({ids})")
26+
27+
# Resume fetching from the scanner — should still work
28+
batch2 = result.fetchmany(1000)
29+
assert len(batch2) > 0
30+
31+
con.execute("COMMIT")
32+
33+
# Verify updates persisted
34+
res = con.execute("SELECT COUNT(*) FROM t WHERE processed").fetchone()
35+
assert res[0] == 10
36+
37+
def test_subcursor_multi_table(self):
38+
"""Multi-table scan pattern with subcursors."""
39+
con = duckdb.connect(":memory:")
40+
con.execute("CREATE TABLE t1 AS SELECT i AS id FROM range(100000) tbl(i)")
41+
con.execute("CREATE TABLE t2 AS SELECT i AS id FROM range(100000) tbl(i)")
42+
con.execute("SET enable_suspended_queries = true")
43+
con.execute("BEGIN TRANSACTION")
44+
45+
scan_a = con.subcursor()
46+
scan_b = con.subcursor()
47+
updater = con.subcursor()
48+
49+
result_a = scan_a.execute("SELECT id FROM t1 ORDER BY id")
50+
result_b = scan_b.execute("SELECT id FROM t2 ORDER BY id")
51+
52+
# Interleave fetches from both tables
53+
batch_a = result_a.fetchmany(100)
54+
assert len(batch_a) == 100
55+
56+
batch_b = result_b.fetchmany(100)
57+
assert len(batch_b) == 100
58+
59+
# Update via subcursor
60+
updater.execute("UPDATE t1 SET id = id + 1000000 WHERE id < 5")
61+
62+
# Resume both scans
63+
batch_a2 = result_a.fetchmany(100)
64+
assert len(batch_a2) > 0
65+
66+
batch_b2 = result_b.fetchmany(100)
67+
assert len(batch_b2) > 0
68+
69+
con.execute("COMMIT")
70+
71+
def test_subcursor_transaction_guard(self):
72+
"""Subcursor cannot manage transactions."""
73+
con = duckdb.connect(":memory:")
74+
sub = con.subcursor()
75+
76+
with pytest.raises(duckdb.InvalidInputException, match="subcursor"):
77+
sub.begin()
78+
79+
with pytest.raises(duckdb.InvalidInputException, match="subcursor"):
80+
sub.commit()
81+
82+
with pytest.raises(duckdb.InvalidInputException, match="subcursor"):
83+
sub.rollback()
84+
85+
def test_subcursor_shares_transaction(self):
86+
"""Subcursor sees data from the same transaction."""
87+
con = duckdb.connect(":memory:")
88+
con.execute("CREATE TABLE t (id INTEGER)")
89+
con.execute("BEGIN TRANSACTION")
90+
con.execute("INSERT INTO t VALUES (1), (2), (3)")
91+
92+
sub = con.subcursor()
93+
# Subcursor should see uncommitted data from the parent's transaction
94+
res = sub.execute("SELECT COUNT(*) FROM t").fetchone()
95+
assert res[0] == 3
96+
97+
con.execute("COMMIT")
98+
99+
def test_subcursor_close(self):
100+
"""Subcursor close doesn't destroy parent connection."""
101+
con = duckdb.connect(":memory:")
102+
con.execute("CREATE TABLE t AS SELECT 1 AS id")
103+
104+
sub = con.subcursor()
105+
sub.execute("SELECT * FROM t")
106+
sub.close()
107+
108+
# Parent connection should still work
109+
res = con.execute("SELECT * FROM t").fetchone()
110+
assert res[0] == 1
111+
112+
def test_subcursor_full_consumption(self):
113+
"""Verify stream can be fully consumed after suspend/resume."""
114+
con = duckdb.connect(":memory:")
115+
con.execute("CREATE TABLE t AS SELECT i AS id FROM range(100000) tbl(i)")
116+
con.execute("SET enable_suspended_queries = true")
117+
con.execute("BEGIN TRANSACTION")
118+
119+
scanner = con.subcursor()
120+
updater = con.subcursor()
121+
122+
result = scanner.execute("SELECT id FROM t ORDER BY id")
123+
total = 0
124+
125+
while True:
126+
batch = result.fetchmany(2048)
127+
if not batch:
128+
break
129+
total += len(batch)
130+
# Interleave an update every few batches
131+
if total % 8192 == 0:
132+
updater.execute(f"UPDATE t SET id = id WHERE id = {total}")
133+
134+
assert total == 100000
135+
con.execute("COMMIT")
136+
137+
def test_subcursor_requires_explicit_transaction(self):
138+
"""Subcursor interleaving requires an explicit transaction (BEGIN)."""
139+
con = duckdb.connect(":memory:")
140+
con.execute("CREATE TABLE t AS SELECT i AS id FROM range(100000) tbl(i)")
141+
142+
# With explicit transaction and setting enabled, interleaving works
143+
con.execute("SET enable_suspended_queries = true")
144+
con.execute("BEGIN TRANSACTION")
145+
scanner = con.subcursor()
146+
updater = con.subcursor()
147+
148+
result = scanner.execute("SELECT id FROM t ORDER BY id")
149+
batch = result.fetchmany(1000)
150+
assert len(batch) == 1000
151+
152+
updater.execute("UPDATE t SET id = id WHERE id < 5")
153+
154+
# Stream survives the update in explicit transaction mode
155+
batch2 = result.fetchmany(1000)
156+
assert len(batch2) > 0
157+
158+
con.execute("COMMIT")
159+
160+
def test_subcursor_close_cancels_suspended_stream(self):
161+
"""Closing a subcursor cancels its suspended stream and frees resources."""
162+
con = duckdb.connect(":memory:")
163+
con.execute("CREATE TABLE t AS SELECT i AS id FROM range(100000) tbl(i)")
164+
con.execute("SET enable_suspended_queries = true")
165+
con.execute("BEGIN TRANSACTION")
166+
167+
scanner = con.subcursor()
168+
updater = con.subcursor()
169+
170+
result = scanner.execute("SELECT id FROM t ORDER BY id")
171+
batch = result.fetchmany(1000)
172+
assert len(batch) == 1000
173+
174+
# Suspend the stream by executing on the updater
175+
updater.execute("UPDATE t SET id = id WHERE id < 5")
176+
177+
# Close the scanner — should cancel the suspended stream
178+
scanner.close()
179+
180+
# The connection should still work normally
181+
updater.execute("SELECT COUNT(*) FROM t")
182+
count = updater.fetchone()[0]
183+
assert count == 100000
184+
185+
con.execute("COMMIT")
186+
187+
def test_subcursor_close_one_of_multiple(self):
188+
"""Closing one subcursor doesn't affect other suspended streams."""
189+
con = duckdb.connect(":memory:")
190+
con.execute("CREATE TABLE t1 AS SELECT i AS id FROM range(100000) tbl(i)")
191+
con.execute("CREATE TABLE t2 AS SELECT i AS id FROM range(100000) tbl(i)")
192+
con.execute("SET enable_suspended_queries = true")
193+
con.execute("BEGIN TRANSACTION")
194+
195+
scan1 = con.subcursor()
196+
scan2 = con.subcursor()
197+
198+
result1 = scan1.execute("SELECT id FROM t1 ORDER BY id")
199+
result2 = scan2.execute("SELECT id FROM t2 ORDER BY id")
200+
201+
# Fetch from both
202+
batch1 = result1.fetchmany(100)
203+
assert len(batch1) == 100
204+
batch2 = result2.fetchmany(100)
205+
assert len(batch2) == 100
206+
207+
# Close scan1 — scan2 should still work
208+
scan1.close()
209+
210+
# scan2 should still be able to fetch
211+
total2 = len(batch2)
212+
while True:
213+
batch = result2.fetchmany(2048)
214+
if not batch:
215+
break
216+
total2 += len(batch)
217+
assert total2 == 100000
218+
219+
# scan1 should be unusable
220+
with pytest.raises(duckdb.ConnectionException):
221+
scan1.execute("SELECT 1")
222+
223+
con.execute("COMMIT")
224+
225+
def test_subcursor_abandoned_without_close(self):
226+
"""Subcursor that goes out of scope should clean up its suspended stream."""
227+
con = duckdb.connect(":memory:")
228+
con.execute("CREATE TABLE t AS SELECT i AS id FROM range(100000) tbl(i)")
229+
con.execute("SET enable_suspended_queries = true")
230+
con.execute("BEGIN TRANSACTION")
231+
232+
def start_and_abandon():
233+
scanner = con.subcursor()
234+
result = scanner.execute("SELECT id FROM t ORDER BY id")
235+
result.fetchmany(100)
236+
# Suspend by running something else
237+
con.execute("SELECT 1")
238+
# scanner goes out of scope here — destructor should clean up
239+
240+
start_and_abandon()
241+
import gc
242+
gc.collect()
243+
244+
# Connection should still work — no leaked suspended state blocking it
245+
result = con.execute("SELECT COUNT(*) FROM t").fetchone()
246+
assert result[0] == 100000
247+
248+
con.execute("COMMIT")

0 commit comments

Comments
 (0)