Skip to content

Commit 60ee58b

Browse files
committed
PYCBC-1774: Add cancellation/close support to the C-extension streaming result
Changes ------- * rows_queue: add a cancelled flag with cancel()/is_cancelled(); get() returns a null sentinel once cancelled and the queue drains, unblocking a waiting worker (already-queued rows still drain first) * pycbc_streamed_result: expose cancel()/is_cancelled() * _finalize: cancel the streamed result on abort only (exc_val is not None), so normal completion can still read trailing metadata * acouchbase range scan: add _abort() to cancel the in-flight core scan (existing cancel_scan()) and shut down the executor on the error/cancellation paths * _core.pyi: add cancel()/is_cancelled() stubs (and the missing scan-iterator cancel_scan()/is_cancelled()) * tests: helper wiring tests (cancel on abort, skip on normal completion) + a live query_t test asserting CancelledError propagates and the streamed result is cancelled end-to-end Change-Id: I0c344ac5daebf4be9c6ec3113b8a82e3bca689f2 Reviewed-on: https://review.couchbase.org/c/couchbase-python-client/+/246063 Reviewed-by: Dimitris Christodoulou <dimitris.christodoulou@couchbase.com> Tested-by: Build Bot <build@couchbase.com>
1 parent d92f378 commit 60ee58b

10 files changed

Lines changed: 169 additions & 7 deletions

File tree

acouchbase/analytics.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ def _finalize(self, exc_val=None):
115115
teardown and leak the span/meter or orphan the streaming result and its executor thread.
116116
"""
117117
self._process_core_span(exc_val=exc_val)
118+
if exc_val is not None and self._streaming_result is not None:
119+
# Cancellation/error: unblock a worker still waiting on the C++ core so its executor
120+
# thread is released promptly instead of waiting for the whole server-side operation
121+
# to finish. Normal completion (exc_val is None) skips this so trailing metadata can
122+
# still be read from the streaming result.
123+
self._streaming_result.cancel()
118124
self._shutdown_executor()
119125

120126
async def __anext__(self):

acouchbase/kv_range_scan.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,25 @@ def _shutdown_executor(self):
6565
self._executor_shutdown = True
6666
self._tp_executor.shutdown(wait=False)
6767

68+
def _abort(self):
69+
"""
70+
**INTERNAL**
71+
72+
Cancel the in-flight core scan so a worker still waiting on the C++ core unwinds promptly,
73+
then release the executor. Used on the error/cancellation paths only.
74+
"""
75+
if self._scan_iterator is not None and self._scan_iterator.is_cancelled() is False:
76+
self._scan_iterator.cancel_scan()
77+
self._shutdown_executor()
78+
6879
async def __anext__(self):
6980
try:
7081
return await self._loop.run_in_executor(self._tp_executor, self._get_next_row)
7182
# We can stop iterator when we receive RangeScanCompletedException
7283
except asyncio.QueueEmpty:
7384
exc_cls = PYCBC_ERROR_MAP.get(ExceptionMap.InternalSDKException.value, CouchbaseException)
7485
excptn = exc_cls('Unexpected QueueEmpty exception caught when doing N1QL query.')
75-
self._shutdown_executor()
86+
self._abort()
7687
raise excptn
7788
except RangeScanCompletedException:
7889
self._done_streaming = True
@@ -83,16 +94,16 @@ async def __anext__(self):
8394
self._shutdown_executor()
8495
raise
8596
except CouchbaseException as ex:
86-
self._shutdown_executor()
97+
self._abort()
8798
raise ex
8899
except Exception as ex:
89100
exc_cls = PYCBC_ERROR_MAP.get(ExceptionMap.InternalSDKException.value, CouchbaseException)
90101
excptn = exc_cls(str(ex))
91-
self._shutdown_executor()
102+
self._abort()
92103
raise excptn
93104
except BaseException:
94105
# asyncio.CancelledError (and KeyboardInterrupt/SystemExit) derive from BaseException,
95-
# not Exception, so they fall through every handler above. Release the executor so the
96-
# cancelled scan does not orphan its worker thread, then re-raise unchanged.
97-
self._shutdown_executor()
106+
# not Exception, so they fall through every handler above. Cancel the in-flight scan
107+
# and release the executor so the worker thread is not orphaned, then re-raise unchanged.
108+
self._abort()
98109
raise

acouchbase/n1ql.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,12 @@ def _finalize(self, exc_val=None):
117117
teardown and leak the span/meter or orphan the streaming result and its executor thread.
118118
"""
119119
self._process_core_span(exc_val=exc_val)
120+
if exc_val is not None and self._streaming_result is not None:
121+
# Cancellation/error: unblock a worker still waiting on the C++ core so its executor
122+
# thread is released promptly instead of waiting for the whole server-side operation
123+
# to finish. Normal completion (exc_val is None) skips this so trailing metadata can
124+
# still be read from the streaming result.
125+
self._streaming_result.cancel()
120126
self._shutdown_executor()
121127

