Skip to content

Commit 51a8310

Browse files
committed
Revert all commits after "Use threading.excepthook instead of sys.excepthook"
This reverts this branch back to just calling `threading.excepthook`, instead of trying to raise the exception when the user gets the result or exits the context manager. I would like to get the immediate bug fixed, and I think this is a simple, low-risk fix that does that. From what I understand it isn't possible to prevent the reference cycle which is causing all those test failures. We need to store the exception to raise it later, which implicitly creates one. We can always add raising the exception as an additional improvement later, but in the meantime if any user code wants to handle the exceptions it can do so by installing a default exception handler.
1 parent b7f18ed commit 51a8310

3 files changed

Lines changed: 18 additions & 168 deletions

File tree

Lib/multiprocessing/managers.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1234,7 +1234,7 @@ def __isub__(self, value):
12341234

12351235
BasePoolProxy = MakeProxyType('PoolProxy', (
12361236
'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1237-
'map', 'map_async', 'starmap', 'starmap_async', 'terminate', '_check_error'
1237+
'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
12381238
))
12391239
BasePoolProxy._method_to_typeid_ = {
12401240
'apply_async': 'AsyncResult',
@@ -1248,7 +1248,6 @@ def __enter__(self):
12481248
return self
12491249
def __exit__(self, exc_type, exc_val, exc_tb):
12501250
self.terminate()
1251-
self._check_error(exc_val)
12521251

12531252
#
12541253
# Definition of SyncManager

Lib/multiprocessing/pool.py

Lines changed: 15 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
# Licensed to PSF under a Contributor Agreement.
88
#
99

10-
__all__ = ['BrokenPoolError', 'CallbackError', 'Pool', 'ThreadPool']
10+
__all__ = ['Pool', 'ThreadPool']
1111

1212
#
1313
# Imports
@@ -69,14 +69,6 @@ def __init__(self, exc, tb):
6969
def __reduce__(self):
7070
return rebuild_exc, (self.exc, self.tb)
7171

72-
class BrokenPoolError(ExceptionGroup):
73-
def __init__(self, msg, exc):
74-
super().__init__(msg, exc)
75-
76-
class CallbackError(ExceptionGroup):
77-
def __init__(self, msg, exc):
78-
super().__init__(msg, exc)
79-
8072
def rebuild_exc(exc, tb):
8173
exc.__cause__ = RemoteTraceback(tb)
8274
return exc
@@ -206,7 +198,6 @@ def __init__(self, processes=None, initializer=None, initargs=(),
206198
self._maxtasksperchild = maxtasksperchild
207199
self._initializer = initializer
208200
self._initargs = initargs
209-
self._errors = []
210201

211202
if processes is None:
212203
processes = os.process_cpu_count() or 1
@@ -358,18 +349,9 @@ def _setup_queues(self):
358349
self._quick_get = self._outqueue._reader.recv
359350

360351
def _check_running(self):
361-
self._check_error()
362352
if self._state != RUN:
363353
raise ValueError("Pool not running")
364354

365-
def _check_error(self, exc=None):
366-
if self._errors:
367-
errs = [CallbackError("callback raised", errs) for errs in self._errors]
368-
if exc is not None and not isinstance(exc, CallbackError) \
369-
and not isinstance(exc, BrokenPoolError):
370-
errs.append(exc)
371-
raise BrokenPoolError("Callback(s) failed", errs) from None
372-
373355
def apply(self, func, args=(), kwds={}):
374356
'''
375357
Equivalent of `func(*args, **kwds)`.
@@ -755,11 +737,6 @@ def __enter__(self):
755737

756738
def __exit__(self, exc_type, exc_val, exc_tb):
757739
self.terminate()
758-
self._check_error(exc_val)
759-
760-
def _error(self, error):
761-
util.debug('callback error: %s', error)
762-
self._errors.append(error)
763740

764741
#
765742
# Class whose instances are returned by `Pool.apply_async()`
@@ -774,7 +751,6 @@ def __init__(self, pool, callback, error_callback):
774751
self._cache = pool._cache
775752
self._callback = callback
776753
self._error_callback = error_callback
777-
self._cb_errors = []
778754
self._cache[self._job] = self
779755

780756
def ready(self):
@@ -792,31 +768,30 @@ def get(self, timeout=None):
792768
self.wait(timeout)
793769
if not self.ready():
794770
raise TimeoutError
795-
if self._cb_errors:
796-
raise CallbackError("callback raised", self._cb_errors)
797-
elif self._success:
771+
if self._success:
798772
return self._value
799773
else:
800774
raise self._value
801775

802776
def _set(self, i, obj):
803777
self._success, self._value = obj
804778
if self._callback and self._success:
805-
try:
806-
self._callback(self._value)
807-
except Exception as e:
808-
self._cb_errors = (e,)
809-
self._pool._error(self._cb_errors)
779+
self._handle_exceptions(self._callback, self._value)
810780
if self._error_callback and not self._success:
811-
try:
812-
self._error_callback(self._value)
813-
except Exception as e:
814-
self._cb_errors = (e, self._value)
815-
self._pool._error(self._cb_errors)
781+
self._handle_exceptions(self._error_callback, self._value)
816782
self._event.set()
817783
del self._cache[self._job]
818784
self._pool = None
819785

786+
@staticmethod
787+
def _handle_exceptions(callback, args):
788+
try:
789+
return callback(args)
790+
except Exception as e:
791+
args = threading.ExceptHookArgs([type(e), e, e.__traceback__, None])
792+
threading.excepthook(args)
793+
del args
794+
820795
__class_getitem__ = classmethod(types.GenericAlias)
821796

822797
AsyncResult = ApplyResult # create alias -- see #17805
@@ -847,11 +822,7 @@ def _set(self, i, success_result):
847822
self._value[i*self._chunksize:(i+1)*self._chunksize] = result
848823
if self._number_left == 0:
849824
if self._callback:
850-
try:
851-
self._callback(self._value)
852-
except Exception as e:
853-
self._cb_errors = (e,)
854-
self._pool._error(self._cb_errors)
825+
self._handle_exceptions(self._callback, self._value)
855826
del self._cache[self._job]
856827
self._event.set()
857828
self._pool = None
@@ -863,11 +834,7 @@ def _set(self, i, success_result):
863834
if self._number_left == 0:
864835
# only consider the result ready once all jobs are done
865836
if self._error_callback:
866-
try:
867-
self._error_callback(self._value)
868-
except Exception as e:
869-
self._cb_errors = (e, self._value)
870-
self._pool._error(self._cb_errors)
837+
self._handle_exceptions(self._error_callback, self._value)
871838
del self._cache[self._job]
872839
self._event.set()
873840
self._pool = None

Lib/test/_test_multiprocessing.py

Lines changed: 2 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import multiprocessing.pool
5555
import multiprocessing.queues
5656
from multiprocessing.connection import wait
57-
from multiprocessing.pool import BrokenPoolError, CallbackError
5857

5958
from multiprocessing import util
6059

@@ -3173,123 +3172,9 @@ def test_resource_warning(self):
31733172
pool = None
31743173
support.gc_collect()
31753174

3176-
def test_callback_errors(self):
3177-
def _apply(pool, target, **kwargs):
3178-
return pool.apply_async(target, **kwargs)
3179-
3180-
def _map(pool, target, **kwargs):
3181-
return pool.map_async(target, range(5), **kwargs)
3182-
3183-
for func in [_apply, _map]:
3184-
with self.subTest(func=func):
3185-
3186-
# Fail upon trying to reuse a broken pool after callback failure:
3187-
# - BrokenPoolError containing:
3188-
# - CallbackError containing:
3189-
# - Error thrown from the callback
3190-
with self.assertRaises(BrokenPoolError) as pool_ctx:
3191-
with self.Pool(1) as pool:
3192-
res = func(pool, noop, callback=raising)
3193-
with self.assertRaises(CallbackError) as res_ctx:
3194-
res.get()
3195-
self._check_subexceptions(res_ctx.exception, [KeyError])
3196-
pool.apply_async(noop)
3197-
self._check_subexceptions(pool_ctx.exception, [CallbackError])
3198-
self._check_subexceptions(pool_ctx.exception.exceptions[0], [KeyError])
3199-
3200-
# Fail upon trying to reuse a broken pool after error callback failures:
3201-
# - BrokenPoolError containing:
3202-
# - CallbackError containing:
3203-
# - Error thrown from the callback
3204-
# - Original error
3205-
with self.assertRaises(BrokenPoolError) as pool_ctx:
3206-
with self.Pool(1) as pool:
3207-
res = func(pool, raising2, error_callback=raising)
3208-
with self.assertRaises(CallbackError) as res_ctx:
3209-
res.get()
3210-
self._check_subexceptions(res_ctx.exception,
3211-
[KeyError, IndexError])
3212-
pool.apply_async(noop)
3213-
self._check_subexceptions(pool_ctx.exception, [CallbackError])
3214-
self._check_subexceptions(pool_ctx.exception.exceptions[0],
3215-
[KeyError, IndexError])
3216-
3217-
# Exiting the context manager with a "normal" error and a failed callback
3218-
# - BrokenPoolError containing:
3219-
# - CallbackError containing:
3220-
# - Error thrown from the callback
3221-
# - Exception that caused the context manager to exit
3222-
with self.assertRaises(BrokenPoolError) as pool_ctx:
3223-
with self.Pool(1) as pool:
3224-
res = func(pool, noop, callback=raising)
3225-
with self.assertRaises(CallbackError) as res_ctx:
3226-
res.get()
3227-
raise IndexError()
3228-
self._check_subexceptions(pool_ctx.exception,
3229-
[CallbackError, IndexError])
3230-
3231-
# Exiting the context manager directly with a callback failure error
3232-
# - BrokenPoolError containing:
3233-
# - CallbackError instance containing:
3234-
# - Error thrown from the callback
3235-
# Note that only one instance of the error is present: it was
3236-
# *not* added again as it was above, since it is a CallbackError
3237-
with self.assertRaises(BrokenPoolError) as pool_ctx:
3238-
with self.Pool(1) as pool:
3239-
func(pool, noop, callback=raising).get()
3240-
self._check_subexceptions(pool_ctx.exception, [CallbackError])
3241-
self._check_subexceptions(pool_ctx.exception.exceptions[0], [KeyError])
3242-
3243-
# Exiting the context manager directly with a broken pool error
3244-
# - BrokenPoolError containing:
3245-
# - CallbackError instance containing:
3246-
# - Error thrown from the callback
3247-
# Note that only one instance of the error is present: it was
3248-
# *not* added again as it was above, since it is a BrokenPoolError
3249-
with self.assertRaises(BrokenPoolError) as pool_ctx:
3250-
with self.Pool(1) as pool:
3251-
try:
3252-
func(pool, noop, callback=raising).get()
3253-
except CallbackError:
3254-
pass
3255-
func(pool, noop, callback=raising)
3256-
self._check_subexceptions(pool_ctx.exception, [CallbackError])
3257-
self._check_subexceptions(pool_ctx.exception.exceptions[0], [KeyError])
3258-
3259-
# Skip this test for process-based parallelism as sharing the barrier will fail
3260-
if self.TYPE != 'processes':
3261-
with self.subTest(name="Multiple callback failures"):
3262-
# Fail with 3x callback failure:
3263-
# - BrokenPoolError containing:
3264-
# - 3x CallbackError containing:
3265-
# - Error thrown from the callback
3266-
with self.assertRaises(BrokenPoolError) as pool_ctx:
3267-
kwds = {'barrier': self.Barrier(3)}
3268-
with self.Pool(3) as pool:
3269-
res = [pool.apply_async(noop, kwds=kwds, callback=raising)
3270-
for _ in range(3)]
3271-
for r in res:
3272-
with self.assertRaises(CallbackError) as res_ctx:
3273-
r.get()
3274-
self._check_subexceptions(pool_ctx.exception, [CallbackError] * 3)
3275-
for se in pool_ctx.exception.exceptions:
3276-
self._check_subexceptions(se, [KeyError])
3277-
3278-
def _check_subexceptions(self, group, sub_types):
3279-
self.assertEqual(len(group.exceptions), len(sub_types))
3280-
for sub_exc, sub_type in zip(group.exceptions, sub_types):
3281-
self.assertIsInstance(sub_exc, sub_type)
3282-
3283-
def noop(*args, barrier=None):
3284-
if barrier:
3285-
barrier.wait()
3286-
3287-
def raising(*args):
3175+
def raising():
32883176
raise KeyError("key")
32893177

3290-
def raising2(*args):
3291-
raise IndexError()
3292-
32933178
def unpickleable_result():
32943179
return lambda: 42
32953180

@@ -7080,8 +6965,7 @@ def tearDownClass(cls):
70806965
f"{multiprocessing.active_children()} "
70816966
f"active children after {dt:.1f} seconds")
70826967

7083-
# Garbage collect to ensure otherwise unreferenced cycles are cleaned up
7084-
support.gc_collect()
6968+
gc.collect() # do garbage collection
70856969
if cls.manager._number_of_objects() != 0:
70866970
# This is not really an error since some tests do not
70876971
# ensure that all processes which hold a reference to a

0 commit comments

Comments
 (0)