Skip to content

Commit 60c1000

Browse files
more fixes
1 parent 49b69cc commit 60c1000

6 files changed

Lines changed: 865 additions & 43 deletions

File tree

fastloop/fastloop.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -770,10 +770,15 @@ async def restart_loop(self, loop_id: str) -> bool:
770770
try:
771771
loop = await self.state_manager.get_loop(loop_id)
772772
except LoopNotFoundError:
773+
logger.warning(f"restart_loop: loop {loop_id} not found")
773774
return False
774775

775776
meta = self._loop_metadata.get(loop.loop_name or "")
776777
if not meta:
778+
logger.warning(
779+
f"restart_loop: no metadata for loop_name={loop.loop_name!r}, "
780+
f"loop_id={loop_id}, available={list(self._loop_metadata.keys())}"
781+
)
777782
return False
778783

779784
ctx = LoopContext(

fastloop/monitor.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,11 @@ async def handle(wake_id: str) -> None:
151151

152152
async def _wake_loop(self, loop_id: str) -> None:
153153
if await self.state_manager.has_claim(loop_id):
154+
logger.debug(f"Loop {loop_id} already has claim, skipping wake")
154155
return
155-
await self.restart_callback(loop_id)
156+
logger.info(f"Waking loop: {loop_id}")
157+
if not await self.restart_callback(loop_id):
158+
logger.warning(f"Failed to restart loop: {loop_id}")
156159

157160
async def _wake_workflow(self, run_id: str) -> None:
158161
if await self.state_manager.workflow_has_claim(run_id):

fastloop/state/state_redis.py

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,10 @@ def _run_wake_monitoring(self):
171171
from ..logging import setup_logger
172172

173173
logger = setup_logger(__name__)
174-
logger.info("Wake thread starting")
174+
logger.info(
175+
"Wake thread starting",
176+
extra={"host": self.config.host, "port": self.config.port},
177+
)
175178

176179
while not self._stop_wake_monitor.is_set():
177180
rdb = None
@@ -184,12 +187,15 @@ def _run_wake_monitoring(self):
184187
ssl=self.config.ssl,
185188
)
186189
rdb.ping()
190+
logger.info("Wake thread connected to Redis")
187191

192+
# Process immediately on connect, then loop
188193
while not self._stop_wake_monitor.is_set():
189-
time.sleep(WAKE_RECONCILIATION_INTERVAL_S)
190194
self._process_due_wakes(rdb)
195+
time.sleep(WAKE_RECONCILIATION_INTERVAL_S)
191196

192-
except sync_redis.exceptions.ConnectionError:
197+
except sync_redis.exceptions.ConnectionError as e:
198+
logger.warning(f"Wake thread Redis connection error: {e}")
193199
time.sleep(5)
194200
except Exception as e:
195201
logger.error(f"Wake thread error: {e}")
@@ -201,16 +207,31 @@ def _run_wake_monitoring(self):
201207

202208
def _process_due_wakes(self, rdb) -> int:
203209
"""Process all wakes with score <= now. Returns count processed."""
210+
from ..logging import setup_logger
211+
212+
logger = setup_logger(__name__)
204213
now = time.time()
205214
processed = 0
206215

207216
# Process loop wakes
208217
loop_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name=self.app_name)
218+
219+
# Debug: check what's pending in the ZSET
220+
all_pending = rdb.zrange(loop_key, 0, -1, withscores=True)
221+
if all_pending:
222+
first_id, first_score = all_pending[0]
223+
logger.info(
224+
f"Pending wakes: key={loop_key}, count={len(all_pending)}, "
225+
f"first=({first_id.decode()}, {first_score}), now={now}, "
226+
f"due_in={first_score - now:.1f}s"
227+
)
228+
209229
for loop_id_bytes in rdb.zrangebyscore(loop_key, "-inf", now):
210230
loop_id = loop_id_bytes.decode("utf-8")
211231
if rdb.zrem(loop_key, loop_id):
212232
self.wake_queue.put(loop_id)
213233
processed += 1
234+
logger.info(f"Queued wake: {loop_id}")
214235

215236
# Process workflow wakes
216237
wf_key = RedisKeys.WORKFLOW_WAKE_SCHEDULE.format(app_name=self.app_name)
@@ -219,6 +240,7 @@ def _process_due_wakes(self, rdb) -> int:
219240
if rdb.zrem(wf_key, wf_id):
220241
self.wake_queue.put(f"{WORKFLOW_WAKE_PREFIX}{wf_id}")
221242
processed += 1
243+
logger.info(f"Queued workflow wake: {wf_id}")
222244

223245
return processed
224246

@@ -259,7 +281,13 @@ async def get_or_create_loop(
259281
RedisKeys.LOOP_STATE.format(app_name=self.app_name, loop_id=loop_id)
260282
)
261283
if loop_str:
262-
return LoopState.from_json(loop_str.decode("utf-8")), False
284+
loop = LoopState.from_json(loop_str.decode("utf-8"))
285+
# Ensure loop_name is set if it was missing
286+
if loop_name and not loop.loop_name:
287+
loop.loop_name = loop_name
288+
await self.update_loop(loop_id, loop)
289+
await self.add_loop_to_name_index(loop_name, loop_id)
290+
return loop, False
263291
elif not create_with_id:
264292
raise LoopNotFoundError(f"Loop {loop_id} not found")
265293

@@ -680,7 +708,11 @@ async def set_wake_time(self, loop_id: str, timestamp: float) -> None:
680708
await self.rdb.zadd(schedule_key, {loop_id: timestamp})
681709
logger.info(
682710
"Loop wake scheduled",
683-
extra={"loop_id": loop_id, "wake_timestamp": timestamp},
711+
extra={
712+
"loop_id": loop_id,
713+
"wake_timestamp": timestamp,
714+
"key": schedule_key,
715+
},
684716
)
685717

686718
async def get_initial_event(self, loop_id: str) -> "LoopEvent | None":

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "fastloop"
3-
version = "0.1.125"
3+
version = "0.1.126"
44
description = "A Python package for deploying stateful loops"
55
readme = "README.md"
66
requires-python = ">=3.12"

0 commit comments

Comments
 (0)