Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit a336079

Browse files
authored
Merge branch 'main' into manually-upgrade-deps
2 parents 8faf882 + 2b7e423 commit a336079

8 files changed

Lines changed: 178 additions & 21 deletions

File tree

.release-please-manifest.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
2-
".": "2.30.0"
2+
".": "2.31.0"
33
}
44

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,18 @@
55
[1]: https://pypi.org/project/google-cloud-pubsub/#history
66

77

8+
## [2.31.0](https://github.com/googleapis/python-pubsub/compare/v2.30.0...v2.31.0) (2025-06-26)
9+
10+
11+
### Features
12+
13+
* Add MessageTransformationFailureReason to IngestionFailureEvent ([#1427](https://github.com/googleapis/python-pubsub/issues/1427)) ([8ab13e1](https://github.com/googleapis/python-pubsub/commit/8ab13e1b71c151f0146548e7224dd38c9d719a88))
14+
15+
16+
### Bug Fixes
17+
18+
* Surface Fatal Stream Errors to Future; Adjust Retryable Error Codes ([#1422](https://github.com/googleapis/python-pubsub/issues/1422)) ([e081beb](https://github.com/googleapis/python-pubsub/commit/e081beb29056035304d365ec9c50fa7ffbac6886))
19+
820
## [2.30.0](https://github.com/googleapis/python-pubsub/compare/v2.29.1...v2.30.0) (2025-06-07)
921

1022

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import collections
1818
import functools
19+
import inspect
1920
import itertools
2021
import logging
2122
import threading
@@ -62,14 +63,22 @@
6263
_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
6364
_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
6465
_RETRYABLE_STREAM_ERRORS = (
66+
exceptions.Aborted,
6567
exceptions.DeadlineExceeded,
66-
exceptions.ServiceUnavailable,
68+
exceptions.GatewayTimeout,
6769
exceptions.InternalServerError,
70+
exceptions.ResourceExhausted,
71+
exceptions.ServiceUnavailable,
6872
exceptions.Unknown,
69-
exceptions.GatewayTimeout,
70-
exceptions.Aborted,
7173
)
72-
_TERMINATING_STREAM_ERRORS = (exceptions.Cancelled,)
74+
_TERMINATING_STREAM_ERRORS = (
75+
exceptions.Cancelled,
76+
exceptions.InvalidArgument,
77+
exceptions.NotFound,
78+
exceptions.PermissionDenied,
79+
exceptions.Unauthenticated,
80+
exceptions.Unauthorized,
81+
)
7382
_MAX_LOAD = 1.0
7483
"""The load threshold above which to pause the incoming message stream."""
7584

@@ -98,6 +107,13 @@
98107
code_pb2.UNAVAILABLE,
99108
}
100109

110+
# `on_fatal_exception` was added in `google-api-core v2.25.1``, which allows us to inform
111+
# callers on unrecoverable errors. We can only pass this arg if it's available in the
112+
# `BackgroundConsumer` spec.
113+
_SHOULD_USE_ON_FATAL_ERROR_CALLBACK = "on_fatal_exception" in inspect.getfullargspec(
114+
bidi.BackgroundConsumer
115+
)
116+
101117

102118
def _wrap_as_exception(maybe_exception: Any) -> BaseException:
103119
"""Wrap an object as a Python exception, if needed.
@@ -876,7 +892,18 @@ def open(
876892
assert self._scheduler is not None
877893
scheduler_queue = self._scheduler.queue
878894
self._dispatcher = dispatcher.Dispatcher(self, scheduler_queue)
879-
self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
895+
896+
# `on_fatal_exception` is only available in more recent library versions.
897+
# For backwards compatibility reasons, we only pass it when `google-api-core` supports it.
898+
if _SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
899+
self._consumer = bidi.BackgroundConsumer(
900+
self._rpc,
901+
self._on_response,
902+
on_fatal_exception=self._on_fatal_exception,
903+
)
904+
else:
905+
self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
906+
880907
self._leaser = leaser.Leaser(self)
881908
self._heartbeater = heartbeater.Heartbeater(self)
882909

@@ -1247,6 +1274,17 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
12471274

12481275
self.maybe_pause_consumer()
12491276

1277+
def _on_fatal_exception(self, exception: BaseException) -> None:
1278+
"""
1279+
Called whenever `self.consumer` receives a non-retryable exception.
1280+
We close the manager on such non-retryable cases.
1281+
"""
1282+
_LOGGER.exception(
1283+
"Streaming pull terminating after receiving non-recoverable error: %s",
1284+
exception,
1285+
)
1286+
self.close(exception)
1287+
12501288
def _should_recover(self, exception: BaseException) -> bool:
12511289
"""Determine if an error on the RPC stream should be recovered.
12521290
@@ -1283,8 +1321,10 @@ def _should_terminate(self, exception: BaseException) -> bool:
12831321
in a list of terminating exceptions.
12841322
"""
12851323
exception = _wrap_as_exception(exception)
1286-
if isinstance(exception, _TERMINATING_STREAM_ERRORS):
1287-
_LOGGER.debug("Observed terminating stream error %s", exception)
1324+
is_api_error = isinstance(exception, exceptions.GoogleAPICallError)
1325+
# Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.)
1326+
if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS):
1327+
_LOGGER.error("Observed terminating stream error %s", exception)
12881328
return True
12891329
_LOGGER.debug("Observed non-terminating stream error %s", exception)
12901330
return False

google/pubsub/gapic_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
#
16-
__version__ = "2.30.0" # {x-release-please-version}
16+
__version__ = "2.31.0" # {x-release-please-version}

google/pubsub_v1/gapic_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
#
16-
__version__ = "2.30.0" # {x-release-please-version}
16+
__version__ = "2.31.0" # {x-release-please-version}

google/pubsub_v1/types/pubsub.py

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,12 @@ class SchemaViolationReason(proto.Message):
861861
862862
"""
863863

864+
class MessageTransformationFailureReason(proto.Message):
865+
r"""Set when a Pub/Sub message fails to get published due to a
866+
message transformation error.
867+
868+
"""
869+
864870
class CloudStorageFailure(proto.Message):
865871
r"""Failure when ingesting from a Cloud Storage source.
866872
@@ -897,6 +903,11 @@ class CloudStorageFailure(proto.Message):
897903
Optional. The Pub/Sub message failed schema
898904
validation.
899905
906+
This field is a member of `oneof`_ ``reason``.
907+
message_transformation_failure_reason (google.pubsub_v1.types.IngestionFailureEvent.MessageTransformationFailureReason):
908+
Optional. Failure encountered when applying a
909+
message transformation to the Pub/Sub message.
910+
900911
This field is a member of `oneof`_ ``reason``.
901912
"""
902913

@@ -932,6 +943,12 @@ class CloudStorageFailure(proto.Message):
932943
message="IngestionFailureEvent.SchemaViolationReason",
933944
)
934945
)
946+
message_transformation_failure_reason: "IngestionFailureEvent.MessageTransformationFailureReason" = proto.Field(
947+
proto.MESSAGE,
948+
number=8,
949+
oneof="reason",
950+
message="IngestionFailureEvent.MessageTransformationFailureReason",
951+
)
935952

936953
class AwsMskFailureReason(proto.Message):
937954
r"""Failure when ingesting from an Amazon MSK source.
@@ -965,6 +982,11 @@ class AwsMskFailureReason(proto.Message):
965982
Optional. The Pub/Sub message failed schema
966983
validation.
967984
985+
This field is a member of `oneof`_ ``reason``.
986+
message_transformation_failure_reason (google.pubsub_v1.types.IngestionFailureEvent.MessageTransformationFailureReason):
987+
Optional. Failure encountered when applying a
988+
message transformation to the Pub/Sub message.
989+
968990
This field is a member of `oneof`_ ``reason``.
969991
"""
970992

@@ -998,6 +1020,12 @@ class AwsMskFailureReason(proto.Message):
9981020
message="IngestionFailureEvent.SchemaViolationReason",
9991021
)
10001022
)
1023+
message_transformation_failure_reason: "IngestionFailureEvent.MessageTransformationFailureReason" = proto.Field(
1024+
proto.MESSAGE,
1025+
number=7,
1026+
oneof="reason",
1027+
message="IngestionFailureEvent.MessageTransformationFailureReason",
1028+
)
10011029

