Skip to content

Commit d92f378

Browse files
committed
PYCBC-1773: Streaming operations do not handle asyncio.CancelledError / BaseException
Motivation ---------- Streaming row iterators only ran span/meter teardown from except branches catching Exception, so asyncio.CancelledError (a BaseException) bypassed teardown which would leak to the span/meter and orphan the C++ core streaming result and its executor thread. Changes ------- * End the span/meter and release the executor on BaseException (CancelledError/KeyboardInterrupt/SystemExit), then re-raise unchanged * Extract the shared __anext__/__next__ handling into stream_anext/stream_next; all 12 query iterators delegate * acouchbase: shut down the per-request executor on every terminal path; default num_workers to 1 (matches sequential consumption) * Add cluster-free unit tests for both helpers Change-Id: I02841b24958df60fb36235d57636d02672bd7ca0 Reviewed-on: https://review.couchbase.org/c/couchbase-python-client/+/246059 Reviewed-by: Dimitris Christodoulou <dimitris.christodoulou@couchbase.com> Tested-by: Build Bot <build@couchbase.com>
1 parent 7d0b99a commit d92f378

17 files changed

Lines changed: 539 additions & 266 deletions

acouchbase/analytics.py

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from couchbase.logic.analytics import AnalyticsQuery # noqa: F401
2626
from couchbase.logic.analytics import AnalyticsRequestLogic
2727
from couchbase.logic.pycbc_core import pycbc_exception as PycbcCoreException
28+
from couchbase.logic.streaming import stream_anext
2829

2930

3031
class AsyncAnalyticsRequest(AnalyticsRequestLogic):
@@ -35,10 +36,11 @@ def __init__(self,
3536
row_factory=lambda x: x,
3637
**kwargs
3738
):
38-
num_workers = kwargs.pop('num_workers', 2)
39+
num_workers = kwargs.pop('num_workers', None) or 1
3940
super().__init__(connection, query_params, row_factory=row_factory, **kwargs)
4041
self._loop = loop
4142
self._tp_executor = ThreadPoolExecutor(num_workers)
43+
self._executor_shutdown = False
4244

4345
@property
4446
def loop(self):
@@ -93,30 +95,27 @@ def _get_next_row(self):
9395
# this should allow the event loop to pick up something else
9496
return self.serializer.deserialize(row)
9597

98+
def _shutdown_executor(self):
99+
"""
100+
**INTERNAL**
101+
102+
Release the per-request streaming executor. Safe to call multiple times.
103+
"""
104+
if not self._executor_shutdown:
105+
self._executor_shutdown = True
106+
self._tp_executor.shutdown(wait=False)
107+
108+
def _finalize(self, exc_val=None):
109+
"""
110+
**INTERNAL**
111+
112+
Terminal cleanup for the streaming op: end the observability span/meter and release the
113+
executor. Invoked from every terminal branch of ``__anext__`` -- including cancellation --
114+
so that ``asyncio.CancelledError`` (a ``BaseException``, not an ``Exception``) cannot bypass
115+
teardown and leak the span/meter or orphan the streaming result and its executor thread.
116+
"""
117+
self._process_core_span(exc_val=exc_val)
118+
self._shutdown_executor()
119+
96120
async def __anext__(self):
97-
try:
98-
row = await self._loop.run_in_executor(self._tp_executor, self._get_next_row)
99-
# We want to end the streaming op span once we have a response from the C++ core.
100-
# Unfortunately right now, that means we need to wait until we have the first row (or we have an error).
101-
# As this method is idempotent, it is safe to call for each row (it will only do work for the first call).
102-
self._process_core_span()
103-
return row
104-
except asyncio.QueueEmpty:
105-
exc_cls = PYCBC_ERROR_MAP.get(ExceptionMap.InternalSDKException.value, CouchbaseException)
106-
excptn = exc_cls('Unexpected QueueEmpty exception caught when doing Analytics query.')
107-
self._process_core_span(exc_val=excptn)
108-
raise excptn
109-
except StopAsyncIteration:
110-
self._done_streaming = True
111-
if self._processed_core_span is False:
112-
self._process_core_span()
113-
self._get_metadata()
114-
raise
115-
except CouchbaseException as ex:
116-
self._process_core_span(exc_val=ex)
117-
raise ex
118-
except Exception as ex:
119-
exc_cls = PYCBC_ERROR_MAP.get(ExceptionMap.InternalSDKException.value, CouchbaseException)
120-
excptn = exc_cls(str(ex))
121-
self._process_core_span(exc_val=excptn)
122-
raise excptn
121+
return await stream_anext(self, 'Analytics')

