Skip to content

Commit 96f6d1c

Browse files
Refactor: Improve state management and documentation
This commit refactors the state management system, particularly for Redis, to enhance reliability and simplify the code. It also updates documentation and tests for better clarity and robustness. Co-authored-by: luke <luke@smartshare.io>
1 parent da84238 commit 96f6d1c

7 files changed

Lines changed: 112 additions & 187 deletions

File tree

fastloop/context.py

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -72,19 +72,10 @@ def switch_to(self: T, func: Callable[[T], Awaitable[None]]):
7272
raise LoopContextSwitchError(func, self)
7373

7474
async def sleep_for(self, duration: float | str) -> None:
75-
"""
76-
Sleep the loop for a specified duration.
77-
78-
Args:
79-
duration: Either a float (seconds) or a string like "5 seconds", "2 minutes", "1 hour"
80-
81-
Raises:
82-
ValueError: If duration is invalid or negative
83-
NotImplementedError: If the state backend doesn't support wake times
84-
"""
75+
"""Sleep the loop for a duration (float seconds or string like "5 seconds")."""
8576
if isinstance(duration, str):
8677
duration = self._parse_duration(duration)
87-
78+
8879
if duration <= 0:
8980
raise ValueError("Sleep duration must be positive")
9081

@@ -93,24 +84,14 @@ async def sleep_for(self, duration: float | str) -> None:
9384
extra={"loop_id": self.loop_id, "duration": duration},
9485
)
9586

96-
wake_time = time.time() + duration
97-
await self.state_manager.set_wake_time(self.loop_id, wake_time)
87+
await self.state_manager.set_wake_time(self.loop_id, time.time() + duration)
9888
self.pause()
9989

10090
async def sleep_until(self, timestamp: float) -> None:
101-
"""
102-
Sleep the loop until a specific timestamp.
103-
104-
Args:
105-
timestamp: Unix timestamp when the loop should wake up
106-
107-
Raises:
108-
ValueError: If timestamp is in the past
109-
NotImplementedError: If the state backend doesn't support wake times
110-
"""
91+
"""Sleep the loop until a specific Unix timestamp."""
11192
if timestamp <= time.time():
11293
raise ValueError("Cannot sleep until a time in the past")
113-
94+
11495
logger.info(
11596
f"Loop sleeping until {timestamp}",
11697
extra={"loop_id": self.loop_id, "timestamp": timestamp},

fastloop/integrations/telnyx.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import httpx
44
from fastapi import Request
5+
from pydantic import Field
56

67
from ..integrations import Integration
78
from ..logging import setup_logger
@@ -22,14 +23,12 @@ class TelnyxRxMessageEvent(LoopEvent):
2223
text: str
2324
from_number: str
2425
to_numbers: list[str]
25-
media: list[dict[str, Any]] = []
26+
media: list[dict[str, Any]] = Field(default_factory=list)
2627
messaging_profile_id: str | None = None
2728
organization_id: str | None = None
2829
received_at: str | None = None
29-
tags: list[str] = []
30+
tags: list[str] = Field(default_factory=list)
3031
subject: str | None = None
31-
32-
# Raw payload just in case
3332
raw_payload: dict[str, Any]
3433

3534

fastloop/state/state_redis.py

Lines changed: 10 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -79,30 +79,19 @@ def __init__(
7979

8080
self.wake_queue: Queue[str] = wake_queue
8181
self._stop_wake_monitor = threading.Event()
82-
82+
8383
if self.wake_queue:
8484
self.wake_thread = threading.Thread(
8585
target=self._run_wake_monitoring, daemon=True
8686
)
8787
self.wake_thread.start()
8888

8989
def _run_wake_monitoring(self):
90-
"""
91-
Background thread for reliable wake scheduling.
92-
93-
Uses a hybrid approach for reliability:
94-
1. ZSET (sorted set) is the source of truth for all scheduled wakes
95-
2. Periodic reconciliation checks for due wakes every WAKE_RECONCILIATION_INTERVAL_S
96-
3. TTL keys + keyspace notifications provide low-latency wake for normal operation
97-
98-
This ensures wakes are never missed even if:
99-
- The service was down when a wake was due
100-
- Keyspace notifications were missed
101-
- Redis pub/sub disconnected temporarily
102-
"""
90+
"""Background thread for reliable wake scheduling using ZSET + periodic reconciliation."""
10391
import redis as sync_redis
92+
10493
from ..logging import setup_logger
105-
94+
10695
logger = setup_logger(__name__)
10796

10897
try:
@@ -114,23 +103,18 @@ def _run_wake_monitoring(self):
114103
ssl=self.config.ssl,
115104
)
116105

117-
# Enable keyspace notifications (best-effort, not required for reliability)
118106
with suppress(sync_redis.exceptions.ResponseError):
119107
rdb.config_set("notify-keyspace-events", "Ex")
120108

121-
# Process any wakes that were missed while we were down
122109
self._process_due_wakes(rdb)
123110

124-
# Set up pub/sub for low-latency wake notifications
125111
pubsub = rdb.pubsub()
126112
pubsub.psubscribe("__keyevent@*__:expired")
127-
128113
last_reconciliation = time.time()
129114

130115
while not self._stop_wake_monitor.is_set():
131-
# Non-blocking check for keyspace notifications
132116
message = pubsub.get_message(timeout=0.1)
133-
117+
134118
if message and message["type"] == "pmessage":
135119
try:
136120
key = message["data"].decode("utf-8")
@@ -140,7 +124,6 @@ def _run_wake_monitoring(self):
140124
except Exception as e:
141125
logger.error(f"Error processing wake notification: {e}")
142126

143-
# Periodic reconciliation - the reliability guarantee
144127
now = time.time()
145128
if now - last_reconciliation >= WAKE_RECONCILIATION_INTERVAL_S:
146129
self._process_due_wakes(rdb)
@@ -150,44 +133,24 @@ def _run_wake_monitoring(self):
150133
logger.error(f"Wake monitoring thread error: {e}")
151134

152135
def _process_due_wakes(self, rdb) -> int:
153-
"""
154-
Process all wakes that are due (score <= now).
155-
156-
Uses ZRANGEBYSCORE to atomically get and remove due entries.
157-
Returns the number of wakes processed.
158-
"""
136+
"""Process all wakes with score <= now. Returns count processed."""
159137
schedule_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name=self.app_name)
160138
now = time.time()
161139
processed = 0
162140