10021030
class AzureEventHubsFailureReason(proto.Message):
10031031
r"""Failure when ingesting from an Azure Event Hubs source.
@@ -1031,6 +1059,11 @@ class AzureEventHubsFailureReason(proto.Message):
10311059
Optional. The Pub/Sub message failed schema
10321060
validation.
10331061
1062+
This field is a member of `oneof`_ ``reason``.
1063+
message_transformation_failure_reason (google.pubsub_v1.types.IngestionFailureEvent.MessageTransformationFailureReason):
1064+
Optional. Failure encountered when applying a
1065+
message transformation to the Pub/Sub message.
1066+
10341067
This field is a member of `oneof`_ ``reason``.
10351068
"""
10361069

@@ -1064,6 +1097,12 @@ class AzureEventHubsFailureReason(proto.Message):
10641097
message="IngestionFailureEvent.SchemaViolationReason",
10651098
)
10661099
)
1100+
message_transformation_failure_reason: "IngestionFailureEvent.MessageTransformationFailureReason" = proto.Field(
1101+
proto.MESSAGE,
1102+
number=7,
1103+
oneof="reason",
1104+
message="IngestionFailureEvent.MessageTransformationFailureReason",
1105+
)
10671106

10681107
class ConfluentCloudFailureReason(proto.Message):
10691108
r"""Failure when ingesting from a Confluent Cloud source.
@@ -1097,6 +1136,11 @@ class ConfluentCloudFailureReason(proto.Message):
10971136
Optional. The Pub/Sub message failed schema
10981137
validation.
10991138
1139+
This field is a member of `oneof`_ ``reason``.
1140+
message_transformation_failure_reason (google.pubsub_v1.types.IngestionFailureEvent.MessageTransformationFailureReason):
1141+
Optional. Failure encountered when applying a
1142+
message transformation to the Pub/Sub message.
1143+
11001144
This field is a member of `oneof`_ ``reason``.
11011145
"""
11021146

