Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit 1f06eb7

Browse files
authored
Dropping usage of HelperThreadRegistry in Pub / Sub policy. (#4536)
1 parent 66f13c6 commit 1f06eb7

4 files changed

Lines changed: 105 additions & 37 deletions

File tree

google/cloud/pubsub_v1/subscriber/_helper_threads.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,6 @@ class HelperThreadRegistry(object):
5757
def __init__(self):
5858
self._helper_threads = {}
5959

60-
def __contains__(self, needle):
61-
return needle in self._helper_threads
62-
6360
def start(self, name, queue_put, target):
6461
"""Create and start a helper thread.
6562

google/cloud/pubsub_v1/subscriber/policy/thread.py

Lines changed: 58 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131

3232
_LOGGER = logging.getLogger(__name__)
33-
_CALLBACK_WORKER_NAME = 'CallbackRequestsWorker'
33+
_CALLBACK_WORKER_NAME = 'Thread-Consumer-CallbackRequestsWorker'
3434

3535

3636
def _callback_completed(future):
@@ -98,6 +98,9 @@ def __init__(self, client, subscription, flow_control=types.FlowControl(),
9898
self._request_queue = self._get_queue(queue)
9999
# Also maintain an executor.
100100
self._executor = self._get_executor(executor)
101+
# The threads created in ``.open()``.
102+
self._dispatch_thread = None
103+
self._leases_thread = None
101104

102105
@staticmethod
103106
def _get_queue(queue):
@@ -146,8 +149,12 @@ def _get_executor(executor):
146149
def close(self):
147150
"""Close the existing connection."""
148151
# Stop consuming messages.
149-
self._consumer.helper_threads.stop(_CALLBACK_WORKER_NAME)
152+
self._request_queue.put(_helper_threads.STOP)
153+
self._dispatch_thread.join() # Wait until stopped.
154+
self._dispatch_thread = None
150155
self._consumer.stop_consuming()
156+
self._leases_thread.join()
157+
self._leases_thread = None
151158
self._executor.shutdown()
152159

153160
# The subscription is closing cleanly; resolve the future if it is not
@@ -156,6 +163,53 @@ def close(self):
156163
self._future.set_result(None)
157164
self._future = None
158165

166+
def _start_dispatch(self):
167+
"""Start a thread to dispatch requests queued up by callbacks.
168+
169+
.. note::
170+
171+
This assumes, but does not check, that ``_dispatch_thread``
172+
is :data:`None`.
173+
174+
Spawns a thread to run :meth:`dispatch_callback` and sets the
175+
"dispatch thread" member on the current policy.
176+
"""
177+
_LOGGER.debug('Starting callback requests worker.')
178+
dispatch_worker = _helper_threads.QueueCallbackWorker(
179+
self._request_queue,
180+
self.dispatch_callback,
181+
)
182+
# Create and start the helper thread.
183+
thread = threading.Thread(
184+
name=_CALLBACK_WORKER_NAME,
185+
target=dispatch_worker,
186+
)
187+
thread.daemon = True
188+
thread.start()
189+
_LOGGER.debug('Started helper thread %s', thread.name)
190+
self._dispatch_thread = thread
191+
192+
def _start_lease_worker(self):
193+
"""Spawn a helper thread that maintains all of leases for this policy.
194+
195+
.. note::
196+
197+
This assumes, but does not check, that ``_leases_thread`` is
198+
:data:`None`.
199+
200+
Spawns a thread to run :meth:`maintain_leases` and sets the
201+
"leases thread" member on the current policy.
202+
"""
203+
_LOGGER.debug('Starting lease maintenance worker.')
204+
thread = threading.Thread(
205+
name='Thread-LeaseMaintenance',
206+
target=self.maintain_leases,
207+
)
208+
thread.daemon = True
209+
thread.start()
210+
211+
self._leases_thread = thread
212+
159213
def open(self, callback):
160214
"""Open a streaming pull connection and begin receiving messages.
161215
@@ -177,30 +231,11 @@ def open(self, callback):
177231
self._future = Future(policy=self)
178232

179233
# Start the thread to pass the requests.
180-
_LOGGER.debug('Starting callback requests worker.')
181234
self._callback = callback
182-
dispatch_worker = _helper_threads.QueueCallbackWorker(
183-
self._request_queue,
184-
self.dispatch_callback,
185-
)
186-
self._consumer.helper_threads.start(
187-
_CALLBACK_WORKER_NAME,
188-
self._request_queue.put,
189-
dispatch_worker,
190-
)
191-
235+
self._start_dispatch()
192236
# Actually start consuming messages.
193237
self._consumer.start_consuming()
194-
195-
# Spawn a helper thread that maintains all of the leases for
196-
# this policy.
197-
_LOGGER.debug('Starting lease maintenance worker.')
198-
self._leaser = threading.Thread(
199-
name='Thread-LeaseMaintenance',
200-
target=self.maintain_leases,
201-
)
202-
self._leaser.daemon = True
203-
self._leaser.start()
238+
self._start_lease_worker()
204239

205240
# Return the future.
206241
return self._future

tests/unit/pubsub_v1/subscriber/test_policy_base.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,10 @@ def test_load():
180180
assert policy._load == 0.2
181181

182182
# Returning a number above 100% is fine.
183-
policy.lease(ack_id='three', byte_size=1000)
184-
assert policy._load == 1.16
183+
with mock.patch.object(policy, 'close') as close:
184+
policy.lease(ack_id='three', byte_size=1000)
185+
assert policy._load == 1.16
186+
close.assert_called_once_with()
185187

186188

187189
def test_modify_ack_deadline():

tests/unit/pubsub_v1/subscriber/test_policy_thread.py

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from google.auth import credentials
2222
import mock
2323
import pytest
24+
import six
2425
from six.moves import queue
2526

2627
from google.cloud.pubsub_v1 import subscriber
@@ -49,36 +50,69 @@ def test_init_with_executor():
4950

5051

5152
def test_close():
53+
dispatch_thread = mock.Mock(spec=threading.Thread)
54+
leases_thread = mock.Mock(spec=threading.Thread)
55+
5256
policy = create_policy()
57+
policy._dispatch_thread = dispatch_thread
58+
policy._leases_thread = leases_thread
5359
consumer = policy._consumer
5460
with mock.patch.object(consumer, 'stop_consuming') as stop_consuming:
5561
policy.close()
5662
stop_consuming.assert_called_once_with()
57-
assert 'callback request worker' not in policy._consumer.helper_threads
63+
64+
assert policy._dispatch_thread is None
65+
dispatch_thread.join.assert_called_once_with()
66+
assert policy._leases_thread is None
67+
leases_thread.join.assert_called_once_with()
5868

5969

6070
def test_close_with_future():
71+
dispatch_thread = mock.Mock(spec=threading.Thread)
72+
leases_thread = mock.Mock(spec=threading.Thread)
73+
6174
policy = create_policy()
75+
policy._dispatch_thread = dispatch_thread
76+
policy._leases_thread = leases_thread
6277
policy._future = Future(policy=policy)
6378
consumer = policy._consumer
6479
with mock.patch.object(consumer, 'stop_consuming') as stop_consuming:
6580
future = policy.future
6681
policy.close()
6782
stop_consuming.assert_called_once_with()
83+
84+
assert policy._dispatch_thread is None
85+
dispatch_thread.join.assert_called_once_with()
86+
assert policy._leases_thread is None
87+
leases_thread.join.assert_called_once_with()
6888
assert policy.future != future
6989
assert future.result() is None
7090

7191

72-
@mock.patch.object(_helper_threads.HelperThreadRegistry, 'start')
73-
@mock.patch.object(threading.Thread, 'start')
74-
def test_open(thread_start, htr_start):
92+
def test_open():
7593
policy = create_policy()
76-
with mock.patch.object(policy._consumer, 'start_consuming') as consuming:
94+
consumer = policy._consumer
95+
threads = (
96+
mock.Mock(spec=('name', 'start')),
97+
mock.Mock(spec=('name', 'start')),
98+
mock.Mock(spec=('name', 'start')),
99+
)
100+
with mock.patch.object(threading, 'Thread', side_effect=threads):
77101
policy.open(mock.sentinel.CALLBACK)
78-
assert policy._callback is mock.sentinel.CALLBACK
79-
consuming.assert_called_once_with()
80-
htr_start.assert_called()
81-
thread_start.assert_called()
102+
103+
assert policy._callback is mock.sentinel.CALLBACK
104+
105+
assert policy._dispatch_thread is threads[0]
106+
threads[0].start.assert_called_once_with()
107+
108+
threads_dict = consumer.helper_threads._helper_threads
109+
assert len(threads_dict) == 1
110+
helper_thread = next(six.itervalues(threads_dict))
111+
assert helper_thread.thread is threads[1]
112+
threads[1].start.assert_called_once_with()
113+
114+
assert policy._leases_thread is threads[2]
115+
threads[2].start.assert_called_once_with()
82116

83117

84118
def test_dispatch_callback_valid_actions():

0 commit comments

Comments
 (0)