Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit ca10706

Browse files
authored
Adding pause/resume to Pub / Sub consumer. (#4558)
Using these (rather then open/close on the subscription Policy) when the flow control signals the message load is too great.
1 parent 2761cd0 commit ca10706

4 files changed

Lines changed: 145 additions & 39 deletions

File tree

google/cloud/pubsub_v1/subscriber/_consumer.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ class Consumer(object):
179179
def __init__(self):
180180
self._request_queue = queue.Queue()
181181
self.stopped = threading.Event()
182+
self._can_consume = threading.Event()
182183
self._put_lock = threading.Lock()
183184
self._consumer_thread = None
184185

@@ -319,8 +320,10 @@ def _blocking_consume(self, policy):
319320

320321
request_generator = self._request_generator_thread(policy)
321322
response_generator = policy.call_rpc(request_generator)
323+
responses = _pausable_iterator(
324+
response_generator, self._can_consume)
322325
try:
323-
for response in response_generator:
326+
for response in responses:
324327
_LOGGER.debug('Received response:\n%r', response)
325328
policy.on_response(response)
326329

@@ -339,6 +342,34 @@ def _blocking_consume(self, policy):
339342
self._stop_no_join()
340343
return
341344

345+
def pause(self):
346+
"""Pause the current consumer.
347+
348+
This method is idempotent by design.
349+
350+
This will clear the ``_can_consume`` event which is checked
351+
every time :meth:`_blocking_consume` consumes a response from the
352+
bidirectional streaming pull.
353+
354+
Complement to :meth:`resume`.
355+
"""
356+
_LOGGER.debug('Pausing consumer')
357+
self._can_consume.clear()
358+
359+
def resume(self):
360+
"""Resume the current consumer.
361+
362+
This method is idempotent by design.
363+
364+
This will set the ``_can_consume`` event which is checked
365+
every time :meth:`_blocking_consume` consumes a response from the
366+
bidirectional streaming pull.
367+
368+
Complement to :meth:`pause`.
369+
"""
370+
_LOGGER.debug('Resuming consumer')
371+
self._can_consume.set()
372+
342373
def start_consuming(self, policy):
343374
"""Start consuming the stream.
344375
@@ -351,6 +382,7 @@ def start_consuming(self, policy):
351382
responses are handled.
352383
"""
353384
self.stopped.clear()
385+
self.resume() # Make sure we aren't paused.
354386
thread = threading.Thread(
355387
name=_BIDIRECTIONAL_CONSUMER_NAME,
356388
target=self._blocking_consume,
@@ -374,6 +406,7 @@ def _stop_no_join(self):
374406
threading.Thread: The worker ("consumer thread") that is being
375407
stopped.
376408
"""
409+
self.resume() # Make sure we aren't paused.
377410
self.stopped.set()
378411
_LOGGER.debug('Stopping helper thread %s', self._consumer_thread.name)
379412
self.send_request(_helper_threads.STOP)
@@ -392,3 +425,23 @@ def stop_consuming(self):
392425
"""
393426
thread = self._stop_no_join()
394427
thread.join()
428+
429+
430+
def _pausable_iterator(iterator, can_continue):
431+
"""Converts a standard iterator into one that can be paused.
432+
433+
The ``can_continue`` event can be used by an independent, concurrent
434+
worker to pause and resume the iteration over ``iterator``.
435+
436+
Args:
437+
iterator (Iterator): Any iterator to be iterated over.
438+
can_continue (threading.Event): An event which determines if we
439+
can advance to the next iteration. Will be ``wait()``-ed on
440+
before
441+
442+
Yields:
443+
Any: The items from ``iterator``.
444+
"""
445+
while True:
446+
can_continue.wait()
447+
yield next(iterator)

google/cloud/pubsub_v1/subscriber/policy/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ def drop(self, ack_id, byte_size):
225225
# before restarting.
226226
if self._paused and self._load < self.flow_control.resume_threshold:
227227
self._paused = False
228-
self.open(self._callback)
228+
self._consumer.resume()
229229

230230
def get_initial_request(self, ack_queue=False):
231231
"""Return the initial request.
@@ -291,7 +291,7 @@ def lease(self, ack_id, byte_size):
291291
# If we do, we need to stop the stream.
292292
if self._load >= 1.0:
293293
self._paused = True
294-
self.close()
294+
self._consumer.pause()
295295

296296
def maintain_leases(self):
297297
"""Maintain all of the leases being managed by the policy.

tests/unit/pubsub_v1/subscriber/test_consumer.py

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,11 @@ def test_request_generator_thread():
6161

6262
def test_blocking_consume():
6363
policy = mock.Mock(spec=('call_rpc', 'on_response'))
64-
policy.call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)
64+
policy.call_rpc.return_value = iter((mock.sentinel.A, mock.sentinel.B))
6565

6666
consumer = _consumer.Consumer()
67+
consumer.resume()
68+
6769
assert consumer._blocking_consume(policy) is None
6870
policy.call_rpc.assert_called_once()
6971
policy.on_response.assert_has_calls(
@@ -96,11 +98,12 @@ def __call__(self, exception):
9698

9799
def test_blocking_consume_on_exception():
98100
policy = mock.Mock(spec=('call_rpc', 'on_response', 'on_exception'))
99-
policy.call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)
101+
policy.call_rpc.return_value = iter((mock.sentinel.A, mock.sentinel.B))
100102
exc = TypeError('Bad things!')
101103
policy.on_response.side_effect = exc
102104

103105
consumer = _consumer.Consumer()
106+
consumer.resume()
104107
consumer._consumer_thread = mock.Mock(spec=threading.Thread)
105108
policy.on_exception.side_effect = OnException()
106109

@@ -114,37 +117,77 @@ def test_blocking_consume_on_exception():
114117
policy.on_exception.assert_called_once_with(exc)
115118

116119

120+
class RaisingResponseGenerator(object):
121+
# NOTE: This is needed because defining `.next` on an **instance**
122+
# rather than the **class** will not be iterable in Python 2.
123+
# This is problematic since a `Mock` just sets members.
124+
125+
def __init__(self, exception):
126+
self.exception = exception
127+
self.done_calls = 0
128+
self.next_calls = 0
129+
130+
def done(self):
131+
self.done_calls += 1
132+
return True
133+
134+
def __next__(self):
135+
self.next_calls += 1
136+
raise self.exception
137+
138+
def next(self):
139+
return self.__next__() # Python 2
140+
141+
117142
def test_blocking_consume_two_exceptions():
118143
policy = mock.Mock(spec=('call_rpc', 'on_exception'))
119144

120145
exc1 = NameError('Oh noes.')
121146
exc2 = ValueError('Something grumble.')
122147
policy.on_exception.side_effect = OnException(acceptable=exc1)
123148

124-
response_generator1 = mock.MagicMock(spec=('__iter__', 'done'))
125-
response_generator1.__iter__.side_effect = exc1
126-
response_generator1.done.return_value = True
127-
response_generator2 = mock.MagicMock(spec=('__iter__', 'done'))
128-
response_generator2.__iter__.side_effect = exc2
149+
response_generator1 = RaisingResponseGenerator(exc1)
150+
response_generator2 = RaisingResponseGenerator(exc2)
129151
policy.call_rpc.side_effect = (response_generator1, response_generator2)
130152

131153
consumer = _consumer.Consumer()
154+
consumer.resume()
132155
consumer._consumer_thread = mock.Mock(spec=threading.Thread)
133156

134157
# Establish that we get responses until we are sent the exiting event.
135-
consumer._blocking_consume(policy)
158+
assert consumer._blocking_consume(policy) is None
136159
assert consumer._consumer_thread is None
137160

138161
# Check mocks.
139162
assert policy.call_rpc.call_count == 2
140-
response_generator1.__iter__.assert_called_once_with()
141-
response_generator1.done.assert_called_once_with()
142-
response_generator2.__iter__.assert_called_once_with()
143-
response_generator2.done.assert_not_called()
163+
assert response_generator1.next_calls == 1
164+
assert response_generator1.done_calls == 1
165+
assert response_generator2.next_calls == 1
166+
assert response_generator2.done_calls == 0
144167
policy.on_exception.assert_has_calls(
145168
[mock.call(exc1), mock.call(exc2)])
146169

147170

171+
@mock.patch.object(_consumer, '_LOGGER')
172+
def test_pause(_LOGGER):
173+
consumer = _consumer.Consumer()
174+
consumer._can_consume.set()
175+
176+
assert consumer.pause() is None
177+
assert not consumer._can_consume.is_set()
178+
_LOGGER.debug.assert_called_once_with('Pausing consumer')
179+
180+
181+
@mock.patch.object(_consumer, '_LOGGER')
182+
def test_resume(_LOGGER):
183+
consumer = _consumer.Consumer()
184+
consumer._can_consume.clear()
185+
186+
assert consumer.resume() is None
187+
assert consumer._can_consume.is_set()
188+
_LOGGER.debug.assert_called_once_with('Resuming consumer')
189+
190+
148191
def test_start_consuming():
149192
creds = mock.Mock(spec=credentials.Credentials)
150193
client = subscriber.Client(credentials=creds)

tests/unit/pubsub_v1/subscriber/test_policy_base.py

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,13 @@ def test_ack_no_time():
109109
def test_ack_paused():
110110
policy = create_policy()
111111
policy._paused = True
112-
policy._consumer.stopped.clear()
113-
with mock.patch.object(policy, 'open') as open_:
112+
consumer = policy._consumer
113+
114+
with mock.patch.object(consumer, 'resume') as resume:
114115
policy.ack('ack_id_string')
115-
open_.assert_called()
116+
resume.assert_called_once_with()
117+
118+
assert policy._paused is False
116119
assert 'ack_id_string' in policy._ack_on_resume
117120

118121

@@ -157,33 +160,38 @@ def test_drop_below_threshold():
157160
"""
158161
policy = create_policy()
159162
policy.managed_ack_ids.add('ack_id_string')
160-
policy._bytes = 20
163+
num_bytes = 20
164+
policy._bytes = num_bytes
161165
policy._paused = True
162-
with mock.patch.object(policy, 'open') as open_:
163-
policy.drop(ack_id='ack_id_string', byte_size=20)
164-
open_.assert_called_once_with(policy._callback)
166+
consumer = policy._consumer
167+
168+
with mock.patch.object(consumer, 'resume') as resume:
169+
policy.drop(ack_id='ack_id_string', byte_size=num_bytes)
170+
resume.assert_called_once_with()
171+
165172
assert policy._paused is False
166173

167174

168175
def test_load():
169176
flow_control = types.FlowControl(max_messages=10, max_bytes=1000)
170177
policy = create_policy(flow_control=flow_control)
171-
172-
# This should mean that our messages count is at 10%, and our bytes
173-
# are at 15%; the ._load property should return the higher (0.15).
174-
policy.lease(ack_id='one', byte_size=150)
175-
assert policy._load == 0.15
176-
177-
# After this message is added, the messages should be higher at 20%
178-
# (versus 16% for bytes).
179-
policy.lease(ack_id='two', byte_size=10)
180-
assert policy._load == 0.2
181-
182-
# Returning a number above 100% is fine.
183-
with mock.patch.object(policy, 'close') as close:
178+
consumer = policy._consumer
179+
180+
with mock.patch.object(consumer, 'pause') as pause:
181+
# This should mean that our messages count is at 10%, and our bytes
182+
# are at 15%; the ._load property should return the higher (0.15).
183+
policy.lease(ack_id='one', byte_size=150)
184+
assert policy._load == 0.15
185+
pause.assert_not_called()
186+
# After this message is added, the messages should be higher at 20%
187+
# (versus 16% for bytes).
188+
policy.lease(ack_id='two', byte_size=10)
189+
assert policy._load == 0.2
190+
pause.assert_not_called()
191+
# Returning a number above 100% is fine.
184192
policy.lease(ack_id='three', byte_size=1000)
185193
assert policy._load == 1.16
186-
close.assert_called_once_with()
194+
pause.assert_called_once_with()
187195

188196

189197
def test_modify_ack_deadline():
@@ -251,11 +259,13 @@ def test_lease():
251259
def test_lease_above_threshold():
252260
flow_control = types.FlowControl(max_messages=2)
253261
policy = create_policy(flow_control=flow_control)
254-
with mock.patch.object(policy, 'close') as close:
262+
consumer = policy._consumer
263+
264+
with mock.patch.object(consumer, 'pause') as pause:
255265
policy.lease(ack_id='first_ack_id', byte_size=20)
256-
assert close.call_count == 0
266+
pause.assert_not_called()
257267
policy.lease(ack_id='second_ack_id', byte_size=25)
258-
close.assert_called_once_with()
268+
pause.assert_called_once_with()
259269

260270

261271
def test_nack():

0 commit comments

Comments
 (0)