@@ -1130,10 +1174,21 @@ class ConfluentCloudFailureReason(proto.Message):
11301174
message="IngestionFailureEvent.SchemaViolationReason",
11311175
)
11321176
)
1177+
message_transformation_failure_reason: "IngestionFailureEvent.MessageTransformationFailureReason" = proto.Field(
1178+
proto.MESSAGE,
1179+
number=7,
1180+
oneof="reason",
1181+
message="IngestionFailureEvent.MessageTransformationFailureReason",
1182+
)
11331183

11341184
class AwsKinesisFailureReason(proto.Message):
11351185
r"""Failure when ingesting from an AWS Kinesis source.
11361186
1187+
This message has `oneof`_ fields (mutually exclusive fields).
1188+
For each oneof, at most one member field can be set at the same time.
1189+
Setting any member of the oneof automatically clears all other
1190+
members.
1191+
11371192
.. _oneof: https://proto-plus-python.readthedocs.io/en/stable/fields.html#oneofs-mutually-exclusive-fields
11381193
11391194
Attributes:
@@ -1150,6 +1205,11 @@ class AwsKinesisFailureReason(proto.Message):
11501205
Optional. The Pub/Sub message failed schema
11511206
validation.
11521207
1208+
This field is a member of `oneof`_ ``reason``.
1209+
message_transformation_failure_reason (google.pubsub_v1.types.IngestionFailureEvent.MessageTransformationFailureReason):
1210+
Optional. Failure encountered when applying a
1211+
message transformation to the Pub/Sub message.
1212+
11531213
This field is a member of `oneof`_ ``reason``.
11541214
"""
11551215

@@ -1173,6 +1233,12 @@ class AwsKinesisFailureReason(proto.Message):
11731233
message="IngestionFailureEvent.SchemaViolationReason",
11741234
)
11751235
)
1236+
message_transformation_failure_reason: "IngestionFailureEvent.MessageTransformationFailureReason" = proto.Field(
1237+
proto.MESSAGE,
1238+
number=5,
1239+
oneof="reason",
1240+
message="IngestionFailureEvent.MessageTransformationFailureReason",
1241+
)
11761242

