@@ -67,60 +67,81 @@ class Policy(base.BasePolicy):
6767
6868 This consumer handles the connection to the Pub/Sub service and all of
6969 the concurrency needs.
70+
71+ Args:
72+ client (~.pubsub_v1.subscriber.client): The subscriber client used
73+ to create this instance.
74+ subscription (str): The name of the subscription. The canonical
75+ format for this is
76+ ``projects/{project}/subscriptions/{subscription}``.
77+ flow_control (~google.cloud.pubsub_v1.types.FlowControl): The flow
78+ control settings.
79+ executor (~concurrent.futures.ThreadPoolExecutor): (Optional.) A
80+ ThreadPoolExecutor instance, or anything duck-type compatible
81+ with it.
82+ queue (~queue.Queue): (Optional.) A Queue instance, appropriate
83+ for crossing the concurrency boundary implemented by
84+ ``executor``.
7085 """
86+
7187 def __init__ (self , client , subscription , flow_control = types .FlowControl (),
7288 executor = None , queue = None ):
73- """Instantiate the policy.
89+ super (Policy , self ).__init__ (
90+ client = client ,
91+ flow_control = flow_control ,
92+ subscription = subscription ,
93+ )
94+ # Default the callback to a no-op; the **actual** callback is
95+ # provided by ``.open()``.
96+ self ._callback = _do_nothing_callback
97+ # Create a queue for keeping track of shared state.
98+ self ._request_queue = self ._get_queue (queue )
99+ # Also maintain an executor.
100+ self ._executor = self ._get_executor (executor )
101+
102+ @staticmethod
103+ def _get_queue (queue ):
104+ """Gets a queue for the constructor.
74105
75106 Args:
76- client (~.pubsub_v1.subscriber.client): The subscriber client used
77- to create this instance.
78- subscription (str): The name of the subscription. The canonical
79- format for this is
80- ``projects/{project}/subscriptions/{subscription}``.
81- flow_control (~google.cloud.pubsub_v1.types.FlowControl): The flow
82- control settings.
83- executor (~concurrent.futures.ThreadPoolExecutor): (Optional.) A
84- ThreadPoolExecutor instance, or anything duck-type compatible
85- with it.
86- queue (~queue.Queue): (Optional.) A Queue instance, appropriate
107+ queue (Optional[~queue.Queue]): A Queue instance, appropriate
87108 for crossing the concurrency boundary implemented by
88109 ``executor``.
89- """
90- # Default the callback to a no-op; it is provided by `.open`.
91- self ._callback = _do_nothing_callback
92110
93- # Default the future to None; it is provided by `.open`.
94- self . _future = None
95-
96- # Create a queue for keeping track of shared state.
111+ Returns:
112+ ~queue.Queue: Either ``queue`` if not :data:` None` or a default
113+ queue.
114+ """
97115 if queue is None :
98- queue = queue_mod .Queue ()
99- self ._request_queue = queue
116+ return queue_mod .Queue ()
117+ else :
118+ return queue
100119
101- # Call the superclass constructor.
102- super (Policy , self ).__init__ (
103- client = client ,
104- flow_control = flow_control ,
105- subscription = subscription ,
106- )
120+ @staticmethod
121+ def _get_executor (executor ):
122+ """Gets an executor for the constructor.
123+
124+ Args:
125+ executor (Optional[~concurrent.futures.ThreadPoolExecutor]): A
126+ ThreadPoolExecutor instance, or anything duck-type compatible
127+ with it.
107128
108- # Also maintain a request queue and an executor.
129+ Returns:
130+ ~concurrent.futures.ThreadPoolExecutor: Either ``executor`` if not
131+ :data:`None` or a default thread pool executor with 10 workers
132+ and a prefix (if supported).
133+ """
109134 if executor is None :
110135 executor_kwargs = {}
111136 if sys .version_info [:2 ] == (2 , 7 ) or sys .version_info >= (3 , 6 ):
112137 executor_kwargs ['thread_name_prefix' ] = (
113138 'ThreadPoolExecutor-SubscriberPolicy' )
114- executor = futures .ThreadPoolExecutor (
139+ return futures .ThreadPoolExecutor (
115140 max_workers = 10 ,
116141 ** executor_kwargs
117142 )
118- self ._executor = executor
119- _LOGGER .debug ('Creating callback requests thread (not starting).' )
120- self ._callback_requests = _helper_threads .QueueCallbackWorker (
121- self ._request_queue ,
122- self .dispatch_callback ,
123- )
143+ else :
144+ return executor
124145
125146 def close (self ):
126147 """Close the existing connection."""
@@ -158,10 +179,14 @@ def open(self, callback):
158179 # Start the thread to pass the requests.
159180 _LOGGER .debug ('Starting callback requests worker.' )
160181 self ._callback = callback
182+ dispatch_worker = _helper_threads .QueueCallbackWorker (
183+ self ._request_queue ,
184+ self .dispatch_callback ,
185+ )
161186 self ._consumer .helper_threads .start (
162187 _CALLBACK_WORKER_NAME ,
163188 self ._request_queue .put ,
164- self . _callback_requests ,
189+ dispatch_worker ,
165190 )
166191
167192 # Actually start consuming messages.
0 commit comments