acouchbase/kv_range_scan.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,12 @@
3232

3333
class AsyncRangeScanRequest(RangeScanRequestLogic):
3434
def __init__(self, connection: pycbc_connection, loop: asyncio.AbstractEventLoop, **kwargs: Any) -> None:
35-
num_workers = kwargs.pop('num_workers', 2)
35+
num_workers = kwargs.pop('num_workers', None) or 1
3636
super().__init__(connection, **kwargs)
3737
self._loop = loop
3838
self._result_ftr = None
3939
self._tp_executor = ThreadPoolExecutor(num_workers)
40+
self._executor_shutdown = False
4041

4142
@property
4243
def loop(self) -> asyncio.AbstractEventLoop:
@@ -54,23 +55,44 @@ def __aiter__(self):
5455

5556
return self
5657

58+
def _shutdown_executor(self):
59+
"""
60+
**INTERNAL**
61+
62+
Release the per-request streaming executor. Safe to call multiple times.
63+
"""
64+
if not self._executor_shutdown:
65+
self._executor_shutdown = True
66+
self._tp_executor.shutdown(wait=False)
67+
5768
async def __anext__(self):
5869
try:
5970
return await self._loop.run_in_executor(self._tp_executor, self._get_next_row)
6071
# We can stop iterator when we receive RangeScanCompletedException
6172
except asyncio.QueueEmpty:
6273
exc_cls = PYCBC_ERROR_MAP.get(ExceptionMap.InternalSDKException.value, CouchbaseException)
6374
excptn = exc_cls('Unexpected QueueEmpty exception caught when doing N1QL query.')
75+
self._shutdown_executor()
6476
raise excptn
6577
except RangeScanCompletedException:
6678
self._done_streaming = True
79+
self._shutdown_executor()
6780
raise StopAsyncIteration
6881
except StopAsyncIteration:
6982
self._done_streaming = True
83+
self._shutdown_executor()
7084
raise
7185
except CouchbaseException as ex:
86+
self._shutdown_executor()
7287
raise ex
7388
except Exception as ex:
7489
exc_cls = PYCBC_ERROR_MAP.get(ExceptionMap.InternalSDKException.value, CouchbaseException)
7590
excptn = exc_cls(str(ex))
91+
self._shutdown_executor()
7692
raise excptn
93+
except BaseException:
94+
# asyncio.CancelledError (and KeyboardInterrupt/SystemExit) derive from BaseException,
95+
# not Exception, so they fall through every handler above. Release the executor so the
96+
# cancelled scan does not orphan its worker thread, then re-raise unchanged.
97+
self._shutdown_executor()
98+
raise

acouchbase/n1ql.py

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from couchbase.logic.n1ql import N1QLQuery # noqa: F401
2727
from couchbase.logic.n1ql import QueryRequestLogic
2828
from couchbase.logic.pycbc_core import pycbc_exception as PycbcCoreException
29+
from couchbase.logic.streaming import stream_anext
2930

3031
logger = logging.getLogger(__name__)
3132

@@ -38,10 +39,11 @@ def __init__(self,
3839
row_factory=lambda x: x,
3940
**kwargs
4041
):
41-
num_workers = kwargs.pop('num_workers', 2)
42+
num_workers = kwargs.pop('num_workers', None) or 1
4243
super().__init__(connection, query_params, row_factory=row_factory, **kwargs)
4344
self._loop = loop
4445
self._tp_executor = ThreadPoolExecutor(num_workers)
46+
self._executor_shutdown = False
4547

4648
@property
4749
def loop(self):
@@ -95,29 +97,27 @@ def _get_next_row(self):
9597

9698
return self.serializer.deserialize(row)
9799

