Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit ca86dd8

Browse files
authored
Break Policy<->Consumer reference cycle. (#4552)
Make `Policy` the parent of `Consumer` and explicitly require passing a `policy` into `Consumer.start_consuming()` (and its helpers).
1 parent 012400f commit ca86dd8

7 files changed

Lines changed: 69 additions & 57 deletions

File tree

google/cloud/pubsub_v1/subscriber/_consumer.py

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -176,13 +176,7 @@ class Consumer(object):
176176
low. The Consumer and end-user can configure any sort of executor they want
177177
for the actual processing of the responses, which may be CPU intensive.
178178
"""
179-
def __init__(self, policy):
180-
"""
181-
Args:
182-
policy (Consumer): The consumer policy, which defines how
183-
requests and responses are handled.
184-
"""
185-
self._policy = policy
179+
def __init__(self):
186180
self._request_queue = queue.Queue()
187181
self.stopped = threading.Event()
188182
self._put_lock = threading.Lock()
@@ -197,18 +191,24 @@ def send_request(self, request):
197191
with self._put_lock:
198192
self._request_queue.put(request)
199193

200-
def _request_generator_thread(self):
194+
def _request_generator_thread(self, policy):
201195
"""Generate requests for the stream.
202196
203197
This blocks for new requests on the request queue and yields them to
204198
gRPC.
205199
200+
Args:
201+
policy (~.pubsub_v1.subscriber.policy.base.BasePolicy): The policy
202+
that owns this consumer. A policy is used to create the
203+
initial request used to open the streaming pull bidirectional
204+
stream.
205+
206206
Yields:
207207
google.cloud.pubsub_v1.types.StreamingPullRequest: Requests
208208
"""
209209
# First, yield the initial request. This occurs on every new
210210
# connection, fundamentally including a resumed connection.
211-
initial_request = self._policy.get_initial_request(ack_queue=True)
211+
initial_request = policy.get_initial_request(ack_queue=True)
212212
_LOGGER.debug('Sending initial request:\n%r', initial_request)
213213
yield initial_request
214214

@@ -290,8 +290,13 @@ def _stop_request_generator(self, request_generator):
290290
_LOGGER.debug('Successfully closed request generator.')
291291
return True
292292

293-
def _blocking_consume(self):
294-
"""Consume the stream indefinitely."""
293+
def _blocking_consume(self, policy):
294+
"""Consume the stream indefinitely.
295+
296+
Args:
297+
policy (~.pubsub_v1.subscriber.policy.base.BasePolicy): The policy,
298+
which defines how requests and responses are handled.
299+
"""
295300
while True:
296301
# It is possible that a timeout can cause the stream to not
297302
# exit cleanly when the user has called stop_consuming(). This
@@ -301,12 +306,12 @@ def _blocking_consume(self):
301306
_LOGGER.debug('Event signalled consumer exit.')
302307
break
303308

304-
request_generator = self._request_generator_thread()
305-
response_generator = self._policy.call_rpc(request_generator)
309+
request_generator = self._request_generator_thread(policy)
310+
response_generator = policy.call_rpc(request_generator)
306311
try:
307312
for response in response_generator:
308313
_LOGGER.debug('Received response:\n%r', response)
309-
self._policy.on_response(response)
314+
policy.on_response(response)
310315

311316
# If the loop above exits without an exception, then the
312317
# request stream terminated cleanly, which should only happen
@@ -315,19 +320,29 @@ def _blocking_consume(self):
315320
_LOGGER.debug('Clean RPC loop exit signalled consumer exit.')
316321
break
317322
except Exception as exc:
318-
recover = self._policy.on_exception(exc)
323+
recover = policy.on_exception(exc)
319324
if recover:
320325
recover = self._stop_request_generator(request_generator)
321326
if not recover:
322327
self._stop_no_join()
323328
return
324329

325-
def start_consuming(self):
326-
"""Start consuming the stream."""
330+
def start_consuming(self, policy):
331+
"""Start consuming the stream.
332+
333+
Sets the ``_consumer_thread`` member on the current consumer with
334+
a newly started thread.
335+
336+
Args:
337+
policy (~.pubsub_v1.subscriber.policy.base.BasePolicy): The policy
338+
that owns this consumer. A policy defines how requests and
339+
responses are handled.
340+
"""
327341
self.stopped.clear()
328342
thread = threading.Thread(
329343
name=_BIDIRECTIONAL_CONSUMER_NAME,
330344
target=self._blocking_consume,
345+
args=(policy,),
331346
)
332347
thread.daemon = True
333348
thread.start()

google/cloud/pubsub_v1/subscriber/_histogram.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class Histogram(object):
2323
2424
The default implementation uses the 99th percentile of previous ack
2525
times to implicitly lease messages; however, custom
26-
:class:`~.pubsub_v1.subscriber.consumer.base.BaseConsumer` subclasses
26+
:class:`~.pubsub_v1.subscriber._consumer.Consumer` subclasses
2727
are free to use a different formula.
2828
2929
The precision of data stored is to the nearest integer. Additionally,

google/cloud/pubsub_v1/subscriber/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def subscribe(self, subscription, callback=None, flow_control=()):
9797
"""Return a representation of an individual subscription.
9898
9999
This method creates and returns a ``Consumer`` object (that is, a
100-
:class:`~.pubsub_v1.subscriber.consumer.base.BaseConsumer`)
100+
:class:`~.pubsub_v1.subscriber._consumer.Consumer`)
101101
subclass) bound to the topic. It does `not` create the subcription
102102
on the backend (or do any API call at all); it simply returns an
103103
object capable of doing these things.
@@ -122,7 +122,7 @@ def subscribe(self, subscription, callback=None, flow_control=()):
122122
inundated with too many messages at once.
123123
124124
Returns:
125-
~.pubsub_v1.subscriber.consumer.base.BaseConsumer: An instance
125+
~.pubsub_v1.subscriber._consumer.Consumer: An instance
126126
of the defined ``consumer_class`` on the client.
127127
128128
Raises:

