Skip to content

Commit d266eec

Browse files
Refactor: Improve Redis state manager and tests
Add stop method to RedisStateManager for graceful shutdown of the wake monitoring thread. Update tests to use unique app names and ensure proper cleanup. Add pytest-asyncio and ruff to dev dependencies. Update fastapi version to 0.115.14. Update fastloop version to 0.1.71. Co-authored-by: luke <luke@smartshare.io>
1 parent 96f6d1c commit d266eec

3 files changed

Lines changed: 781 additions & 704 deletions

File tree

fastloop/state/state_redis.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,20 +79,29 @@ def __init__(
7979

8080
self.wake_queue: Queue[str] = wake_queue
8181
self._stop_wake_monitor = threading.Event()
82+
self.wake_thread: threading.Thread | None = None
8283

8384
if self.wake_queue:
8485
self.wake_thread = threading.Thread(
8586
target=self._run_wake_monitoring, daemon=True
8687
)
8788
self.wake_thread.start()
8889

90+
def stop(self):
91+
"""Stop the wake monitoring thread."""
92+
self._stop_wake_monitor.set()
93+
if self.wake_thread and self.wake_thread.is_alive():
94+
self.wake_thread.join(timeout=2.0)
95+
8996
def _run_wake_monitoring(self):
9097
"""Background thread for reliable wake scheduling using ZSET + periodic reconciliation."""
9198
import redis as sync_redis
9299

93100
from ..logging import setup_logger
94101

95102
logger = setup_logger(__name__)
103+
rdb = None
104+
pubsub = None
96105

97106
try:
98107
rdb = sync_redis.Redis(
@@ -131,6 +140,13 @@ def _run_wake_monitoring(self):
131140

132141
except Exception as e:
133142
logger.error(f"Wake monitoring thread error: {e}")
143+
finally:
144+
if pubsub:
145+
with suppress(Exception):
146+
pubsub.close()
147+
if rdb:
148+
with suppress(Exception):
149+
rdb.close()
134150

135151
def _process_due_wakes(self, rdb) -> int:
136152
"""Process all wakes with score <= now. Returns count processed."""

tests/test_scheduling.py

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import asyncio
99
import os
1010
import time
11+
import uuid
1112
from queue import Queue
1213

1314
import pytest
@@ -40,27 +41,34 @@ def redis_config():
4041
)
4142

4243

44+
@pytest.fixture
45+
def app_name():
46+
"""Unique app name per test to prevent thread interference."""
47+
return f"test-app-{uuid.uuid4().hex[:8]}"
48+
49+
4350
@pytest.fixture
4451
def wake_queue():
4552
"""Create a queue for wake events."""
4653
return Queue()
4754

4855

4956
@pytest.fixture
50-
async def state_manager(redis_config, wake_queue):
57+
async def state_manager(redis_config, wake_queue, app_name):
5158
"""Create a Redis state manager connected to real Redis."""
5259
manager = RedisStateManager(
53-
app_name="test-app",
60+
app_name=app_name,
5461
config=redis_config,
5562
wake_queue=wake_queue,
5663
)
5764

58-
# Wait a moment for the wake monitoring thread to start and configure notifications
59-
await asyncio.sleep(0.1)
65+
# Wait for wake monitoring thread to start and configure notifications
66+
await asyncio.sleep(0.2)
6067

6168
yield manager
6269

63-
# Cleanup
70+
# Cleanup: stop wake thread first, then clear Redis
71+
manager.stop()
6472
await manager.rdb.flushdb()
6573

6674

@@ -88,25 +96,29 @@ async def loop_context(state_manager, loop_state):
8896
class TestSetWakeTime:
8997
"""Tests for the set_wake_time functionality."""
9098

91-
async def test_set_wake_time_adds_to_schedule(self, state_manager, loop_state):
99+
async def test_set_wake_time_adds_to_schedule(
100+
self, state_manager, loop_state, app_name
101+
):
92102
"""Test that set_wake_time adds the loop to the wake schedule ZSET."""
93103
wake_timestamp = time.time() + 5.0
94104
await state_manager.set_wake_time(loop_state.loop_id, wake_timestamp)
95105

96-
schedule_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name="test-app")
106+
schedule_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name=app_name)
97107

