Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
name: CI

on:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
test:
runs-on: ubuntu-latest

services:
redis:
image: redis:7
ports:
- 6379:6379
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5

steps:
- uses: actions/checkout@v4

- name: Install uv
uses: astral-sh/setup-uv@v4
with:
version: "latest"

- name: Set up Python
run: uv python install 3.12

- name: Install dependencies
run: uv sync --all-extras --dev

- name: Run linter
run: uv run ruff check .

- name: Run formatter check
run: uv run ruff format --check .

- name: Run tests
run: uv run pytest tests/ -v --tb=short
env:
REDIS_TEST_HOST: localhost
REDIS_TEST_PORT: 6379

type-check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Install uv
uses: astral-sh/setup-uv@v4
with:
version: "latest"

- name: Set up Python
run: uv python install 3.12

- name: Install dependencies
run: uv sync --all-extras --dev

- name: Check Python syntax
run: |
uv run python -m py_compile fastloop/*.py
uv run python -m py_compile fastloop/**/*.py
35 changes: 35 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,40 @@
VERSION=$(shell grep '^version =' pyproject.toml | head -1 | cut -d'"' -f2)

.PHONY: test test-verbose install-dev lint format check publish redis-start redis-stop

install-dev:
uv sync --all-extras --dev

# Start Redis for local testing
redis-start:
docker run -d --name fastloop-test-redis -p 6379:6379 redis:7
@echo "Redis started. Run 'make test' to run tests."

redis-stop:
docker stop fastloop-test-redis && docker rm fastloop-test-redis

# Run tests (requires Redis - set REDIS_TEST_HOST=localhost or run 'make redis-start')
test:
REDIS_TEST_HOST=localhost uv run pytest tests/ -v

test-verbose:
REDIS_TEST_HOST=localhost uv run pytest tests/ -v -s

test-scheduling:
REDIS_TEST_HOST=localhost uv run pytest tests/test_scheduling.py -v -s

lint:
uv run ruff check .

format:
uv run ruff format .

format-check:
uv run ruff format --check .

check: lint format-check test
@echo "All checks passed!"

publish:
rm -rf dist/
uv build
Expand Down
46 changes: 25 additions & 21 deletions fastloop/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,19 @@
from .integrations import Integration

logger = setup_logger(__name__)

T = TypeVar("T", bound="LoopContext")

_DURATION_RE = re.compile(
r"^(\d+(?:\.\d+)?)\s*(seconds?|secs?|minutes?|mins?|hours?|hrs?|days?)$"
)
_UNIT_MULTIPLIERS = {
"sec": 1,
"min": 60,
"hour": 3600,
"hr": 3600,
"day": 86400,
}