122128
async def __anext__(self):

acouchbase/search.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ def _finalize(self, exc_val=None):
113113
teardown and leak the span/meter or orphan the streaming result and its executor thread.
114114
"""
115115
self._process_core_span(exc_val=exc_val)
116+
if exc_val is not None and self._streaming_result is not None:
117+
# Cancellation/error: unblock a worker still waiting on the C++ core so its executor
118+
# thread is released promptly instead of waiting for the whole server-side operation
119+
# to finish. Normal completion (exc_val is None) skips this so trailing metadata can
120+
# still be read from the streaming result.
121+
self._streaming_result.cancel()
116122
self._shutdown_executor()
117123

118124
async def __anext__(self):

acouchbase/tests/query_t.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ class QueryTestSuite:
187187
'test_mixed_positional_parameters',
188188
'test_non_blocking',
189189
'test_preserve_expiry',
190+
'test_query_cancellation',
190191
'test_query_error_context',
191192
'test_query_metadata',
192193
'test_query_raw_options',
@@ -216,6 +217,33 @@ async def test_bad_query(self, cb_env):
216217
with pytest.raises(ParsingFailedException):
217218
await cb_env.cluster.query("I'm not N1QL!").execute()
218219

220+
@pytest.mark.asyncio
221+
async def test_query_cancellation(self, cb_env):
222+
# PYCBC-1774: cancelling an in-flight streaming query must propagate CancelledError
223+
# unchanged (not swallow or convert it) and cancel the underlying C++ streamed result so
224+
# the executor worker is released promptly instead of waiting for the whole server-side
225+
# operation to finish.
226+
result = cb_env.cluster.query(f"SELECT * FROM `{cb_env.bucket.name}` LIMIT 100")
227+
228+
async def consume():
229+
rows = []
230+
async for row in result:
231+
rows.append(row)
232+
return rows
233+
234+
task = asyncio.ensure_future(consume())
235+
# let the task start streaming (submit the query) and reach the first, blocking __anext__
236+
while result._request._streaming_result is None and not task.done():
237+
await asyncio.sleep(0)
238+
# cancel while the worker is still waiting on the C++ core for the first row
239+
task.cancel()
240+
with pytest.raises(asyncio.CancelledError):
241+
await task
242+
243+
# the cancellation must have reached through to the C++ streamed result
244+
assert result._request._streaming_result is not None
245+
assert result._request._streaming_result.is_cancelled() is True
246+
219247
@pytest.mark.asyncio
220248
async def test_mixed_named_parameters(self, cb_env):
221249
batch_id = cb_env.get_batch_id()

acouchbase/tests/streaming_helpers_t.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,21 @@
1919

2020
import pytest
2121

22+
from acouchbase.n1ql import AsyncN1QLRequest
2223
from couchbase.exceptions import CouchbaseException, InternalSDKException
2324
from couchbase.logic.streaming import stream_anext
2425

2526

27+
class _FakeStreamingResult:
28+
"""Stand-in for the C-extension ``pycbc_streamed_result`` exposing the new cancel hook."""
29+
30+
def __init__(self):
31+
self.cancel_calls = 0
32+
33+
def cancel(self):
34+
self.cancel_calls += 1
35+
36+
2637
class _FakeAsyncStreamingRequest:
2738
"""Minimal stand-in implementing the contract ``stream_anext`` relies on, so the streaming
2839
iterator teardown/error handling can be exercised without a live cluster. Records the
@@ -76,6 +87,8 @@ class StreamingAnextTestSuite:
7687
'test_queue_empty_converted_with_op_name',
7788
'test_keyboard_interrupt_propagates_unconverted',
7889
'test_cancellation_finalizes_and_propagates',
90+
'test_finalize_cancels_streaming_result_on_error',
91+
'test_finalize_skips_cancel_on_normal_completion',
7992
]
8093