google/cloud/pubsub_v1/subscriber/message.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class Message(object):
2626
them in callbacks on subscriptions; most users should never have a need
2727
to instantiate them by hand. (The exception to this is if you are
2828
implementing a custom subclass to
29-
:class:`~.pubsub_v1.subscriber.consumer.BaseConsumer`.)
29+
:class:`~.pubsub_v1.subscriber._consumer.Consumer`.)
3030
3131
Attributes:
3232
message_id (str): The message ID. In general, you should not need
@@ -186,7 +186,7 @@ def modify_ack_deadline(self, seconds):
186186
The default implementation handles this for you; you should not need
187187
to manually deal with setting ack deadlines. The exception case is
188188
if you are implementing your own custom subclass of
189-
:class:`~.pubsub_v1.subcriber.consumer.BaseConsumer`.
189+
:class:`~.pubsub_v1.subcriber._consumer.Consumer`.
190190
191191
.. note::
192192
This is not an extension; it *sets* the deadline to the given

google/cloud/pubsub_v1/subscriber/policy/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def __init__(self, client, subscription,
7575
flow_control=types.FlowControl(), histogram_data=None):
7676
self._client = client
7777
self._subscription = subscription
78-
self._consumer = _consumer.Consumer(self)
78+
self._consumer = _consumer.Consumer()
7979
self._ack_deadline = 10
8080
self._last_histogram_size = 0
8181
self._future = None

google/cloud/pubsub_v1/subscriber/policy/thread.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ def open(self, callback):
234234
self._callback = callback
235235
self._start_dispatch()
236236
# Actually start consuming messages.
237-
self._consumer.start_consuming()
237+
self._consumer.start_consuming(self)
238238
self._start_lease_worker()
239239

240240
# Return the future.

tests/unit/pubsub_v1/subscriber/test_consumer.py

Lines changed: 30 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -27,24 +27,20 @@
2727
from google.cloud.pubsub_v1.subscriber.policy import thread
2828

2929

30-
def create_consumer():
31-
creds = mock.Mock(spec=credentials.Credentials)
32-
client = subscriber.Client(credentials=creds)
33-
subscription = client.subscribe('sub_name_e')
34-
return _consumer.Consumer(policy=subscription)
35-
36-
3730
def test_send_request():
38-
consumer = create_consumer()
31+
consumer = _consumer.Consumer()
3932
request = types.StreamingPullRequest(subscription='foo')
4033
with mock.patch.object(queue.Queue, 'put') as put:
4134
consumer.send_request(request)
4235
put.assert_called_once_with(request)
4336

4437

4538
def test_request_generator_thread():
46-
consumer = create_consumer()
47-
generator = consumer._request_generator_thread()
39+
consumer = _consumer.Consumer()
40+
creds = mock.Mock(spec=credentials.Credentials)
41+
client = subscriber.Client(credentials=creds)
42+
policy = client.subscribe('sub_name_e')
43+
generator = consumer._request_generator_thread(policy)
4844

4945
# The first request that comes from the request generator thread
5046
# should always be the initial request.
@@ -64,27 +60,24 @@ def test_request_generator_thread():
6460

6561

6662
def test_blocking_consume():
67-
consumer = create_consumer()
68-
Policy = type(consumer._policy)
63+
policy = mock.Mock(spec=('call_rpc', 'on_response'))
64+
policy.call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)
6965

70-
# Establish that we get responses until we run out of them.
71-
with mock.patch.object(Policy, 'call_rpc', autospec=True) as call_rpc:
72-
call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)
73-
with mock.patch.object(Policy, 'on_response', autospec=True) as on_res:
74-
consumer._blocking_consume()
75-
assert on_res.call_count == 2
76-
assert on_res.mock_calls[0][1][1] == mock.sentinel.A
77-
assert on_res.mock_calls[1][1][1] == mock.sentinel.B
66+
consumer = _consumer.Consumer()
67+
assert consumer._blocking_consume(policy) is None
68+
policy.call_rpc.assert_called_once()
69+
policy.on_response.assert_has_calls(
70+
[mock.call(mock.sentinel.A), mock.call(mock.sentinel.B)])
7871

