From b27f9665548b10d9a3e329fe2b25720f84e9d2fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sindri=20Gu=C3=B0mundsson?= Date: Thu, 2 Jan 2020 12:31:22 +0000 Subject: [PATCH 1/2] bpo-39190: Fix deadlock when callback raises --- Lib/multiprocessing/pool.py | 46 +++++++++++++++++++------------ Lib/test/_test_multiprocessing.py | 34 ++++++++++++++++++++++- 2 files changed, 62 insertions(+), 18 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index bbe05a550c349c..436662acaf5f01 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -592,6 +592,9 @@ def _handle_results(outqueue, get, cache): cache[job]._set(i, obj) except KeyError: pass + except Exception: + # Even if we raised we still want to handle callbacks + traceback.print_exc() task = job = obj = None while cache and thread._state != TERMINATE: @@ -609,6 +612,9 @@ def _handle_results(outqueue, get, cache): cache[job]._set(i, obj) except KeyError: pass + except Exception: + # Even if we raised we still want to handle callbacks + traceback.print_exc() task = job = obj = None if hasattr(outqueue, '_reader'): @@ -772,13 +778,15 @@ def get(self, timeout=None): def _set(self, i, obj): self._success, self._value = obj - if self._callback and self._success: - self._callback(self._value) - if self._error_callback and not self._success: - self._error_callback(self._value) - self._event.set() - del self._cache[self._job] - self._pool = None + try: + if self._callback and self._success: + self._callback(self._value) + if self._error_callback and not self._success: + self._error_callback(self._value) + finally: + self._event.set() + del self._cache[self._job] + self._pool = None __class_getitem__ = classmethod(types.GenericAlias) @@ -809,11 +817,13 @@ def _set(self, i, success_result): if success and self._success: self._value[i*self._chunksize:(i+1)*self._chunksize] = result if self._number_left == 0: - if self._callback: - self._callback(self._value) - del self._cache[self._job] - self._event.set() - self._pool = None + try: + if self._callback: + self._callback(self._value) + finally: + del self._cache[self._job] + self._event.set() + self._pool = None else: if not success and self._success: # only store first exception @@ -821,11 +831,13 @@ def _set(self, i, success_result): self._value = result if self._number_left == 0: # only consider the result ready once all jobs are done - if self._error_callback: - self._error_callback(self._value) - del self._cache[self._job] - self._event.set() - self._pool = None + try: + if self._error_callback: + self._error_callback(self._value) + finally: + del self._cache[self._job] + self._event.set() + self._pool = None # # Class whose instances are returned by `Pool.imap()` diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index fd3b4303f034c1..dbaf251fd61de7 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2741,6 +2741,39 @@ def errback(exc): p.close() p.join() +class _TestPoolResultHandlerErrors(BaseTestCase): + ALLOWED_TYPES = ('processes', ) + + def test_apply_async_callback_raises_exception(self): + p = multiprocessing.Pool(1) + + def job(): + return 1 + + def callback(value): + raise Exception() + + p.apply_async(job, callback=callback) + + self.assertTrue(p._result_handler.is_alive()) + p.close() + p.join() + + def test_map_async_callback_raises_exception(self): + p = multiprocessing.Pool(1) + + def job(value): + return value + + def callback(value): + raise Exception() + + p.map_async(job, [1], callback=callback) + + self.assertTrue(p._result_handler.is_alive()) + p.close() + p.join() + class _TestPoolWorkerLifetime(BaseTestCase): ALLOWED_TYPES = ('processes', ) @@ -5740,7 +5773,6 @@ def install_tests_in_module_dict(remote_globs, start_method): __module__ = remote_globs['__name__'] local_globs = globals() ALL_TYPES = {'processes', 'threads', 'manager'} - for name, base in local_globs.items(): if not isinstance(base, type): continue From ca50674022a0f716bdc1d2a50124fd02cdc80621 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Thu, 3 Sep 2020 09:01:36 +0000 Subject: [PATCH 2/2] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../NEWS.d/next/Library/2020-09-03-09-01-34.bpo-39190.oU-ejc.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2020-09-03-09-01-34.bpo-39190.oU-ejc.rst diff --git a/Misc/NEWS.d/next/Library/2020-09-03-09-01-34.bpo-39190.oU-ejc.rst b/Misc/NEWS.d/next/Library/2020-09-03-09-01-34.bpo-39190.oU-ejc.rst new file mode 100644 index 00000000000000..891aa976882268 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-09-03-09-01-34.bpo-39190.oU-ejc.rst @@ -0,0 +1 @@ +Fix deadlock when pool.apply_async's and pool.map_async's callbacks raise exception \ No newline at end of file