77import time
88import weakref
99from concurrent .futures import CancelledError , TimeoutError
10+ from itertools import islice
1011from unittest import mock
1112
1213import pytest
@@ -175,6 +176,9 @@ def test_map(executor):
175176 results = list (executor .map (lambda x : x + 1 , range (10 )))
176177 assert results == [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ]
177178
179+ results = list (executor .map (lambda x , y : x + y , range (10 ), range (9 )))
180+ assert results == [0 , 2 , 4 , 6 , 8 , 10 , 12 , 14 , 16 ]
181+
178182
179183def test_map_timeout (executor ):
180184 """Test that map with timeout raises TimeoutError and cancels futures"""
@@ -200,6 +204,56 @@ def func(x):
200204 assert set (results ) != {0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 }
201205
202206
207+ def test_map_error (executor ):
208+ """Test that map with an exception will raise, and remaining tasks are cancelled"""
209+ results = []
210+
211+ def func (x ):
212+ nonlocal results
213+ time .sleep (0.05 )
214+ if len (results ) == 5 :
215+ raise ValueError ("Test error" )
216+ results .append (x )
217+ return x
218+
219+ with pytest .raises (ValueError ):
220+ list (executor .map (func , range (15 )))
221+
222+ executor .shutdown (wait = True , cancel_futures = False )
223+ assert len (results ) <= 10 , "Final 5 at least should have been cancelled"
224+
225+
226+ @pytest .mark .parametrize ("cancel" , [True , False ])
227+ def test_map_shutdown (executor , cancel ):
228+ results = []
229+
230+ def func (x ):
231+ nonlocal results
232+ time .sleep (0.05 )
233+ results .append (x )
234+ return x
235+
236+ # Get the first few results.
237+ # Keep the iterator alive so that it isn't closed when its reference is dropped.
238+ m = executor .map (func , range (15 ))
239+ values = list (islice (m , 5 ))
240+ assert values == [0 , 1 , 2 , 3 , 4 ]
241+
242+ executor .shutdown (wait = True , cancel_futures = cancel )
243+ if cancel :
244+ assert len (results ) < 15 , "Some tasks should have been cancelled"
245+ else :
246+ assert len (results ) == 15 , "All tasks should have been completed"
247+
248+
249+ def test_map_start (executor ):
250+ """Test that map starts tasks immediately, before iterating"""
251+ e = threading .Event ()
252+ m = executor .map (lambda x : (e .set (), x ), range (1 ))
253+ e .wait (timeout = 0.1 )
254+ assert list (m ) == [(None , 0 )]
255+
256+
203257def test_closing (executor ):
204258 """Test that closing context manager works as expected"""
205259 # mock the shutdown method of the executor
0 commit comments