Skip to content

Commit 0ed6d06

Browse files
committed
docs: update README and tests for time index cleanup logic
- Expanded README to include details on interval tasks and the new `{prefix}:time_index` sorted set for tracking schedules. - Updated cleanup logic in `ListRedisScheduleSource` to remove stale entries older than 5 minutes instead of 1 hour. - Modified tests to reflect the new 5-minute threshold for cleanup, ensuring accurate verification of the time index behavior.
1 parent 384c862 commit 0ed6d06

3 files changed

Lines changed: 46 additions & 28 deletions

File tree

README.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,26 @@ This is very ineficent and should not be used for high-volume schedules. Because
154154
This source holds values in lists.
155155
156156
* For cron tasks it uses key `{prefix}:cron`.
157+
* For interval tasks it uses key `{prefix}:interval`.
157158
* For timed schedules it uses key `{prefix}:time:{time}` where `{time}` is actually time where schedules should run.
159+
* A sorted set at `{prefix}:time_index` tracks all time keys with their unix timestamps as scores, so that past time schedules can be discovered via `ZRANGEBYSCORE` instead of scanning all Redis keys. Stale entries (older than 5 minutes with empty time key lists) are cleaned up automatically.
158160
159-
The main advantage of this approach is that we only fetch tasks we need to run at a given time and do not perform any excesive calls to redis.
161+
The main advantage of this approach is that we only fetch tasks we need to run at a given time and do not perform any excessive calls to redis.
162+
163+
#### `populate_time_index`
164+
165+
If you are upgrading from an older version that did not maintain the `{prefix}:time_index` sorted set, existing time keys will not be present in the index. Set `populate_time_index=True` once on startup to backfill the index via a one-time `SCAN`, then set it back to `False` for subsequent runs:
166+
167+
```python
168+
# First run after upgrading — backfills the time index
169+
source = ListRedisScheduleSource(
170+
"redis://localhost/1",
171+
populate_time_index=True,
172+
)
173+
174+
# All subsequent runs — no SCAN, uses the time index
175+
source = ListRedisScheduleSource("redis://localhost/1")
176+
```
160177
161178
162179
### Migration from one source to another

