@@ -47,7 +47,7 @@ def __init__(self, maxsize=0, *, ctx):
4747 self ._wlock = ctx .Lock ()
4848 self ._sem = ctx .BoundedSemaphore (maxsize )
4949 self ._is_shutdown = ctx .Value ('B' , False )
50- self ._n_pendings = ctx .Array ('Q' ,[0 , 0 ])
50+ self ._n_pending_processes = ctx .Array ('Q' ,[0 , 0 ])
5151
5252 # For use by concurrent.futures
5353 self ._ignore_epipe = False
@@ -59,12 +59,12 @@ def __getstate__(self):
5959 context .assert_spawning (self )
6060 return (self ._ignore_epipe , self ._maxsize , self ._reader , self ._writer ,
6161 self ._rlock , self ._wlock , self ._sem , self ._opid ,
62- self ._is_shutdown , self ._n_pendings )
62+ self ._is_shutdown , self ._n_pending_processes )
6363
6464 def __setstate__ (self , state ):
6565 (self ._ignore_epipe , self ._maxsize , self ._reader , self ._writer ,
6666 self ._rlock , self ._wlock , self ._sem , self ._opid ,
67- self ._is_shutdown , self ._n_pendings ) = state
67+ self ._is_shutdown , self ._n_pending_processes ) = state
6868 self ._reset ()
6969
7070 def _after_fork (self ):
@@ -88,15 +88,19 @@ def _reset(self, after_fork=False):
8888
8989 @contextmanager
9090 def _handle_pending_processes (self , get_or_put ):
91- # Count pending processes. Used when queue shutdowns
91+ # Count pending get or put processes in a shared array.
92+ # These two values are only used when queue shutdowns
9293 # to release all pending processes.
93- with self ._n_pendings . get_lock () :
94- self ._n_pendings [get_or_put ] += 1
94+ with self ._n_pending_processes :
95+ self ._n_pending_processes [get_or_put ] += 1
9596 try :
97+ # Wraps calls to _sem.acquire() in put method and,
98+ # calls to _rlock.acquire() or _recv_bytes()
99+ # in get method.
96100 yield
97101 finally :
98- with self ._n_pendings . get_lock () :
99- self ._n_pendings [get_or_put ] -= 1
102+ with self ._n_pending_processes :
103+ self ._n_pending_processes [get_or_put ] -= 1
100104
101105 def put (self , obj , block = True , timeout = None ):
102106 if self ._closed :
@@ -112,7 +116,8 @@ def put(self, obj, block=True, timeout=None):
112116
113117 if self ._is_shutdown .value :
114118 # Released from acquire below.
115- if self ._n_pendings [Queue ._PUTTERS ] > 0 :
119+ if self ._n_pending_processes [Queue ._PUTTERS ] > 0 :
120+ debug ('`put` release next pending putter process -> shutdown' )
116121 self ._sem .release ()
117122 raise ShutDown
118123
@@ -127,6 +132,7 @@ def get(self, block=True, timeout=None):
127132 raise ValueError (f"Queue { self !r} is closed" )
128133 if self ._is_shutdown .value and self .empty ():
129134 raise ShutDown
135+
130136 if block and timeout is None :
131137 with self ._handle_pending_processes (Queue ._GETTERS ):
132138 with self ._rlock :
@@ -156,14 +162,15 @@ def get(self, block=True, timeout=None):
156162 finally :
157163 self ._rlock .release ()
158164
159- # unserialize the data before having released the lock
160- # to check if it's not a dummy item.
161- final_res = _ForkingPickler .loads (res )
162- if isinstance (final_res , _DummyItem ) and self ._is_shutdown .value :
165+ # Unserialize the data before releasing the lock.
166+ # When shutdowns, checks if this is a sentinel item.
167+ item = _ForkingPickler .loads (res )
168+ if self ._is_shutdown .value and isinstance (item , _SentinelShutdown ):
169+ debug ('`get` got _sentinel_shutdown -> shutdown' )
163170 raise ShutDown
164171
165172 self ._sem .release ()
166- return final_res
173+ return item
167174
168175 def qsize (self ):
169176 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
@@ -189,35 +196,35 @@ def _clear(self):
189196 def shutdown (self , immediate = False ):
190197 if self ._closed :
191198 raise ValueError (f"Queue { self !r} is closed" )
192- with self ._is_shutdown . get_lock () :
199+ with self ._is_shutdown :
193200 self ._is_shutdown .value = True
194201
195202 # Unblock all pending getter processes.
196- # Put specific dummy data in the pipe.
197- for _ in range ( self ._n_pendings [ Queue . _GETTERS ]) :
198- with self ._notempty :
199- if self ._thread is None :
200- self ._start_thread ()
201- self ._buffer .append (_dummy )
203+ # Put specific sentinel item in the pipe.
204+ with self ._notempty :
205+ if self ._thread is None :
206+ self ._start_thread ()
207+ for _ in range ( self ._n_pending_processes [ Queue . _GETTERS ]):
208+ self ._buffer .append (_sentinel_shutdown )
202209 self ._notempty .notify ()
203- else :
204- debug (f'on shutdown, { self ._n_pendings [Queue ._GETTERS ]} ' +
205- 'pending getter processes to release' )
210+ else :
211+ debug (f'when shutdown, { self ._n_pending_processes [Queue ._GETTERS ]} ' +
212+ 'pending getter processes to release' )
206213
207214
208215 # Unblock all pending putter processes.
209- if self ._n_pendings [Queue ._PUTTERS ] > 0 :
210- debug (f'on shutdown, { self ._n_pendings [Queue ._PUTTERS ]} ' +
216+ if self ._n_pending_processes [Queue ._PUTTERS ] > 0 :
217+ debug (f'when shutdown, { self ._n_pending_processes [Queue ._PUTTERS ]} ' +
211218 'pending putters processes to release' )
212- # Here we start to release for a first putter process.
213- # When this process is unblock, checks again and
214- # continue to release in cascade
215- # until there is no more putters .
219+ # We start releasing a first putter process.
220+ # In the `put` method, as soon as the target process is
221+ # unblocked, we continue releasing in cascade until
222+ # there are no more putter processes .
216223 self ._sem .release ()
217224
218225 # if there are pending getters processes, queue is empty.
219- if immediate and not self ._n_pendings [Queue ._GETTERS ]:
220- debug (f'on shutdown, clear all items in pipe' )
226+ if immediate and not self ._n_pending_processes [Queue ._GETTERS ]:
227+ debug (f'when shutdown, clear all items in pipe' )
221228 self ._clear ()
222229
223230 def close (self ):
@@ -386,10 +393,10 @@ def _on_queue_feeder_error(e, obj):
386393 __class_getitem__ = classmethod (types .GenericAlias )
387394
388395
389- # dummy data used to release
396+ # sentinel used to release
390397# pending getter processes.
391- class _DummyItem : pass
392- _dummy = _DummyItem ()
398+ class _SentinelShutdown : pass
399+ _sentinel_shutdown = _SentinelShutdown ()
393400
394401
395402_sentinel = object ()
@@ -429,14 +436,14 @@ def put(self, obj, block=True, timeout=None):
429436 raise Full
430437
431438 if self ._is_shutdown .value :
432- if self ._n_pendings [Queue ._PUTTERS ] > 0 :
439+ if self ._n_pending_processes [Queue ._PUTTERS ] > 0 :
433440 self ._sem .release ()
434441 raise ShutDown
435442
436- with self ._notempty : #, self._cond:
443+ with self ._notempty :
437444 # Here it seems to me that `self._cond` is unnecessary in
438445 # the "with" instruction.
439- # So now, this method and its inherited method are identical except
446+ # This method and its inherited method are identical except
440447 # a call to class instructions.
441448 # Here this is a call to `self._unfinished_tasks.release()`.
442449 #
0 commit comments