88
99import asyncio
1010import json
11+ import time
1112import traceback
1213import uuid
1314from collections .abc import Callable
1617from fastapi import HTTPException
1718from fastapi .responses import StreamingResponse
1819
19- from .constants import CANCEL_GRACE_PERIOD_S
20+ from .constants import CANCEL_GRACE_PERIOD_S , MEANINGFUL_WORK_THRESHOLD_S
2021from .exceptions import (
2122 EventTimeoutError ,
2223 LoopClaimError ,
@@ -76,6 +77,8 @@ async def _run(
7677 delay : float ,
7778 loop_start_func : Callable [..., Any ] | None ,
7879 loop_stop_func : Callable [..., Any ] | None ,
80+ stop_after_idle_seconds : float | None = None ,
81+ pause_after_idle_seconds : float | None = None ,
7982 ) -> None :
8083 try :
8184 async with self .state_manager .with_claim (loop_id ): # type: ignore
@@ -85,10 +88,12 @@ async def _run(
8588 await loop_start_func (context )
8689 else :
8790 loop_start_func (context ) # type: ignore
88- idle_cycles = 0
91+
92+ last_active_time = time .time ()
8993
9094 while not context .should_stop and not context .should_pause :
91- context .event_this_cycle = False
95+ context ._reset_cycle_tracking ()
96+ cycle_start = time .monotonic ()
9297
9398 try :
9499 if asyncio .iscoroutinefunction (func ):
@@ -122,15 +127,41 @@ async def _run(
122127 },
123128 )
124129
125- if not context .event_this_cycle :
126- idle_cycles += 1
130+ cycle_duration = time .monotonic () - cycle_start
131+ work_time = cycle_duration - context ._wait_time_this_cycle
132+ is_active = (
133+ work_time > MEANINGFUL_WORK_THRESHOLD_S
134+ or context .event_this_cycle
135+ )
136+
137+ if is_active :
138+ last_active_time = time .time ()
139+ else :
140+ idle_seconds = time .time () - last_active_time
127141 if (
128- idle_cycles >= self . config . max_idle_cycles
129- and self . config . shutdown_idle
142+ stop_after_idle_seconds is not None
143+ and idle_seconds >= stop_after_idle_seconds
130144 ):
145+ logger .info (
146+ "Loop idle timeout reached, stopping" ,
147+ extra = {
148+ "loop_id" : loop_id ,
149+ "idle_seconds" : idle_seconds ,
150+ },
151+ )
152+ raise LoopStoppedError ()
153+ if (
154+ pause_after_idle_seconds is not None
155+ and idle_seconds >= pause_after_idle_seconds
156+ ):
157+ logger .info (
158+ "Loop idle timeout reached, pausing" ,
159+ extra = {
160+ "loop_id" : loop_id ,
161+ "idle_seconds" : idle_seconds ,
162+ },
163+ )
131164 raise LoopPausedError ()
132- else :
133- idle_cycles = 0
134165
135166 try :
136167 await asyncio .sleep (delay )
@@ -180,13 +211,22 @@ async def start(
180211 context : Any ,
181212 loop : LoopState ,
182213 loop_delay : float = 0.1 ,
214+ stop_after_idle_seconds : float | None = None ,
215+ pause_after_idle_seconds : float | None = None ,
183216 ) -> bool :
184217 if loop .loop_id in self .loop_tasks :
185218 return False
186219
187220 self .loop_tasks [loop .loop_id ] = asyncio .create_task (
188221 self ._run (
189- func , context , loop .loop_id , loop_delay , loop_start_func , loop_stop_func
222+ func ,
223+ context ,
224+ loop .loop_id ,
225+ loop_delay ,
226+ loop_start_func ,
227+ loop_stop_func ,
228+ stop_after_idle_seconds ,
229+ pause_after_idle_seconds ,
190230 )
191231 )
192232
0 commit comments