Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit 4259094

Browse files
committed
fix: only pass on_fatal_exception if google-api-core client library version supports it
1 parent ee511c9 commit 4259094

6 files changed

Lines changed: 40 additions & 18 deletions

File tree

black

Whitespace-only changes.

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import collections
1818
import functools
19+
import inspect
1920
import itertools
2021
import logging
2122
import threading
@@ -106,6 +107,13 @@
106107
code_pb2.UNAVAILABLE,
107108
}
108109

110+
# `on_fatal_exception` added in `google-api-core v2.25.1``, which allows us to inform
111+
# callers on unrecoverable errors. We can only pass this arg if it's available in the
112+
# `BackgroundConsumer` spec.
113+
_SHOULD_USE_ON_FATAL_ERROR_CALLBACK = "on_fatal_exception" in inspect.getfullargspec(
114+
bidi.BackgroundConsumer
115+
)
116+
109117

110118
def _wrap_as_exception(maybe_exception: Any) -> BaseException:
111119
"""Wrap an object as a Python exception, if needed.
@@ -884,9 +892,18 @@ def open(
884892
assert self._scheduler is not None
885893
scheduler_queue = self._scheduler.queue
886894
self._dispatcher = dispatcher.Dispatcher(self, scheduler_queue)
887-
self._consumer = bidi.BackgroundConsumer(
888-
self._rpc, self._on_response, on_fatal_exception=self._on_fatal_exception
889-
)
895+
896+
# `on_fatal_exception` is only available in more recent library versions.
897+
# For backwards compatibility reasons, we only pass it when `google-api-core` supports it.
898+
if _SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
899+
self._consumer = bidi.BackgroundConsumer(
900+
self._rpc,
901+
self._on_response,
902+
on_fatal_exception=self._on_fatal_exception,
903+
)
904+
else:
905+
self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
906+
890907
self._leaser = leaser.Leaser(self)
891908
self._heartbeater = heartbeater.Heartbeater(self)
892909

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
"grpcio >= 1.51.3, < 2.0.0", # https://github.com/googleapis/python-pubsub/issues/609
4040
# google-api-core >= 1.34.0 is allowed in order to support google-api-core 1.x
4141
"google-auth >= 2.14.1, <3.0.0",
42-
"google-api-core[grpc] >= 2.25.1, <3.0.0,!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,!=2.9.*,!=2.10.*",
42+
"google-api-core[grpc] >= 1.34.0, <3.0.0,!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,!=2.9.*,!=2.10.*",
4343
"proto-plus >= 1.22.0, <2.0.0",
4444
"proto-plus >= 1.22.2, <2.0.0; python_version>='3.11'",
4545
"proto-plus >= 1.25.0, < 2.0.0; python_version >= '3.13'",

testing/constraints-3.7.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
# Pin the version to the lower bound.
55
# e.g., if setup.py has "google-cloud-foo >= 1.14.0, < 2.0.0dev",
66
# Then this file should have google-cloud-foo==1.14.0
7-
google-api-core==2.25.1
7+
google-api-core==1.34.0
88
google-auth==2.14.1
99
proto-plus==1.22.3
1010
protobuf==3.20.2

testing/constraints-3.8.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
# This constraints file is required for unit tests.
33
# List all library dependencies and extras in this file.
4-
google-api-core==2.25.1
4+
google-api-core==1.34.0
55
proto-plus
66
protobuf
77
grpc-google-iam-v1

tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,9 +1333,13 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
13331333
leaser.return_value.start.assert_called_once()
13341334
assert manager.leaser == leaser.return_value
13351335

1336-
background_consumer.assert_called_once_with(
1337-
manager._rpc, manager._on_response, manager._on_fatal_exception
1338-
)
1336+
if streaming_pull_manager._SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
1337+
background_consumer.assert_called_once_with(
1338+
manager._rpc, manager._on_response, manager._on_fatal_exception
1339+
)
1340+
else:
1341+
background_consumer.assert_called_once_with(manager._rpc, manager._on_response)
1342+
13391343
background_consumer.return_value.start.assert_called_once()
13401344
assert manager._consumer == background_consumer.return_value
13411345

@@ -1444,18 +1448,19 @@ def test_closes_on_fatal_consumer_error():
14441448
scheduler,
14451449
) = make_running_manager()
14461450

1447-
error = ValueError("some fatal exception")
1448-
manager._on_fatal_exception(error)
1451+
if streaming_pull_manager._SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
1452+
error = ValueError("some fatal exception")
1453+
manager._on_fatal_exception(error)
14491454

1450-
await_manager_shutdown(manager, timeout=3)
1455+
await_manager_shutdown(manager, timeout=3)
14511456

1452-
consumer.stop.assert_called_once()
1453-
leaser.stop.assert_called_once()
1454-
dispatcher.stop.assert_called_once()
1455-
heartbeater.stop.assert_called_once()
1456-
scheduler.shutdown.assert_called_once()
1457+
consumer.stop.assert_called_once()
1458+
leaser.stop.assert_called_once()
1459+
dispatcher.stop.assert_called_once()
1460+
heartbeater.stop.assert_called_once()
1461+
scheduler.shutdown.assert_called_once()
14571462

1458-
assert manager.is_active is False
1463+
assert manager.is_active is False
14591464

14601465

14611466
def test_close_inactive_consumer():

0 commit comments

Comments
 (0)