1212
1313if TYPE_CHECKING :
1414 from collections .abc import Callable
15+ from multiprocessing .sharedctypes import Synchronized
16+
17+ # The `spawn` start method is used for all processes in the shared-memory estimation test. Unlike `fork`, it is safe
18+ # to use from the multi-threaded pytest process (forking a multi-threaded process is deprecated since Python 3.12),
19+ # but it requires all process targets to be picklable, module-level functions.
20+ _ctx = get_context ('spawn' )
21+
22+ _EXTRA_MEMORY_SIZE = 1024 * 1024 * 100 # 100 MB
1523
1624
1725def test_get_memory_info_returns_valid_values () -> None :
@@ -26,6 +34,100 @@ def test_get_cpu_info_returns_valid_values() -> None:
2634 assert 0 <= cpu_info .used_ratio <= 1
2735
2836
37+ def _no_extra_memory_child (ready : synchronize .Barrier , measured : synchronize .Barrier ) -> None :
38+ ready .wait ()
39+ measured .wait ()
40+
41+
42+ def _extra_memory_child (ready : synchronize .Barrier , measured : synchronize .Barrier ) -> None :
43+ memory = SharedMemory (size = _EXTRA_MEMORY_SIZE , create = True )
44+ assert memory .buf is not None
45+ memory .buf [:] = bytearray ([255 for _ in range (_EXTRA_MEMORY_SIZE )])
46+ print (f'Using the memory... { memory .buf [- 1 ]} ' )
47+ ready .wait ()
48+ measured .wait ()
49+ memory .close ()
50+ memory .unlink ()
51+
52+
53+ def _shared_extra_memory_child (ready : synchronize .Barrier , measured : synchronize .Barrier , memory : SharedMemory ) -> None :
54+ assert memory .buf is not None
55+ print (f'Using the memory... { memory .buf [- 1 ]} ' )
56+ ready .wait ()
57+ measured .wait ()
58+
59+
60+ def _get_additional_memory_estimation_while_running_processes (
61+ * , target : Callable , count : int = 1 , use_shared_memory : bool = False
62+ ) -> float :
63+ processes = []
64+ ready = _ctx .Barrier (parties = count + 1 )
65+ measured = _ctx .Barrier (parties = count + 1 )
66+ shared_memory : None | SharedMemory = None
67+ memory_before = get_memory_info ().current_size
68+
69+ if use_shared_memory :
70+ shared_memory = SharedMemory (size = _EXTRA_MEMORY_SIZE , create = True )
71+ assert shared_memory .buf is not None
72+ shared_memory .buf [:] = bytearray ([255 for _ in range (_EXTRA_MEMORY_SIZE )])
73+ extra_args = [shared_memory ]
74+ else :
75+ extra_args = []
76+
77+ for _ in range (count ):
78+ p = _ctx .Process (target = target , args = [ready , measured , * extra_args ])
79+ p .start ()
80+ processes .append (p )
81+
82+ ready .wait ()
83+ memory_during = get_memory_info ().current_size
84+ measured .wait ()
85+
86+ for p in processes :
87+ p .join ()
88+
89+ if shared_memory :
90+ shared_memory .close ()
91+ shared_memory .unlink ()
92+
93+ return (memory_during - memory_before ).to_mb () / count
94+
95+
96+ def _parent_process (estimated_memory_expectation : Synchronized ) -> None :
97+ children_count = 4
98+ # Memory calculation is not exact, so allow for some tolerance.
99+ test_tolerance = 0.3
100+
101+ additional_memory_simple_child = _get_additional_memory_estimation_while_running_processes (
102+ target = _no_extra_memory_child , count = children_count
103+ )
104+ additional_memory_extra_memory_child = (
105+ _get_additional_memory_estimation_while_running_processes (target = _extra_memory_child , count = children_count )
106+ - additional_memory_simple_child
107+ )
108+ additional_memory_shared_extra_memory_child = (
109+ _get_additional_memory_estimation_while_running_processes (
110+ target = _shared_extra_memory_child , count = children_count , use_shared_memory = True
111+ )
112+ - additional_memory_simple_child
113+ )
114+
115+ memory_estimation_difference_ratio = (
116+ abs ((additional_memory_shared_extra_memory_child * children_count ) - additional_memory_extra_memory_child )
117+ / additional_memory_extra_memory_child
118+ )
119+
120+ estimated_memory_expectation .value = memory_estimation_difference_ratio < test_tolerance
121+
122+ if not estimated_memory_expectation .value :
123+ print (
124+ f'{ additional_memory_shared_extra_memory_child = } \n '
125+ f'{ children_count = } \n '
126+ f'{ additional_memory_extra_memory_child = } \n '
127+ f'{ memory_estimation_difference_ratio = } '
128+ )
129+
130+
29131@pytest .mark .skipif (sys .platform != 'linux' , reason = 'Improved estimation available only on Linux' )
30132def test_memory_estimation_does_not_overestimate_due_to_shared_memory () -> None :
31133 """Test that memory usage estimation is not overestimating memory usage by counting shared memory multiple times.
@@ -38,103 +140,9 @@ def test_memory_estimation_does_not_overestimate_due_to_shared_memory() -> None:
38140 equal to additional_memory_size_estimate_per_unshared_memory_child where the additional shared memory is exactly
39141 the same as the unshared memory.
40142 """
143+ estimated_memory_expectation = _ctx .Value ('b' , False ) # noqa: FBT003 # Common usage pattern for multiprocessing.Value
41144
42- ctx = get_context ('fork' )
43- estimated_memory_expectation = ctx .Value ('b' , False ) # noqa: FBT003 # Common usage pattern for multiprocessing.Value
44-
45- def parent_process () -> None :
46- extra_memory_size = 1024 * 1024 * 100 # 100 MB
47- children_count = 4
48- # Memory calculation is not exact, so allow for some tolerance.
49- test_tolerance = 0.3
50-
51- def no_extra_memory_child (ready : synchronize .Barrier , measured : synchronize .Barrier ) -> None :
52- ready .wait ()
53- measured .wait ()
54-
55- def extra_memory_child (ready : synchronize .Barrier , measured : synchronize .Barrier ) -> None :
56- memory = SharedMemory (size = extra_memory_size , create = True )
57- assert memory .buf is not None
58- memory .buf [:] = bytearray ([255 for _ in range (extra_memory_size )])
59- print (f'Using the memory... { memory .buf [- 1 ]} ' )
60- ready .wait ()
61- measured .wait ()
62- memory .close ()
63- memory .unlink ()
64-
65- def shared_extra_memory_child (
66- ready : synchronize .Barrier , measured : synchronize .Barrier , memory : SharedMemory
67- ) -> None :
68- assert memory .buf is not None
69- print (f'Using the memory... { memory .buf [- 1 ]} ' )
70- ready .wait ()
71- measured .wait ()
72-
73- def get_additional_memory_estimation_while_running_processes (
74- * , target : Callable , count : int = 1 , use_shared_memory : bool = False
75- ) -> float :
76- processes = []
77- ready = ctx .Barrier (parties = count + 1 )
78- measured = ctx .Barrier (parties = count + 1 )
79- shared_memory : None | SharedMemory = None
80- memory_before = get_memory_info ().current_size
81-
82- if use_shared_memory :
83- shared_memory = SharedMemory (size = extra_memory_size , create = True )
84- assert shared_memory .buf is not None
85- shared_memory .buf [:] = bytearray ([255 for _ in range (extra_memory_size )])
86- extra_args = [shared_memory ]
87- else :
88- extra_args = []
89-
90- for _ in range (count ):
91- p = ctx .Process (target = target , args = [ready , measured , * extra_args ])
92- p .start ()
93- processes .append (p )
94-
95- ready .wait ()
96- memory_during = get_memory_info ().current_size
97- measured .wait ()
98-
99- for p in processes :
100- p .join ()
101-
102- if shared_memory :
103- shared_memory .close ()
104- shared_memory .unlink ()
105-
106- return (memory_during - memory_before ).to_mb () / count
107-
108- additional_memory_simple_child = get_additional_memory_estimation_while_running_processes (
109- target = no_extra_memory_child , count = children_count
110- )
111- additional_memory_extra_memory_child = (
112- get_additional_memory_estimation_while_running_processes (target = extra_memory_child , count = children_count )
113- - additional_memory_simple_child
114- )
115- additional_memory_shared_extra_memory_child = (
116- get_additional_memory_estimation_while_running_processes (
117- target = shared_extra_memory_child , count = children_count , use_shared_memory = True
118- )
119- - additional_memory_simple_child
120- )
121-
122- memory_estimation_difference_ratio = (
123- abs ((additional_memory_shared_extra_memory_child * children_count ) - additional_memory_extra_memory_child )
124- / additional_memory_extra_memory_child
125- )
126-
127- estimated_memory_expectation .value = memory_estimation_difference_ratio < test_tolerance
128-
129- if not estimated_memory_expectation .value :
130- print (
131- f'{ additional_memory_shared_extra_memory_child = } \n '
132- f'{ children_count = } \n '
133- f'{ additional_memory_extra_memory_child = } \n '
134- f'{ memory_estimation_difference_ratio = } '
135- )
136-
137- process = ctx .Process (target = parent_process )
145+ process = _ctx .Process (target = _parent_process , args = (estimated_memory_expectation ,))
138146 process .start ()
139147 process .join ()
140148
0 commit comments