Skip to content

Commit 46e6f58

Browse files
ensure restarts
1 parent 1c72a06 commit 46e6f58

4 files changed

Lines changed: 115 additions & 37 deletions

File tree

fastloop/fastloop.py

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import asyncio
1111
from collections.abc import Callable
12-
from contextlib import asynccontextmanager
12+
from contextlib import asynccontextmanager, suppress
1313
from enum import Enum
1414
from http import HTTPStatus
1515
from queue import Queue
@@ -71,19 +71,18 @@ def __init__(
7171
):
7272
@asynccontextmanager
7373
async def lifespan(_: FastAPI):
74-
self._monitor_task = asyncio.create_task(
75-
LoopMonitor(
76-
state_manager=self.state_manager,
77-
loop_manager=self.loop_manager,
78-
restart_callback=self.restart_loop,
79-
wake_queue=self.wake_queue,
80-
fastloop_instance=self,
81-
).run()
82-
)
74+
self._stopping = False
75+
self._start_monitor(reason="lifespan")
8376

8477
yield
8578

86-
self._monitor_task.cancel()
79+
self._stopping = True
80+
if self._monitor_restart_task:
81+
self._monitor_restart_task.cancel()
82+
if self._monitor_task:
83+
self._monitor_task.cancel()
84+
with suppress(asyncio.CancelledError):
85+
await self._monitor_task
8786
await self.loop_manager.stop_all()
8887
await self.workflow_manager.stop_all()
8988
await self.task_manager.stop_all()
@@ -108,6 +107,9 @@ async def lifespan(_: FastAPI):
108107
self.workflow_manager: WorkflowManager = WorkflowManager(self.state_manager)
109108
self.task_manager: TaskManager = TaskManager(self.state_manager)
110109
self._monitor_task: asyncio.Task[None] | None = None
110+
self._monitor_restart_task: asyncio.Task[None] | None = None
111+
self._monitor_restart_delay_s: float = 0.5
112+
self._stopping: bool = False
111113
self._loop_start_func: Callable[[LoopContext], None] | None = None
112114
self._loop_metadata: dict[str, dict[str, Any]] = {}
113115
self._workflow_metadata: dict[str, dict[str, Any]] = {}
@@ -137,6 +139,58 @@ async def events_history_endpoint(entity_id: str): # type: ignore
137139
async def events_sse_endpoint(entity_id: str): # type: ignore
138140
return await self.loop_manager.events_sse(entity_id)
139141

142+
@self.middleware("http")
143+
async def _ensure_monitor_running(request, call_next): # type: ignore
144+
if self._monitor_task is None or self._monitor_task.done():
145+
self._start_monitor(reason="middleware_safety_net")
146+
return await call_next(request)
147+
148+
def _start_monitor(self, *, reason: str) -> None:
149+
if self._stopping:
150+
return
151+
if self._monitor_task is not None and not self._monitor_task.done():
152+
return
153+
logger.info("Starting LoopMonitor", extra={"reason": reason})
154+
self._monitor_task = asyncio.create_task(
155+
LoopMonitor(
156+
state_manager=self.state_manager,
157+
loop_manager=self.loop_manager,
158+
restart_callback=self.restart_loop,
159+
wake_queue=self.wake_queue,
160+
fastloop_instance=self,
161+
).run()
162+
)
163+
self._monitor_task.add_done_callback(self._on_monitor_done)
164+
165+
def _on_monitor_done(self, task: asyncio.Task[Any]) -> None:
166+
if self._stopping:
167+
return
168+
with suppress(asyncio.CancelledError):
169+
exc = task.exception()
170+
if exc is None:
171+
logger.warning("LoopMonitor stopped unexpectedly; restarting")
172+
else:
173+
logger.error("LoopMonitor crashed; restarting", extra={"error": str(exc)})
174+
self._schedule_monitor_restart()
175+
176+
def _schedule_monitor_restart(self) -> None:
177+
if self._stopping:
178+
return
179+
if (
180+
self._monitor_restart_task is not None
181+
and not self._monitor_restart_task.done()
182+
):
183+
return
184+
185+
delay = self._monitor_restart_delay_s
186+
self._monitor_restart_delay_s = min(self._monitor_restart_delay_s * 2, 10.0)
187+
188+
async def _restart() -> None:
189+
await asyncio.sleep(delay)
190+
self._start_monitor(reason="restart_after_crash")
191+
192+
self._monitor_restart_task = asyncio.create_task(_restart())
193+
140194
@property
141195
def config(self) -> BaseConfig:
142196
return self.config_manager.get_config()