100+
def _shutdown_executor(self):
101+
"""
102+
**INTERNAL**
103+
104+
Release the per-request streaming executor. Safe to call multiple times.
105+
"""
106+
if not self._executor_shutdown:
107+
self._executor_shutdown = True
108+
self._tp_executor.shutdown(wait=False)
109+
110+
def _finalize(self, exc_val=None):
111+
"""
112+
**INTERNAL**
113+
114+
Terminal cleanup for the streaming op: end the observability span/meter and release the
115+
executor. Invoked from every terminal branch of ``__anext__`` -- including cancellation --
116+
so that ``asyncio.CancelledError`` (a ``BaseException``, not an ``Exception``) cannot bypass
117+
teardown and leak the span/meter or orphan the streaming result and its executor thread.
118+
"""
119+
self._process_core_span(exc_val=exc_val)
120+
self._shutdown_executor()
121+
98122
async def __anext__(self):
99-
try:
100-
row = await self._loop.run_in_executor(self._tp_executor, self._get_next_row)
101-
# We want to end the streaming op span once we have a response from the C++ core.
102-
# Unfortunately right now, that means we need to wait until we have the first row (or we have an error).
103-
# As this method is idempotent, it is safe to call for each row (it will only do work for the first call).
104-
self._process_core_span()
105-
return row
106-
except asyncio.QueueEmpty:
107-
exc_cls = PYCBC_ERROR_MAP.get(ExceptionMap.InternalSDKException.value, CouchbaseException)
108-
excptn = exc_cls('Unexpected QueueEmpty exception caught when doing N1QL query.')
109-
self._process_core_span(exc_val=excptn)
110-
raise excptn
111-
except StopAsyncIteration:
112-
self._done_streaming = True
113-
self._process_core_span()
114-
self._get_metadata()
115-
raise
116-
except CouchbaseException as ex:
117-
self._process_core_span(exc_val=ex)
118-
raise ex
119-
except Exception as ex:
120-
exc_cls = PYCBC_ERROR_MAP.get(ExceptionMap.InternalSDKException.value, CouchbaseException)
121-
excptn = exc_cls(str(ex))
122-
self._process_core_span(exc_val=excptn)
123-
raise excptn
123+
return await stream_anext(self, 'N1QL')

acouchbase/search.py

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from couchbase.logic.pycbc_core import pycbc_exception as PycbcCoreException
2626
from couchbase.logic.search import SearchQueryBuilder # noqa: F401
2727
from couchbase.logic.search import FullTextSearchRequestLogic
28+
from couchbase.logic.streaming import stream_anext
2829

2930

3031
class AsyncFullTextSearchRequest(FullTextSearchRequestLogic):
@@ -34,10 +35,11 @@ def __init__(self,
3435
encoded_query,
3536
**kwargs
3637
):
37-
num_workers = kwargs.pop('num_workers', 2)
38+
num_workers = kwargs.pop('num_workers', None) or 1
3839
super().__init__(connection, encoded_query, **kwargs)
3940
self._loop = loop
4041
self._tp_executor = ThreadPoolExecutor(num_workers)
42+
self._executor_shutdown = False
4143

4244
@property
4345
def loop(self):
@@ -91,29 +93,27 @@ def _get_next_row(self):
9193

9294
return self._deserialize_row(row)
9395

96+
def _shutdown_executor(self):
97+
"""
98+
**INTERNAL**
99+
100+
Release the per-request streaming executor. Safe to call multiple times.
101+
"""
102+
if not self._executor_shutdown:
103+
self._executor_shutdown = True
104+
self._tp_executor.shutdown(wait=False)
105+
106+
def _finalize(self, exc_val=None):
107+
"""
108+
**INTERNAL**
109+
110+
Terminal cleanup for the streaming op: end the observability span/meter and release the
111+
executor. Invoked from every terminal branch of ``__anext__`` -- including cancellation --
112+
so that ``asyncio.CancelledError`` (a ``BaseException``, not an ``Exception``) cannot bypass
113+
teardown and leak the span/meter or orphan the streaming result and its executor thread.
114+
"""
115+
self._process_core_span(exc_val=exc_val)
116+
self._shutdown_executor()
117+
94118
async def __anext__(self):
95-
try:
96-
row = await self._loop.run_in_executor(self._tp_executor, self._get_next_row)
97-
# We want to end the streaming op span once we have a response from the C++ core.
98-
# Unfortunately right now, that means we need to wait until we have the first row (or we have an error).
99-
# As this method is idempotent, it is safe to call for each row (it will only do work for the first call).
100-
self._process_core_span()
101-
return row
102-
except asyncio.QueueEmpty:
103-
exc_cls = PYCBC_ERROR_MAP.get(ExceptionMap.InternalSDKException.value, CouchbaseException)
104-
excptn = exc_cls('Unexpected QueueEmpty exception caught when doing Search query.')
105-
self._process_core_span(exc_val=excptn)
106-
raise excptn
107-
except StopAsyncIteration:
108-
self._done_streaming = True
109-
self._process_core_span()
110-
self._get_metadata()
111-
raise
112-
except CouchbaseException as ex:
113-
self._process_core_span(exc_val=ex)
114-
raise ex
115-
except Exception as ex:
116-
exc_cls = PYCBC_ERROR_MAP.get(ExceptionMap.InternalSDKException.value, CouchbaseException)
117-
excptn = exc_cls(str(ex))
118-
self._process_core_span(exc_val=excptn)
119-
raise excptn
119+
return await stream_anext(self, 'Search')

0 commit comments

Comments
 (0)