1717from __future__ import annotations
1818
1919import asyncio
20- import gc
2120import sys
2221import threading
2322import time
2625
2726from test .asynchronous import AsyncUnitTest , unittest
2827
29- from pymongo import periodic_executor
30- from pymongo .periodic_executor import (
31- AsyncPeriodicExecutor ,
32- _register_executor ,
33- _shutdown_executors ,
34- )
28+ from pymongo .periodic_executor import AsyncPeriodicExecutor
3529
3630_IS_SYNC = False
3731
@@ -49,69 +43,28 @@ async def target():
4943
5044class AsyncPeriodicExecutorTestBase (AsyncUnitTest ):
5145 async def asyncSetUp (self ):
52- self .executor = _make_executor ()
46+ self .executor = None
5347
5448 async def asyncTearDown (self ):
55- self .executor .close ()
56- await self .executor .join (timeout = 2 )
49+ if self .executor is not None :
50+ self .executor .close ()
51+ await self .executor .join (timeout = 2 )
5752
5853
59- class TestAsyncPeriodicExecutorRepr ( AsyncUnitTest ):
54+ class TestAsyncPeriodicExecutor ( AsyncPeriodicExecutorTestBase ):
6055 async def test_repr_contains_class_and_name (self ):
6156 executor = _make_executor (name = "exec" )
6257 executor_repr = repr (executor )
6358 self .assertIn ("AsyncPeriodicExecutor" , executor_repr )
6459 self .assertIn ("exec" , executor_repr )
6560
66-
67- class TestAsyncPeriodicExecutorBasic (AsyncPeriodicExecutorTestBase ):
68- async def test_wake_sets_event (self ):
69- self .assertFalse (self .executor ._event )
70- self .executor .wake ()
71- self .assertTrue (self .executor ._event )
72-
73- async def test_update_interval (self ):
74- self .executor .update_interval (60 )
75- self .assertEqual (self .executor ._interval , 60 )
76-
77- async def test_skip_sleep (self ):
78- self .assertFalse (self .executor ._skip_sleep )
79- self .executor .skip_sleep ()
80- self .assertTrue (self .executor ._skip_sleep )
81-
82-
83- class TestAsyncPeriodicExecutorLifecycle (AsyncPeriodicExecutorTestBase ):
84- async def test_open_starts_worker (self ):
85- self .executor .open ()
86- if _IS_SYNC :
87- self .assertIsNotNone (self .executor ._thread )
88- self .assertTrue (self .executor ._thread .is_alive ())
89- else :
90- self .assertIsNotNone (self .executor ._task )
91-
92- async def test_close_sets_stopped (self ):
93- self .executor .open ()
94- self .executor .close ()
95- self .assertTrue (self .executor ._stopped )
96- await self .executor .join (timeout = 1 )
97-
9861 async def test_join_without_open_is_safe (self ):
99- await self .executor .join (timeout = 0.01 )
100-
101- async def test_multiple_open_calls_have_no_effect (self ):
102- self .executor .open ()
103- if _IS_SYNC :
104- worker_id = id (self .executor ._thread )
105- else :
106- worker_id = id (self .executor ._task )
107- self .executor .open ()
108- if _IS_SYNC :
109- self .assertEqual (worker_id , id (self .executor ._thread ))
110- else :
111- self .assertEqual (worker_id , id (self .executor ._task ))
112-
62+ self .executor = _make_executor ()
63+ try :
64+ await self .executor .join (timeout = 0.01 )
65+ except Exception as e :
66+ self .fail (f"join() raised unexpected Exception { e } " )
11367
114- class TestAsyncPeriodicExecutorTarget (AsyncPeriodicExecutorTestBase ):
11568 async def test_target_returning_false_stops_executor (self ):
11669 if _IS_SYNC :
11770 ran = threading .Event ()
@@ -124,12 +77,8 @@ async def target():
12477
12578 self .executor = _make_executor (target = target )
12679 self .executor .open ()
127- if _IS_SYNC :
128- self .assertTrue (ran .wait (timeout = 2 ), "target never ran" )
129- else :
130- await asyncio .wait_for (ran .wait (), timeout = 2 )
13180 await self .executor .join (timeout = 2 )
132- self .assertTrue (self . executor . _stopped )
81+ self .assertTrue (ran . is_set (), "target never ran" )
13382
13483 async def test_target_exception_stops_executor (self ):
13584 if _IS_SYNC :
@@ -149,33 +98,30 @@ def target():
14998
15099 self .executor = _make_executor (target = target )
151100 self .executor .open ()
152- self .assertTrue (ran .wait (timeout = 2 ), "target never ran" )
153101 self .executor .join (timeout = 2 )
102+ self .assertTrue (ran .is_set (), "target never ran" )
154103 finally :
155104 threading .excepthook = orig_excepthook
156- self .assertTrue (self .executor ._stopped )
157105 self .assertEqual (len (captured_exc ), 1 )
158106 self .assertIsInstance (captured_exc [0 ], RuntimeError )
159107 else :
160- ran = asyncio . Event ()
108+ call_count = 0
161109
162110 async def target ():
163- ran .set ()
111+ nonlocal call_count
112+ call_count += 1
164113 raise RuntimeError ("error" )
165114
166115 self .executor = _make_executor (target = target )
167116 self .executor .open ()
168- await asyncio .wait_for (ran .wait (), timeout = 2 )
169117 await self .executor .join (timeout = 2 )
170- self .assertTrue (self .executor ._stopped )
171- if self .executor ._task is not None and self .executor ._task .done ():
172- self .executor ._task .exception ()
118+ self .assertEqual (call_count , 1 , "target should stop after exception" )
173119
174120 async def test_skip_sleep_flag_skips_interval (self ):
175121 call_times = []
176122
177123 async def target ():
178- call_times .append (time .monotonic () if _IS_SYNC else asyncio . get_running_loop (). time () )
124+ call_times .append (time .monotonic ())
179125 if len (call_times ) >= 2 :
180126 return False
181127 return True
@@ -188,19 +134,18 @@ async def target():
188134 self .assertLess (call_times [1 ] - call_times [0 ], 5.0 )
189135
190136 async def test_wake_causes_early_run (self ):
191- call_count = [ 0 ]
137+ call_count = 0
192138 if _IS_SYNC :
193139 woken = threading .Event ()
194140 else :
195141 woken = asyncio .Event ()
196142
197143 async def target ():
198- call_count [0 ] += 1
199- if call_count [0 ] == 1 :
144+ nonlocal call_count
145+ call_count += 1
146+ if call_count == 1 :
200147 woken .set ()
201- if call_count [0 ] >= 2 :
202- return False
203- return True
148+ return call_count < 2
204149
205150 self .executor = _make_executor (interval = 30.0 , min_interval = 0.01 , target = target )
206151 self .executor .open ()
@@ -210,80 +155,22 @@ async def target():
210155 await asyncio .wait_for (woken .wait (), timeout = 2 )
211156 self .executor .wake ()
212157 await self .executor .join (timeout = 3 )
213- self .assertGreaterEqual (call_count [ 0 ] , 2 )
158+ self .assertGreaterEqual (call_count , 2 )
214159
215160 async def test_open_after_target_returns_false (self ):
216- called = [ 0 ]
161+ called = 0
217162
218163 async def target ():
219- called [0 ] += 1
164+ nonlocal called
165+ called += 1
220166 return False
221167
222168 self .executor = _make_executor (target = target )
223169 self .executor .open ()
224170 await self .executor .join (timeout = 2 )
225- self .assertTrue (self .executor ._stopped )
226- if not _IS_SYNC :
227- first_task = self .executor ._task
228171 self .executor .open ()
229172 await self .executor .join (timeout = 2 )
230- self .assertGreaterEqual (called [0 ], 2 )
231- if not _IS_SYNC :
232- self .assertIsNot (self .executor ._task , first_task )
233-
234-
235- class TestShouldStop (AsyncUnitTest ):
236- if _IS_SYNC :
237-
238- def test_returns_false_when_not_stopped (self ):
239- executor = _make_executor ()
240- self .assertFalse (executor ._should_stop ())
241- self .assertFalse (executor ._thread_will_exit )
242-
243- def test_returns_true_and_sets_thread_will_exit (self ):
244- executor = _make_executor ()
245- executor ._stopped = True
246- self .assertTrue (executor ._should_stop ())
247- self .assertTrue (executor ._thread_will_exit )
248-
249-
250- class TestRegisterExecutor (AsyncUnitTest ):
251- if _IS_SYNC :
252-
253- def setUp (self ):
254- self ._orig = set (periodic_executor ._EXECUTORS )
255-
256- def tearDown (self ):
257- periodic_executor ._EXECUTORS .clear ()
258- periodic_executor ._EXECUTORS .update (self ._orig )
259-
260- def test_register_adds_weakref (self ):
261- executor = _make_executor ()
262- before = len (periodic_executor ._EXECUTORS )
263- _register_executor (executor )
264- self .assertEqual (len (periodic_executor ._EXECUTORS ), before + 1 )
265- ref = next (r for r in periodic_executor ._EXECUTORS if r () is executor )
266- del executor
267- gc .collect ()
268- self .assertNotIn (ref , periodic_executor ._EXECUTORS )
269-
270- def test_shutdown_executors_stops_running_executors (self ):
271- ran = threading .Event ()
272-
273- def target ():
274- ran .set ()
275- return True
276-
277- executor = _make_executor (target = target )
278- executor .open ()
279- self .assertTrue (ran .wait (timeout = 2 ), "target never ran" )
280- _shutdown_executors ()
281- executor .join (timeout = 2 )
282- self .assertTrue (executor ._stopped )
283-
284- def test_shutdown_executors_safe_when_empty (self ):
285- periodic_executor ._EXECUTORS .clear ()
286- _shutdown_executors ()
173+ self .assertGreaterEqual (called , 2 )
287174
288175
289176if __name__ == "__main__" :
0 commit comments