fastloop/monitor.py

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -292,42 +292,66 @@ async def _register_schedules(self) -> None:
292292

293293
async def run(self):
294294
"""Main monitor loop."""
295+
logger.info("LoopMonitor started")
296+
295297
if not self._app_start_processed:
296-
await self._process_app_start_callbacks()
297-
await self._register_schedules()
298-
self._app_start_processed = True
298+
try:
299+
await self._process_app_start_callbacks()
300+
await self._register_schedules()
301+
except Exception as e:
302+
logger.error(
303+
"LoopMonitor startup failed (continuing without app-start processing)",
304+
extra={"error": str(e)},
305+
)
306+
finally:
307+
self._app_start_processed = True
299308

300309
while not self._stop_event.is_set():
301310
try:
302-
# Process all pending wakes, handling errors individually
303-
# Use get_nowait in a try/except to avoid race between empty() and get()
304-
while True:
305-
try:
306-
wake_id = self.wake_queue.get_nowait()
311+
# Block for up to WATCHDOG_INTERVAL_S to avoid polling the event loop.
312+
# This is a threadsafe Queue fed by the Redis wake thread.
313+
wake_batch: list[str] = []
314+
try:
315+
first = await asyncio.to_thread(
316+
self.wake_queue.get, True, WATCHDOG_INTERVAL_S
317+
)
318+
wake_batch.append(first)
319+
except Empty:
320+
pass
321+
322+
# Drain any additional wakes immediately.
323+
if wake_batch:
324+
while True:
307325
try:
308-
await self._process_wake(wake_id)
309-
except Exception as e:
310-
logger.error(
311-
"Error processing wake",
312-
extra={"wake_id": wake_id, "error": str(e)},
313-
)
314-
except Empty:
315-
break
326+
wake_batch.append(self.wake_queue.get_nowait())
327+
except Empty:
328+
break
329+
330+
processed_wakes = 0
331+
for wake_id in wake_batch:
332+
try:
333+
await self._process_wake(wake_id)
334+
processed_wakes += 1
335+
except Exception as e:
336+
logger.error(
337+
"Error processing wake",
338+
extra={"wake_id": wake_id, "error": str(e)},
339+
)
340+
if processed_wakes:
341+
logger.info(
342+
"Processed wakes from queue",
343+
extra={
344+
"count": processed_wakes,
345+
"queue_size": self.wake_queue.qsize(),
346+
},
347+
)
316348

317349
await self._check_orphaned_loops()
318350
await self._check_orphaned_workflows()
319351
await self._check_orphaned_tasks()
320352
await self._check_scheduled_workflows()
321353
await self._check_scheduled_tasks()
322354
await self._check_disconnect_stops()
323-
324-
try:
325-
await asyncio.wait_for(
326-
self._stop_event.wait(), timeout=WATCHDOG_INTERVAL_S
327-
)
328-
break
329-
except TimeoutError:
330-
pass
331355
except asyncio.CancelledError:
332356
break
333357
except Exception as e:

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "fastloop"
3-
version = "0.1.106"
3+
version = "0.1.107"
44
description = "A Python package for deploying stateful loops"
55
readme = "README.md"
66
requires-python = ">=3.12"

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)