Skip to content

Commit c0a59e3

Browse files
clean up and refactor
1 parent 073379e commit c0a59e3

22 files changed

Lines changed: 1155 additions & 1136 deletions

fastloop/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1+
"""
2+
FastLoop - A framework for building event-driven loop applications.
3+
"""
4+
15
from . import integrations
26
from .context import LoopContext
37
from .fastloop import FastLoop
4-
from .loop import Loop, LoopEvent, Workflow, WorkflowBlock
8+
from .loop import Loop
9+
from .models import LoopEvent, WorkflowBlock
510
from .types import BlockPlan, RetryPolicy, ScheduleType
11+
from .workflow import Workflow
612

713
__all__ = [
814
"BlockPlan",

fastloop/context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
WorkflowRepeatError,
1616
)
1717
from .logging import setup_logger
18-
from .loop import LoopEvent
18+
from .models import LoopEvent
1919
from .state.state import StateManager
2020
from .types import E, LoopEventSender
2121
from .utils import get_func_import_path

fastloop/fastloop.py

Lines changed: 31 additions & 263 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
1+
"""
2+
FastLoop application class.
3+
4+
This module contains the main FastLoop class that provides:
5+
- HTTP server with FastAPI
6+
- Loop and Workflow decorators
7+
- Event registration
8+
"""
9+
110
import asyncio
2-
from collections.abc import Callable, Coroutine
11+
from collections.abc import Callable
312
from contextlib import asynccontextmanager
413
from enum import Enum
514
from http import HTTPStatus
@@ -16,7 +25,6 @@
1625
from pydantic_core import PydanticUndefined
1726

1827
from .config import ConfigManager, create_config_manager
19-
from .constants import WATCHDOG_INTERVAL_S
2028
from .context import LoopContext
2129
from .exceptions import (
2230
LoopAlreadyDefinedError,
@@ -25,10 +33,13 @@
2533
)
2634
from .integrations import Integration
2735
from .logging import configure_logging, setup_logger
28-
from .loop import Loop, LoopEvent, LoopManager, Workflow, WorkflowManager
36+
from .loop import Loop, LoopManager
37+
from .models import LoopEvent
38+
from .monitor import LoopMonitor
2939
from .state.state import StateManager, create_state_manager
3040
from .types import BaseConfig, LoopStatus, RetryPolicy
3141
from .utils import get_func_import_path, import_func_from_path, infer_application_path
42+
from .workflow import Workflow, WorkflowManager
3243

3344
logger = setup_logger()
3445

@@ -45,6 +56,8 @@ def _resolve_event_key(event: str | Enum | type[LoopEvent] | None) -> str | None
4556

4657

4758
class FastLoop(FastAPI):
59+
"""Main application class that extends FastAPI with loop and workflow support."""
60+
4861
def __init__(
4962
self,
5063
name: str,
@@ -192,6 +205,8 @@ def loop(
192205
integrations: list[Integration] | None = None,
193206
stop_on_disconnect: bool = False,
194207
) -> Callable[[Callable[..., Any] | type[Loop]], Callable[..., Any] | type[Loop]]:
208+
"""Decorator to register a loop function or class."""
209+
195210
def _decorator(
196211
func_or_class: Callable[..., Any] | type[Loop],
197212
) -> Callable[..., Any] | type[Loop]:
@@ -433,6 +448,13 @@ async def _pause_handler(loop_id: str):
433448
response_model=None,
434449
)
435450

