|
3 | 3 | import sys |
4 | 4 | from datetime import datetime, timedelta, timezone |
5 | 5 | from logging import basicConfig, getLogger |
6 | | -from typing import Any, Dict, List, Optional, Union |
| 6 | +from typing import Any, Dict, List, Optional, Tuple, Union |
7 | 7 | from zoneinfo import ZoneInfo |
8 | 8 |
|
9 | 9 | import pycron |
@@ -59,23 +59,22 @@ async def get_schedules(source: ScheduleSource) -> List[ScheduledTask]: |
59 | 59 |
|
60 | 60 | async def get_all_schedules( |
61 | 61 | scheduler: TaskiqScheduler, |
62 | | -) -> Dict[ScheduleSource, List[ScheduledTask]]: |
| 62 | +) -> List[Tuple[ScheduleSource, List[ScheduledTask]]]: |
63 | 63 | """ |
64 | 64 | Task to update all schedules. |
65 | 65 |
|
66 | 66 | This function updates all schedules |
67 | | - from all sources and returns a dict |
68 | | - with source as a key and list of |
69 | | - scheduled tasks as a value. |
| 67 | + from all sources and returns a list |
| 68 | + of (source, tasks) pairs. |
70 | 69 |
|
71 | 70 | :param scheduler: current scheduler. |
72 | | - :return: dict with source as a key and list of scheduled tasks as a value. |
| 71 | + :return: list of (source, tasks) pairs. |
73 | 72 | """ |
74 | 73 | logger.debug("Started schedule update.") |
75 | 74 | schedules: List[List[ScheduledTask]] = await asyncio.gather( |
76 | 75 | *[get_schedules(source) for source in scheduler.sources], |
77 | 76 | ) |
78 | | - return dict(zip(scheduler.sources, schedules)) |
| 77 | + return list(zip(scheduler.sources, schedules)) |
79 | 78 |
|
80 | 79 |
|
81 | 80 | class CronValueError(Exception): |
@@ -197,15 +196,15 @@ def __init__( |
197 | 196 | self.interval_tasks_last_run: dict[ScheduleId, datetime] = {} |
198 | 197 | self.time_tasks_last_run: dict[ScheduleId, datetime] = {} |
199 | 198 |
|
200 | | - self.scheduled_tasks: Dict[ScheduleSource, List[ScheduledTask]] = {} |
| 199 | + self.scheduled_tasks: List[Tuple[ScheduleSource, List[ScheduledTask]]] = [] |
201 | 200 | self.scheduled_tasks_updated_at: Optional[datetime] = None |
202 | 201 | self._update_schedules_task_future: Optional[asyncio.Task[Any]] = None |
203 | 202 |
|
204 | 203 | def _update_schedules_task_future_callback(self, task_: asyncio.Task[Any]) -> None: |
205 | 204 | self.scheduled_tasks = task_.result() |
206 | 205 |
|
207 | 206 | new_schedules_ids: set[ScheduleId] = set() |
208 | | - for source, task_list in self.scheduled_tasks.items(): |
| 207 | + for source, task_list in self.scheduled_tasks: |
209 | 208 | logger.debug("Got %d schedules from source %s.", len(task_list), source) |
210 | 209 | new_schedules_ids.update({t.schedule_id for t in task_list}) |
211 | 210 |
|
@@ -237,7 +236,7 @@ async def _update_scheduled_tasks(self) -> None: |
237 | 236 |
|
238 | 237 | def _mark_cron_tasks_as_already_run(self) -> None: |
239 | 238 | current_minute = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0) |
240 | | - for _, task_list in self.scheduled_tasks.items(): |
| 239 | + for _, task_list in self.scheduled_tasks: |
241 | 240 | for task in task_list: |
242 | 241 | if task.cron is not None: |
243 | 242 | self.cron_tasks_last_run[task.schedule_id] = current_minute |
@@ -328,7 +327,7 @@ async def run( |
328 | 327 | await self._update_scheduled_tasks() |
329 | 328 | self.scheduled_tasks_updated_at = now |
330 | 329 |
|
331 | | - for source, task_list in self.scheduled_tasks.items(): |
| 330 | + for source, task_list in self.scheduled_tasks: |
332 | 331 | for task in task_list: |
333 | 332 | is_ready_to_send: bool = self._is_schedule_ready_to_send( |
334 | 333 | task=task, |
|
0 commit comments