Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit a6f99e4

Browse files
authored
Various hygiene changes to Pub / Sub subscriber. (#4494)
- Using `%`-formatting in all `logging.log()` calls (e.g. `info()`). I am not opposed to using `.format()` but `logging` "prefers" `%`-formatting (and I wanted to be consistent because hobgoblins). - Adding non-public globals for helper thread names, this was especially needed because I accidentally broke this in #4476 when I changed `callback requests worker` to `CallbackRequestsWorker` in one place, not two. - Adding docstring to `QueueCallbackThread` that explains what it does (I'll refactor this class in a follow-up) - Adding a logging statement when a `QueueCallbackThread` exits - Changing indents / using much more vertical space in calls to `request_queue.put()` in `subscriber.message.Message`. (I came across these when trying to understand how `QueueCallbackThread` interacts with `Policy.on_callback_request`) - Changing `GPRC` to `gRPC` in a docstring - Moving "Creating callback requests thread (not starting)." until right before the resource is created - Changing "Spawning" to "Starting" in a log message to match others
1 parent 96c2c3c commit a6f99e4

4 files changed

Lines changed: 79 additions & 36 deletions

File tree

google/cloud/pubsub_v1/subscriber/_consumer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,9 @@
123123

124124
from google.cloud.pubsub_v1.subscriber import _helper_threads
125125

126+
126127
_LOGGER = logging.getLogger(__name__)
128+
_BIDIRECTIONAL_CONSUMER_NAME = 'ConsumeBidirectionalStream'
127129

128130

129131
class Consumer(object):
@@ -250,7 +252,7 @@ def start_consuming(self):
250252
self.active = True
251253
self._exiting.clear()
252254
self.helper_threads.start(
253-
'ConsumeBidirectionalStream',
255+
_BIDIRECTIONAL_CONSUMER_NAME,
254256
self._request_queue,
255257
self._blocking_consume,
256258
)

google/cloud/pubsub_v1/subscriber/_helper_threads.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def start(self, name, queue, target):
6868

6969
# Keep track of the helper thread, so we are able to stop it.
7070
self._helper_threads[name] = _HelperThread(name, thread, queue)
71-
_LOGGER.debug('Started helper thread {}'.format(name))
71+
_LOGGER.debug('Started helper thread %s', name)
7272
return thread
7373

7474
def stop(self, name):
@@ -86,7 +86,7 @@ def stop(self, name):
8686

8787
# Join the thread if it is still alive.
8888
if helper_thread.thread.is_alive():
89-
_LOGGER.debug('Stopping helper thread {}'.format(name))
89+
_LOGGER.debug('Stopping helper thread %s', name)
9090
helper_thread.queue.put(STOP)
9191
helper_thread.thread.join()
9292

@@ -102,9 +102,25 @@ def stop_all(self):
102102

103103

104104
class QueueCallbackThread(object):
105-
"""A helper thread that executes a callback for every item in
106-
the queue.
105+
"""A helper that executes a callback for every item in the queue.
106+
107+
.. note::
108+
109+
This is not actually a thread, but it is intended to be a target
110+
for a thread.
111+
112+
Calls a blocking ``get()`` on the ``queue`` until it encounters
113+
:attr:`STOP`.
114+
115+
Args:
116+
queue (~queue.Queue): A Queue instance, appropriate for crossing the
117+
concurrency boundary implemented by ``executor``. Items will
118+
be popped off (with a blocking ``get()``) until :attr:`STOP`
119+
is encountered.
120+
callback (Callable): A callback that can process items pulled off
121+
of the queue.
107122
"""
123+
108124
def __init__(self, queue, callback):
109125
self.queue = queue
110126
self._callback = callback
@@ -113,14 +129,12 @@ def __call__(self):
113129
while True:
114130
item = self.queue.get()
115131
if item == STOP:
116-
break
132+
_LOGGER.debug('Exiting the QueueCallbackThread.')
133+
return
117134

118135
# Run the callback. If any exceptions occur, log them and
119136
# continue.
120137
try:
121138
self._callback(item)
122139
except Exception as exc:
123-
_LOGGER.error('{class_}: {message}'.format(
124-
class_=exc.__class__.__name__,
125-
message=str(exc),
126-
))
140+
_LOGGER.error('%s: %s', exc.__class__.__name__, exc)

google/cloud/pubsub_v1/subscriber/message.py

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class Message(object):
3737
publish_time (datetime): The time that this message was originally
3838
published.
3939
"""
40+
4041
def __init__(self, message, ack_id, request_queue):
4142
"""Construct the Message.
4243
@@ -128,11 +129,16 @@ def ack(self):
128129
receive any given message more than once.
129130
"""
130131
time_to_ack = math.ceil(time.time() - self._received_timestamp)
131-
self._request_queue.put(('ack', {
132-
'ack_id': self._ack_id,
133-
'byte_size': self.size,
134-
'time_to_ack': time_to_ack,
135-
}))
132+
self._request_queue.put(
133+
(
134+
'ack',
135+
{
136+
'ack_id': self._ack_id,
137+
'byte_size': self.size,
138+
'time_to_ack': time_to_ack,
139+
},
140+
),
141+
)
136142

137143
def drop(self):
138144
"""Release the message from lease management.
@@ -147,10 +153,15 @@ def drop(self):
147153
both call this one. You probably do not want to call this method
148154
directly.
149155
"""
150-
self._request_queue.put(('drop', {
151-
'ack_id': self._ack_id,
152-
'byte_size': self.size,
153-
}))
156+
self._request_queue.put(
157+
(
158+
'drop',
159+
{
160+
'ack_id': self._ack_id,
161+
'byte_size': self.size,
162+
},
163+
),
164+
)
154165

155166
def lease(self):
156167
"""Inform the policy to lease this message continually.
@@ -159,10 +170,15 @@ def lease(self):
159170
This method is called by the constructor, and you should never
160171
need to call it manually.
161172
"""
162-
self._request_queue.put(('lease', {
163-
'ack_id': self._ack_id,
164-
'byte_size': self.size,
165-
}))
173+
self._request_queue.put(
174+
(
175+
'lease',
176+
{
177+
'ack_id': self._ack_id,
178+
'byte_size': self.size,
179+
},
180+
),
181+
)
166182

167183
def modify_ack_deadline(self, seconds):
168184
"""Set the deadline for acknowledgement to the given value.
@@ -182,17 +198,27 @@ def modify_ack_deadline(self, seconds):
182198
to. This should be between 0 and 600. Due to network latency,
183199
values below 10 are advised against.
184200
"""
185-
self._request_queue.put(('modify_ack_deadline', {
186-
'ack_id': self._ack_id,
187-
'seconds': seconds,
188-
}))
201+
self._request_queue.put(
202+
(
203+
'modify_ack_deadline',
204+
{
205+
'ack_id': self._ack_id,
206+
'seconds': seconds,
207+
},
208+
),
209+
)
189210

190211
def nack(self):
191212
"""Decline to acknowldge the given message.
192213
193214
This will cause the message to be re-delivered to the subscription.
194215
"""
195-
self._request_queue.put(('nack', {
196-
'ack_id': self._ack_id,
197-
'byte_size': self.size,
198-
}))
216+
self._request_queue.put(
217+
(
218+
'nack',
219+
{
220+
'ack_id': self._ack_id,
221+
'byte_size': self.size,
222+
},
223+
),
224+
)

google/cloud/pubsub_v1/subscriber/policy/thread.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030

3131
_LOGGER = logging.getLogger(__name__)
32+
_CALLBACK_WORKER_NAME = 'CallbackRequestsWorker'
3233

3334

3435
def _callback_completed(future):
@@ -104,10 +105,10 @@ def __init__(self, client, subscription, flow_control=types.FlowControl(),
104105
)
105106

106107
# Also maintain a request queue and an executor.
107-
_LOGGER.debug('Creating callback requests thread (not starting).')
108108
if executor is None:
109109
executor = futures.ThreadPoolExecutor(max_workers=10)
110110
self._executor = executor
111+
_LOGGER.debug('Creating callback requests thread (not starting).')
111112
self._callback_requests = _helper_threads.QueueCallbackThread(
112113
self._request_queue,
113114
self.on_callback_request,
@@ -116,7 +117,7 @@ def __init__(self, client, subscription, flow_control=types.FlowControl(),
116117
def close(self):
117118
"""Close the existing connection."""
118119
# Stop consuming messages.
119-
self._consumer.helper_threads.stop('callback requests worker')
120+
self._consumer.helper_threads.stop(_CALLBACK_WORKER_NAME)
120121
self._consumer.stop_consuming()
121122

122123
# The subscription is closing cleanly; resolve the future if it is not
@@ -149,7 +150,7 @@ def open(self, callback):
149150
_LOGGER.debug('Starting callback requests worker.')
150151
self._callback = callback
151152
self._consumer.helper_threads.start(
152-
'CallbackRequestsWorker',
153+
_CALLBACK_WORKER_NAME,
153154
self._request_queue,
154155
self._callback_requests,
155156
)
@@ -159,7 +160,7 @@ def open(self, callback):
159160

160161
# Spawn a helper thread that maintains all of the leases for
161162
# this policy.
162-
_LOGGER.debug('Spawning lease maintenance worker.')
163+
_LOGGER.debug('Starting lease maintenance worker.')
163164
self._leaser = threading.Thread(
164165
name='Thread-LeaseMaintenance',
165166
target=self.maintain_leases,
@@ -171,7 +172,7 @@ def open(self, callback):
171172
return self._future
172173

173174
def on_callback_request(self, callback_request):
174-
"""Map the callback request to the appropriate GRPC request."""
175+
"""Map the callback request to the appropriate gRPC request."""
175176
action, kwargs = callback_request[0], callback_request[1]
176177
getattr(self, action)(**kwargs)
177178

0 commit comments

Comments
 (0)