Skip to content

Commit 80043ff

Browse files
committed
gh-80961: Add daemon parameter to ThreadPoolExecutor
Add `daemon=False` keyword-only parameter to ThreadPoolExecutor. When True, worker threads are created as daemon threads and are not registered in the global _threads_queues, allowing the interpreter to exit without waiting for them to finish.
1 parent 1d091a3 commit 80043ff

File tree

5 files changed

+92
-5
lines changed

5 files changed

+92
-5
lines changed

Doc/library/concurrent.futures.rst

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ Executor Objects
9595
return immediately and the resources associated with the executor will be
9696
freed when all pending futures are done executing. Regardless of the
9797
value of *wait*, the entire Python program will not exit until all
98-
pending futures are done executing.
98+
pending futures are done executing (see the *daemon* parameter of
99+
:class:`ThreadPoolExecutor` for an exception to this rule).
99100

100101
If *cancel_futures* is ``True``, this method will cancel all pending
101102
futures that the executor has not started running. Any futures that
@@ -159,7 +160,7 @@ And::
159160
executor.submit(wait_on_future)
160161

161162

162-
.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
163+
.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), *, daemon=False)
163164

164165
An :class:`Executor` subclass that uses a pool of at most *max_workers*
165166
threads to execute calls asynchronously.
@@ -177,6 +178,13 @@ And::
177178
pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`,
178179
as well as any attempt to submit more jobs to the pool.
179180

181+
If *daemon* is ``True``, worker threads are created as
182+
:ref:`daemon threads <thread-objects>`, allowing the interpreter to exit
183+
without waiting for them to finish. This is useful when tasks may be stuck
184+
indefinitely and would otherwise prevent the process from exiting.
185+
``shutdown(wait=True)`` still joins the threads explicitly. Daemon threads
186+
are not supported in subinterpreters and will raise :exc:`RuntimeError`.
187+
180188
.. versionchanged:: 3.5
181189
If *max_workers* is ``None`` or
182190
not given, it will default to the number of processors on the machine,
@@ -206,6 +214,9 @@ And::
206214
Default value of *max_workers* is changed to
207215
``min(32, (os.process_cpu_count() or 1) + 4)``.
208216

217+
.. versionchanged:: 3.15
218+
Added the *daemon* parameter.
219+
209220

210221
.. _threadpoolexecutor-example:
211222

Doc/whatsnew/3.15.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,11 @@ concurrent.futures
702702
terminated process.
703703
(Contributed by Jonathan Berg in :gh:`139486`.)
704704

705+
* Added *daemon* parameter to :class:`concurrent.futures.ThreadPoolExecutor`.
706+
When set to ``True``, worker threads are created as daemon threads, allowing
707+
the interpreter to exit without waiting for them to finish.
708+
(Contributed by Hrvoje Nikšić in :gh:`80961`.)
709+
705710

706711
contextlib
707712
----------

Lib/concurrent/futures/thread.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ def prepare_context(cls, initializer, initargs):
159159
return WorkerContext.prepare(initializer, initargs)
160160

161161
def __init__(self, max_workers=None, thread_name_prefix='',
162-
initializer=None, initargs=(), **ctxkwargs):
162+
initializer=None, initargs=(), *, daemon=False,
163+
**ctxkwargs):
163164
"""Initializes a new ThreadPoolExecutor instance.
164165
165166
Args:
@@ -168,8 +169,15 @@ def __init__(self, max_workers=None, thread_name_prefix='',
168169
thread_name_prefix: An optional name prefix to give our threads.
169170
initializer: A callable used to initialize worker threads.
170171
initargs: A tuple of arguments to pass to the initializer.
172+
daemon: If True, worker threads are created as daemon threads.
171173
ctxkwargs: Additional arguments to cls.prepare_context().
172174
"""
175+
if daemon:
176+
if not threading._daemon_threads_allowed():
177+
raise RuntimeError(
178+
'daemon threads are disabled in this (sub)interpreter')
179+
self._daemon = daemon
180+
173181
if max_workers is None:
174182
# ThreadPoolExecutor is often used to:
175183
# * CPU bound task which releases GIL
@@ -233,10 +241,12 @@ def weakref_cb(_, q=self._work_queue):
233241
t = threading.Thread(name=thread_name, target=_worker,
234242
args=(weakref.ref(self, weakref_cb),
235243
self._create_worker_context(),
236-
self._work_queue))
244+
self._work_queue),
245+
daemon=True if self._daemon else None)
237246
t.start()
238247
self._threads.add(t)
239-
_threads_queues[t] = self._work_queue
248+
if not self._daemon:
249+
_threads_queues[t] = self._work_queue
240250

241251
def _initializer_failed(self):
242252
with self._shutdown_lock:

Lib/test/test_concurrent_futures/test_shutdown.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,64 @@ def test_cancel_futures_wait_false(self):
261261
self.assertIn(out.strip(), [b"apple", b""])
262262

263263

264+
class ThreadPoolDaemonTest(BaseTestCase):
265+
def test_daemon_worker_threads(self):
266+
executor = futures.ThreadPoolExecutor(max_workers=2, daemon=True)
267+
executor.submit(time.sleep, 0)
268+
executor.shutdown(wait=True)
269+
for t in executor._threads:
270+
self.assertTrue(t.daemon)
271+
272+
def test_default_non_daemon_workers(self):
273+
executor = futures.ThreadPoolExecutor(max_workers=2)
274+
executor.submit(time.sleep, 0)
275+
executor.shutdown(wait=True)
276+
for t in executor._threads:
277+
self.assertFalse(t.daemon)
278+
279+
def test_daemon_workers_untracked(self):
280+
from concurrent.futures.thread import _threads_queues
281+
executor = futures.ThreadPoolExecutor(max_workers=2, daemon=True)
282+
executor.submit(time.sleep, 0)
283+
executor.shutdown(wait=True)
284+
for t in executor._threads:
285+
self.assertNotIn(t, _threads_queues)
286+
287+
def test_daemon_explicit_shutdown_wait(self):
288+
# shutdown(wait=True) should still wait for task completion
289+
results = []
290+
def append_after_sleep():
291+
time.sleep(0.1)
292+
results.append(42)
293+
executor = futures.ThreadPoolExecutor(max_workers=2, daemon=True)
294+
executor.submit(append_after_sleep)
295+
executor.shutdown(wait=True)
296+
self.assertEqual(results, [42])
297+
298+
def test_daemon_exit_no_block(self):
299+
# Interpreter should exit promptly with daemon=True and
300+
# shutdown(wait=False), even if a task is still running.
301+
rc, out, err = assert_python_ok('-c', """if True:
302+
from concurrent.futures import ThreadPoolExecutor
303+
import time
304+
t = ThreadPoolExecutor(max_workers=1, daemon=True)
305+
t.submit(time.sleep, 60)
306+
t.shutdown(wait=False)
307+
""")
308+
self.assertFalse(err)
309+
310+
def test_daemon_context_manager_waits(self):
311+
# Context manager calls shutdown(wait=True), so it should
312+
# still wait for tasks to finish.
313+
results = []
314+
def append_after_sleep():
315+
time.sleep(0.1)
316+
results.append(42)
317+
with futures.ThreadPoolExecutor(max_workers=2, daemon=True) as e:
318+
e.submit(append_after_sleep)
319+
self.assertEqual(results, [42])
320+
321+
264322
class ProcessPoolShutdownTest(ExecutorShutdownTest):
265323
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
266324
def test_processes_terminate(self):
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Added *daemon* parameter to :class:`concurrent.futures.ThreadPoolExecutor`.
2+
When set to ``True``, worker threads are created as daemon threads, allowing
3+
the interpreter to exit without waiting for them to finish.

0 commit comments

Comments
 (0)