Skip to content

Commit da282d3

Browse files
committed
feat: implement lazy cleanup for time index in ListRedisScheduleSource
- Added `_maybe_cleanup_time_index` method to manage time index cleanup at most once per minute. - Introduced `_cleanup_time_index` method to remove stale entries older than one hour with empty time key lists. - Updated `delete_schedule` to call `_maybe_cleanup_time_index` for efficient cleanup. - Enhanced tests to verify the behavior of the new cleanup methods, ensuring proper handling of stale and recent entries.
1 parent fad3046 commit da282d3

2 files changed

Lines changed: 203 additions & 69 deletions

File tree

taskiq_redis/list_schedule_source.py

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import datetime
2+
import time as _time
23
from logging import getLogger
34
from typing import Any
45

@@ -58,6 +59,7 @@ def __init__(
5859
self._delete_schedules_after_migration: bool = True
5960
self._skip_past_schedules = skip_past_schedules
6061
self._populate_time_index = populate_time_index
62+
self._last_cleanup_time: float = 0
6163

6264
async def startup(self) -> None:
6365
"""
@@ -136,6 +138,42 @@ def _parse_time_key(self, key: str) -> datetime.datetime | None:
136138
logger.debug("Failed to parse time key %s", key)
137139
return None
138140

141+
async def _maybe_cleanup_time_index(self, redis: Redis) -> None: # type: ignore[type-arg]
142+
"""
143+
Run time index cleanup at most once per minute.
144+
145+
Called from delete_schedule after removing a time-based schedule,
146+
since that's the path where time key lists become empty.
147+
"""
148+
now = _time.monotonic()
149+
if now - self._last_cleanup_time < 60:
150+
return
151+
self._last_cleanup_time = now
152+
await self._cleanup_time_index(redis)
153+
154+
async def _cleanup_time_index(self, redis: Redis) -> None: # type: ignore[type-arg]
155+
"""
156+
Remove stale entries from the time index sorted set.
157+
158+
Only removes entries that are older than 1 hour AND whose
159+
corresponding time key list is empty (or no longer exists).
160+
This avoids a race condition where an eager cleanup in
161+
delete_schedule could remove an index entry right as
162+
add_schedule is creating a new schedule at the same minute.
163+
"""
164+
one_hour_ago = (
165+
datetime.datetime.now(datetime.timezone.utc)
166+
- datetime.timedelta(hours=1)
167+
).timestamp()
168+
stale_keys: list[bytes] = await redis.zrangebyscore(
169+
self._get_time_index_key(),
170+
"-inf",
171+
one_hour_ago,
172+
)
173+
for key in stale_keys:
174+
if await redis.llen(key) == 0:
175+
await redis.zrem(self._get_time_index_key(), key)
176+
139177
async def _get_previous_time_schedules(self) -> list[bytes]:
140178
"""
141179
Function that gets all timed schedules that are in the past.
@@ -185,14 +223,7 @@ async def delete_schedule(self, schedule_id: str) -> None:
185223
elif schedule.time is not None:
186224
time_key = self._get_time_key(schedule.time)
187225
await redis.lrem(time_key, 0, schedule_id) # type: ignore[misc]
188-
# If the time key list is now empty, clean up both
189-
# the list key and its entry in the time index.
190-
if await redis.llen(time_key) == 0:
191-
await redis.delete(time_key)
192-
await redis.zrem(
193-
self._get_time_index_key(),
194-
time_key,
195-
)
226+
await self._maybe_cleanup_time_index(redis)
196227
elif schedule.interval:
197228
await redis.lrem(self._get_interval_key(), 0, schedule_id) # type: ignore[misc]
198229

tests/test_list_schedule_source.py

Lines changed: 164 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,10 @@ async def test_time_index_populated_on_add(redis_url: str) -> None:
207207

208208
@pytest.mark.anyio
209209
@freeze_time("2025-01-01 00:00:00")
210-
async def test_time_index_cleaned_on_delete(redis_url: str) -> None:
211-
"""Test that deleting last schedule from a time key cleans the index."""
210+
async def test_time_index_not_eagerly_cleaned_on_delete(redis_url: str) -> None:
211+
"""Test that delete_schedule does NOT eagerly remove the index entry.
212+
This avoids a race condition where a concurrent add_schedule at the
213+
same minute could lose its index entry."""
212214
prefix = uuid.uuid4().hex
213215
source = ListRedisScheduleSource(redis_url, prefix=prefix)
214216
schedule = ScheduledTask(
@@ -227,55 +229,110 @@ async def test_time_index_cleaned_on_delete(redis_url: str) -> None:
227229

228230
await source.delete_schedule(schedule.schedule_id)
229231

230-
# After deletion, the index should be empty.
232+
# Index entry is still present (lazy cleanup handles it later).
231233
async with Redis(connection_pool=source._connection_pool) as redis:
232-
assert await redis.zcard(source._get_time_index_key()) == 0
233-
# The time key list itself should also be deleted.
234-
assert not await redis.exists(source._get_time_key(schedule.time))
234+
assert await redis.zcard(source._get_time_index_key()) == 1
235235

236236

237237
@pytest.mark.anyio
238-
@freeze_time("2025-01-01 00:00:00")
239-
async def test_time_index_not_cleaned_when_other_schedules_remain(
240-
redis_url: str,
241-
) -> None:
242-
"""Test that deleting one schedule doesn't remove the index entry
243-
when other schedules still exist at the same time."""
238+
async def test_cleanup_removes_old_empty_entries(redis_url: str) -> None:
239+
"""Test that _cleanup_time_index removes index entries that are
240+
older than 1 hour and whose time key lists are empty."""
244241
prefix = uuid.uuid4().hex
245-
source = ListRedisScheduleSource(redis_url, prefix=prefix)
246-
schedule_time = datetime.datetime.now(
247-
datetime.timezone.utc,
248-
) + datetime.timedelta(minutes=5)
249-
schedule1 = ScheduledTask(
250-
task_name="test_task_1",
251-
labels={},
252-
args=[],
253-
kwargs={},
254-
time=schedule_time,
255-
)
256-
schedule2 = ScheduledTask(
257-
task_name="test_task_2",
258-
labels={},
259-
args=[],
260-
kwargs={},
261-
time=schedule_time,
262-
)
263-
await source.add_schedule(schedule1)
264-
await source.add_schedule(schedule2)
242+
with freeze_time("2025-01-01 00:00:00"):
243+
source = ListRedisScheduleSource(redis_url, prefix=prefix)
244+
old_time = datetime.datetime(
245+
2024, 12, 31, 22, 0, tzinfo=datetime.timezone.utc,
246+
)
247+
schedule = ScheduledTask(
248+
task_name="test_task",
249+
labels={},
250+
args=[],
251+
kwargs={},
252+
time=old_time,
253+
)
254+
await source.add_schedule(schedule)
255+
# Prevent delete_schedule from triggering cleanup by pretending
256+
# cleanup just ran (rate limiter blocks it).
257+
import time
265258

266-
await source.delete_schedule(schedule1.schedule_id)
259+
source._last_cleanup_time = time.monotonic()
260+
await source.delete_schedule(schedule.schedule_id)
267261

268-
# Index should still have the entry because schedule2 remains.
262+
# Index still has the stale entry (cleanup was rate-limited).
269263
async with Redis(connection_pool=source._connection_pool) as redis:
270264
assert await redis.zcard(source._get_time_index_key()) == 1
271265

272-
await source.delete_schedule(schedule2.schedule_id)
266+
# Run cleanup directly — entry is > 1 hour old and empty.
267+
with freeze_time("2025-01-01 00:00:00"):
268+
async with Redis(connection_pool=source._connection_pool) as redis:
269+
await source._cleanup_time_index(redis)
273270

274-
# Now the index should be empty.
271+
# Now it should be cleaned up.
275272
async with Redis(connection_pool=source._connection_pool) as redis:
276273
assert await redis.zcard(source._get_time_index_key()) == 0
277274

278275

276+
@pytest.mark.anyio
277+
async def test_cleanup_keeps_non_empty_entries(redis_url: str) -> None:
278+
"""Test that _cleanup_time_index does NOT remove index entries whose
279+
time key lists still have schedules, even if older than 1 hour."""
280+
prefix = uuid.uuid4().hex
281+
with freeze_time("2025-01-01 00:00:00"):
282+
source = ListRedisScheduleSource(redis_url, prefix=prefix)
283+
old_time = datetime.datetime(
284+
2024, 12, 31, 22, 0, tzinfo=datetime.timezone.utc,
285+
)
286+
schedule = ScheduledTask(
287+
task_name="test_task",
288+
labels={},
289+
args=[],
290+
kwargs={},
291+
time=old_time,
292+
)
293+
await source.add_schedule(schedule)
294+
295+
# Run cleanup — entry is > 1 hour old but list is NOT empty.
296+
with freeze_time("2025-01-01 00:00:00"):
297+
async with Redis(connection_pool=source._connection_pool) as redis:
298+
await source._cleanup_time_index(redis)
299+
300+
# Entry should still be present.
301+
async with Redis(connection_pool=source._connection_pool) as redis:
302+
assert await redis.zcard(source._get_time_index_key()) == 1
303+
304+
305+
@pytest.mark.anyio
306+
async def test_cleanup_keeps_recent_empty_entries(redis_url: str) -> None:
307+
"""Test that _cleanup_time_index does NOT remove index entries that
308+
are less than 1 hour old, even if their time key lists are empty."""
309+
prefix = uuid.uuid4().hex
310+
with freeze_time("2025-01-01 00:00:00"):
311+
source = ListRedisScheduleSource(redis_url, prefix=prefix)
312+
# 30 minutes ago — within the 1-hour safety window.
313+
recent_time = datetime.datetime(
314+
2024, 12, 31, 23, 30, tzinfo=datetime.timezone.utc,
315+
)
316+
schedule = ScheduledTask(
317+
task_name="test_task",
318+
labels={},
319+
args=[],
320+
kwargs={},
321+
time=recent_time,
322+
)
323+
await source.add_schedule(schedule)
324+
await source.delete_schedule(schedule.schedule_id)
325+
326+
# Run cleanup — entry is empty but only 30 min old.
327+
with freeze_time("2025-01-01 00:00:00"):
328+
async with Redis(connection_pool=source._connection_pool) as redis:
329+
await source._cleanup_time_index(redis)
330+
331+
# Entry should still be present (not old enough).
332+
async with Redis(connection_pool=source._connection_pool) as redis:
333+
assert await redis.zcard(source._get_time_index_key()) == 1
334+
335+
279336
@pytest.mark.anyio
280337
@freeze_time("2025-01-01 00:00:00")
281338
async def test_past_schedules_found_via_time_index(redis_url: str) -> None:
@@ -343,37 +400,83 @@ async def test_populate_time_index_from_existing_keys(redis_url: str) -> None:
343400

344401

345402
@pytest.mark.anyio
346-
@freeze_time("2025-01-01 00:00:00")
347-
async def test_post_send_cleans_time_index(redis_url: str) -> None:
348-
"""Test that post_send (which calls delete_schedule for time tasks)
349-
properly cleans up the time index."""
403+
async def test_post_send_triggers_cleanup(redis_url: str) -> None:
404+
"""Test the full lifecycle: add schedule, get it, post_send it,
405+
then verify cleanup (triggered from delete_schedule) removes
406+
the stale index entry when it's > 1 hour old."""
350407
prefix = uuid.uuid4().hex
351-
source = ListRedisScheduleSource(redis_url, prefix=prefix)
352-
schedule = ScheduledTask(
353-
task_name="test_task",
354-
labels={},
355-
args=[],
356-
kwargs={},
357-
time=datetime.datetime.now(datetime.timezone.utc)
358-
- datetime.timedelta(minutes=3),
359-
)
360-
await source.add_schedule(schedule)
361408

362-
# First run picks up past schedules.
363-
schedules = await source.get_schedules()
364-
assert schedules == [schedule]
409+
with freeze_time("2025-01-01 02:00:00"):
410+
source = ListRedisScheduleSource(redis_url, prefix=prefix)
411+
schedule = ScheduledTask(
412+
task_name="test_task",
413+
labels={},
414+
args=[],
415+
kwargs={},
416+
time=datetime.datetime(
417+
2025, 1, 1, 0, 30, tzinfo=datetime.timezone.utc,
418+
),
419+
)
420+
await source.add_schedule(schedule)
365421

366-
# Simulate sending the task.
367-
for s in schedules:
368-
await source.post_send(s)
422+
# First run picks up past schedules.
423+
schedules = await source.get_schedules()
424+
assert schedules == [schedule]
369425

370-
# Time index should be empty now.
371-
async with Redis(connection_pool=source._connection_pool) as redis:
372-
assert await redis.zcard(source._get_time_index_key()) == 0
426+
# post_send -> delete_schedule -> _maybe_cleanup_time_index.
427+
# The entry is > 1 hour old and the list becomes empty,
428+
# so cleanup should remove it.
429+
for s in schedules:
430+
await source.post_send(s)
431+
432+
async with Redis(connection_pool=source._connection_pool) as redis:
433+
assert await redis.zcard(source._get_time_index_key()) == 0
373434

374435
# Second run should return nothing.
375-
schedules = await source.get_schedules()
376-
assert schedules == []
436+
with freeze_time("2025-01-01 02:01:00"):
437+
schedules = await source.get_schedules()
438+
assert schedules == []
439+
440+
441+
@pytest.mark.anyio
442+
async def test_cleanup_rate_limited(redis_url: str) -> None:
443+
"""Test that _maybe_cleanup_time_index only runs once per minute."""
444+
prefix = uuid.uuid4().hex
445+
446+
with freeze_time("2025-01-01 02:00:00"):
447+
source = ListRedisScheduleSource(redis_url, prefix=prefix)
448+
old_time = datetime.datetime(
449+
2025, 1, 1, 0, 30, tzinfo=datetime.timezone.utc,
450+
)
451+
sched1 = ScheduledTask(
452+
task_name="task1",
453+
labels={},
454+
args=[],
455+
kwargs={},
456+
time=old_time,
457+
)
458+
sched2 = ScheduledTask(
459+
task_name="task2",
460+
labels={},
461+
args=[],
462+
kwargs={},
463+
time=old_time,
464+
)
465+
await source.add_schedule(sched1)
466+
await source.add_schedule(sched2)
467+
468+
# First delete triggers cleanup (first call, _last_cleanup_time=0).
469+
# But the time key list still has sched2, so the entry is kept.
470+
await source.delete_schedule(sched1.schedule_id)
471+
async with Redis(connection_pool=source._connection_pool) as redis:
472+
assert await redis.zcard(source._get_time_index_key()) == 1
473+
474+
# Second delete happens within the same minute, so cleanup
475+
# is rate-limited and does NOT run — index entry remains
476+
# even though the list is now empty.
477+
await source.delete_schedule(sched2.schedule_id)
478+
async with Redis(connection_pool=source._connection_pool) as redis:
479+
assert await redis.zcard(source._get_time_index_key()) == 1
377480

378481

379482
@pytest.mark.anyio

0 commit comments

Comments
 (0)