Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit df5a803

Browse files
authored
Actually **stopping** the Pub / Sub consumer on an exception. (#4498)
Fixes #4463.
1 parent a6f99e4 commit df5a803

5 files changed

Lines changed: 51 additions & 18 deletions

File tree

google/cloud/pubsub_v1/subscriber/_consumer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,10 +242,10 @@ def _blocking_consume(self):
242242
# case, break out of the while loop and exit this thread.
243243
_LOGGER.debug('Clean RPC loop exit signalled consumer exit.')
244244
break
245-
except KeyboardInterrupt:
246-
self.stop_consuming()
247245
except Exception as exc:
248-
self.active = self._policy.on_exception(exc)
246+
recover = self._policy.on_exception(exc)
247+
if not recover:
248+
self.stop_consuming()
249249

250250
def start_consuming(self):
251251
"""Start consuming the stream."""

google/cloud/pubsub_v1/subscriber/_helper_threads.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,19 @@
4040
STOP = uuid.uuid4()
4141

4242

43+
def _current_thread():
44+
"""Get the currently active thread.
45+
46+
This is provided as a test helper so that it can be mocked easily.
47+
Mocking ``threading.current_thread()`` directly may have unintended
48+
consequences on code that relies on it.
49+
50+
Returns:
51+
threading.Thread: The current thread.
52+
"""
53+
return threading.current_thread()
54+
55+
4356
class HelperThreadRegistry(object):
4457
def __init__(self):
4558
self._helper_threads = {}
@@ -84,6 +97,16 @@ def stop(self, name):
8497
if helper_thread is None:
8598
return
8699

100+
if helper_thread.thread is _current_thread():
101+
# The current thread cannot ``join()`` itself but it can
102+
# still send a signal to stop.
103+
_LOGGER.debug('Cannot stop current thread %s', name)
104+
helper_thread.queue.put(STOP)
105+
# We return and stop short of ``pop()``-ing so that the
106+
# thread that invoked the current helper can properly stop
107+
# it.
108+
return
109+
87110
# Join the thread if it is still alive.
88111
if helper_thread.thread.is_alive():
89112
_LOGGER.debug('Stopping helper thread %s', name)

google/cloud/pubsub_v1/subscriber/policy/thread.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,12 @@ def on_exception(self, exception):
180180
"""Handle the exception.
181181
182182
If the exception is one of the retryable exceptions, this will signal
183-
to the consumer thread that it should remain active.
183+
to the consumer thread that it should "recover" from the failure.
184184
185185
This will cause the stream to exit when it returns :data:`False`.
186186
187187
Returns:
188-
bool: Indicates if the caller should remain active or shut down.
188+
bool: Indicates if the caller should recover or shut down.
189189
Will be :data:`True` if the ``exception`` is "acceptable", i.e.
190190
in a list of retryable / idempotent exceptions.
191191
"""

tests/unit/pubsub_v1/subscriber/test_consumer.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -76,19 +76,6 @@ def test_blocking_consume():
7676
assert on_res.mock_calls[1][1][1] == mock.sentinel.B
7777

7878

79-
def test_blocking_consume_keyboard_interrupt():
80-
consumer = create_consumer()
81-
Policy = type(consumer._policy)
82-
83-
# Establish that we get responses until we are sent the exiting event.
84-
with mock.patch.object(Policy, 'call_rpc', autospec=True) as call_rpc:
85-
call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)
86-
with mock.patch.object(Policy, 'on_response', autospec=True) as on_res:
87-
on_res.side_effect = KeyboardInterrupt
88-
consumer._blocking_consume()
89-
on_res.assert_called_once_with(consumer._policy, mock.sentinel.A)
90-
91-
9279
class OnException(object):
9380

9481
def __init__(self, exiting_event, acceptable=None):

tests/unit/pubsub_v1/subscriber/test_helper_threads.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,29 @@ def test_stop_noop():
3636
assert len(registry._helper_threads) == 0
3737

3838

39+
@mock.patch.object(
40+
_helper_threads, '_current_thread', return_value=mock.sentinel.thread)
41+
def test_stop_current_thread(_current_thread):
42+
registry = _helper_threads.HelperThreadRegistry()
43+
queue_ = mock.Mock(spec=('put',))
44+
45+
name = 'here'
46+
registry._helper_threads[name] = _helper_threads._HelperThread(
47+
name=name,
48+
queue=queue_,
49+
thread=_current_thread.return_value,
50+
)
51+
assert list(registry._helper_threads.keys()) == [name]
52+
registry.stop(name)
53+
# Make sure it hasn't been removed from the registry ...
54+
assert list(registry._helper_threads.keys()) == [name]
55+
# ... but it did receive the STOP signal.
56+
queue_.put.assert_called_once_with(_helper_threads.STOP)
57+
58+
# Verify that our mock was only called once.
59+
_current_thread.assert_called_once_with()
60+
61+
3962
def test_stop_dead_thread():
4063
registry = _helper_threads.HelperThreadRegistry()
4164
registry._helper_threads['foo'] = _helper_threads._HelperThread(

0 commit comments

Comments
 (0)