Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit 4374149

Browse files
authored
PubSub: Policy.on_exception actually used to make consumer go inactive. (#4472)
1 parent aa49154 commit 4374149

6 files changed

Lines changed: 84 additions & 28 deletions

File tree

google/cloud/pubsub_v1/subscriber/_consumer.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -243,11 +243,7 @@ def _blocking_consume(self):
243243
except KeyboardInterrupt:
244244
self.stop_consuming()
245245
except Exception as exc:
246-
try:
247-
self._policy.on_exception(exc)
248-
except:
249-
self.active = False
250-
raise
246+
self.active = self._policy.on_exception(exc)
251247

252248
def start_consuming(self):
253249
"""Start consuming the stream."""

google/cloud/pubsub_v1/subscriber/futures.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,19 @@ def running(self):
3939
4040
.. note::
4141
42-
A ``False`` value here does not necessarily mean that the
42+
A :data:`False` value here does not necessarily mean that the
4343
subscription is closed; it merely means that _this_ future is
4444
not the future applicable to it.
4545
4646
Since futures have a single result (or exception) and there is
47-
not a concept of resetting them, a closing re-opening of a
47+
not a concept of resetting them, a closing / re-opening of a
4848
subscription will therefore return a new future.
4949
5050
Returns:
51-
bool: ``True`` if this subscription is opened with this future,
52-
``False`` otherwise.
51+
bool: :data:`True` if this subscription is opened with this
52+
future, :data:`False` otherwise.
5353
"""
54-
return self._policy.future is self
54+
if self._policy.future is not self:
55+
return False
56+
57+
return super(Future, self).running()

google/cloud/pubsub_v1/subscriber/policy/base.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,8 +376,11 @@ def on_exception(self, exception):
376376
"""Called when a gRPC exception occurs.
377377
378378
If this method does nothing, then the stream is re-started. If this
379-
raises an exception, it will stop the consumer thread.
380-
This is executed on the response consumer helper thread.
379+
raises an exception, it will stop the consumer thread. This is
380+
executed on the response consumer helper thread.
381+
382+
Implementations should return :data:`True` if they want the consumer
383+
thread to remain active, otherwise they should return :data:`False`.
381384
382385
Args:
383386
exception (Exception): The exception raised by the RPC.

google/cloud/pubsub_v1/subscriber/policy/thread.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,17 +176,26 @@ def on_callback_request(self, callback_request):
176176
getattr(self, action)(**kwargs)
177177

178178
def on_exception(self, exception):
179-
"""Bubble the exception.
179+
"""Handle the exception.
180180
181-
This will cause the stream to exit loudly.
181+
If the exception is one of the retryable exceptions, this will signal
182+
to the consumer thread that it should remain active.
183+
184+
This will cause the stream to exit when it returns :data:`False`.
185+
186+
Returns:
187+
bool: Indicates if the caller should remain active or shut down.
188+
Will be :data:`True` if the ``exception`` is "acceptable", i.e.
189+
in a list of retryable / idempotent exceptions.
182190
"""
183191
# If this is in the list of idempotent exceptions, then we want to
184192
# retry. That entails just returning None.
185193
if isinstance(exception, self._RETRYABLE_STREAM_ERRORS):
186-
return
194+
return True
187195

188196
# Set any other exception on the future.
189197
self._future.set_exception(exception)
198+
return False
190199

191200
def on_response(self, response):
192201
"""Process all received Pub/Sub messages.

tests/unit/pubsub_v1/subscriber/test_consumer.py

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import types as base_types
16+
1517
from google.auth import credentials
1618
import mock
1719
import pytest
@@ -87,18 +89,61 @@ def test_blocking_consume_keyboard_interrupt():
8789
on_res.assert_called_once_with(consumer._policy, mock.sentinel.A)
8890

8991

90-
@mock.patch.object(thread.Policy, 'call_rpc', autospec=True)
91-
@mock.patch.object(thread.Policy, 'on_response', autospec=True)
92-
@mock.patch.object(thread.Policy, 'on_exception', autospec=True)
93-
def test_blocking_consume_exception_reraise(on_exc, on_res, call_rpc):
94-
consumer = create_consumer()
92+
class OnException(object):
93+
94+
def __init__(self, exiting_event, acceptable=None):
95+
self.exiting_event = exiting_event
96+
self.acceptable = acceptable
97+
98+
def __call__(self, exception):
99+
if exception is self.acceptable:
100+
return True
101+
else:
102+
self.exiting_event.set()
103+
return False
104+
105+
106+
def test_blocking_consume_on_exception():
107+
policy = mock.Mock(spec=('call_rpc', 'on_response', 'on_exception'))
108+
policy.call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)
109+
exc = TypeError('Bad things!')
110+
policy.on_response.side_effect = exc
111+
112+
consumer = _consumer.Consumer(policy=policy)
113+
policy.on_exception.side_effect = OnException(consumer._exiting)
114+
115+
# Establish that we get responses until we are sent the exiting event.
116+
consumer._blocking_consume()
117+
118+
# Check mocks.
119+
policy.call_rpc.assert_called_once()
120+
policy.on_response.assert_called_once_with(mock.sentinel.A)
121+
policy.on_exception.assert_called_once_with(exc)
122+
123+
124+
def test_blocking_consume_two_exceptions():
125+
policy = mock.Mock(spec=('call_rpc', 'on_response', 'on_exception'))
126+
policy.call_rpc.side_effect = (
127+
(mock.sentinel.A,),
128+
(mock.sentinel.B,),
129+
)
130+
exc1 = NameError('Oh noes.')
131+
exc2 = ValueError('Something grumble.')
132+
policy.on_response.side_effect = (exc1, exc2)
133+
134+
consumer = _consumer.Consumer(policy=policy)
135+
policy.on_exception.side_effect = OnException(
136+
consumer._exiting, acceptable=exc1)
95137

96138
# Establish that we get responses until we are sent the exiting event.
97-
call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)
98-
on_res.side_effect = TypeError('Bad things!')
99-
on_exc.side_effect = on_res.side_effect
100-
with pytest.raises(TypeError):
101-
consumer._blocking_consume()
139+
consumer._blocking_consume()
140+
141+
# Check mocks.
142+
assert policy.call_rpc.call_count == 2
143+
policy.on_response.assert_has_calls(
144+
[mock.call(mock.sentinel.A), mock.call(mock.sentinel.B)])
145+
policy.on_exception.assert_has_calls(
146+
[mock.call(exc1), mock.call(exc2)])
102147

103148

104149
def test_start_consuming():

tests/unit/pubsub_v1/subscriber/test_policy_thread.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def test_on_exception_deadline_exceeded():
9494
details = 'Bad thing happened. Time out, go sit in the corner.'
9595
exc = exceptions.DeadlineExceeded(details)
9696

97-
assert policy.on_exception(exc) is None
97+
assert policy.on_exception(exc) is True
9898

9999

100100
def test_on_exception_unavailable():
@@ -103,14 +103,14 @@ def test_on_exception_unavailable():
103103
details = 'UNAVAILABLE. Service taking nap.'
104104
exc = exceptions.ServiceUnavailable(details)
105105

106-
assert policy.on_exception(exc) is None
106+
assert policy.on_exception(exc) is True
107107

108108

109109
def test_on_exception_other():
110110
policy = create_policy()
111111
policy._future = Future(policy=policy)
112112
exc = TypeError('wahhhhhh')
113-
assert policy.on_exception(exc) is None
113+
assert policy.on_exception(exc) is False
114114
with pytest.raises(TypeError):
115115
policy.future.result()
116116

0 commit comments

Comments
 (0)