From a9df6de179506efde1fb34bad82598cd2d47e1a3 Mon Sep 17 00:00:00 2001 From: abbrowne126 <81702808+abbrowne126@users.noreply.github.com> Date: Tue, 20 May 2025 17:59:43 -0400 Subject: [PATCH 1/6] change owner to abbrowne126 --- .github/blunderbuss.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/blunderbuss.yml b/.github/blunderbuss.yml index 3408b580a..ac5c87339 100644 --- a/.github/blunderbuss.yml +++ b/.github/blunderbuss.yml @@ -4,14 +4,14 @@ # Note: This file is autogenerated. To make changes to the assignee # team, please update `codeowner_team` in `.repo-metadata.json`. assign_issues: - - mukund-ananthu + - abbrowne126 assign_issues_by: - labels: - "samples" to: - googleapis/python-samples-reviewers - - mukund-ananthu + - abbrowne126 assign_prs: - - mukund-ananthu + - abbrowne126 From 5d7dea1abc1ad79fa9fdbf44ec392eff41d85466 Mon Sep 17 00:00:00 2001 From: abbrowne126 <81702808+abbrowne126@users.noreply.github.com> Date: Tue, 20 May 2025 17:59:43 -0400 Subject: [PATCH 2/6] chore: change assignees for issues and PRs to abbrowne126 --- .github/blunderbuss.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/blunderbuss.yml b/.github/blunderbuss.yml index 3408b580a..ac5c87339 100644 --- a/.github/blunderbuss.yml +++ b/.github/blunderbuss.yml @@ -4,14 +4,14 @@ # Note: This file is autogenerated. To make changes to the assignee # team, please update `codeowner_team` in `.repo-metadata.json`. assign_issues: - - mukund-ananthu + - abbrowne126 assign_issues_by: - labels: - "samples" to: - googleapis/python-samples-reviewers - - mukund-ananthu + - abbrowne126 assign_prs: - - mukund-ananthu + - abbrowne126 From 8139e53c26471a730f583d1f6039d290c747eb9b Mon Sep 17 00:00:00 2001 From: abbrowne126 <81702808+abbrowne126@users.noreply.github.com> Date: Tue, 27 May 2025 12:25:41 -0400 Subject: [PATCH 3/6] fix: align stream retries and closure with canonical API behavior --- .../_protocol/streaming_pull_manager.py | 23 ++++++++++++++----- .../subscriber/test_streaming_pull_manager.py | 11 ++++----- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 4c9e1c20e..4f5bbcdcc 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -62,14 +62,23 @@ _REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown" _RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated" _RETRYABLE_STREAM_ERRORS = ( + exceptions.Aborted, + exceptions.Cancelled, exceptions.DeadlineExceeded, - exceptions.ServiceUnavailable, + exceptions.GatewayTimeout, exceptions.InternalServerError, + exceptions.ResourceExhausted, + exceptions.ServiceUnavailable, exceptions.Unknown, - exceptions.GatewayTimeout, - exceptions.Aborted, ) -_TERMINATING_STREAM_ERRORS = (exceptions.Cancelled,) +_TERMINATING_STREAM_ERRORS = ( + exceptions.InvalidArgument, + exceptions.NotFound, + exceptions.PermissionDenied, + exceptions.PermissionDenied, + exceptions.Unauthenticated, + exceptions.Unauthorized, +) _MAX_LOAD = 1.0 """The load threshold above which to pause the incoming message stream.""" @@ -1283,8 +1292,10 @@ def _should_terminate(self, exception: BaseException) -> bool: in a list of terminating exceptions. """ exception = _wrap_as_exception(exception) - if isinstance(exception, _TERMINATING_STREAM_ERRORS): - _LOGGER.debug("Observed terminating stream error %s", exception) + is_api_error = isinstance(exception, exceptions.GoogleAPICallError) + # Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.) + if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS): + _LOGGER.error("Observed terminating stream error %s", exception) return True _LOGGER.debug("Observed non-terminating stream error %s", exception) return False diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index f4ceedaf0..0ace44e8e 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -2270,18 +2270,15 @@ def test__should_recover_false(): def test__should_terminate_true(): manager = make_manager() - details = "Cancelled. Go away, before I taunt you a second time." - exc = exceptions.Cancelled(details) - - assert manager._should_terminate(exc) is True + for exc in [exceptions.PermissionDenied(""), TypeError(), ValueError()]: + assert manager._should_terminate(exc) def test__should_terminate_false(): manager = make_manager() - exc = TypeError("wahhhhhh") - - assert manager._should_terminate(exc) is False + for exc in [exceptions.ResourceExhausted(""), exceptions.ServiceUnavailable(""), exceptions.Cancelled("")]: + assert not manager._should_terminate(exc) @mock.patch("threading.Thread", autospec=True) From 11755ed1d90bf909d837b7e3c6c3cd46fc93f22f Mon Sep 17 00:00:00 2001 From: abbrowne126 <81702808+abbrowne126@users.noreply.github.com> Date: Tue, 27 May 2025 12:32:52 -0400 Subject: [PATCH 4/6] Update test_streaming_pull_manager.py --- .../pubsub_v1/subscriber/test_streaming_pull_manager.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 0ace44e8e..84fa304e7 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -2277,7 +2277,11 @@ def test__should_terminate_true(): def test__should_terminate_false(): manager = make_manager() - for exc in [exceptions.ResourceExhausted(""), exceptions.ServiceUnavailable(""), exceptions.Cancelled("")]: + for exc in [ + exceptions.ResourceExhausted(""), + exceptions.ServiceUnavailable(""), + exceptions.Cancelled(""), + ]: assert not manager._should_terminate(exc) From f1f705db451ddf5319ea87ef62db718056c6c909 Mon Sep 17 00:00:00 2001 From: abbrowne126 <81702808+abbrowne126@users.noreply.github.com> Date: Tue, 27 May 2025 14:36:37 -0400 Subject: [PATCH 5/6] fix: align stream errors with canonical retry behavior --- .../pubsub_v1/subscriber/_protocol/streaming_pull_manager.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 4f5bbcdcc..4bb729a46 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -63,7 +63,6 @@ _RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated" _RETRYABLE_STREAM_ERRORS = ( exceptions.Aborted, - exceptions.Cancelled, exceptions.DeadlineExceeded, exceptions.GatewayTimeout, exceptions.InternalServerError, @@ -72,10 +71,10 @@ exceptions.Unknown, ) _TERMINATING_STREAM_ERRORS = ( + exceptions.Cancelled, exceptions.InvalidArgument, exceptions.NotFound, exceptions.PermissionDenied, - exceptions.PermissionDenied, exceptions.Unauthenticated, exceptions.Unauthorized, ) From 0ae034055e5f374f1d20080f8eed1bc52e11ce68 Mon Sep 17 00:00:00 2001 From: abbrowne126 <81702808+abbrowne126@users.noreply.github.com> Date: Tue, 27 May 2025 14:39:41 -0400 Subject: [PATCH 6/6] Update test_streaming_pull_manager.py --- .../pubsub_v1/subscriber/test_streaming_pull_manager.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 84fa304e7..331d067db 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -2270,7 +2270,12 @@ def test__should_recover_false(): def test__should_terminate_true(): manager = make_manager() - for exc in [exceptions.PermissionDenied(""), TypeError(), ValueError()]: + for exc in [ + exceptions.Cancelled(""), + exceptions.PermissionDenied(""), + TypeError(), + ValueError(), + ]: assert manager._should_terminate(exc) @@ -2280,7 +2285,7 @@ def test__should_terminate_false(): for exc in [ exceptions.ResourceExhausted(""), exceptions.ServiceUnavailable(""), - exceptions.Cancelled(""), + exceptions.DeadlineExceeded(""), ]: assert not manager._should_terminate(exc)