@@ -33,15 +33,20 @@ def test_main(self, ipc_device, nmrs):
3333 device = ipc_device
3434 options = DeviceMemoryResourceOptions (max_size = POOL_SIZE , ipc_enabled = True )
3535 mrs = [DeviceMemoryResource (device , options = options ) for _ in range (nmrs )]
36- buffers = [mr .allocate (NBYTES ) for mr , _ in zip (cycle (mrs ), range (NTASKS ))]
3736
38- with mp . Pool ( NWORKERS ) as pool :
39- pool . map ( self . process_buffer , buffers )
37+ try :
38+ buffers = [ mr . allocate ( NBYTES ) for mr , _ in zip ( cycle ( mrs ), range ( NTASKS ))]
4039
41- pgen = PatternGen (device , NBYTES )
42- for buffer in buffers :
43- pgen .verify_buffer (buffer , seed = True )
44- buffer .close ()
40+ with mp .Pool (NWORKERS ) as pool :
41+ pool .map (self .process_buffer , buffers )
42+
43+ pgen = PatternGen (device , NBYTES )
44+ for buffer in buffers :
45+ pgen .verify_buffer (buffer , seed = True )
46+ buffer .close ()
47+ finally :
48+ for mr in mrs :
49+ mr .close ()
4550
4651 def process_buffer (self , buffer ):
4752 device = Device (buffer .memory_resource .device_id )
@@ -70,18 +75,23 @@ def test_main(self, ipc_device, nmrs):
7075 device = ipc_device
7176 options = DeviceMemoryResourceOptions (max_size = POOL_SIZE , ipc_enabled = True )
7277 mrs = [DeviceMemoryResource (device , options = options ) for _ in range (nmrs )]
73- buffers = [mr .allocate (NBYTES ) for mr , _ in zip (cycle (mrs ), range (NTASKS ))]
7478
75- with mp .Pool (NWORKERS , initializer = self .init_worker , initargs = (mrs ,)) as pool :
76- pool .starmap (
77- self .process_buffer ,
78- [(mrs .index (buffer .memory_resource ), buffer .get_ipc_descriptor ()) for buffer in buffers ],
79- )
79+ try :
80+ buffers = [mr .allocate (NBYTES ) for mr , _ in zip (cycle (mrs ), range (NTASKS ))]
8081
81- pgen = PatternGen (device , NBYTES )
82- for buffer in buffers :
83- pgen .verify_buffer (buffer , seed = True )
84- buffer .close ()
82+ with mp .Pool (NWORKERS , initializer = self .init_worker , initargs = (mrs ,)) as pool :
83+ pool .starmap (
84+ self .process_buffer ,
85+ [(mrs .index (buffer .memory_resource ), buffer .get_ipc_descriptor ()) for buffer in buffers ],
86+ )
87+
88+ pgen = PatternGen (device , NBYTES )
89+ for buffer in buffers :
90+ pgen .verify_buffer (buffer , seed = True )
91+ buffer .close ()
92+ finally :
93+ for mr in mrs :
94+ mr .close ()
8595
8696 def process_buffer (self , mr_idx , buffer_desc ):
8797 mr = self .mrs [mr_idx ]
@@ -115,15 +125,20 @@ def test_main(self, ipc_device, nmrs):
115125 device = ipc_device
116126 options = DeviceMemoryResourceOptions (max_size = POOL_SIZE , ipc_enabled = True )
117127 mrs = [DeviceMemoryResource (device , options = options ) for _ in range (nmrs )]
118- buffers = [mr .allocate (NBYTES ) for mr , _ in zip (cycle (mrs ), range (NTASKS ))]
119128
120- with mp . Pool ( NWORKERS , initializer = self . init_worker , initargs = ( mrs ,)) as pool :
121- pool . starmap ( self . process_buffer , [( device , pickle . dumps ( buffer )) for buffer in buffers ])
129+ try :
130+ buffers = [ mr . allocate ( NBYTES ) for mr , _ in zip ( cycle ( mrs ), range ( NTASKS ))]
122131
123- pgen = PatternGen (device , NBYTES )
124- for buffer in buffers :
125- pgen .verify_buffer (buffer , seed = True )
126- buffer .close ()
132+ with mp .Pool (NWORKERS , initializer = self .init_worker , initargs = (mrs ,)) as pool :
133+ pool .starmap (self .process_buffer , [(device , pickle .dumps (buffer )) for buffer in buffers ])
134+
135+ pgen = PatternGen (device , NBYTES )
136+ for buffer in buffers :
137+ pgen .verify_buffer (buffer , seed = True )
138+ buffer .close ()
139+ finally :
140+ for mr in mrs :
141+ mr .close ()
127142
128143 def process_buffer (self , device , buffer_s ):
129144 device .set_current ()
0 commit comments