Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit d768d4f

Browse files
authored
PubSub: Remove deprecated methods and settings (#8836)
* Remove Message lease() method and autolease param These two have been deprecated in 0.44.0 and it's time to remove them. * Remove FlowControl.resume_threshold setting * Remove FlowControl.max_requests setting * Remove FlowControl.max_request_batch_size setting * Remove FlowControl.max_request_batch_latency * Promote hardcoded values to module constants
1 parent cd14b5a commit d768d4f

5 files changed

Lines changed: 37 additions & 135 deletions

File tree

google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@
2727
_CALLBACK_WORKER_NAME = "Thread-CallbackRequestDispatcher"
2828

2929

30+
_MAX_BATCH_SIZE = 100
31+
"""The maximum number of requests to process and dispatch at a time."""
32+
33+
_MAX_BATCH_LATENCY = 0.01
34+
"""The maximum amount of time in seconds to wait for additional request items
35+
before processing the next batch of requests."""
36+
37+
3038
class Dispatcher(object):
3139
def __init__(self, manager, queue):
3240
self._manager = manager
@@ -42,12 +50,11 @@ def start(self):
4250
if self._thread is not None:
4351
raise ValueError("Dispatcher is already running.")
4452

45-
flow_control = self._manager.flow_control
4653
worker = helper_threads.QueueCallbackWorker(
4754
self._queue,
4855
self.dispatch_callback,
49-
max_items=flow_control.max_request_batch_size,
50-
max_latency=flow_control.max_request_batch_latency,
56+
max_items=_MAX_BATCH_SIZE,
57+
max_latency=_MAX_BATCH_LATENCY,
5158
)
5259
# Create and start the helper thread.
5360
thread = threading.Thread(name=_CALLBACK_WORKER_NAME, target=worker)

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@
4444
exceptions.GatewayTimeout,
4545
exceptions.Aborted,
4646
)
47+
_MAX_LOAD = 1.0
48+
"""The load threshold above which to pause the incoming message stream."""
49+
50+
_RESUME_THRESHOLD = 0.8
51+
"""The load threshold below which to resume the incoming message stream."""
4752

4853

4954
def _maybe_wrap_exception(exception):
@@ -223,7 +228,7 @@ def add_close_callback(self, callback):
223228
def maybe_pause_consumer(self):
224229
"""Check the current load and pause the consumer if needed."""
225230
with self._pause_resume_lock:
226-
if self.load >= 1.0:
231+
if self.load >= _MAX_LOAD:
227232
if self._consumer is not None and not self._consumer.is_paused:
228233
_LOGGER.debug(
229234
"Message backlog over load at %.2f, pausing.", self.load
@@ -252,7 +257,7 @@ def maybe_resume_consumer(self):
252257
# currently on hold, if the current load allows for it.
253258
self._maybe_release_messages()
254259

255-
if self.load < self.flow_control.resume_threshold:
260+
if self.load < _RESUME_THRESHOLD:
256261
_LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
257262
self._consumer.resume()
258263
else:
@@ -271,7 +276,7 @@ def _maybe_release_messages(self):
271276
The method assumes the caller has acquired the ``_pause_resume_lock``.
272277
"""
273278
while True:
274-
if self.load >= 1.0:
279+
if self.load >= _MAX_LOAD:
275280
break # already overloaded
276281

277282
try:
@@ -518,12 +523,9 @@ def _on_response(self, response):
518523

519524
for received_message in response.received_messages:
520525
message = google.cloud.pubsub_v1.subscriber.message.Message(
521-
received_message.message,
522-
received_message.ack_id,
523-
self._scheduler.queue,
524-
autolease=False,
526+
received_message.message, received_message.ack_id, self._scheduler.queue
525527
)
526-
if self.load < 1.0:
528+
if self.load < _MAX_LOAD:
527529
req = requests.LeaseRequest(
528530
ack_id=message.ack_id, byte_size=message.size
529531
)

google/cloud/pubsub_v1/subscriber/message.py

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import json
1919
import math
2020
import time
21-
import warnings
2221

2322
from google.api_core import datetime_helpers
2423
from google.cloud.pubsub_v1.subscriber._protocol import requests
@@ -71,7 +70,7 @@ class Message(object):
7170
published.
7271
"""
7372

74-
def __init__(self, message, ack_id, request_queue, autolease=True):
73+
def __init__(self, message, ack_id, request_queue):
7574
"""Construct the Message.
7675
7776
.. note::
@@ -86,13 +85,6 @@ def __init__(self, message, ack_id, request_queue, autolease=True):
8685
request_queue (queue.Queue): A queue provided by the policy that
8786
can accept requests; the policy is responsible for handling
8887
those requests.
89-
autolease (bool): An optional flag determining whether a new Message
90-
instance should automatically lease itself upon creation.
91-
Defaults to :data:`True`.
92-
93-
.. note::
94-
.. deprecated:: 0.44.0
95-
Parameter will be removed in future versions.
9688
"""
9789
self._message = message
9890
self._ack_id = ack_id
@@ -104,11 +96,6 @@ def __init__(self, message, ack_id, request_queue, autolease=True):
10496
# the default lease deadline.
10597
self._received_timestamp = time.time()
10698

107-
# The policy should lease this message, telling PubSub that it has
108-
# it until it is acked or otherwise dropped.
109-
if autolease:
110-
self.lease()
111-
11299
def __repr__(self):
113100
# Get an abbreviated version of the data.
114101
abbv_data = self._message.data
@@ -213,26 +200,6 @@ def drop(self):
213200
requests.DropRequest(ack_id=self._ack_id, byte_size=self.size)
214201
)
215202