163-
# Get all due wakes (score <= now)
164141
due_wakes: list[bytes] = rdb.zrangebyscore(schedule_key, "-inf", now)
165-
166142
for loop_id_bytes in due_wakes:
167143
loop_id = loop_id_bytes.decode("utf-8")
168-
169-
# Atomically remove from schedule (only if still there with same score)
170-
# This prevents double-processing in multi-replica scenarios
171-
removed = rdb.zrem(schedule_key, loop_id)
172-
173-
if removed:
144+
if rdb.zrem(schedule_key, loop_id):
174145
self.wake_queue.put(loop_id)
175146
processed += 1
176147

177148
return processed
178149

179150
def _queue_wake(self, rdb, loop_id: str) -> bool:
180-
"""
181-
Queue a wake for a loop, removing it from the schedule.
182-
183-
Returns True if the wake was queued, False if already processed.
184-
"""
151+
"""Remove loop from schedule and queue wake. Returns True if queued."""
185152
schedule_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name=self.app_name)
186-
187-
# Remove from schedule - if it was there, queue the wake
188-
removed = rdb.zrem(schedule_key, loop_id)
189-
190-
if removed:
153+
if rdb.zrem(schedule_key, loop_id):
191154
self.wake_queue.put(loop_id)
192155
return True
193156
return False
@@ -474,27 +437,16 @@ async def pop_event(
474437
return None
475438

476439
async def set_wake_time(self, loop_id: str, timestamp: float) -> None:
477-
"""
478-
Schedule a wake time for a loop.
479-
480-
Uses two mechanisms for reliability:
481-
1. ZSET (sorted set) - Source of truth, survives restarts
482-
2. TTL key - Triggers keyspace notification for low-latency wake
483-
484-
The periodic reconciliation in _process_due_wakes ensures wakes
485-
are never missed even if keyspace notifications fail.
486-
"""
440+
"""Schedule a wake time. Uses ZSET (source of truth) + TTL key (fast notification)."""
487441
if timestamp <= time.time():
488442
raise ValueError("Timestamp is in the past")
489443

490444
schedule_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name=self.app_name)
491445
wake_key = RedisKeys.LOOP_WAKE_KEY.format(
492446
app_name=self.app_name, loop_id=loop_id
493447
)
494-
495448
ttl_ms = max(1, int((timestamp - time.time()) * 1000))
496449

497-
# Atomic: add to schedule and set TTL key
498450
async with self.rdb.pipeline(transaction=True) as pipe:
499451
pipe.zadd(schedule_key, {loop_id: timestamp})
500452
pipe.set(wake_key, "1", px=ttl_ms)

fastloop/state/state_s3.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,8 @@ def _put_bytes(self, key: str, data: bytes):
108108

109109
def _delete_object(self, key: str):
110110
"""Delete an object from S3."""
111-
try:
111+
with suppress(ClientError):
112112
self.s3.delete_object(Bucket=self.bucket, Key=key)
113-
except ClientError:
114-
pass # Ignore errors if object doesn't exist
115113

116114
async def _renew_lock_periodically(self, lock_key: str, renewal_interval: float):
117115
"""Background task to continuously renew the lock while process is alive"""
@@ -400,7 +398,7 @@ async def pop_event(
400398
async def set_wake_time(self, loop_id: str, timestamp: float) -> None:
401399
"""
402400
S3 does not support scheduled wake times.
403-
401+
404402
Wake time scheduling requires a pub/sub mechanism that S3 lacks.
405403
Consider using the Redis backend if you need sleep_for/sleep_until functionality.
406404
"""

fastloop/utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ def import_func_from_path(path: str) -> Callable[..., Any]:
4646
def infer_application_path(app_instance: Any, fallback_var: str = "app") -> str | None:
4747
"""
4848
Infer the application path for Hypercorn reload support.
49-
49+
5050
Try (1) to locate the app in its defining module and use 'module:var',
5151
else (2) derive module from sys.argv[0] and use 'module:fallback_var'.
52-
52+
5353
Args:
5454
app_instance: The FastLoop/FastAPI application instance
5555
fallback_var: Variable name to use as fallback (default: "app")
56-
56+
5757
Returns:
5858
Application path string like "module.path:app" or None if cannot be determined
5959
"""

tests/conftest.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,10 @@
22
Pytest configuration and shared fixtures for fastloop tests.
33
"""
44

5-
import pytest
6-
75

86
def pytest_configure(config):
97
"""Configure pytest with custom markers."""
108
config.addinivalue_line(
119
"markers", "slow: marks tests as slow (deselect with '-m \"not slow\"')"
1210
)
13-
config.addinivalue_line(
14-
"markers", "integration: marks tests as integration tests"
15-
)
11+
config.addinivalue_line("markers", "integration: marks tests as integration tests")

0 commit comments

Comments
 (0)