Skip to content

Commit ef372bc

Browse files
Refactor loop monitor and state management for efficiency
Co-authored-by: luke <luke@smartshare.io>
1 parent d266eec commit ef372bc

5 files changed

Lines changed: 144 additions & 169 deletions

File tree

fastloop/context.py

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,19 @@
2121
from .integrations import Integration
2222

2323
logger = setup_logger(__name__)
24-
2524
T = TypeVar("T", bound="LoopContext")
2625

26+
_DURATION_RE = re.compile(
27+
r"^(\d+(?:\.\d+)?)\s*(seconds?|secs?|minutes?|mins?|hours?|hrs?|days?)$"
28+
)
29+
_UNIT_MULTIPLIERS = {
30+
"sec": 1,
31+
"min": 60,
32+
"hour": 3600,
33+
"hr": 3600,
34+
"day": 86400,
35+
}
36+
2737

2838
class LoopContext:
2939
def __init__(
@@ -231,26 +241,12 @@ def should_pause(self) -> bool:
231241
return self._pause_requested
232242

233243
def _parse_duration(self, duration_str: str) -> float:
234-
duration_str = duration_str.lower().strip()
235-
236-
match = re.match(
237-
r"^(\d+(?:\.\d+)?)\s*(seconds?|secs?|minutes?|mins?|hours?|hrs?|days?)$",
238-
duration_str,
239-
)
244+
match = _DURATION_RE.match(duration_str.lower().strip())
240245
if not match:
241246
raise ValueError(f"Invalid duration format: {duration_str}")
242247

243-
value = float(match.group(1))
244-
unit = match.group(2)
245-
246-
# Convert to seconds
247-
if unit.startswith("sec"):
248-
return value
249-
elif unit.startswith("min"):
250-
return value * 60
251-
elif unit.startswith("hour") or unit.startswith("hr"):
252-
return value * 3600
253-
elif unit.startswith("day"):
254-
return value * 86400
255-
else:
256-
raise ValueError(f"Unknown time unit: {unit}")
248+
value, unit = float(match.group(1)), match.group(2)
249+
for prefix, mult in _UNIT_MULTIPLIERS.items():
250+
if unit.startswith(prefix):
251+
return value * mult
252+
return value

fastloop/fastloop.py

Lines changed: 56 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from .integrations import Integration
2323
from .logging import configure_logging, setup_logger
2424
from .loop import LoopEvent, LoopManager
25-
from .state.state import LoopState, StateManager, create_state_manager
25+
from .state.state import StateManager, create_state_manager
2626
from .types import BaseConfig, LoopStatus
2727
from .utils import get_func_import_path, import_func_from_path, infer_application_path
2828

@@ -499,120 +499,76 @@ def __init__(
499499
wake_queue: Queue[str],
500500
fastloop_instance: FastLoop,
501501
):
502-
self.state_manager: StateManager = state_manager
503-
self.loop_manager: LoopManager = loop_manager
504-
self.restart_callback: Callable[[str], Coroutine[Any, Any, bool]] = (
505-
restart_callback
506-
)
507-
self.wake_queue: Queue[str] = wake_queue
508-
self.fastloop_instance: FastLoop = fastloop_instance
509-
self._stop_event: asyncio.Event = asyncio.Event()
502+
self.state_manager = state_manager
503+
self.loop_manager = loop_manager
504+
self.restart_callback = restart_callback
505+
self.wake_queue = wake_queue
506+
self.fastloop_instance = fastloop_instance
507+
self._stop_event = asyncio.Event()
510508

511509
def stop(self) -> None:
512510
self._stop_event.set()
513511

514-
async def run(self):
515-
while not self._stop_event.is_set():
516-
try:
517-
if not self.wake_queue.empty():
518-
loop_id = self.wake_queue.get()
519-
if await self.state_manager.has_claim(loop_id):
520-
continue
521-
522-
logger.info(
523-
"Loop woke up, restarting",
524-
extra={"loop_id": loop_id},
525-
)
526-
if not await self.restart_callback(loop_id):
527-
await self.state_manager.update_loop_status(
528-
loop_id, LoopStatus.STOPPED
529-
)
530-
await self.loop_manager.stop(loop_id)
531-
532-
continue
533-
534-
loop_ids: set[str] = await self.state_manager.get_all_loop_ids()
535-
active_loop_ids: set[str] = await self.loop_manager.active_loop_ids()
536-
loops_running: set[str] = active_loop_ids.intersection(loop_ids)
537-
538-
for loop_id in loops_running:
539-
loop = await self.state_manager.get_loop(loop_id)
540-
541-
if (
542-
loop.status in LoopStatus.IDLE
543-
or loop.status == LoopStatus.STOPPED
544-
):
545-
if await self.state_manager.has_claim(loop_id):
546-
continue
547-
548-
logger.info(
549-
"Loop is idle or stopped, stopping",
550-
extra={"loop_id": loop_id},
551-
)
512+
async def _process_wake(self, loop_id: str) -> None:
513+
if await self.state_manager.has_claim(loop_id):
514+
return
515+
logger.info("Loop woke up, restarting", extra={"loop_id": loop_id})
516+
if not await self.restart_callback(loop_id):
517+
await self.state_manager.update_loop_status(loop_id, LoopStatus.STOPPED)
552518

553-
await self.loop_manager.stop(loop_id)
554-
continue
519+
async def _check_orphaned_loops(self) -> None:
520+
running_loops = await self.state_manager.get_all_loops(
521+
status=LoopStatus.RUNNING
522+
)
523+
for loop in running_loops:
524+
if await self.state_manager.has_claim(loop.loop_id):
525+
continue
526+
logger.info(
527+
"Loop has no claim, restarting", extra={"loop_id": loop.loop_id}
528+
)
529+
if not await self.restart_callback(loop.loop_id):
530+
await self.state_manager.update_loop_status(
531+
loop.loop_id, LoopStatus.STOPPED
532+
)
555533

556-
loops: list[LoopState] = await self.state_manager.get_all_loops(
557-
status=LoopStatus.RUNNING
534+
async def _check_disconnect_stops(self) -> None:
535+
active_ids = await self.loop_manager.active_loop_ids()
536+
for loop_id in active_ids:
537+
try:
538+
loop = await self.state_manager.get_loop(loop_id)
539+
except LoopNotFoundError:
540+
continue
541+
if not loop.loop_name:
542+
continue
543+
metadata = self.fastloop_instance._loop_metadata.get(loop.loop_name)
544+
if not metadata or not metadata.get("stop_on_disconnect"):
545+
continue
546+
if not await self.fastloop_instance.has_active_clients(loop_id):
547+
logger.info(
548+
"Loop has no clients, stopping",
549+
extra={"loop_id": loop_id, "loop_name": loop.loop_name},
558550
)
559-
for loop in loops:
560-
# Restart loop if it has no claim and is not idle (maybe the task crashed or was interrupted)
561-
if not await self.state_manager.has_claim(loop.loop_id):
562-
logger.info(
563-
"Loop has no claim, restarting",
564-
extra={
565-
"loop_id": loop.loop_id,
566-
},
567-
)
551+
await self.state_manager.update_loop_status(loop_id, LoopStatus.STOPPED)
552+
await self.loop_manager.stop(loop_id)
553+
554+
async def run(self):
555+
while not self._stop_event.is_set():
556+
try:
557+
while not self.wake_queue.empty():
558+
await self._process_wake(self.wake_queue.get_nowait())
568559

569-
if not await self.restart_callback(loop.loop_id):
570-
await self.state_manager.update_loop_status(
571-
loop.loop_id, LoopStatus.STOPPED
572-
)
573-
await self.loop_manager.stop(loop.loop_id)
574-
575-
continue
576-
577-
# Check for loops with stop_on_disconnect=true that have no active clients
578-
for loop in loops:
579-
if (
580-
loop.loop_name
581-
and loop.loop_name in self.fastloop_instance._loop_metadata
582-
):
583-
metadata = self.fastloop_instance._loop_metadata[loop.loop_name]
584-
if metadata.get("stop_on_disconnect", False):
585-
has_clients = (
586-
await self.fastloop_instance.has_active_clients(
587-
loop.loop_id
588-
)
589-
)
590-
if not has_clients:
591-
logger.info(
592-
"Loop has stop_on_disconnect=true and no active clients, stopping",
593-
extra={
594-
"loop_id": loop.loop_id,
595-
"loop_name": loop.loop_name,
596-
},
597-
)
598-
await self.state_manager.update_loop_status(
599-
loop.loop_id, LoopStatus.STOPPED
600-
)
601-
await self.loop_manager.stop(loop.loop_id)
560+
await self._check_orphaned_loops()
561+
await self._check_disconnect_stops()
602562

603563
try:
604564
await asyncio.wait_for(
605565
self._stop_event.wait(), timeout=WATCHDOG_INTERVAL_S
606566
)
607567
break
608568
except TimeoutError:
609-
continue
610-
569+
pass
611570
except asyncio.CancelledError:
612571
break
613-
except BaseException as e:
614-
logger.error(
615-
"Error in loop monitor",
616-
extra={"error": str(e)},
617-
)
572+
except Exception as e:
573+
logger.error("Error in loop monitor", extra={"error": str(e)})
618574
await asyncio.sleep(WATCHDOG_INTERVAL_S)

fastloop/state/state.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,15 @@ def from_json(cls, json_str: str) -> "LoopState":
3636

3737
class StateManager(ABC):
3838
@abstractmethod
39-
async def get_all_loop_ids(
40-
self,
41-
) -> set[str]:
39+
async def get_all_loop_ids(self) -> set[str]:
4240
pass
4341

4442
@abstractmethod
45-
async def get_all_loops(
46-
self,
47-
status: LoopStatus | None = None,
48-
) -> list[LoopState]:
43+
async def get_running_loop_ids(self) -> set[str]:
44+
pass
45+
46+
@abstractmethod
47+
async def get_all_loops(self, status: LoopStatus | None = None) -> list[LoopState]:
4948
pass
5049

5150
@abstractmethod

0 commit comments

Comments
 (0)