taskiq_redis/list_schedule_source.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,20 +154,20 @@ async def _cleanup_time_index(self, redis: Redis) -> None: # type: ignore[type-
154154
"""
155155
Remove stale entries from the time index sorted set.
156156
157-
Only removes entries that are older than 1 hour AND whose
157+
Only removes entries that are older than 5 minutes AND whose
158158
corresponding time key list is empty (or no longer exists).
159159
This avoids a race condition where an eager cleanup in
160160
delete_schedule could remove an index entry right as
161161
add_schedule is creating a new schedule at the same minute.
162162
"""
163-
one_hour_ago = (
163+
five_minutes_ago = (
164164
datetime.datetime.now(datetime.timezone.utc)
165-
- datetime.timedelta(hours=1)
165+
- datetime.timedelta(minutes=5)
166166
).timestamp()
167167
stale_keys: list[bytes] = await redis.zrangebyscore(
168168
self._get_time_index_key(),
169169
"-inf",
170-
one_hour_ago,
170+
five_minutes_ago,
171171
)
172172
for key in stale_keys:
173173
if await redis.llen(key) == 0:

tests/test_list_schedule_source.py

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -237,12 +237,13 @@ async def test_time_index_not_eagerly_cleaned_on_delete(redis_url: str) -> None:
237237
@pytest.mark.anyio
238238
async def test_cleanup_removes_old_empty_entries(redis_url: str) -> None:
239239
"""Test that _cleanup_time_index removes index entries that are
240-
older than 1 hour and whose time key lists are empty."""
240+
older than 5 minutes and whose time key lists are empty."""
241241
prefix = uuid.uuid4().hex
242-
with freeze_time("2025-01-01 00:00:00"):
242+
with freeze_time("2025-01-01 00:10:00"):
243243
source = ListRedisScheduleSource(redis_url, prefix=prefix)
244+
# 10 minutes before "now" — well past the 5-minute threshold.
244245
old_time = datetime.datetime(
245-
2024, 12, 31, 22, 0, tzinfo=datetime.timezone.utc,
246+
2025, 1, 1, 0, 0, tzinfo=datetime.timezone.utc,
246247
)
247248
schedule = ScheduledTask(
248249
task_name="test_task",
@@ -263,8 +264,8 @@ async def test_cleanup_removes_old_empty_entries(redis_url: str) -> None:
263264
async with Redis(connection_pool=source._connection_pool) as redis:
264265
assert await redis.zcard(source._get_time_index_key()) == 1
265266

266-
# Run cleanup directly — entry is > 1 hour old and empty.
267-
with freeze_time("2025-01-01 00:00:00"):
267+
# Run cleanup directly — entry is > 5 minutes old and empty.
268+
with freeze_time("2025-01-01 00:10:00"):
268269
async with Redis(connection_pool=source._connection_pool) as redis:
269270
await source._cleanup_time_index(redis)
270271

@@ -276,12 +277,12 @@ async def test_cleanup_removes_old_empty_entries(redis_url: str) -> None:
276277
@pytest.mark.anyio
277278
async def test_cleanup_keeps_non_empty_entries(redis_url: str) -> None:
278279
"""Test that _cleanup_time_index does NOT remove index entries whose
279-
time key lists still have schedules, even if older than 1 hour."""
280+
time key lists still have schedules, even if older than 5 minutes."""
280281
prefix = uuid.uuid4().hex
281-
with freeze_time("2025-01-01 00:00:00"):
282+
with freeze_time("2025-01-01 00:10:00"):
282283
source = ListRedisScheduleSource(redis_url, prefix=prefix)
283284
old_time = datetime.datetime(
284-
2024, 12, 31, 22, 0, tzinfo=datetime.timezone.utc,
285+
2025, 1, 1, 0, 0, tzinfo=datetime.timezone.utc,
285286
)
286287
schedule = ScheduledTask(
287288
task_name="test_task",
@@ -292,8 +293,8 @@ async def test_cleanup_keeps_non_empty_entries(redis_url: str) -> None:
292293
)
293294
await source.add_schedule(schedule)
294295

295-
# Run cleanup — entry is > 1 hour old but list is NOT empty.
296-
with freeze_time("2025-01-01 00:00:00"):
296+
# Run cleanup — entry is > 5 minutes old but list is NOT empty.
297+
with freeze_time("2025-01-01 00:10:00"):
297298
async with Redis(connection_pool=source._connection_pool) as redis:
298299
await source._cleanup_time_index(redis)
299300

@@ -305,13 +306,13 @@ async def test_cleanup_keeps_non_empty_entries(redis_url: str) -> None:
305306
@pytest.mark.anyio
306307
async def test_cleanup_keeps_recent_empty_entries(redis_url: str) -> None:
307308
"""Test that _cleanup_time_index does NOT remove index entries that
308-
are less than 1 hour old, even if their time key lists are empty."""
309+
are less than 5 minutes old, even if their time key lists are empty."""
309310
prefix = uuid.uuid4().hex
310-
with freeze_time("2025-01-01 00:00:00"):
311+
with freeze_time("2025-01-01 00:04:00"):
311312
source = ListRedisScheduleSource(redis_url, prefix=prefix)
312-
# 30 minutes ago — within the 1-hour safety window.
313+
# 2 minutes ago — within the 5-minute safety window.
313314
recent_time = datetime.datetime(
314-
2024, 12, 31, 23, 30, tzinfo=datetime.timezone.utc,
315+
2025, 1, 1, 0, 2, tzinfo=datetime.timezone.utc,
315316
)
316317
schedule = ScheduledTask(
317318
task_name="test_task",
@@ -323,8 +324,8 @@ async def test_cleanup_keeps_recent_empty_entries(redis_url: str) -> None:
323324
await source.add_schedule(schedule)
324325
await source.delete_schedule(schedule.schedule_id)
325326

326-
# Run cleanup — entry is empty but only 30 min old.
327-
with freeze_time("2025-01-01 00:00:00"):
327+
# Run cleanup — entry is empty but only 2 minutes old.
328+
with freeze_time("2025-01-01 00:04:00"):
328329
async with Redis(connection_pool=source._connection_pool) as redis:
329330
await source._cleanup_time_index(redis)
330331

@@ -403,18 +404,18 @@ async def test_populate_time_index_from_existing_keys(redis_url: str) -> None:
403404
async def test_post_send_triggers_cleanup(redis_url: str) -> None:
404405
"""Test the full lifecycle: add schedule, get it, post_send it,
405406
then verify cleanup (triggered from delete_schedule) removes
406-
the stale index entry when it's > 1 hour old."""
407+
the stale index entry when it's > 5 minutes old."""
407408
prefix = uuid.uuid4().hex
408409

409-
with freeze_time("2025-01-01 02:00:00"):
410+
with freeze_time("2025-01-01 00:10:00"):
410411
source = ListRedisScheduleSource(redis_url, prefix=prefix)
411412
schedule = ScheduledTask(
412413
task_name="test_task",
413414
labels={},
414415
args=[],
415416
kwargs={},
416417
time=datetime.datetime(
417-
2025, 1, 1, 0, 30, tzinfo=datetime.timezone.utc,
418+
2025, 1, 1, 0, 0, tzinfo=datetime.timezone.utc,
418419
),
419420
)
420421
await source.add_schedule(schedule)
@@ -424,7 +425,7 @@ async def test_post_send_triggers_cleanup(redis_url: str) -> None:
424425
assert schedules == [schedule]
425426

426427
# post_send -> delete_schedule -> _maybe_cleanup_time_index.
427-
# The entry is > 1 hour old and the list becomes empty,
428+
# The entry is > 5 minutes old and the list becomes empty,
428429
# so cleanup should remove it.
429430
for s in schedules:
430431
await source.post_send(s)
@@ -433,7 +434,7 @@ async def test_post_send_triggers_cleanup(redis_url: str) -> None:
433434
assert await redis.zcard(source._get_time_index_key()) == 0
434435

435436
# Second run should return nothing.
436-
with freeze_time("2025-01-01 02:01:00"):
437+
with freeze_time("2025-01-01 00:11:00"):
437438
schedules = await source.get_schedules()
438439
assert schedules == []
439440

@@ -443,10 +444,10 @@ async def test_cleanup_rate_limited(redis_url: str) -> None:
443444
"""Test that _maybe_cleanup_time_index only runs once per minute."""
444445
prefix = uuid.uuid4().hex
445446

446-
with freeze_time("2025-01-01 02:00:00"):
447+
with freeze_time("2025-01-01 00:10:00"):
447448
source = ListRedisScheduleSource(redis_url, prefix=prefix)
448449
old_time = datetime.datetime(
449-
2025, 1, 1, 0, 30, tzinfo=datetime.timezone.utc,
450+
2025, 1, 1, 0, 0, tzinfo=datetime.timezone.utc,
450451
)
451452
sched1 = ScheduledTask(
452453
task_name="task1",

0 commit comments

Comments
 (0)