Skip to content

Commit 49b69cc

Browse files
clean up
1 parent d7b8eb7 commit 49b69cc

7 files changed

Lines changed: 250 additions & 513 deletions

File tree

fastloop/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@
99
LEASE_HEARTBEAT_INTERVAL_S = 5
1010
MAX_EVENT_HISTORY = 1000
1111
MEANINGFUL_WORK_THRESHOLD_S = 0.01
12+
WORKFLOW_WAKE_PREFIX = "workflow:"

fastloop/fastloop.py

Lines changed: 42 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ async def lifespan(_: FastAPI):
110110
self._monitor_restart_task: asyncio.Task[None] | None = None
111111
self._monitor_restart_delay_s: float = 0.5
112112
self._stopping: bool = False
113+
self._current_monitor: LoopMonitor | None = None
113114
self._loop_start_func: Callable[[LoopContext], None] | None = None
114115
self._loop_metadata: dict[str, dict[str, Any]] = {}
115116
self._workflow_metadata: dict[str, dict[str, Any]] = {}
@@ -142,7 +143,10 @@ async def events_sse_endpoint(entity_id: str): # type: ignore
142143
@self.middleware("http")
143144
async def _ensure_monitor_running(request, call_next): # type: ignore
144145
if self._monitor_task is None or self._monitor_task.done():
145-
self._start_monitor(reason="middleware_safety_net")
146+
self._start_monitor(reason="middleware")
147+
elif self._current_monitor and not self._current_monitor.is_healthy(60.0):
148+
self._monitor_task.cancel()
149+
self._start_monitor(reason="stuck")
146150
return await call_next(request)
147151