8194
@pytest.mark.asyncio
@@ -158,6 +171,30 @@ async def test_cancellation_finalizes_and_propagates(self):
158171
assert isinstance(req.finalize_calls[0], asyncio.CancelledError)
159172
assert req.executor_shutdown is True
160173

174+
def test_finalize_cancels_streaming_result_on_error(self):
175+
# _finalize is exercised on a real request to verify the cancel() wiring (Phase III).
176+
loop = asyncio.new_event_loop()
177+
try:
178+
req = AsyncN1QLRequest(None, loop, {}, obs_handler=None)
179+
req._streaming_result = _FakeStreamingResult()
180+
req._finalize(exc_val=CouchbaseException('boom'))
181+
# abort path: the C++ streamed result is cancelled so a blocked worker can unwind
182+
assert req._streaming_result.cancel_calls == 1
183+
assert req._executor_shutdown is True
184+
finally:
185+
loop.close()
186+
187+
def test_finalize_skips_cancel_on_normal_completion(self):
188+
loop = asyncio.new_event_loop()
189+
try:
190+
req = AsyncN1QLRequest(None, loop, {}, obs_handler=None)
191+
req._streaming_result = _FakeStreamingResult()
192+
req._finalize() # exc_val is None -> normal completion must NOT cancel (metadata follows)
193+
assert req._streaming_result.cancel_calls == 0
194+
assert req._executor_shutdown is True
195+
finally:
196+
loop.close()
197+
161198

162199
class AsyncStreamingAnextTests(StreamingAnextTestSuite):
163200
@pytest.fixture(scope='class', autouse=True)