98108
# Check the loop is in the schedule with correct timestamp
99109
score = await state_manager.rdb.zscore(schedule_key, loop_state.loop_id)
100110
assert score is not None
101111
assert abs(score - wake_timestamp) < 0.1 # Within 100ms tolerance
102112

103-
async def test_set_wake_time_creates_ttl_key(self, state_manager, loop_state):
113+
async def test_set_wake_time_creates_ttl_key(
114+
self, state_manager, loop_state, app_name
115+
):
104116
"""Test that set_wake_time also creates a TTL key for fast wake."""
105117
wake_timestamp = time.time() + 5.0
106118
await state_manager.set_wake_time(loop_state.loop_id, wake_timestamp)
107119

108120
wake_key = RedisKeys.LOOP_WAKE_KEY.format(
109-
app_name="test-app",
121+
app_name=app_name,
110122
loop_id=loop_state.loop_id,
111123
)
112124

@@ -119,13 +131,13 @@ async def test_set_wake_time_creates_ttl_key(self, state_manager, loop_state):
119131
assert 4000 <= ttl_ms <= 5100
120132

121133
async def test_set_wake_time_with_subsecond_precision(
122-
self, state_manager, loop_state
134+
self, state_manager, loop_state, app_name
123135
):
124136
"""Test that sub-second durations work correctly."""
125137
wake_timestamp = time.time() + 0.5
126138
await state_manager.set_wake_time(loop_state.loop_id, wake_timestamp)
127139

128-
schedule_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name="test-app")
140+
schedule_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name=app_name)
129141
score = await state_manager.rdb.zscore(schedule_key, loop_state.loop_id)
130142
assert abs(score - wake_timestamp) < 0.1
131143

@@ -135,7 +147,9 @@ async def test_set_wake_time_past_timestamp_raises(self, state_manager, loop_sta
135147
with pytest.raises(ValueError, match="Timestamp is in the past"):
136148
await state_manager.set_wake_time(loop_state.loop_id, past_timestamp)
137149

138-
async def test_set_wake_time_overwrites_previous(self, state_manager, loop_state):
150+
async def test_set_wake_time_overwrites_previous(
151+
self, state_manager, loop_state, app_name
152+
):
139153
"""Test that setting a new wake time overwrites the previous one."""
140154
# Set initial wake time
141155
await state_manager.set_wake_time(loop_state.loop_id, time.time() + 60)
@@ -144,7 +158,7 @@ async def test_set_wake_time_overwrites_previous(self, state_manager, loop_state
144158
new_timestamp = time.time() + 5.0
145159
await state_manager.set_wake_time(loop_state.loop_id, new_timestamp)
146160

147-
schedule_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name="test-app")
161+
schedule_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name=app_name)
148162
score = await state_manager.rdb.zscore(schedule_key, loop_state.loop_id)
149163

150164
# Should have the new timestamp, not the old one
@@ -161,15 +175,17 @@ async def test_wake_via_keyspace_notification(
161175
await state_manager.set_wake_time(loop_state.loop_id, time.time() + 0.3)
162176

163177
# Wait for TTL expiration + notification processing
164-
await asyncio.sleep(1.0)
178+
await asyncio.sleep(1.5)
165179

166180
assert not wake_queue.empty(), "Wake queue should have the loop_id"
167181
assert wake_queue.get_nowait() == loop_state.loop_id
168182

169-
async def test_wake_via_reconciliation(self, state_manager, wake_queue, loop_state):
183+
async def test_wake_via_reconciliation(
184+
self, state_manager, wake_queue, loop_state, app_name
185+
):
170186
"""Test that periodic reconciliation catches due wakes."""
171187
# Directly add to schedule (simulating a wake that was set before restart)
172-
schedule_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name="test-app")
188+
schedule_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name=app_name)
173189
past_timestamp = time.time() - 1.0 # Already due
174190
await state_manager.rdb.zadd(schedule_key, {loop_state.loop_id: past_timestamp})
175191