148152
def _start_monitor(self, *, reason: str) -> None:
@@ -151,15 +155,14 @@ def _start_monitor(self, *, reason: str) -> None:
151155
if self._monitor_task is not None and not self._monitor_task.done():
152156
return
153157
logger.info("Starting LoopMonitor", extra={"reason": reason})
154-
self._monitor_task = asyncio.create_task(
155-
LoopMonitor(
156-
state_manager=self.state_manager,
157-
loop_manager=self.loop_manager,
158-
restart_callback=self.restart_loop,
159-
wake_queue=self.wake_queue,
160-
fastloop_instance=self,
161-
).run()
158+
self._current_monitor = LoopMonitor(
159+
state_manager=self.state_manager,
160+
loop_manager=self.loop_manager,
161+
restart_callback=self.restart_loop,
162+
wake_queue=self.wake_queue,
163+
fastloop_instance=self,
162164
)
165+
self._monitor_task = asyncio.create_task(self._current_monitor.run())
163166
self._monitor_task.add_done_callback(self._on_monitor_done)
164167

165168
def _on_monitor_done(self, task: asyncio.Task[Any]) -> None:
@@ -763,76 +766,43 @@ async def _event_handler(request: dict[str, Any]):
763766
return _decorator
764767

765768
async def restart_loop(self, loop_id: str) -> bool:
766-
"""Restart a loop using stored metadata (keyed by loop name)."""
769+
"""Restart a loop using stored metadata."""
767770
try:
768771
loop = await self.state_manager.get_loop(loop_id)
769-
loop_name = loop.loop_name
770-
771-
logger.info(
772-
"Attempting to restart loop",
773-
extra={
774-
"loop_id": loop_id,
775-
"loop_name": loop_name,
776-
"loop_status": loop.status.value if loop.status else None,
777-
"registered_loops": list(self._loop_metadata.keys()),
778-
},
779-
)
772+
except LoopNotFoundError:
773+
return False
780774

781-
if not loop_name or loop_name not in self._loop_metadata:
782-
logger.warning(
783-
"No metadata found for loop",
784-
extra={"loop_name": loop_name, "loop_id": loop_id},
785-
)
786-
return False
775+
meta = self._loop_metadata.get(loop.loop_name or "")
776+
if not meta:
777+
return False
787778

788-
metadata = self._loop_metadata[loop_name]
789-
initial_event = await self.state_manager.get_initial_event(loop_id)
790-
context = LoopContext(
791-
loop_id=loop.loop_id,
792-
initial_event=initial_event,
793-
state_manager=self.state_manager,
794-
integrations=metadata.get("integrations", []),
795-
)
779+
ctx = LoopContext(
780+
loop_id=loop.loop_id,
781+
initial_event=await self.state_manager.get_initial_event(loop_id),
782+
state_manager=self.state_manager,
783+
integrations=meta.get("integrations", []),
784+
)
796785

797-
await context.setup_integrations()
786+
with suppress(asyncio.TimeoutError):
787+
await asyncio.wait_for(ctx.setup_integrations(), timeout=10.0)
798788

799-
loop_instance: Loop | None = metadata.get("loop_instance")
800-
if loop_instance:
801-
loop_instance.ctx = context
802-
func = loop_instance.loop
803-
else:
804-
func = import_func_from_path(loop.current_function_path)
805-
started = await self.loop_manager.start(
806-
func=func,
807-
loop_start_func=metadata.get("on_start"),
808-
loop_stop_func=metadata.get("on_stop"),
809-
context=context,
810-
loop=loop,
811-
loop_delay=metadata["loop_delay"],
812-
stop_after_idle_seconds=metadata.get("stop_after_idle_seconds"),
813-
pause_after_idle_seconds=metadata.get("pause_after_idle_seconds"),
814-
)
815-
if started:
816-
logger.info("Restarted loop", extra={"loop_id": loop.loop_id})
817-
return True
818-
else:
819-
logger.warning(
820-
"Failed to restart loop - task already exists in loop_manager",
821-
extra={
822-
"loop_id": loop.loop_id,
823-
},
824-
)
825-
return False
789+
instance: Loop | None = meta.get("loop_instance")
790+
if instance:
791+
instance.ctx = ctx
792+
func = instance.loop
793+
else:
794+
func = import_func_from_path(loop.current_function_path)
826795

827-
except BaseException as e:
828-
logger.error(
829-
"Failed to restart loop",
830-
extra={
831-
"loop_id": loop.loop_id, # type: ignore
832-
"error": str(e),
833-
},
834-
)
835-
return False
796+
return await self.loop_manager.start(
797+
func=func,
798+
loop_start_func=meta.get("on_start"),
799+
loop_stop_func=meta.get("on_stop"),
800+
context=ctx,
801+
loop=loop,
802+
loop_delay=meta["loop_delay"],
803+
stop_after_idle_seconds=meta.get("stop_after_idle_seconds"),
804+
pause_after_idle_seconds=meta.get("pause_after_idle_seconds"),
805+
)
836806

837807
async def has_active_clients(self, loop_id: str) -> bool:
838808
"""Check if a loop has any active SSE client connections."""

fastloop/models.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,10 @@ def to_string(self) -> str:
5454

5555
@classmethod
5656
def from_json(cls, json_str: str) -> "LoopState":
57-
return cls(**json.loads(json_str))
57+
data = json.loads(json_str)
58+
if "status" in data and isinstance(data["status"], str):
59+
data["status"] = LoopStatus(data["status"])
60+
return cls(**data)
5861

5962

6063
class WorkflowBlock(BaseModel):
@@ -88,6 +91,8 @@ def to_string(self) -> str:
8891
@classmethod
8992
def from_json(cls, json_str: str) -> "WorkflowState":
9093
data = json.loads(json_str)
94+
if "status" in data and isinstance(data["status"], str):
95+
data["status"] = LoopStatus(data["status"])
9196
if "block_attempts" in data and isinstance(data["block_attempts"], dict):
9297
data["block_attempts"] = {
9398
int(k): v for k, v in data["block_attempts"].items()
@@ -115,4 +120,7 @@ def to_string(self) -> str:
115120

116121
@classmethod
117122
def from_json(cls, json_str: str) -> "TaskState":
118-
return cls(**json.loads(json_str))
123+
data = json.loads(json_str)
124+
if "status" in data and isinstance(data["status"], str):
125+
data["status"] = TaskStatus(data["status"])
126+
return cls(**data)

0 commit comments

Comments
 (0)