Skip to content

Commit 38148e9

Browse files
Audit and fix scheduling and state management (#15)
* Refactor: Improve sleep functionality and state management This commit introduces several improvements: - **Enhanced `LoopContext.sleep_for` and `LoopContext.sleep_until`**: - Added docstrings explaining functionality, arguments, and potential errors. - Implemented validation for sleep durations and timestamps to prevent invalid values. - `sleep_for` now raises a `ValueError` if the duration is not positive. - `sleep_until` now raises a `ValueError` if the timestamp is in the past. - **State Management Updates**: - **`LoopEvent` and `LoopState`**: - Renamed `to_json` methods to `to_dict` for clarity and consistency. - Added `to_string` methods for JSON serialization. - Kept `to_json` as a deprecated alias for `to_dict` for backward compatibility. - Changed `created_at` type in `LoopState` from `int` to `float` for better precision. - **`RedisStateManager`**: - Improved `_run_wake_monitoring` with better error handling and logging. - Modified `set_wake_time` to use milliseconds (`px`) for Redis TTL, allowing for more precise wake-up scheduling. - Updated `set_wake_time` to use the full wake key in `srem` for consistency. - Added a docstring to `has_claim` for better understanding. - **`S3StateManager`**: - Explicitly raised `NotImplementedError` in `set_wake_time` with a clear message indicating that S3 does not support wake time scheduling. - Added a private `_delete_object` method. - **Utility Function Update**: - **`infer_application_path`**: - Updated the function signature to accept `app_instance` instead of `self`. - Improved the docstring to clarify its purpose and arguments. - Enhanced introspection logic to correctly identify the application instance. Co-authored-by: luke <luke@smartshare.io> * Refactor: Use to_dict() instead of to_json() Co-authored-by: luke <luke@smartshare.io> * Changes made by Agent Co-authored-by: luke <luke@smartshare.io> * Add Ruff linting and formatting, and CI workflow Co-authored-by: luke <luke@smartshare.io> * Checkpoint before follow-up message Co-authored-by: luke <luke@smartshare.io> * Refactor: Use real Redis for scheduling tests This commit replaces fakeredis with a real Redis instance for scheduling tests. It also updates the CI to include a Redis service and modifies the Makefile to support starting/stopping a local Redis instance. The integration tests have been removed as they are now covered by the refactored unit tests. Co-authored-by: luke <luke@smartshare.io> * Refactor Redis wake scheduling for reliability Co-authored-by: luke <luke@smartshare.io> * Refactor: Improve state management and documentation This commit refactors the state management system, particularly for Redis, to enhance reliability and simplify the code. It also updates documentation and tests for better clarity and robustness. Co-authored-by: luke <luke@smartshare.io> * Refactor: Improve Redis state manager and tests Add stop method to RedisStateManager for graceful shutdown of the wake monitoring thread. Update tests to use unique app names and ensure proper cleanup. Add pytest-asyncio and ruff to dev dependencies. Update fastapi version to 0.115.14. Update fastloop version to 0.1.71. Co-authored-by: luke <luke@smartshare.io> * Refactor loop monitor and state management for efficiency Co-authored-by: luke <luke@smartshare.io> * Remove unused get_running_loop_ids method Co-authored-by: luke <luke@smartshare.io> --------- Co-authored-by: Cursor Agent <cursoragent@cursor.com>
1 parent a3dc7d1 commit 38148e9

15 files changed

Lines changed: 1509 additions & 912 deletions

File tree

.github/workflows/ci.yml

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
name: CI
2+
3+
on:
4+
push:
5+
branches: [main]
6+
pull_request:
7+
branches: [main]
8+
9+
jobs:
10+
test:
11+
runs-on: ubuntu-latest
12+
13+
services:
14+
redis:
15+
image: redis:7
16+
ports:
17+
- 6379:6379
18+
options: >-
19+
--health-cmd "redis-cli ping"
20+
--health-interval 10s
21+
--health-timeout 5s
22+
--health-retries 5
23+
24+
steps:
25+
- uses: actions/checkout@v4
26+
27+
- name: Install uv
28+
uses: astral-sh/setup-uv@v4
29+
with:
30+
version: "latest"
31+
32+
- name: Set up Python
33+
run: uv python install 3.12
34+
35+
- name: Install dependencies
36+
run: uv sync --all-extras --dev
37+
38+
- name: Run linter
39+
run: uv run ruff check .
40+
41+
- name: Run formatter check
42+
run: uv run ruff format --check .
43+
44+
- name: Run tests
45+
run: uv run pytest tests/ -v --tb=short
46+
env:
47+
REDIS_TEST_HOST: localhost
48+
REDIS_TEST_PORT: 6379
49+
50+
type-check:
51+
runs-on: ubuntu-latest
52+
steps:
53+
- uses: actions/checkout@v4
54+
55+
- name: Install uv
56+
uses: astral-sh/setup-uv@v4
57+
with:
58+
version: "latest"
59+
60+
- name: Set up Python
61+
run: uv python install 3.12
62+
63+
- name: Install dependencies
64+
run: uv sync --all-extras --dev
65+
66+
- name: Check Python syntax
67+
run: |
68+
uv run python -m py_compile fastloop/*.py
69+
uv run python -m py_compile fastloop/**/*.py

Makefile

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,40 @@
11
VERSION=$(shell grep '^version =' pyproject.toml | head -1 | cut -d'"' -f2)
22

3+
.PHONY: test test-verbose install-dev lint format check publish redis-start redis-stop
4+
5+
install-dev:
6+
uv sync --all-extras --dev
7+
8+
# Start Redis for local testing
9+
redis-start:
10+
docker run -d --name fastloop-test-redis -p 6379:6379 redis:7
11+
@echo "Redis started. Run 'make test' to run tests."
12+
13+
redis-stop:
14+
docker stop fastloop-test-redis && docker rm fastloop-test-redis
15+
16+
# Run tests (requires Redis - set REDIS_TEST_HOST=localhost or run 'make redis-start')
17+
test:
18+
REDIS_TEST_HOST=localhost uv run pytest tests/ -v
19+
20+
test-verbose:
21+
REDIS_TEST_HOST=localhost uv run pytest tests/ -v -s
22+
23+
test-scheduling:
24+
REDIS_TEST_HOST=localhost uv run pytest tests/test_scheduling.py -v -s
25+
26+
lint:
27+
uv run ruff check .
28+
29+
format:
30+
uv run ruff format .
31+
32+
format-check:
33+
uv run ruff format --check .
34+
35+
check: lint format-check test
36+
@echo "All checks passed!"
37+
338
publish:
439
rm -rf dist/
540
uv build

fastloop/context.py

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,19 @@
2121
from .integrations import Integration
2222

2323
logger = setup_logger(__name__)
24-
2524
T = TypeVar("T", bound="LoopContext")
2625

26+
_DURATION_RE = re.compile(
27+
r"^(\d+(?:\.\d+)?)\s*(seconds?|secs?|minutes?|mins?|hours?|hrs?|days?)$"
28+
)
29+
_UNIT_MULTIPLIERS = {
30+
"sec": 1,
31+
"min": 60,
32+
"hour": 3600,
33+
"hr": 3600,
34+
"day": 86400,
35+
}
36+
2737

2838
class LoopContext:
2939
def __init__(
@@ -72,9 +82,13 @@ def switch_to(self: T, func: Callable[[T], Awaitable[None]]):
7282
raise LoopContextSwitchError(func, self)
7383

7484
async def sleep_for(self, duration: float | str) -> None:
85+
"""Sleep the loop for a duration (float seconds or string like "5 seconds")."""
7586
if isinstance(duration, str):
7687
duration = self._parse_duration(duration)
7788

89+
if duration <= 0:
90+
raise ValueError("Sleep duration must be positive")
91+
7892
logger.info(
7993
f"Loop sleeping for {duration} seconds",
8094
extra={"loop_id": self.loop_id, "duration": duration},
@@ -84,6 +98,10 @@ async def sleep_for(self, duration: float | str) -> None:
8498
self.pause()
8599

86100
async def sleep_until(self, timestamp: float) -> None:
101+
"""Sleep the loop until a specific Unix timestamp."""
102+
if timestamp <= time.time():
103+
raise ValueError("Cannot sleep until a time in the past")
104+
87105
logger.info(
88106
f"Loop sleeping until {timestamp}",
89107
extra={"loop_id": self.loop_id, "timestamp": timestamp},
@@ -223,26 +241,12 @@ def should_pause(self) -> bool:
223241
return self._pause_requested
224242

225243
def _parse_duration(self, duration_str: str) -> float:
226-
duration_str = duration_str.lower().strip()
227-
228-
match = re.match(
229-
r"^(\d+(?:\.\d+)?)\s*(seconds?|secs?|minutes?|mins?|hours?|hrs?|days?)$",
230-
duration_str,
231-
)
244+
match = _DURATION_RE.match(duration_str.lower().strip())
232245
if not match:
233246
raise ValueError(f"Invalid duration format: {duration_str}")
234247

235-
value = float(match.group(1))
236-
unit = match.group(2)
237-
238-
# Convert to seconds
239-
if unit.startswith("sec"):
240-
return value
241-
elif unit.startswith("min"):
242-
return value * 60
243-
elif unit.startswith("hour") or unit.startswith("hr"):
244-
return value * 3600
245-
elif unit.startswith("day"):
246-
return value * 86400
247-
else:
248-
raise ValueError(f"Unknown time unit: {unit}")
248+
value, unit = float(match.group(1)), match.group(2)
249+
for prefix, mult in _UNIT_MULTIPLIERS.items():
250+
if unit.startswith(prefix):
251+
return value * mult
252+
return value

fastloop/fastloop.py

Lines changed: 57 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from .integrations import Integration
2323
from .logging import configure_logging, setup_logger
2424
from .loop import LoopEvent, LoopManager
25-
from .state.state import LoopState, StateManager, create_state_manager
25+
from .state.state import StateManager, create_state_manager
2626
from .types import BaseConfig, LoopStatus
2727
from .utils import get_func_import_path, import_func_from_path, infer_application_path
2828

@@ -338,7 +338,7 @@ async def _retrieve_handler(loop_id: str):
338338
) from e
339339

340340
return JSONResponse(
341-
content=loop.to_json(), media_type="application/json"
341+
content=loop.to_dict(), media_type="application/json"
342342
)
343343

344344
async def _stop_handler(loop_id: str):
@@ -499,120 +499,76 @@ def __init__(
499499
wake_queue: Queue[str],
500500
fastloop_instance: FastLoop,
501501
):
502-
self.state_manager: StateManager = state_manager
503-
self.loop_manager: LoopManager = loop_manager
504-
self.restart_callback: Callable[[str], Coroutine[Any, Any, bool]] = (
505-
restart_callback
506-
)
507-
self.wake_queue: Queue[str] = wake_queue
508-
self.fastloop_instance: FastLoop = fastloop_instance
509-
self._stop_event: asyncio.Event = asyncio.Event()
502+
self.state_manager = state_manager
503+
self.loop_manager = loop_manager
504+
self.restart_callback = restart_callback
505+
self.wake_queue = wake_queue
506+
self.fastloop_instance = fastloop_instance
507+
self._stop_event = asyncio.Event()
510508

511509
def stop(self) -> None:
512510
self._stop_event.set()
513511

514-
async def run(self):
515-
while not self._stop_event.is_set():
516-
try:
517-
if not self.wake_queue.empty():
518-
loop_id = self.wake_queue.get()
519-
if await self.state_manager.has_claim(loop_id):
520-
continue
521-
522-
logger.info(
523-
"Loop woke up, restarting",
524-
extra={"loop_id": loop_id},
525-
)
526-
if not await self.restart_callback(loop_id):
527-
await self.state_manager.update_loop_status(
528-
loop_id, LoopStatus.STOPPED
529-
)
530-
await self.loop_manager.stop(loop_id)
531-
532-
continue
533-
534-
loop_ids: set[str] = await self.state_manager.get_all_loop_ids()
535-
active_loop_ids: set[str] = await self.loop_manager.active_loop_ids()
536-
loops_running: set[str] = active_loop_ids.intersection(loop_ids)
537-
538-
for loop_id in loops_running:
539-
loop = await self.state_manager.get_loop(loop_id)
540-
541-
if (
542-
loop.status in LoopStatus.IDLE
543-
or loop.status == LoopStatus.STOPPED
544-
):
545-
if await self.state_manager.has_claim(loop_id):
546-
continue
547-
548-
logger.info(
549-
"Loop is idle or stopped, stopping",
550-
extra={"loop_id": loop_id},
551-
)
512+
async def _process_wake(self, loop_id: str) -> None:
513+
if await self.state_manager.has_claim(loop_id):
514+
return
515+
logger.info("Loop woke up, restarting", extra={"loop_id": loop_id})
516+
if not await self.restart_callback(loop_id):
517+
await self.state_manager.update_loop_status(loop_id, LoopStatus.STOPPED)
552518

553-
await self.loop_manager.stop(loop_id)
554-
continue
519+
async def _check_orphaned_loops(self) -> None:
520+
running_loops = await self.state_manager.get_all_loops(
521+
status=LoopStatus.RUNNING
522+
)
523+
for loop in running_loops:
524+
if await self.state_manager.has_claim(loop.loop_id):
525+
continue
526+
logger.info(
527+
"Loop has no claim, restarting", extra={"loop_id": loop.loop_id}
528+
)
529+
if not await self.restart_callback(loop.loop_id):
530+
await self.state_manager.update_loop_status(
531+
loop.loop_id, LoopStatus.STOPPED
532+
)
555533

556-
loops: list[LoopState] = await self.state_manager.get_all_loops(
557-
status=LoopStatus.RUNNING
534+
async def _check_disconnect_stops(self) -> None:
535+
active_ids = await self.loop_manager.active_loop_ids()
536+
for loop_id in active_ids:
537+
try:
538+
loop = await self.state_manager.get_loop(loop_id)
539+
except LoopNotFoundError:
540+
continue
541+
if not loop.loop_name:
542+
continue
543+
metadata = self.fastloop_instance._loop_metadata.get(loop.loop_name)
544+
if not metadata or not metadata.get("stop_on_disconnect"):
545+
continue
546+
if not await self.fastloop_instance.has_active_clients(loop_id):
547+
logger.info(
548+
"Loop has no clients, stopping",
549+
extra={"loop_id": loop_id, "loop_name": loop.loop_name},
558550
)
559-
for loop in loops:
560-
# Restart loop if it has no claim and is not idle (maybe the task crashed or was interrupted)
561-
if not await self.state_manager.has_claim(loop.loop_id):
562-
logger.info(
563-
"Loop has no claim, restarting",
564-
extra={
565-
"loop_id": loop.loop_id,
566-
},
567-
)
551+
await self.state_manager.update_loop_status(loop_id, LoopStatus.STOPPED)
552+
await self.loop_manager.stop(loop_id)
553+
554+
async def run(self):
555+
while not self._stop_event.is_set():
556+
try:
557+
while not self.wake_queue.empty():
558+
await self._process_wake(self.wake_queue.get_nowait())
568559

569-
if not await self.restart_callback(loop.loop_id):
570-
await self.state_manager.update_loop_status(
571-
loop.loop_id, LoopStatus.STOPPED
572-
)
573-
await self.loop_manager.stop(loop.loop_id)
574-
575-
continue
576-
577-
# Check for loops with stop_on_disconnect=true that have no active clients
578-
for loop in loops:
579-
if (
580-
loop.loop_name
581-
and loop.loop_name in self.fastloop_instance._loop_metadata
582-
):
583-
metadata = self.fastloop_instance._loop_metadata[loop.loop_name]
584-
if metadata.get("stop_on_disconnect", False):
585-
has_clients = (
586-
await self.fastloop_instance.has_active_clients(
587-
loop.loop_id
588-
)
589-
)
590-
if not has_clients:
591-
logger.info(
592-
"Loop has stop_on_disconnect=true and no active clients, stopping",
593-
extra={
594-
"loop_id": loop.loop_id,
595-
"loop_name": loop.loop_name,
596-
},
597-
)
598-
await self.state_manager.update_loop_status(
599-
loop.loop_id, LoopStatus.STOPPED
600-
)
601-
await self.loop_manager.stop(loop.loop_id)
560+
await self._check_orphaned_loops()
561+
await self._check_disconnect_stops()
602562

603563
try:
604564
await asyncio.wait_for(
605565
self._stop_event.wait(), timeout=WATCHDOG_INTERVAL_S
606566
)
607567
break
608568
except TimeoutError:
609-
continue
610-
569+
pass
611570
except asyncio.CancelledError:
612571
break
613-
except BaseException as e:
614-
logger.error(
615-
"Error in loop monitor",
616-
extra={"error": str(e)},
617-
)
572+
except Exception as e:
573+
logger.error("Error in loop monitor", extra={"error": str(e)})
618574
await asyncio.sleep(WATCHDOG_INTERVAL_S)

0 commit comments

Comments
 (0)