11771243
topic: str = proto.Field(
11781244
proto.STRING,
@@ -3091,10 +3157,12 @@ class StreamingPullResponse(proto.Message):
30913157
will not be empty.
30923158
acknowledge_confirmation (google.pubsub_v1.types.StreamingPullResponse.AcknowledgeConfirmation):
30933159
Optional. This field will only be set if
3094-
``enable_exactly_once_delivery`` is set to ``true``.
3160+
``enable_exactly_once_delivery`` is set to ``true`` and is
3161+
not guaranteed to be populated.
30953162
modify_ack_deadline_confirmation (google.pubsub_v1.types.StreamingPullResponse.ModifyAckDeadlineConfirmation):
30963163
Optional. This field will only be set if
3097-
``enable_exactly_once_delivery`` is set to ``true``.
3164+
``enable_exactly_once_delivery`` is set to ``true`` and is
3165+
not guaranteed to be populated.
30983166
subscription_properties (google.pubsub_v1.types.StreamingPullResponse.SubscriptionProperties):
30993167
Optional. Properties associated with this
31003168
subscription.

samples/generated_samples/snippet_metadata_google.pubsub.v1.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
],
99
"language": "PYTHON",
1010
"name": "google-cloud-pubsub",
11-
"version": "2.30.0"
11+
"version": "2.31.0"
1212
},
1313
"snippets": [
1414
{

tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,7 +1333,13 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
13331333
leaser.return_value.start.assert_called_once()
13341334
assert manager.leaser == leaser.return_value
13351335

1336-
background_consumer.assert_called_once_with(manager._rpc, manager._on_response)
1336+
if streaming_pull_manager._SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
1337+
background_consumer.assert_called_once_with(
1338+
manager._rpc, manager._on_response, manager._on_fatal_exception
1339+
)
1340+
else:
1341+
background_consumer.assert_called_once_with(manager._rpc, manager._on_response)
1342+
13371343
background_consumer.return_value.start.assert_called_once()
13381344
assert manager._consumer == background_consumer.return_value
13391345

@@ -1432,6 +1438,31 @@ def test_close():
14321438
assert manager.is_active is False
14331439

14341440

1441+
def test_closes_on_fatal_consumer_error():
1442+
(
1443+
manager,
1444+
consumer,
1445+
dispatcher,
1446+
leaser,
1447+
heartbeater,
1448+
scheduler,
1449+
) = make_running_manager()
1450+
1451+
if streaming_pull_manager._SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
1452+
error = ValueError("some fatal exception")
1453+
manager._on_fatal_exception(error)
1454+
1455+
await_manager_shutdown(manager, timeout=3)
1456+
1457+
consumer.stop.assert_called_once()
1458+
leaser.stop.assert_called_once()
1459+
dispatcher.stop.assert_called_once()
1460+
heartbeater.stop.assert_called_once()
1461+
scheduler.shutdown.assert_called_once()
1462+
1463+
assert manager.is_active is False
1464+
1465+
14351466
def test_close_inactive_consumer():
14361467
(
14371468
manager,
@@ -2270,18 +2301,24 @@ def test__should_recover_false():
22702301
def test__should_terminate_true():
22712302
manager = make_manager()
22722303

2273-
details = "Cancelled. Go away, before I taunt you a second time."
2274-
exc = exceptions.Cancelled(details)
2275-
2276-
assert manager._should_terminate(exc) is True
2304+
for exc in [
2305+
exceptions.Cancelled(""),
2306+
exceptions.PermissionDenied(""),
2307+
TypeError(),
2308+
ValueError(),
2309+
]:
2310+
assert manager._should_terminate(exc)
22772311

22782312

22792313
def test__should_terminate_false():
22802314
manager = make_manager()
22812315

2282-
exc = TypeError("wahhhhhh")
2283-
2284-
assert manager._should_terminate(exc) is False
2316+
for exc in [
2317+
exceptions.ResourceExhausted(""),
2318+
exceptions.ServiceUnavailable(""),
2319+
exceptions.DeadlineExceeded(""),
2320+
]:
2321+
assert not manager._should_terminate(exc)
22852322

22862323

22872324
@mock.patch("threading.Thread", autospec=True)

0 commit comments

Comments
 (0)