Skip to content

Commit 384c862

Browse files
committed
refactor: remove first run logic and update schedule fetching behavior
- Removed the `_is_first_run` flag to simplify the schedule fetching logic. - Updated `get_schedules` to fetch past time schedules on every call, ensuring no schedules are missed within the current minute and previous minute. - Enhanced documentation to reflect the new behavior of schedule retrieval.
1 parent 7934072 commit 384c862

1 file changed

Lines changed: 10 additions & 10 deletions

File tree

taskiq_redis/list_schedule_source.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ def __init__(
5454
if serializer is None:
5555
serializer = PickleSerializer()
5656
self._serializer = serializer
57-
self._is_first_run = True
5857
self._previous_schedule_source: ScheduleSource | None = None
5958
self._delete_schedules_after_migration: bool = True
6059
self._skip_past_schedules = skip_past_schedules
@@ -188,8 +187,9 @@ async def _get_previous_time_schedules(
188187
Uses the time index sorted set to look up past time keys
189188
instead of scanning all Redis keys.
190189
191-
This function is called only during the first run to minimize
192-
the number of requests to the Redis server.
190+
Called on every get_schedules invocation so that schedules
191+
added in a past minute (after the previous get_schedules call
192+
but before the minute rolled over) are never missed.
193193
194194
:param current_time: The reference time captured by the caller,
195195
used to derive the cutoff so that the "previous" and "current"
@@ -280,19 +280,19 @@ async def get_schedules(self) -> list["ScheduledTask"]:
280280
Get all schedules.
281281
282282
This function gets all the schedules from the schedule source.
283-
What it does is get all the cron schedules and time schedules
284-
for the current time and return them.
283+
What it does is get all the cron schedules, interval schedules,
284+
past time schedules, and current-minute time schedules and
285+
return them.
285286
286-
If it's the first run, it also gets all the time schedules
287-
that are in the past and haven't been sent yet.
287+
Past time schedules are fetched on every call so that
288+
schedules added after the previous call but before the
289+
minute rolled over are never missed.
288290
"""
289291
schedules = []
290292
current_time = datetime.datetime.now(datetime.timezone.utc)
291293
timed: list[bytes] = []
292-
# Only during first run, we need to get previous time schedules
293-
if not self._skip_past_schedules and self._is_first_run:
294+
if not self._skip_past_schedules:
294295
timed = await self._get_previous_time_schedules(current_time)
295-
self._is_first_run = False
296296
async with Redis(connection_pool=self._connection_pool) as redis:
297297
buffer = []
298298
crons = await redis.lrange(self._get_cron_key(), 0, -1) # type: ignore[misc]

0 commit comments

Comments
 (0)