7972

8073
@mock.patch.object(_consumer, '_LOGGER')
8174
def test_blocking_consume_when_exiting(_LOGGER):
82-
consumer = create_consumer()
75+
consumer = _consumer.Consumer()
8376
assert consumer.stopped.is_set() is False
8477
consumer.stopped.set()
8578

8679
# Make sure method cleanly exits.
87-
assert consumer._blocking_consume() is None
80+
assert consumer._blocking_consume(None) is None
8881

8982
_LOGGER.debug.assert_called_once_with('Event signalled consumer exit.')
9083

@@ -107,12 +100,12 @@ def test_blocking_consume_on_exception():
107100
exc = TypeError('Bad things!')
108101
policy.on_response.side_effect = exc
109102

110-
consumer = _consumer.Consumer(policy=policy)
103+
consumer = _consumer.Consumer()
111104
consumer._consumer_thread = mock.Mock(spec=threading.Thread)
112105
policy.on_exception.side_effect = OnException()
113106

114107
# Establish that we get responses until we are sent the exiting event.
115-
consumer._blocking_consume()
108+
consumer._blocking_consume(policy)
116109
assert consumer._consumer_thread is None
117110

118111
# Check mocks.
@@ -131,12 +124,12 @@ def test_blocking_consume_two_exceptions():
131124
exc2 = ValueError('Something grumble.')
132125
policy.on_response.side_effect = (exc1, exc2)
133126

134-
consumer = _consumer.Consumer(policy=policy)
127+
consumer = _consumer.Consumer()
135128
consumer._consumer_thread = mock.Mock(spec=threading.Thread)
136129
policy.on_exception.side_effect = OnException(acceptable=exc1)
137130

138131
# Establish that we get responses until we are sent the exiting event.
139-
consumer._blocking_consume()
132+
consumer._blocking_consume(policy)
140133
assert consumer._consumer_thread is None
141134

142135
# Check mocks.
@@ -148,20 +141,24 @@ def test_blocking_consume_two_exceptions():
148141

149142

150143
def test_start_consuming():
151-
consumer = create_consumer()
144+
creds = mock.Mock(spec=credentials.Credentials)
145+
client = subscriber.Client(credentials=creds)
146+
policy = client.subscribe('sub_name_e')
147+
consumer = _consumer.Consumer()
152148
with mock.patch.object(threading, 'Thread', autospec=True) as Thread:
153-
consumer.start_consuming()
149+
consumer.start_consuming(policy)
154150

155151
assert consumer.stopped.is_set() is False
156152
Thread.assert_called_once_with(
157153
name=_consumer._BIDIRECTIONAL_CONSUMER_NAME,
158154
target=consumer._blocking_consume,
155+
args=(policy,),
159156
)
160157
assert consumer._consumer_thread is Thread.return_value
161158

162159

163160
def test_stop_consuming():
164-
consumer = create_consumer()
161+
consumer = _consumer.Consumer()
165162
assert consumer.stopped.is_set() is False
166163
thread = mock.Mock(spec=threading.Thread)
167164
consumer._consumer_thread = thread
@@ -188,7 +185,7 @@ def test_stop_request_generator_not_running():
188185
# - The request queue **is not** empty
189186
# Expected result:
190187
# - ``_stop_request_generator()`` successfully calls ``.close()``
191-
consumer = create_consumer()
188+
consumer = _consumer.Consumer()
192189
queue_ = consumer._request_queue
193190
received = queue.Queue()
194191
request_generator = basic_queue_generator(queue_, received)
@@ -227,7 +224,7 @@ def test_stop_request_generator_close_failure():
227224
# Expected result:
228225
# - ``_stop_request_generator()`` falls through to the ``LOGGER.error``
229226
# case and returns ``False``
230-
consumer = create_consumer()
227+
consumer = _consumer.Consumer()
231228

232229
request_generator = mock.Mock(spec=('close',))
233230
request_generator.close.side_effect = TypeError('Really, not a generator')
@@ -247,7 +244,7 @@ def test_stop_request_generator_queue_non_empty():
247244
# - ``_stop_request_generator()`` can't call ``.close()`` (since
248245
# the generator is running) but then returns with ``False`` because
249246
# the queue **is not** empty
250-
consumer = create_consumer()
247+
consumer = _consumer.Consumer()
251248
# Attach a "fake" queue to the request generator so the generator can
252249
# block on an empty queue while the consumer's queue is not empty.
253250
queue_ = queue.Queue()
@@ -292,7 +289,7 @@ def test_stop_request_generator_running():
292289
# the generator is running) but then verifies that the queue is
293290
# empty and sends ``STOP`` into the queue to successfully stop
294291
# the generator
295-
consumer = create_consumer()
292+
consumer = _consumer.Consumer()
296293
queue_ = consumer._request_queue
297294
received = queue.Queue()
298295
request_generator = basic_queue_generator(queue_, received)

0 commit comments

Comments
 (0)