451+
self.add_api_route(
452+
path=f"/{name}/{{loop_id}}/stop",
453+
endpoint=_cancel_handler,
454+
methods=["POST"],
455+
response_model=None,
456+
)
457+
436458
self.add_api_route(
437459
path=f"/{name}/{{loop_id}}/pause",
438460
endpoint=_pause_handler,
@@ -445,6 +467,8 @@ async def _pause_handler(loop_id: str):
445467
return _decorator
446468

447469
def event(self, event_type: str) -> Callable[[type[LoopEvent]], type[LoopEvent]]:
470+
"""Decorator to register an event type."""
471+
448472
def _decorator(cls: type[LoopEvent]) -> type[LoopEvent]:
449473
cls.type = event_type
450474
self.register_event(cls)
@@ -465,6 +489,8 @@ def workflow(
465489
) -> Callable[
466490
[Callable[..., Any] | type[Workflow]], Callable[..., Any] | type[Workflow]
467491
]:
492+
"""Decorator to register a workflow function or class."""
493+
468494
def _decorator(
469495
func_or_class: Callable[..., Any] | type[Workflow],
470496
) -> Callable[..., Any] | type[Workflow]:
@@ -644,8 +670,7 @@ async def _event_handler(request: dict[str, Any]):
644670
return _decorator
645671

646672
async def restart_loop(self, loop_id: str) -> bool:
647-
"""Restart a loop using stored metadata (keyed by loop name)"""
648-
673+
"""Restart a loop using stored metadata (keyed by loop name)."""
649674
try:
650675
loop = await self.state_manager.get_loop(loop_id)
651676
loop_name = loop.loop_name
@@ -711,7 +736,7 @@ async def restart_loop(self, loop_id: str) -> bool:
711736
return False
712737

713738
async def has_active_clients(self, loop_id: str) -> bool:
714-
"""Check if a loop has any active SSE client connections"""
739+
"""Check if a loop has any active SSE client connections."""
715740
client_count = await self.state_manager.get_active_client_count(loop_id)
716741
return client_count > 0
717742

@@ -777,260 +802,3 @@ async def restart_workflow(self, workflow_run_id: str) -> bool:
777802
extra={"workflow_run_id": workflow_run_id, "error": str(e)},
778803
)
779804
return False
780-
781-
782-
class LoopMonitor:
783-
def __init__(
784-
self,
785-
state_manager: StateManager,
786-
loop_manager: LoopManager,
787-
restart_callback: Callable[[str], Coroutine[Any, Any, bool]],
788-
wake_queue: Queue[str],
789-
fastloop_instance: FastLoop,
790-
):
791-
self.state_manager = state_manager
792-
self.loop_manager = loop_manager
793-
self.restart_callback = restart_callback
794-
self.wake_queue = wake_queue
795-
self.fastloop_instance = fastloop_instance
796-
self._stop_event = asyncio.Event()
797-
self._app_start_processed = False
798-
799-
def stop(self) -> None:
800-
self._stop_event.set()
801-
802-
async def _process_wake(self, wake_id: str) -> None:
803-
logger.info(
804-
"Processing wake from queue",
805-
extra={"wake_id": wake_id},
806-
)
807-
if wake_id.startswith("workflow:"):
808-
workflow_run_id = wake_id[9:]
809-
if await self.state_manager.workflow_has_claim(workflow_run_id):
810-
logger.info(
811-
"Workflow has active claim, skipping wake",
812-
extra={"workflow_run_id": workflow_run_id},
813-
)
814-
return
815-
logger.info(
816-
"Workflow woke up, attempting restart",
817-
extra={"workflow_run_id": workflow_run_id},
818-
)
819-
try:
820-
if await self.fastloop_instance.restart_workflow(workflow_run_id):
821-
await self.state_manager.clear_workflow_wake_time(workflow_run_id)
822-
logger.info(
823-
"Workflow restarted successfully",
824-
extra={"workflow_run_id": workflow_run_id},
825-
)
826-
else:
827-
await self.state_manager.clear_workflow_wake_time(workflow_run_id)
828-
await self.state_manager.update_workflow_status(
829-
workflow_run_id, LoopStatus.STOPPED
830-
)
831-
logger.warning(
832-
"Workflow restart failed, marked as stopped",
833-
extra={"workflow_run_id": workflow_run_id},
834-
)
835-
except Exception as e:
836-
logger.error(
837-
"Error restarting workflow from wake",
838-
extra={"workflow_run_id": workflow_run_id, "error": str(e)},
839-
)
840-
else:
841-
loop_id = wake_id
842-
if await self.state_manager.has_claim(loop_id):
843-
return
844-
logger.info("Loop woke up, restarting", extra={"loop_id": loop_id})
845-
if not await self.restart_callback(loop_id):
846-
await self.state_manager.update_loop_status(loop_id, LoopStatus.STOPPED)
847-
848-
async def _check_orphaned_loops(self) -> None:
849-
running_loops = await self.state_manager.get_all_loops(
850-
status=LoopStatus.RUNNING
851-
)
852-
for loop in running_loops:
853-
if await self.state_manager.has_claim(loop.loop_id):
854-
continue
855-
logger.info(
856-
"Loop has no claim, restarting", extra={"loop_id": loop.loop_id}
857-
)
858-
if not await self.restart_callback(loop.loop_id):
859-
await self.state_manager.update_loop_status(
860-
loop.loop_id, LoopStatus.STOPPED
861-
)
862-
863-
async def _check_orphaned_workflows(self) -> None:
864-
running_workflows = await self.state_manager.get_all_workflows(
865-
status=LoopStatus.RUNNING
866-
)
867-
for workflow in running_workflows:
868-
if await self.state_manager.workflow_has_claim(workflow.workflow_run_id):
869-
continue
870-
logger.info(
871-
"Workflow has no claim, restarting",
872-
extra={
873-
"workflow_run_id": workflow.workflow_run_id,
874-
"block_index": workflow.current_block_index,
875-
},
876-
)
877-
if not await self.fastloop_instance.restart_workflow(
878-
workflow.workflow_run_id
879-
):
880-
await self.state_manager.update_workflow_status(
881-
workflow.workflow_run_id, LoopStatus.STOPPED
882-
)
883-
884-
async def _check_scheduled_workflows(self) -> None:
885-
"""Check for IDLE workflows with past-due scheduled wake times.
886-
887-
This is a backup mechanism that catches workflows that may have been
888-
removed from the ZSET but not yet processed (e.g., if the wake queue
889-
consumer failed or the wake monitoring thread died).
890-
"""
891-
import time
892-
893-
now = time.time()
894-
idle_workflows = await self.state_manager.get_all_workflows(
895-
status=LoopStatus.IDLE
896-
)
897-
for workflow in idle_workflows:
898-
if not workflow.scheduled_wake_time:
899-
continue
900-
if workflow.scheduled_wake_time > now:
901-
continue
902-
if await self.state_manager.workflow_has_claim(workflow.workflow_run_id):
903-
continue
904-
claimed_from_zset = await self.state_manager.try_claim_workflow_wake(
905-
workflow.workflow_run_id
906-
)
907-
logger.info(
908-
"IDLE workflow has past-due wake time, restarting",
909-
extra={
910-
"workflow_run_id": workflow.workflow_run_id,
911-
"scheduled_wake_time": workflow.scheduled_wake_time,
912-
"block_index": workflow.current_block_index,
913-
"claimed_from_zset": claimed_from_zset,
914-
},
915-
)
916-
if await self.fastloop_instance.restart_workflow(workflow.workflow_run_id):
917-
await self.state_manager.clear_workflow_wake_time(
918-
workflow.workflow_run_id
919-
)
920-
else:
921-
await self.state_manager.clear_workflow_wake_time(
922-
workflow.workflow_run_id
923-
)
924-
await self.state_manager.update_workflow_status(
925-
workflow.workflow_run_id, LoopStatus.STOPPED
926-
)
927-
928-
async def _check_disconnect_stops(self) -> None:
929-
active_ids = await self.loop_manager.active_loop_ids()
930-
for loop_id in active_ids:
931-
try:
932-
loop = await self.state_manager.get_loop(loop_id)
933-
except LoopNotFoundError:
934-
continue
935-
if not loop.loop_name:
936-
continue
937-
metadata = self.fastloop_instance._loop_metadata.get(loop.loop_name)
938-
if not metadata or not metadata.get("stop_on_disconnect"):
939-
continue
940-
if not await self.fastloop_instance.has_active_clients(loop_id):
941-
logger.info(
942-
"Loop has no clients, stopping",
943-
extra={"loop_id": loop_id, "loop_name": loop.loop_name},
944-
)
945-
await self.state_manager.update_loop_status(loop_id, LoopStatus.STOPPED)
946-
await self.loop_manager.stop(loop_id)
947-
948-
async def _process_app_start_callbacks(self) -> None:
949-
"""Call on_app_start for each non-stopped class-based loop instance."""
950-
all_loops = await self.state_manager.get_all_loops()
951-
952-
for loop in all_loops:
953-
if loop.status == LoopStatus.STOPPED or not loop.loop_name:
954-
continue
955-
956-
metadata = self.fastloop_instance._loop_metadata.get(loop.loop_name)
957-
loop_instance: Loop | None = (
958-
metadata.get("loop_instance") if metadata else None
959-
)
960-
if not loop_instance:
961-
continue
962-
963-
if not await self.state_manager.try_acquire_app_start_lock(loop.loop_id):
964-
continue
965-
966-
try:
967-
context = LoopContext(
968-
loop_id=loop.loop_id,
969-
initial_event=await self.state_manager.get_initial_event(
970-
loop.loop_id
971-
),
972-
state_manager=self.state_manager,
973-
integrations=metadata.get("integrations", []), # type: ignore
974-
)
975-
loop_instance.ctx = context
976-
977-
if await loop_instance.on_app_start(context):
978-
logger.info(
979-
"on_app_start returned True, starting loop",
980-
extra={"loop_id": loop.loop_id, "loop_name": loop.loop_name},
981-
)
982-
if not await self.restart_callback(loop.loop_id):
983-
await self.state_manager.update_loop_status(
984-
loop.loop_id, LoopStatus.STOPPED
985-
)
986-
except Exception as e:
987-
logger.error(
988-
"Error in on_app_start",
989-
extra={"loop_id": loop.loop_id, "error": str(e)},
990-
)
991-
finally:
992-
await self.state_manager.release_app_start_lock(loop.loop_id)
993-
994-
async def run(self):
995-
from queue import Empty
996-
997-
if not self._app_start_processed:
998-
await self._process_app_start_callbacks()
999-
self._app_start_processed = True
1000-
1001-
while not self._stop_event.is_set():
1002-
try:
1003-
# Process all pending wakes, handling errors individually
1004-
# Use get_nowait in a try/except to avoid race between empty() and get()
1005-
wakes_processed = 0
1006-
while True:
1007-
try:
1008-
wake_id = self.wake_queue.get_nowait()
1009-
wakes_processed += 1
1010-
try:
1011-
await self._process_wake(wake_id)
1012-
except Exception as e:
1013-
logger.error(
1014-
"Error processing wake",
1015-
extra={"wake_id": wake_id, "error": str(e)},
1016-
)
1017-
except Empty:
1018-
break
1019-
1020-
await self._check_orphaned_loops()
1021-
await self._check_orphaned_workflows()
1022-
await self._check_scheduled_workflows()
1023-
await self._check_disconnect_stops()
1024-
1025-
try:
1026-
await asyncio.wait_for(
1027-
self._stop_event.wait(), timeout=WATCHDOG_INTERVAL_S
1028-
)
1029-
break
1030-
except TimeoutError:
1031-
pass
1032-
except asyncio.CancelledError:
1033-
break
1034-
except Exception as e:
1035-
logger.error("Error in monitor", extra={"error": str(e)})
1036-
await asyncio.sleep(WATCHDOG_INTERVAL_S)

0 commit comments

Comments
 (0)