Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit 8139e53

Browse files
committed
fix: align stream retries and closure with canonical API behavior
1 parent ead0f41 commit 8139e53

File tree

2 files changed

+21
-13
lines changed

2 files changed

+21
-13
lines changed

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,23 @@
6262
_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
6363
_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
6464
_RETRYABLE_STREAM_ERRORS = (
65+
exceptions.Aborted,
66+
exceptions.Cancelled,
6567
exceptions.DeadlineExceeded,
66-
exceptions.ServiceUnavailable,
68+
exceptions.GatewayTimeout,
6769
exceptions.InternalServerError,
70+
exceptions.ResourceExhausted,
71+
exceptions.ServiceUnavailable,
6872
exceptions.Unknown,
69-
exceptions.GatewayTimeout,
70-
exceptions.Aborted,
7173
)
72-
_TERMINATING_STREAM_ERRORS = (exceptions.Cancelled,)
74+
_TERMINATING_STREAM_ERRORS = (
75+
exceptions.InvalidArgument,
76+
exceptions.NotFound,
77+
exceptions.PermissionDenied,
78+
exceptions.PermissionDenied,
79+
exceptions.Unauthenticated,
80+
exceptions.Unauthorized,
81+
)
7382
_MAX_LOAD = 1.0
7483
"""The load threshold above which to pause the incoming message stream."""
7584

@@ -1283,8 +1292,10 @@ def _should_terminate(self, exception: BaseException) -> bool:
12831292
in a list of terminating exceptions.
12841293
"""
12851294
exception = _wrap_as_exception(exception)
1286-
if isinstance(exception, _TERMINATING_STREAM_ERRORS):
1287-
_LOGGER.debug("Observed terminating stream error %s", exception)
1295+
is_api_error = isinstance(exception, exceptions.GoogleAPICallError)
1296+
# Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.)
1297+
if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS):
1298+
_LOGGER.error("Observed terminating stream error %s", exception)
12881299
return True
12891300
_LOGGER.debug("Observed non-terminating stream error %s", exception)
12901301
return False

tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2270,18 +2270,15 @@ def test__should_recover_false():
22702270
def test__should_terminate_true():
22712271
manager = make_manager()
22722272

2273-
details = "Cancelled. Go away, before I taunt you a second time."
2274-
exc = exceptions.Cancelled(details)
2275-
2276-
assert manager._should_terminate(exc) is True
2273+
for exc in [exceptions.PermissionDenied(""), TypeError(), ValueError()]:
2274+
assert manager._should_terminate(exc)
22772275

22782276

22792277
def test__should_terminate_false():
22802278
manager = make_manager()
22812279

2282-
exc = TypeError("wahhhhhh")
2283-
2284-
assert manager._should_terminate(exc) is False
2280+
for exc in [exceptions.ResourceExhausted(""), exceptions.ServiceUnavailable(""), exceptions.Cancelled("")]:
2281+
assert not manager._should_terminate(exc)
22852282

22862283

22872284
@mock.patch("threading.Thread", autospec=True)

0 commit comments

Comments
 (0)