Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,23 @@
_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
_RETRYABLE_STREAM_ERRORS = (
exceptions.Aborted,
exceptions.Cancelled,
exceptions.DeadlineExceeded,
exceptions.ServiceUnavailable,
exceptions.GatewayTimeout,
exceptions.InternalServerError,
exceptions.ResourceExhausted,
exceptions.ServiceUnavailable,
exceptions.Unknown,
exceptions.GatewayTimeout,
exceptions.Aborted,
)
_TERMINATING_STREAM_ERRORS = (exceptions.Cancelled,)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the reasoning for the use of Cancelled as a terminating stream error here is to prevent errors logs for what should be "clean" shutdowns (googleapis/google-cloud-python#8650 (comment)), so I wonder if this might cause exceptions like (googleapis/google-cloud-python#7826) to be thrown. Another worry is that we won't get exceptions for the new TERMINATING_STREAM_ERRORS, though I'm admittedly not certain how the mechanism actually works.

Copy link
Copy Markdown
Collaborator Author

@abbrowne126 abbrowne126 May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I aligned this with the other client libs (ex Java, ex CPP) but this is a fair point - I believe these cancelled would actually be client initiated in this case so I'll keep it in terminating.

As for the exceptions/no exceptions, this will close the thread; this was already WAI for the library. Here's the details for ServiceConsumer (i.e. specific to our implementation):

As for how the behavior works with termination, this is already covered in unit tests. Specific to the future, here's an example of where the error -> client-visible exception is tested

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for spelling out the details here, definitely makes a lot more sense now. This all looks good to me, just want to confirm if you've tested this with local changes and a reproduction of throwing an exception in BackgroundConsumer._thread_main.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - granted, the test involved mocking the ResumableBidiRpc but otherwise it was the same testing process. I saw the error bubbled to the future.

_TERMINATING_STREAM_ERRORS = (
exceptions.InvalidArgument,
exceptions.NotFound,
exceptions.PermissionDenied,
exceptions.PermissionDenied,
Comment thread
abbrowne126 marked this conversation as resolved.
Outdated
exceptions.Unauthenticated,
exceptions.Unauthorized,
)
_MAX_LOAD = 1.0
"""The load threshold above which to pause the incoming message stream."""

Expand Down Expand Up @@ -1283,8 +1292,10 @@ def _should_terminate(self, exception: BaseException) -> bool:
in a list of terminating exceptions.
"""
exception = _wrap_as_exception(exception)
if isinstance(exception, _TERMINATING_STREAM_ERRORS):
_LOGGER.debug("Observed terminating stream error %s", exception)
is_api_error = isinstance(exception, exceptions.GoogleAPICallError)
# Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.)
if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS):
_LOGGER.error("Observed terminating stream error %s", exception)
return True
_LOGGER.debug("Observed non-terminating stream error %s", exception)
return False
Expand Down
15 changes: 8 additions & 7 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2270,18 +2270,19 @@ def test__should_recover_false():
def test__should_terminate_true():
manager = make_manager()

details = "Cancelled. Go away, before I taunt you a second time."
exc = exceptions.Cancelled(details)

assert manager._should_terminate(exc) is True
for exc in [exceptions.PermissionDenied(""), TypeError(), ValueError()]:
assert manager._should_terminate(exc)


def test__should_terminate_false():
manager = make_manager()

exc = TypeError("wahhhhhh")

assert manager._should_terminate(exc) is False
for exc in [
exceptions.ResourceExhausted(""),
exceptions.ServiceUnavailable(""),
exceptions.Cancelled(""),
]:
assert not manager._should_terminate(exc)


@mock.patch("threading.Thread", autospec=True)
Expand Down