Skip to content

Commit 7934072

Browse files
committed
feat: enhance _get_previous_time_schedules to accept current_time parameter
- Updated the _get_previous_time_schedules method to take current_time as an argument, allowing for more precise cutoff calculations. - Adjusted the logic to use the provided current_time for determining previous schedules, ensuring no overlap with the current window. - Modified the call to _get_previous_time_schedules in the first run logic to pass the current_time parameter.
1 parent da282d3 commit 7934072

1 file changed

Lines changed: 12 additions & 5 deletions

File tree

taskiq_redis/list_schedule_source.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,10 @@ async def _cleanup_time_index(self, redis: Redis) -> None: # type: ignore[type-
174174
if await redis.llen(key) == 0:
175175
await redis.zrem(self._get_time_index_key(), key)
176176

177-
async def _get_previous_time_schedules(self) -> list[bytes]:
177+
async def _get_previous_time_schedules(
178+
self,
179+
current_time: datetime.datetime,
180+
) -> list[bytes]:
178181
"""
179182
Function that gets all timed schedules that are in the past.
180183
@@ -187,11 +190,15 @@ async def _get_previous_time_schedules(self) -> list[bytes]:
187190
188191
This function is called only during the first run to minimize
189192
the number of requests to the Redis server.
193+
194+
:param current_time: The reference time captured by the caller,
195+
used to derive the cutoff so that the "previous" and "current"
196+
windows never overlap.
190197
"""
191198
logger.info("Getting previous time schedules")
192-
minute_before = datetime.datetime.now(
193-
datetime.timezone.utc,
194-
).replace(second=0, microsecond=0) - datetime.timedelta(
199+
minute_before = current_time.replace(
200+
second=0, microsecond=0,
201+
) - datetime.timedelta(
195202
minutes=1,
196203
)
197204
schedules = []
@@ -284,7 +291,7 @@ async def get_schedules(self) -> list["ScheduledTask"]:
284291
timed: list[bytes] = []
285292
# Only during first run, we need to get previous time schedules
286293
if not self._skip_past_schedules and self._is_first_run:
287-
timed = await self._get_previous_time_schedules()
294+
timed = await self._get_previous_time_schedules(current_time)
288295
self._is_first_run = False
289296
async with Redis(connection_pool=self._connection_pool) as redis:
290297
buffer = []

0 commit comments

Comments
 (0)