Skip to content

Commit ed1ddb6

Browse files
authored
PYTHON-5784 Coverage increase for periodic_executor.py (#2771)
1 parent 98c002a commit ed1ddb6

4 files changed

Lines changed: 371 additions & 2 deletions

File tree

pymongo/periodic_executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def wake(self) -> None:
8787
"""Execute the target function soon."""
8888
self._event = True
8989

90-
def update_interval(self, new_interval: int) -> None:
90+
def update_interval(self, new_interval: float) -> None:
9191
self._interval = new_interval
9292

9393
def skip_sleep(self) -> None:
@@ -217,7 +217,7 @@ def wake(self) -> None:
217217
"""Execute the target function soon."""
218218
self._event = True
219219

220-
def update_interval(self, new_interval: int) -> None:
220+
def update_interval(self, new_interval: float) -> None:
221221
self._interval = new_interval
222222

223223
def skip_sleep(self) -> None:
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
# Copyright 2026-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Unit tests for periodic_executor.py."""
16+
17+
from __future__ import annotations
18+
19+
import asyncio
20+
import sys
21+
import threading
22+
import time
23+
24+
sys.path[0:0] = [""]
25+
26+
from test.asynchronous import AsyncUnitTest, unittest
27+
28+
from pymongo.periodic_executor import AsyncPeriodicExecutor
29+
30+
_IS_SYNC = False
31+
32+
33+
class TestAsyncPeriodicExecutor(AsyncUnitTest):
34+
def _make_executor(self, interval=30.0, min_interval=0.01, target=None, name="test"):
35+
if target is None:
36+
37+
async def target():
38+
return True
39+
40+
executor = AsyncPeriodicExecutor(
41+
interval=interval, min_interval=min_interval, target=target, name=name
42+
)
43+
self.addAsyncCleanup(self._close_executor, executor)
44+
return executor
45+
46+
async def _close_executor(self, executor):
47+
executor.close()
48+
await executor.join(timeout=2)
49+
50+
async def test_join_without_open_is_safe(self):
51+
executor = self._make_executor()
52+
try:
53+
await executor.join(timeout=0.01)
54+
except Exception as e:
55+
self.fail(f"join() raised unexpected Exception {e}")
56+
57+
async def test_target_returning_false_stops_executor(self):
58+
if _IS_SYNC:
59+
ran = threading.Event()
60+
else:
61+
ran = asyncio.Event()
62+
63+
async def target():
64+
ran.set()
65+
return False
66+
67+
executor = self._make_executor(target=target)
68+
executor.open()
69+
await executor.join(timeout=2)
70+
self.assertTrue(ran.is_set(), "target never ran")
71+
72+
async def test_skip_sleep_flag_skips_interval(self):
73+
call_times = []
74+
75+
async def target():
76+
nonlocal call_times
77+
call_times.append(time.monotonic())
78+
if len(call_times) >= 2:
79+
return False
80+
return True
81+
82+
executor = self._make_executor(interval=30.0, min_interval=0.001, target=target)
83+
executor.skip_sleep()
84+
executor.open()
85+
await executor.join(timeout=3)
86+
self.assertGreaterEqual(len(call_times), 2)
87+
self.assertLess(call_times[1] - call_times[0], 5.0)
88+
89+
async def test_wake_causes_early_run(self):
90+
call_count = 0
91+
if _IS_SYNC:
92+
woken = threading.Event()
93+
else:
94+
woken = asyncio.Event()
95+
96+
async def target():
97+
nonlocal call_count
98+
call_count += 1
99+
if call_count == 1:
100+
woken.set()
101+
return call_count < 2
102+
103+
executor = self._make_executor(interval=30.0, min_interval=0.01, target=target)
104+
executor.open()
105+
if _IS_SYNC:
106+
woken.wait(timeout=2)
107+
else:
108+
assert isinstance(woken, asyncio.Event)
109+
await asyncio.wait_for(woken.wait(), timeout=2)
110+
executor.wake()
111+
await executor.join(timeout=3)
112+
self.assertGreaterEqual(call_count, 2)
113+
114+
async def test_update_interval_changes_next_wait(self):
115+
call_times = []
116+
117+
async def target():
118+
nonlocal call_times
119+
call_times.append(time.monotonic())
120+
if len(call_times) == 1:
121+
# Shorten the interval from 30s so the next run happens promptly.
122+
executor.update_interval(0.05)
123+
return True
124+
return False
125+
126+
executor = self._make_executor(interval=30.0, min_interval=0.01, target=target)
127+
executor.open()
128+
await executor.join(timeout=3)
129+
self.assertGreaterEqual(len(call_times), 2)
130+
self.assertLess(call_times[1] - call_times[0], 5.0)
131+
132+
async def test_open_after_target_returns_false(self):
133+
called = 0
134+
135+
async def target():
136+
nonlocal called
137+
called += 1
138+
return False
139+
140+
executor = self._make_executor(target=target)
141+
executor.open()
142+
await executor.join(timeout=2)
143+
executor.open()
144+
await executor.join(timeout=2)
145+
self.assertGreaterEqual(called, 2)
146+
147+
async def test_target_exception_stops_executor(self):
148+
call_count = 0
149+
150+
async def target():
151+
nonlocal call_count
152+
call_count += 1
153+
raise RuntimeError("error")
154+
155+
executor = self._make_executor(target=target)
156+
157+
if _IS_SYNC:
158+
# The exception re-raises on the executor's background thread,
159+
# which would otherwise trigger threading.excepthook and print a
160+
# noisy traceback. Swap it for a no-op for the duration of the test.
161+
original_excepthook = threading.excepthook
162+
threading.excepthook = lambda args: None
163+
self.addCleanup(setattr, threading, "excepthook", original_excepthook)
164+
165+
executor.open()
166+
await executor.join(timeout=2)
167+
if not _IS_SYNC and executor._task is not None and executor._task.done():
168+
# Retrieve the exception to avoid "Task exception was never
169+
# retrieved" warnings when the task is garbage collected.
170+
executor._task.exception()
171+
self.assertEqual(call_count, 1, "target should stop after raising")
172+
173+
# Re-opening after an exception restarts the executor. For the threaded
174+
# PeriodicExecutor this also exercises the _thread_will_exit join path
175+
# in open().
176+
executor.open()
177+
await executor.join(timeout=2)
178+
if not _IS_SYNC and executor._task is not None and executor._task.done():
179+
executor._task.exception()
180+
self.assertEqual(call_count, 2, "executor should run again after re-open")
181+
182+
183+
if __name__ == "__main__":
184+
unittest.main()

test/test_periodic_executor.py

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
# Copyright 2026-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Unit tests for periodic_executor.py."""
16+
17+
from __future__ import annotations
18+
19+
import asyncio
20+
import sys
21+
import threading
22+
import time
23+
24+
sys.path[0:0] = [""]
25+
26+
from test import UnitTest, unittest
27+
28+
from pymongo.periodic_executor import PeriodicExecutor
29+
30+
_IS_SYNC = True
31+
32+
33+
class TestPeriodicExecutor(UnitTest):
34+
def _make_executor(self, interval=30.0, min_interval=0.01, target=None, name="test"):
35+
if target is None:
36+
37+
def target():
38+
return True
39+
40+
executor = PeriodicExecutor(
41+
interval=interval, min_interval=min_interval, target=target, name=name
42+
)
43+
self.addCleanup(self._close_executor, executor)
44+
return executor
45+
46+
def _close_executor(self, executor):
47+
executor.close()
48+
executor.join(timeout=2)
49+
50+
def test_join_without_open_is_safe(self):
51+
executor = self._make_executor()
52+
try:
53+
executor.join(timeout=0.01)
54+
except Exception as e:
55+
self.fail(f"join() raised unexpected Exception {e}")
56+
57+
def test_target_returning_false_stops_executor(self):
58+
if _IS_SYNC:
59+
ran = threading.Event()
60+
else:
61+
ran = asyncio.Event()
62+
63+
def target():
64+
ran.set()
65+
return False
66+
67+
executor = self._make_executor(target=target)
68+
executor.open()
69+
executor.join(timeout=2)
70+
self.assertTrue(ran.is_set(), "target never ran")
71+
72+
def test_skip_sleep_flag_skips_interval(self):
73+
call_times = []
74+
75+
def target():
76+
nonlocal call_times
77+
call_times.append(time.monotonic())
78+
if len(call_times) >= 2:
79+
return False
80+
return True
81+
82+
executor = self._make_executor(interval=30.0, min_interval=0.001, target=target)
83+
executor.skip_sleep()
84+
executor.open()
85+
executor.join(timeout=3)
86+
self.assertGreaterEqual(len(call_times), 2)
87+
self.assertLess(call_times[1] - call_times[0], 5.0)
88+
89+
def test_wake_causes_early_run(self):
90+
call_count = 0
91+
if _IS_SYNC:
92+
woken = threading.Event()
93+
else:
94+
woken = asyncio.Event()
95+
96+
def target():
97+
nonlocal call_count
98+
call_count += 1
99+
if call_count == 1:
100+
woken.set()
101+
return call_count < 2
102+
103+
executor = self._make_executor(interval=30.0, min_interval=0.01, target=target)
104+
executor.open()
105+
if _IS_SYNC:
106+
woken.wait(timeout=2)
107+
else:
108+
assert isinstance(woken, asyncio.Event)
109+
asyncio.wait_for(woken.wait(), timeout=2)
110+
executor.wake()
111+
executor.join(timeout=3)
112+
self.assertGreaterEqual(call_count, 2)
113+
114+
def test_update_interval_changes_next_wait(self):
115+
call_times = []
116+
117+
def target():
118+
nonlocal call_times
119+
call_times.append(time.monotonic())
120+
if len(call_times) == 1:
121+
# Shorten the interval from 30s so the next run happens promptly.
122+
executor.update_interval(0.05)
123+
return True
124+
return False
125+
126+
executor = self._make_executor(interval=30.0, min_interval=0.01, target=target)
127+
executor.open()
128+
executor.join(timeout=3)
129+
self.assertGreaterEqual(len(call_times), 2)
130+
self.assertLess(call_times[1] - call_times[0], 5.0)
131+
132+
def test_open_after_target_returns_false(self):
133+
called = 0
134+
135+
def target():
136+
nonlocal called
137+
called += 1
138+
return False
139+
140+
executor = self._make_executor(target=target)
141+
executor.open()
142+
executor.join(timeout=2)
143+
executor.open()
144+
executor.join(timeout=2)
145+
self.assertGreaterEqual(called, 2)
146+
147+
def test_target_exception_stops_executor(self):
148+
call_count = 0
149+
150+
def target():
151+
nonlocal call_count
152+
call_count += 1
153+
raise RuntimeError("error")
154+
155+
executor = self._make_executor(target=target)
156+
157+
if _IS_SYNC:
158+
# The exception re-raises on the executor's background thread,
159+
# which would otherwise trigger threading.excepthook and print a
160+
# noisy traceback. Swap it for a no-op for the duration of the test.
161+
original_excepthook = threading.excepthook
162+
threading.excepthook = lambda args: None
163+
self.addCleanup(setattr, threading, "excepthook", original_excepthook)
164+
165+
executor.open()
166+
executor.join(timeout=2)
167+
if not _IS_SYNC and executor._task is not None and executor._task.done():
168+
# Retrieve the exception to avoid "Task exception was never
169+
# retrieved" warnings when the task is garbage collected.
170+
executor._task.exception()
171+
self.assertEqual(call_count, 1, "target should stop after raising")
172+
173+
# Re-opening after an exception restarts the executor. For the threaded
174+
# PeriodicExecutor this also exercises the _thread_will_exit join path
175+
# in open().
176+
executor.open()
177+
executor.join(timeout=2)
178+
if not _IS_SYNC and executor._task is not None and executor._task.done():
179+
executor._task.exception()
180+
self.assertEqual(call_count, 2, "executor should run again after re-open")
181+
182+
183+
if __name__ == "__main__":
184+
unittest.main()

tools/synchro.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ def async_only_test(f: str) -> bool:
252252
"test_monitoring.py",
253253
"test_mongos_load_balancing.py",
254254
"test_on_demand_csfle.py",
255+
"test_periodic_executor.py",
255256
"test_pooling.py",
256257
"test_raw_bson.py",
257258
"test_read_concern.py",

0 commit comments

Comments
 (0)