class LoopContext:
def __init__(
Expand Down Expand Up @@ -72,9 +82,13 @@ def switch_to(self: T, func: Callable[[T], Awaitable[None]]):
raise LoopContextSwitchError(func, self)

async def sleep_for(self, duration: float | str) -> None:
"""Sleep the loop for a duration (float seconds or string like "5 seconds")."""
if isinstance(duration, str):
duration = self._parse_duration(duration)

if duration <= 0:
raise ValueError("Sleep duration must be positive")

logger.info(
f"Loop sleeping for {duration} seconds",
extra={"loop_id": self.loop_id, "duration": duration},
Expand All @@ -84,6 +98,10 @@ async def sleep_for(self, duration: float | str) -> None:
self.pause()

async def sleep_until(self, timestamp: float) -> None:
"""Sleep the loop until a specific Unix timestamp."""
if timestamp <= time.time():
raise ValueError("Cannot sleep until a time in the past")

logger.info(
f"Loop sleeping until {timestamp}",
extra={"loop_id": self.loop_id, "timestamp": timestamp},
Expand Down Expand Up @@ -223,26 +241,12 @@ def should_pause(self) -> bool:
return self._pause_requested

def _parse_duration(self, duration_str: str) -> float:
duration_str = duration_str.lower().strip()

match = re.match(
r"^(\d+(?:\.\d+)?)\s*(seconds?|secs?|minutes?|mins?|hours?|hrs?|days?)$",
duration_str,
)
match = _DURATION_RE.match(duration_str.lower().strip())
if not match:
raise ValueError(f"Invalid duration format: {duration_str}")

value = float(match.group(1))
unit = match.group(2)

# Convert to seconds
if unit.startswith("sec"):
return value
elif unit.startswith("min"):
return value * 60
elif unit.startswith("hour") or unit.startswith("hr"):
return value * 3600
elif unit.startswith("day"):
return value * 86400
else:
raise ValueError(f"Unknown time unit: {unit}")
value, unit = float(match.group(1)), match.group(2)
for prefix, mult in _UNIT_MULTIPLIERS.items():
if unit.startswith(prefix):
return value * mult
return value
158 changes: 57 additions & 101 deletions fastloop/fastloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from .integrations import Integration
from .logging import configure_logging, setup_logger
from .loop import LoopEvent, LoopManager
from .state.state import LoopState, StateManager, create_state_manager
from .state.state import StateManager, create_state_manager
from .types import BaseConfig, LoopStatus
from .utils import get_func_import_path, import_func_from_path, infer_application_path

Expand Down Expand Up @@ -338,7 +338,7 @@ async def _retrieve_handler(loop_id: str):
) from e

return JSONResponse(
content=loop.to_json(), media_type="application/json"
content=loop.to_dict(), media_type="application/json"
)

async def _stop_handler(loop_id: str):
Expand Down Expand Up @@ -499,120 +499,76 @@ def __init__(
wake_queue: Queue[str],
fastloop_instance: FastLoop,
):
self.state_manager: StateManager = state_manager
self.loop_manager: LoopManager = loop_manager
self.restart_callback: Callable[[str], Coroutine[Any, Any, bool]] = (
restart_callback
)
self.wake_queue: Queue[str] = wake_queue
self.fastloop_instance: FastLoop = fastloop_instance
self._stop_event: asyncio.Event = asyncio.Event()
self.state_manager = state_manager
self.loop_manager = loop_manager
self.restart_callback = restart_callback
self.wake_queue = wake_queue
self.fastloop_instance = fastloop_instance
self._stop_event = asyncio.Event()

def stop(self) -> None:
self._stop_event.set()

async def run(self):
while not self._stop_event.is_set():
try:
if not self.wake_queue.empty():
loop_id = self.wake_queue.get()
if await self.state_manager.has_claim(loop_id):
continue

logger.info(
"Loop woke up, restarting",
extra={"loop_id": loop_id},
)
if not await self.restart_callback(loop_id):
await self.state_manager.update_loop_status(
loop_id, LoopStatus.STOPPED
)
await self.loop_manager.stop(loop_id)

continue

loop_ids: set[str] = await self.state_manager.get_all_loop_ids()
active_loop_ids: set[str] = await self.loop_manager.active_loop_ids()
loops_running: set[str] = active_loop_ids.intersection(loop_ids)

for loop_id in loops_running:
loop = await self.state_manager.get_loop(loop_id)

if (
loop.status in LoopStatus.IDLE
or loop.status == LoopStatus.STOPPED
):
if await self.state_manager.has_claim(loop_id):
continue

logger.info(
"Loop is idle or stopped, stopping",
extra={"loop_id": loop_id},
)
async def _process_wake(self, loop_id: str) -> None:
if await self.state_manager.has_claim(loop_id):
return
logger.info("Loop woke up, restarting", extra={"loop_id": loop_id})
if not await self.restart_callback(loop_id):
await self.state_manager.update_loop_status(loop_id, LoopStatus.STOPPED)

await self.loop_manager.stop(loop_id)
continue
async def _check_orphaned_loops(self) -> None:
running_loops = await self.state_manager.get_all_loops(
status=LoopStatus.RUNNING
)
for loop in running_loops:
if await self.state_manager.has_claim(loop.loop_id):
continue
logger.info(
"Loop has no claim, restarting", extra={"loop_id": loop.loop_id}
)
if not await self.restart_callback(loop.loop_id):
await self.state_manager.update_loop_status(
loop.loop_id, LoopStatus.STOPPED
)

loops: list[LoopState] = await self.state_manager.get_all_loops(
status=LoopStatus.RUNNING
async def _check_disconnect_stops(self) -> None:
active_ids = await self.loop_manager.active_loop_ids()
for loop_id in active_ids:
try:
loop = await self.state_manager.get_loop(loop_id)
except LoopNotFoundError:
continue
if not loop.loop_name:
continue
metadata = self.fastloop_instance._loop_metadata.get(loop.loop_name)
if not metadata or not metadata.get("stop_on_disconnect"):
continue
if not await self.fastloop_instance.has_active_clients(loop_id):
logger.info(
"Loop has no clients, stopping",
extra={"loop_id": loop_id, "loop_name": loop.loop_name},
)
for loop in loops:
# Restart loop if it has no claim and is not idle (maybe the task crashed or was interrupted)
if not await self.state_manager.has_claim(loop.loop_id):
logger.info(
"Loop has no claim, restarting",
extra={
"loop_id": loop.loop_id,
},
)
await self.state_manager.update_loop_status(loop_id, LoopStatus.STOPPED)
await self.loop_manager.stop(loop_id)

async def run(self):
while not self._stop_event.is_set():
try:
while not self.wake_queue.empty():
await self._process_wake(self.wake_queue.get_nowait())

if not await self.restart_callback(loop.loop_id):
await self.state_manager.update_loop_status(
loop.loop_id, LoopStatus.STOPPED
)
await self.loop_manager.stop(loop.loop_id)

continue

# Check for loops with stop_on_disconnect=true that have no active clients
for loop in loops:
if (
loop.loop_name
and loop.loop_name in self.fastloop_instance._loop_metadata
):
metadata = self.fastloop_instance._loop_metadata[loop.loop_name]
if metadata.get("stop_on_disconnect", False):
has_clients = (
await self.fastloop_instance.has_active_clients(
loop.loop_id
)
)
if not has_clients:
logger.info(
"Loop has stop_on_disconnect=true and no active clients, stopping",
extra={
"loop_id": loop.loop_id,
"loop_name": loop.loop_name,
},
)
await self.state_manager.update_loop_status(
loop.loop_id, LoopStatus.STOPPED
)
await self.loop_manager.stop(loop.loop_id)
await self._check_orphaned_loops()
await self._check_disconnect_stops()

try:
await asyncio.wait_for(
self._stop_event.wait(), timeout=WATCHDOG_INTERVAL_S
)
break
except TimeoutError:
continue

pass
except asyncio.CancelledError:
break
except BaseException as e:
logger.error(
"Error in loop monitor",
extra={"error": str(e)},
)
except Exception as e:
logger.error("Error in loop monitor", extra={"error": str(e)})
await asyncio.sleep(WATCHDOG_INTERVAL_S)
Loading