acouchbase/views.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ def _finalize(self, exc_val=None):
121121
teardown and leak the span/meter or orphan the streaming result and its executor thread.
122122
"""
123123
self._process_core_span(exc_val=exc_val)
124+
if exc_val is not None and self._streaming_result is not None:
125+
# Cancellation/error: unblock a worker still waiting on the C++ core so its executor
126+
# thread is released promptly instead of waiting for the whole server-side operation
127+
# to finish. Normal completion (exc_val is None) skips this so trailing metadata can
128+
# still be read from the streaming result.
129+
self._streaming_result.cancel()
124130
self._shutdown_executor()
125131

126132
async def __anext__(self):

couchbase/logic/pycbc_core/_core.pyi

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,21 @@ class pycbc_streamed_result(Generic[T]):
8282

8383
def __next__(self) -> Union[T, pycbc_exception, None]: ...
8484

85+
def cancel(self) -> None: ...
86+
87+
def is_cancelled(self) -> bool: ...
88+
8589

8690
class pycbc_scan_iterator(Generic[T]):
8791

8892
def __iter__(self) -> pycbc_scan_iterator[T]: ...
8993

9094
def __next__(self) -> Union[T, pycbc_exception]: ...
9195

96+
def cancel_scan(self) -> None: ...
97+
98+
def is_cancelled(self) -> bool: ...
99+
92100

93101
class pycbc_exception:
94102

src/result.cxx

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,36 @@ static PyMemberDef pycbc_streamed_result_members[] = {
214214
{ nullptr } // Sentinel
215215
};
216216

217+
static PyObject*
218+
pycbc_streamed_result__cancel__(pycbc_streamed_result* self, PyObject* args)
219+
{
220+
if (self->rows) {
221+
self->rows->cancel();
222+
}
223+
Py_RETURN_NONE;
224+
}
225+
226+
static PyObject*
227+
pycbc_streamed_result__is_cancelled__(pycbc_streamed_result* self, PyObject* args)
228+
{
229+
if (self->rows && self->rows->is_cancelled()) {
230+
Py_RETURN_TRUE;
231+
}
232+
Py_RETURN_FALSE;
233+
}
234+
235+
static PyMethodDef pycbc_streamed_result_methods[] = {
236+
{ "cancel",
237+
(PyCFunction)pycbc_streamed_result__cancel__,
238+
METH_NOARGS,
239+
PyDoc_STR("Cancel the streaming operation so a blocked iterator can unwind") },
240+
{ "is_cancelled",
241+
(PyCFunction)pycbc_streamed_result__is_cancelled__,
242+
METH_NOARGS,
243+
PyDoc_STR("Check if the streaming operation has been cancelled") },
244+
{ nullptr } // Sentinel
245+
};
246+
217247
static PyTypeObject pycbc_streamed_result_type = {
218248
PyVarObject_HEAD_INIT(nullptr, 0) "pycbc_core.pycbc_streamed_result", // tp_name
219249
sizeof(pycbc_streamed_result), // tp_basicsize
@@ -241,7 +271,7 @@ static PyTypeObject pycbc_streamed_result_type = {
241271
0, // tp_weaklistoffset
242272
pycbc_streamed_result__iter__, // tp_iter
243273
pycbc_streamed_result__iternext__, // tp_iternext
244-
nullptr, // tp_methods
274+
pycbc_streamed_result_methods, // tp_methods
245275
pycbc_streamed_result_members, // tp_members
246276
nullptr, // tp_getset
247277
nullptr, // tp_base

src/result.hxx

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public:
5151
: rows_()
5252
, mut_()
5353
, cv_()
54+
, cancelled_(false)
5455
{
5556
}
5657

@@ -72,11 +73,33 @@ public:
7273
cv_.notify_one();
7374
}
7475

76+
// Unblock a waiting get(): once cancelled, get() stops waiting for new rows and returns a
77+
// null sentinel as soon as the queue drains. Used to release a streaming iterator's worker
78+
// thread when the operation is cancelled/errored, rather than waiting for the entire
79+
// server-side operation to complete. Already-queued rows are still drained first.
80+
void cancel()
81+
{
82+
std::lock_guard<std::mutex> lock(mut_);
83+
cancelled_ = true;
84+
cv_.notify_all();
85+
}
86+
87+
bool is_cancelled()
88+
{
89+
std::lock_guard<std::mutex> lock(mut_);
90+
return cancelled_;
91+
}
92+
7593
T get(std::chrono::milliseconds timeout_ms)
7694
{
7795
std::unique_lock<std::mutex> lock(mut_);
7896

7997
while (rows_.empty()) {
98+
if (cancelled_) {
99+
// Cancelled with no rows remaining: stop waiting and signal end-of-iteration
100+
// (a null PyObject* surfaces as StopIteration to the Python iterator).
101+
return nullptr;
102+
}
80103
if (cv_.wait_for(lock, timeout_ms) == std::cv_status::timeout) {
81104
// This timeout (e.g. timeout_ms) is the same timeout we pass to the C++ core.
82105
// If we timeout on the Python side this means:
@@ -111,6 +134,7 @@ private:
111134
std::queue<T> rows_;
112135
std::mutex mut_;
113136
std::condition_variable cv_;
137+
bool cancelled_;
114138
};
115139

116140
struct pycbc_result {

0 commit comments

Comments
 (0)