diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index f979890170b1a1..b5d17746e21a1e 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -595,6 +595,9 @@ def _handle_results(outqueue, get, cache): cache[job]._set(i, obj) except KeyError: pass + except Exception: + # Even if we raised we still want to handle callbacks + traceback.print_exc() task = job = obj = None while cache and thread._state != TERMINATE: @@ -612,6 +615,9 @@ def _handle_results(outqueue, get, cache): cache[job]._set(i, obj) except KeyError: pass + except Exception: + # Even if we raised we still want to handle callbacks + traceback.print_exc() task = job = obj = None if hasattr(outqueue, '_reader'): @@ -775,13 +781,15 @@ def get(self, timeout=None): def _set(self, i, obj): self._success, self._value = obj - if self._callback and self._success: - self._callback(self._value) - if self._error_callback and not self._success: - self._error_callback(self._value) - self._event.set() - del self._cache[self._job] - self._pool = None + try: + if self._callback and self._success: + self._callback(self._value) + if self._error_callback and not self._success: + self._error_callback(self._value) + finally: + self._event.set() + del self._cache[self._job] + self._pool = None __class_getitem__ = classmethod(types.GenericAlias) @@ -812,11 +820,13 @@ def _set(self, i, success_result): if success and self._success: self._value[i*self._chunksize:(i+1)*self._chunksize] = result if self._number_left == 0: - if self._callback: - self._callback(self._value) - del self._cache[self._job] - self._event.set() - self._pool = None + try: + if self._callback: + self._callback(self._value) + finally: + del self._cache[self._job] + self._event.set() + self._pool = None else: if not success and self._success: # only store first exception @@ -824,11 +834,13 @@ def _set(self, i, success_result): self._value = result if self._number_left == 0: # only consider the result ready once all jobs are done - if self._error_callback: - self._error_callback(self._value) - del self._cache[self._job] - self._event.set() - self._pool = None + try: + if self._error_callback: + self._error_callback(self._value) + finally: + del self._cache[self._job] + self._event.set() + self._pool = None # # Class whose instances are returned by `Pool.imap()` diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 69174cff699115..8eafc9611f563b 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3336,6 +3336,39 @@ def errback(exc): p.close() p.join() +class _TestPoolResultHandlerErrors(BaseTestCase): + ALLOWED_TYPES = ('processes', ) + + def test_apply_async_callback_raises_exception(self): + p = multiprocessing.Pool(1) + + def job(): + return 1 + + def callback(value): + raise Exception() + + p.apply_async(job, callback=callback) + + self.assertTrue(p._result_handler.is_alive()) + p.close() + p.join() + + def test_map_async_callback_raises_exception(self): + p = multiprocessing.Pool(1) + + def job(value): + return value + + def callback(value): + raise Exception() + + p.map_async(job, [1], callback=callback) + + self.assertTrue(p._result_handler.is_alive()) + p.close() + p.join() + class _TestPoolWorkerLifetime(BaseTestCase): ALLOWED_TYPES = ('processes', ) @@ -7346,7 +7379,6 @@ def install_tests_in_module_dict(remote_globs, start_method, __module__ = remote_globs['__name__'] local_globs = globals() ALL_TYPES = {'processes', 'threads', 'manager'} - for name, base in local_globs.items(): if not isinstance(base, type): continue diff --git a/Misc/NEWS.d/next/Library/2020-09-03-09-01-34.bpo-39190.oU-ejc.rst b/Misc/NEWS.d/next/Library/2020-09-03-09-01-34.bpo-39190.oU-ejc.rst new file mode 100644 index 00000000000000..891aa976882268 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-09-03-09-01-34.bpo-39190.oU-ejc.rst @@ -0,0 +1 @@ +Fix deadlock when pool.apply_async's and pool.map_async's callbacks raise exception \ No newline at end of file