216-
def lease(self):
217-
"""Inform the policy to lease this message continually.
218-
219-
.. note::
220-
By default this method is called by the constructor, and you should
221-
never need to call it manually, unless the
222-
:class:`~.pubsub_v1.subscriber.message.Message` instance was
223-
created with ``autolease=False``.
224-
225-
.. deprecated:: 0.44.0
226-
Will be removed in future versions.
227-
"""
228-
warnings.warn(
229-
"lease() is deprecated since 0.44.0, and will be removed in future versions.",
230-
category=DeprecationWarning,
231-
)
232-
self._request_queue.put(
233-
requests.LeaseRequest(ack_id=self._ack_id, byte_size=self.size)
234-
)
235-
236203
def modify_ack_deadline(self, seconds):
237204
"""Resets the deadline for acknowledgement.
238205

google/cloud/pubsub_v1/types.py

Lines changed: 1 addition & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from __future__ import absolute_import
1616
import collections
1717
import sys
18-
import textwrap
1918

2019
from google.api import http_pb2
2120
from google.iam.v1 import iam_policy_pb2
@@ -67,24 +66,11 @@
6766
# these settings can be altered to tweak Pub/Sub behavior.
6867
# The defaults should be fine for most use cases.
6968
FlowControl = collections.namedtuple(
70-
"FlowControl",
71-
[
72-
"max_bytes",
73-
"max_messages",
74-
"resume_threshold",
75-
"max_requests",
76-
"max_request_batch_size",
77-
"max_request_batch_latency",
78-
"max_lease_duration",
79-
],
69+
"FlowControl", ["max_bytes", "max_messages", "max_lease_duration"]
8070
)
8171
FlowControl.__new__.__defaults__ = (
8272
100 * 1024 * 1024, # max_bytes: 100mb
8373
100, # max_messages: 100
84-
0.8, # resume_threshold: 80%
85-
100, # max_requests: 100
86-
100, # max_request_batch_size: 100
87-
0.01, # max_request_batch_latency: 0.01s
8874
2 * 60 * 60, # max_lease_duration: 2 hours.
8975
)
9076

@@ -101,42 +87,6 @@
10187
"The maximum number of received - but not yet processed - messages before "
10288
"pausing the message stream."
10389
)
104-
FlowControl.resume_threshold.__doc__ = textwrap.dedent(
105-
"""
106-
The relative threshold of the ``max_bytes`` and ``max_messages`` limits
107-
below which to resume the message stream. Must be a positive number not
108-
greater than ``1.0``.
109-
110-
.. note::
111-
.. deprecated:: 0.44.0
112-
Will be removed in future versions."""
113-
)
114-
FlowControl.max_requests.__doc__ = textwrap.dedent(
115-
"""
116-
Currently not in use.
117-
118-
.. note::
119-
.. deprecated:: 0.44.0
120-
Will be removed in future versions."""
121-
)
122-
FlowControl.max_request_batch_size.__doc__ = textwrap.dedent(
123-
"""
124-
The maximum number of requests scheduled by callbacks to process and
125-
dispatch at a time.
126-
127-
.. note::
128-
.. deprecated:: 0.44.0
129-
Will be removed in future versions."""
130-
)
131-
FlowControl.max_request_batch_latency.__doc__ = textwrap.dedent(
132-
"""
133-
The maximum amount of time in seconds to wait for additional request
134-
items before processing the next batch of requests.
135-
136-
.. note::
137-
.. deprecated:: 0.44.0
138-
Will be removed in future versions."""
139-
)
14090
FlowControl.max_lease_duration.__doc__ = (
14191
"The maximum amount of time in seconds to hold a lease on a message "
14292
"before dropping it from the lease management."

tests/unit/pubsub_v1/subscriber/test_message.py

Lines changed: 15 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import time
1717

1818
import mock
19-
import pytest
2019
import pytz
2120
from six.moves import queue
2221
from google.protobuf import timestamp_pb2
@@ -34,28 +33,22 @@
3433
PUBLISHED_SECONDS = datetime_helpers.to_milliseconds(PUBLISHED) // 1000
3534

3635

37-
def create_message(data, ack_id="ACKID", autolease=True, **attrs):
38-
with mock.patch.object(message.Message, "lease") as lease:
39-
with mock.patch.object(time, "time") as time_:
40-
time_.return_value = RECEIVED_SECONDS
41-
msg = message.Message(
42-
types.PubsubMessage(
43-
attributes=attrs,
44-
data=data,
45-
message_id="message_id",
46-
publish_time=timestamp_pb2.Timestamp(
47-
seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
48-
),
36+
def create_message(data, ack_id="ACKID", **attrs):
37+
with mock.patch.object(time, "time") as time_:
38+
time_.return_value = RECEIVED_SECONDS
39+
msg = message.Message(
40+
types.PubsubMessage(
41+
attributes=attrs,
42+
data=data,
43+
message_id="message_id",
44+
publish_time=timestamp_pb2.Timestamp(
45+
seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
4946
),
50-
ack_id,
51-
queue.Queue(),
52-
autolease=autolease,
53-
)
54-
if autolease:
55-
lease.assert_called_once_with()
56-
else:
57-
lease.assert_not_called()
58-
return msg
47+
),
48+
ack_id,
49+
queue.Queue(),
50+
)
51+
return msg
5952

6053

6154
def test_attributes():
@@ -84,11 +77,6 @@ def test_publish_time():
8477
assert msg.publish_time == PUBLISHED
8578

8679

87-
def test_disable_autolease_on_creation():
88-
# the create_message() helper does the actual assertion
89-
create_message(b"foo", autolease=False)
90-
91-
9280
def check_call_types(mock, *args, **kwargs):
9381
"""Checks a mock's call types.
9482
@@ -134,18 +122,6 @@ def test_drop():
134122
check_call_types(put, requests.DropRequest)
135123

136124

137-
def test_lease():
138-
msg = create_message(b"foo", ack_id="bogus_ack_id")
139-
140-
pytest_warns = pytest.warns(DeprecationWarning)
141-
with pytest_warns, mock.patch.object(msg._request_queue, "put") as put:
142-
msg.lease()
143-
put.assert_called_once_with(
144-
requests.LeaseRequest(ack_id="bogus_ack_id", byte_size=30)
145-
)
146-
check_call_types(put, requests.LeaseRequest)
147-
148-
149125
def test_modify_ack_deadline():
150126
msg = create_message(b"foo", ack_id="bogus_ack_id")
151127
with mock.patch.object(msg._request_queue, "put") as put:

0 commit comments

Comments
 (0)