Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit 730e244

Browse files
authored
PubSub: Making thread.Policy.on_exception more robust. (#4444)
- Adding special handling for API core exceptions - Retrying on both types of idempotent error Towards #4234.
1 parent 354d73c commit 730e244

4 files changed

Lines changed: 44 additions & 8 deletions

File tree

google/cloud/pubsub_v1/subscriber/policy/base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import random
2222
import time
2323

24+
from google.api_core import exceptions
2425
import six
2526

2627
from google.cloud.pubsub_v1 import types
@@ -65,6 +66,10 @@ class BasePolicy(object):
6566
"""
6667

6768
_managed_ack_ids = None
69+
_RETRYABLE_STREAM_ERRORS = (
70+
exceptions.DeadlineExceeded,
71+
exceptions.ServiceUnavailable,
72+
)
6873

6974
def __init__(self, client, subscription,
7075
flow_control=types.FlowControl(), histogram_data=None):

google/cloud/pubsub_v1/subscriber/policy/thread.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,9 @@ def on_exception(self, exception):
153153
154154
This will cause the stream to exit loudly.
155155
"""
156-
# If this is DEADLINE_EXCEEDED, then we want to retry.
157-
# That entails just returning None.
158-
deadline_exceeded = grpc.StatusCode.DEADLINE_EXCEEDED
159-
if getattr(exception, 'code', lambda: None)() == deadline_exceeded:
156+
# If this is in the list of idempotent exceptions, then we want to
157+
# retry. That entails just returning None.
158+
if isinstance(exception, self._RETRYABLE_STREAM_ERRORS):
160159
return
161160

162161
# Set any other exception on the future.

tests/unit/pubsub_v1/subscriber/test_policy_base.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,15 @@
1414

1515
import time
1616

17+
from google.api_core import exceptions
18+
from google.auth import credentials
19+
import grpc
1720
import mock
1821

19-
from google.auth import credentials
2022
from google.cloud.pubsub_v1 import subscriber
2123
from google.cloud.pubsub_v1 import types
24+
from google.cloud.pubsub_v1.gapic import subscriber_client_config
25+
from google.cloud.pubsub_v1.subscriber.policy import base
2226
from google.cloud.pubsub_v1.subscriber.policy import thread
2327

2428

@@ -28,6 +32,23 @@ def create_policy(flow_control=types.FlowControl()):
2832
return thread.Policy(client, 'sub_name_d', flow_control=flow_control)
2933

3034

35+
def test_idempotent_retry_codes():
36+
# Make sure the config matches our hard-coded tuple of exceptions.
37+
interfaces = subscriber_client_config.config['interfaces']
38+
retry_codes = interfaces['google.pubsub.v1.Subscriber']['retry_codes']
39+
idempotent = retry_codes['idempotent']
40+
41+
status_codes = tuple(
42+
getattr(grpc.StatusCode, name, None)
43+
for name in idempotent
44+
)
45+
expected = tuple(
46+
exceptions.exception_class_for_grpc_status(status_code)
47+
for status_code in status_codes
48+
)
49+
assert base.BasePolicy._RETRYABLE_STREAM_ERRORS == expected
50+
51+
3152
def test_ack_deadline():
3253
policy = create_policy()
3354
assert policy.ack_deadline == 10

tests/unit/pubsub_v1/subscriber/test_policy_thread.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
from concurrent import futures
1818
import threading
1919

20+
from google.api_core import exceptions
2021
from google.auth import credentials
21-
import grpc
2222
import mock
2323
import pytest
2424
from six.moves import queue
@@ -90,8 +90,19 @@ def test_on_callback_request():
9090

9191
def test_on_exception_deadline_exceeded():
9292
policy = create_policy()
93-
exc = mock.Mock(spec=('code',))
94-
exc.code.return_value = grpc.StatusCode.DEADLINE_EXCEEDED
93+
94+
details = 'Bad thing happened. Time out, go sit in the corner.'
95+
exc = exceptions.DeadlineExceeded(details)
96+
97+
assert policy.on_exception(exc) is None
98+
99+
100+
def test_on_exception_unavailable():
101+
policy = create_policy()
102+
103+
details = 'UNAVAILABLE. Service taking nap.'
104+
exc = exceptions.ServiceUnavailable(details)
105+
95106
assert policy.on_exception(exc) is None
96107

97108

0 commit comments

Comments
 (0)