forked from python/cpython
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_pool.py
More file actions
44 lines (36 loc) · 1.55 KB
/
test_pool.py
File metadata and controls
44 lines (36 loc) · 1.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from multiprocessing import Manager
import time
import unittest
from .util import BaseTestCase, setup_module
class PoolExecutorTest(BaseTestCase):
def test_map_buffersize(self):
manager = Manager()
for ExecutorType in (ThreadPoolExecutor, ProcessPoolExecutor):
with ExecutorType(max_workers=1) as pool:
with self.assertRaisesRegex(
ValueError, "buffersize must be None or >= 1."
):
pool.map(bool, [], buffersize=0)
for buffersize, iterable_size in [
(1, 5),
(5, 5),
(10, 5),
]:
iterable = range(iterable_size)
processed_elements = manager.list()
with ExecutorType(max_workers=1) as pool:
iterator = pool.map(
processed_elements.append, iterable, buffersize=buffersize
)
time.sleep(0.2) # wait for buffered futures to finish
self.assertSetEqual(set(processed_elements), set(range(min(buffersize, iterable_size))))
next(iterator)
time.sleep(0.1) # wait for the created future to finish
self.assertSetEqual(
set(processed_elements), set(range(min(buffersize + 1, iterable_size)))
)
def setUpModule():
setup_module()
if __name__ == "__main__":
unittest.main()