Skip to content

Commit 4204653

Browse files
committed
PYCBC-1779: Remove asyncio.QueueEmpty exception blocks
Changes ------- * Removed dead `except asyncio.QueueEmpty` branch from `stream_anext` and `AsyncRangeScanRequest.__anext__` (left over from PYCBC-1471 work) - Dropped `op_name` param from `stream_anext` - Removed `test_queue_empty_converted_with_op_name` from streaming_helpers_t.py Change-Id: Idb3c0414d12da83005c702569609d5496f511a68 Reviewed-on: https://review.couchbase.org/c/couchbase-python-client/+/246875 Reviewed-by: Anirudh Lakhotia <anirudh.lakhotia@couchbase.com> Tested-by: Build Bot <build@couchbase.com>
1 parent 289fd60 commit 4204653

7 files changed

Lines changed: 27 additions & 49 deletions

File tree

acouchbase/analytics.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2016-2022. Couchbase, Inc.
1+
# Copyright 2016-2026. Couchbase, Inc.
22
# All Rights Reserved.
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License")
@@ -124,4 +124,4 @@ def _finalize(self, exc_val=None):
124124
self._shutdown_executor()
125125

126126
async def __anext__(self):
127-
return await stream_anext(self, 'Analytics')
127+
return await stream_anext(self)

acouchbase/kv_range_scan.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
# Copyright 2021, Couchbase, Inc.
2-
# All Rights Reserved
1+
# Copyright 2016-2026. Couchbase, Inc.
2+
# All Rights Reserved.
33
#
4-
# Licensed under the Apache License, Version 2.0 (the "License")
5-
# you may not use this file except in compliance with the License.
6-
# You may obtain a copy of the License at
4+
# Licensed under the Apache License, Version 2.0 (the "License")
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
77
#
8-
# http://www.apache.org/licenses/LICENSE-2.0
8+
# http://www.apache.org/licenses/LICENSE-2.0
99
#
10-
# Unless required by applicable law or agreed to in writing, software
11-
# distributed under the License is distributed on an "AS IS" BASIS,
12-
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
# See the License for the specific language governing permissions and
14-
# limitations under the License.
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
1515

1616
from __future__ import annotations
1717

@@ -80,11 +80,6 @@ async def __anext__(self):
8080
try:
8181
return await self._loop.run_in_executor(self._tp_executor, self._get_next_row)
8282
# We can stop iterator when we receive RangeScanCompletedException
83-
except asyncio.QueueEmpty:
84-
exc_cls = PYCBC_ERROR_MAP.get(ExceptionMap.InternalSDKException.value, CouchbaseException)
85-
excptn = exc_cls('Unexpected QueueEmpty exception caught when doing N1QL query.')
86-
self._abort()
87-
raise excptn
8883
except RangeScanCompletedException:
8984
self._done_streaming = True
9085
self._shutdown_executor()

acouchbase/n1ql.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2016-2022. Couchbase, Inc.
1+
# Copyright 2016-2026. Couchbase, Inc.
22
# All Rights Reserved.
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License")
@@ -126,4 +126,4 @@ def _finalize(self, exc_val=None):
126126
self._shutdown_executor()
127127

128128
async def __anext__(self):
129-
return await stream_anext(self, 'N1QL')
129+
return await stream_anext(self)

acouchbase/search.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2016-2022. Couchbase, Inc.
1+
# Copyright 2016-2026. Couchbase, Inc.
22
# All Rights Reserved.
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License")
@@ -122,4 +122,4 @@ def _finalize(self, exc_val=None):
122122
self._shutdown_executor()
123123

124124
async def __anext__(self):
125-
return await stream_anext(self, 'Search')
125+
return await stream_anext(self)