@@ -180,19 +196,19 @@ async def test_wake_via_reconciliation(self, state_manager, wake_queue, loop_sta
180196
assert wake_queue.get_nowait() == loop_state.loop_id
181197

182198
async def test_wake_removes_from_schedule(
183-
self, state_manager, wake_queue, loop_state
199+
self, state_manager, wake_queue, loop_state, app_name
184200
):
185201
"""Test that woken loops are removed from the schedule."""
186202
await state_manager.set_wake_time(loop_state.loop_id, time.time() + 0.3)
187203

188-
schedule_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name="test-app")
204+
schedule_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name=app_name)
189205

190206
# Should be in schedule initially
191207
score = await state_manager.rdb.zscore(schedule_key, loop_state.loop_id)
192208
assert score is not None
193209

194210
# Wait for wake
195-
await asyncio.sleep(1.0)
211+
await asyncio.sleep(1.5)
196212

197213
# Drain the wake queue
198214
while not wake_queue.empty():
@@ -213,8 +229,8 @@ async def test_multiple_loops_wake_correctly(self, state_manager, wake_queue):
213229
loops.append(loop)
214230
await state_manager.set_wake_time(loop.loop_id, time.time() + 0.2 * (i + 1))
215231

216-
# Wait for all to wake
217-
await asyncio.sleep(1.5)
232+
# Wait for all to wake (longest is 0.6s + reconciliation buffer)
233+
await asyncio.sleep(2.0)
218234

219235
woken_ids = set()
220236
while not wake_queue.empty():
@@ -228,7 +244,7 @@ async def test_no_duplicate_wakes(self, state_manager, wake_queue, loop_state):
228244
await state_manager.set_wake_time(loop_state.loop_id, time.time() + 0.3)
229245

230246
# Wait long enough for both TTL expiry and reconciliation
231-
await asyncio.sleep(WAKE_RECONCILIATION_INTERVAL_S + 1.0)
247+
await asyncio.sleep(WAKE_RECONCILIATION_INTERVAL_S + 1.5)
232248

233249
# Should only have one wake
234250
woken_ids = []
@@ -246,7 +262,7 @@ async def test_overwriting_wake_time(self, state_manager, wake_queue, loop_state
246262
# Overwrite with short wake time
247263
await state_manager.set_wake_time(loop_state.loop_id, time.time() + 0.3)
248264

249-
await asyncio.sleep(1.0)
265+
await asyncio.sleep(1.5)
250266

251267
assert not wake_queue.empty()
252268
assert wake_queue.get_nowait() == loop_state.loop_id
@@ -260,18 +276,20 @@ async def test_sleep_for_triggers_wake(self, loop_context, wake_queue):
260276
with pytest.raises(LoopPausedError):
261277
await loop_context.sleep_for(0.3)
262278

263-
await asyncio.sleep(1.0)
279+
await asyncio.sleep(1.5)
264280

265281
assert not wake_queue.empty()
266282
assert wake_queue.get_nowait() == loop_context.loop_id
267283

268-
async def test_sleep_for_string_duration(self, loop_context, state_manager):
284+
async def test_sleep_for_string_duration(
285+
self, loop_context, state_manager, app_name
286+
):
269287
"""Test sleep_for with string durations."""
270288
with pytest.raises(LoopPausedError):
271289
await loop_context.sleep_for("5 seconds")
272290

273291
wake_key = RedisKeys.LOOP_WAKE_KEY.format(
274-
app_name="test-app",
292+
app_name=app_name,
275293
loop_id=loop_context.loop_id,
276294
)
277295
ttl_ms = await state_manager.rdb.pttl(wake_key)
@@ -303,7 +321,7 @@ async def test_sleep_until_triggers_wake(self, loop_context, wake_queue):
303321
with pytest.raises(LoopPausedError):
304322
await loop_context.sleep_until(future_time)
305323

306-
await asyncio.sleep(1.0)
324+
await asyncio.sleep(1.5)
307325

308326
assert not wake_queue.empty()
309327
assert wake_queue.get_nowait() == loop_context.loop_id

0 commit comments

Comments
 (0)