Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit ac07e41

Browse files
authored
Make sure Pub / Sub BIDI consumer doesn't try to join itself. (#4540)
This functionality was removed in #4537, so this puts it back.
1 parent da3242e commit ac07e41

2 files changed

Lines changed: 58 additions & 4 deletions

File tree

google/cloud/pubsub_v1/subscriber/_consumer.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,8 @@ def _blocking_consume(self):
321321
if recover:
322322
recover = self._stop_request_generator(request_generator)
323323
if not recover:
324-
self.stop_consuming()
324+
self._stop_no_join()
325+
return
325326

326327
def start_consuming(self):
327328
"""Start consuming the stream."""
@@ -336,11 +337,35 @@ def start_consuming(self):
336337
_LOGGER.debug('Started helper thread %s', thread.name)
337338
self._consumer_thread = thread
338339

339-
def stop_consuming(self):
340-
"""Signal the stream to stop and block until it completes."""
340+
def _stop_no_join(self):
341+
"""Signal the request stream to stop.
342+
343+
To actually stop the worker ("consumer thread"), a ``STOP`` is
344+
sent to the request queue.
345+
346+
The ``_consumer_thread`` member is removed from the current instance
347+
and returned.
348+
349+
Returns:
350+
threading.Thread: The worker ("consumer thread") that is being
351+
stopped.
352+
"""
341353
self.active = False
342354
self._exiting.set()
343355
_LOGGER.debug('Stopping helper thread %s', self._consumer_thread.name)
344356
self.send_request(_helper_threads.STOP)
345-
self._consumer_thread.join()
357+
thread = self._consumer_thread
346358
self._consumer_thread = None
359+
return thread
360+
361+
def stop_consuming(self):
362+
"""Signal the stream to stop and block until it completes.
363+
364+
To actually stop the worker ("consumer thread"), a ``STOP`` is
365+
sent to the request queue.
366+
367+
This **assumes** that the caller is not in the same thread
368+
(since a thread cannot ``join()`` itself).
369+
"""
370+
thread = self._stop_no_join()
371+
thread.join()

tests/unit/pubsub_v1/subscriber/test_consumer.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,18 @@ def test_blocking_consume():
7777
assert on_res.mock_calls[1][1][1] == mock.sentinel.B
7878

7979

80+
@mock.patch.object(_consumer, '_LOGGER')
81+
def test_blocking_consume_when_exiting(_LOGGER):
82+
consumer = create_consumer()
83+
assert consumer._exiting.is_set() is False
84+
consumer._exiting.set()
85+
86+
# Make sure method cleanly exits.
87+
assert consumer._blocking_consume() is None
88+
89+
_LOGGER.debug.assert_called_once_with('Event signalled consumer exit.')
90+
91+
8092
class OnException(object):
8193

8294
def __init__(self, acceptable=None):
@@ -149,6 +161,23 @@ def test_start_consuming():
149161
assert consumer._consumer_thread is Thread.return_value
150162

151163

164+
def test_stop_consuming():
165+
consumer = create_consumer()
166+
consumer.active = True
167+
assert consumer._exiting.is_set() is False
168+
thread = mock.Mock(spec=threading.Thread)
169+
consumer._consumer_thread = thread
170+
171+
assert consumer.stop_consuming() is None
172+
173+
# Make sure state was updated.
174+
assert consumer.active is False
175+
assert consumer._exiting.is_set() is True
176+
assert consumer._consumer_thread is None
177+
# Check mocks.
178+
thread.join.assert_called_once_with()
179+
180+
152181
def basic_queue_generator(queue, received):
153182
while True:
154183
value = queue.get()

0 commit comments

Comments
 (0)