acouchbase/tests/streaming_helpers_t.py

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ class StreamingAnextTestSuite:
8484
'test_stop_async_iteration_finalizes_and_reads_metadata',
8585
'test_couchbase_exception_propagates_unchanged',
8686
'test_generic_exception_converted_to_internal',
87-
'test_queue_empty_converted_with_op_name',
8887
'test_keyboard_interrupt_propagates_unconverted',
8988
'test_cancellation_finalizes_and_propagates',
9089
'test_finalize_cancels_streaming_result_on_error',
@@ -95,7 +94,7 @@ class StreamingAnextTestSuite:
9594
async def test_returns_row_and_ends_span(self):
9695
req = _FakeAsyncStreamingRequest(asyncio.get_running_loop(), rows=['row-1'])
9796
try:
98-
row = await stream_anext(req, 'N1QL')
97+
row = await stream_anext(req)
9998
assert row == 'row-1'
10099
# span ended on the (first) row, no error, and the op is NOT finalized mid-stream
101100
assert req.process_core_span_calls == [None]
@@ -108,7 +107,7 @@ async def test_returns_row_and_ends_span(self):
108107
async def test_stop_async_iteration_finalizes_and_reads_metadata(self):
109108
req = _FakeAsyncStreamingRequest(asyncio.get_running_loop(), rows=[])
110109
with pytest.raises(StopAsyncIteration):
111-
await stream_anext(req, 'N1QL')
110+
await stream_anext(req)
112111
assert req._done_streaming is True
113112
assert req.finalize_calls == [None]
114113
assert req.get_metadata_calls == 1
@@ -119,7 +118,7 @@ async def test_couchbase_exception_propagates_unchanged(self):
119118
err = CouchbaseException('boom')
120119
req = _FakeAsyncStreamingRequest(asyncio.get_running_loop(), raise_exc=err)
121120
with pytest.raises(CouchbaseException) as exc_info:
122-
await stream_anext(req, 'N1QL')
121+
await stream_anext(req)
123122
assert exc_info.value is err
124123
assert req.finalize_calls == [err]
125124
assert req.executor_shutdown is True
@@ -128,25 +127,16 @@ async def test_couchbase_exception_propagates_unchanged(self):
128127
async def test_generic_exception_converted_to_internal(self):
129128
req = _FakeAsyncStreamingRequest(asyncio.get_running_loop(), raise_exc=ValueError('bad'))
130129
with pytest.raises(InternalSDKException):
131-
await stream_anext(req, 'N1QL')
130+
await stream_anext(req)
132131
assert len(req.finalize_calls) == 1
133132
assert isinstance(req.finalize_calls[0], InternalSDKException)
134133
assert req.executor_shutdown is True
135134

136-
@pytest.mark.asyncio
137-
async def test_queue_empty_converted_with_op_name(self):
138-
req = _FakeAsyncStreamingRequest(asyncio.get_running_loop(), raise_exc=asyncio.QueueEmpty())
139-
with pytest.raises(InternalSDKException) as exc_info:
140-
await stream_anext(req, 'Analytics')
141-
assert 'Analytics' in str(exc_info.value)
142-
assert len(req.finalize_calls) == 1
143-
assert req.executor_shutdown is True
144-
145135
@pytest.mark.asyncio
146136
async def test_keyboard_interrupt_propagates_unconverted(self):
147137
req = _FakeAsyncStreamingRequest(asyncio.get_running_loop(), raise_exc=KeyboardInterrupt())
148138
with pytest.raises(KeyboardInterrupt):
149-
await stream_anext(req, 'N1QL')
139+
await stream_anext(req)
150140
# BaseException must be finalized for cleanup but NOT converted to a CouchbaseException
151141
assert len(req.finalize_calls) == 1
152142
assert isinstance(req.finalize_calls[0], KeyboardInterrupt)
@@ -157,7 +147,7 @@ async def test_cancellation_finalizes_and_propagates(self):
157147
# The core scenario: a task cancelled while blocked fetching the next row.
158148
block = threading.Event()
159149
req = _FakeAsyncStreamingRequest(asyncio.get_running_loop(), rows=['late-row'], block_event=block)
160-
task = asyncio.ensure_future(stream_anext(req, 'N1QL'))
150+
task = asyncio.ensure_future(stream_anext(req))
161151
# let the task reach the run_in_executor await (worker is now blocked on the event)
162152
await asyncio.sleep(0.05)
163153
task.cancel()

acouchbase/views.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2016-2022. Couchbase, Inc.
1+
# Copyright 2016-2026. Couchbase, Inc.
22
# All Rights Reserved.
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License")
@@ -130,4 +130,4 @@ def _finalize(self, exc_val=None):
130130
self._shutdown_executor()
131131

132132
async def __anext__(self):
133-
return await stream_anext(self, 'View')
133+
return await stream_anext(self)

couchbase/logic/streaming.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2016-2022. Couchbase, Inc.
1+
# Copyright 2016-2026. Couchbase, Inc.
22
# All Rights Reserved.
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License")
@@ -13,8 +13,6 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
import asyncio
17-
1816
from couchbase.exceptions import (PYCBC_ERROR_MAP,
1917
CouchbaseException,
2018
ExceptionMap)
@@ -44,12 +42,11 @@ def _internal_exception(message):
4442
return exc_cls(message)
4543

4644

47-
async def stream_anext(req, op_name):
45+
async def stream_anext(req):
4846
"""
4947
**INTERNAL**
5048
5149
Drive a single async streaming iteration for ``req`` and apply uniform teardown/error handling.
52-
``op_name`` is the human-readable operation name used in diagnostic messages (e.g. ``'N1QL'``).
5350
"""
5451
try:
5552
# this is a blocking operation, so it is offloaded to the request's executor
@@ -60,10 +57,6 @@ async def stream_anext(req, op_name):
6057
# work for the first call).
6158
req._process_core_span()
6259
return row
63-
except asyncio.QueueEmpty:
64-
excptn = _internal_exception(f'Unexpected QueueEmpty exception caught when doing {op_name} query.')
65-
req._finalize(exc_val=excptn)
66-
raise excptn
6760
except StopAsyncIteration:
6861
req._done_streaming = True
6962
req._finalize()

0 commit comments

Comments
 (0)