-
Notifications
You must be signed in to change notification settings - Fork 762
Expand file tree
/
Copy pathrecurring_task.py
More file actions
64 lines (50 loc) · 2.08 KB
/
Copy pathrecurring_task.py
File metadata and controls
64 lines (50 loc) · 2.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from __future__ import annotations
import asyncio
from logging import getLogger
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Callable
from datetime import timedelta
from types import TracebackType
from typing_extensions import Self
logger = getLogger(__name__)
class RecurringTask:
"""Class for creating and managing recurring tasks.
Attributes:
func: The function to be executed repeatedly.
delay: The time delay (in seconds) between function calls.
task: The underlying task object.
"""
def __init__(self, func: Callable, delay: timedelta) -> None:
logger.debug(f'Calling RecurringTask.__init__(func={func.__name__}, delay={delay})...')
self.func = func
self.delay = delay
self.task: asyncio.Task | None = None
async def __aenter__(self) -> Self:
self.start()
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_traceback: TracebackType | None,
) -> None:
await self.stop()
async def _wrapper(self) -> None:
"""Continuously execute the provided function with the specified delay.
Run the function in a loop, waiting for the configured delay between executions.
Supports both synchronous and asynchronous functions.
"""
sleep_time_secs = self.delay.total_seconds()
while True:
await self.func() if asyncio.iscoroutinefunction(self.func) else self.func()
await asyncio.sleep(sleep_time_secs)
def start(self) -> None:
"""Start the recurring task execution."""
self.task = asyncio.create_task(self._wrapper(), name=f'Task-recurring-{self.func.__name__}')
async def stop(self) -> None:
"""Stop the recurring task execution."""
if self.task:
self.task.cancel()
# Ensure the task has a chance to properly handle the cancellation and any potential exceptions.
await asyncio.gather(self.task, return_exceptions=True)