Skip to content

Commit 48dde9d

Browse files
fix races
1 parent e3d27c7 commit 48dde9d

4 files changed

Lines changed: 54 additions & 30 deletions

File tree

fastloop/monitor.py

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,10 @@ async def _check_orphaned_loops(self) -> None:
100100
for loop in running_loops:
101101
if await self.state_manager.has_claim(loop.loop_id):
102102
continue
103-
logger.info(
104-
"Loop has no claim, restarting", extra={"loop_id": loop.loop_id}
105-
)
103+
if not await self.state_manager.try_claim_loop_recovery(loop.loop_id):
104+
continue
105+
106+
logger.info("Orphaned loop recovered", extra={"loop_id": loop.loop_id})
106107
if not await self.restart_callback(loop.loop_id):
107108
await self.state_manager.update_loop_status(
108109
loop.loop_id, LoopStatus.STOPPED
@@ -115,12 +116,14 @@ async def _check_orphaned_workflows(self) -> None:
115116
for workflow in running_workflows:
116117
if await self.state_manager.workflow_has_claim(workflow.workflow_run_id):
117118
continue
119+
if not await self.state_manager.try_claim_workflow_recovery(
120+
workflow.workflow_run_id
121+
):
122+
continue
123+
118124
logger.info(
119-
"Workflow has no claim, restarting",
120-
extra={
121-
"workflow_run_id": workflow.workflow_run_id,
122-
"block_index": workflow.current_block_index,
123-
},
125+
"Orphaned workflow recovered",
126+
extra={"workflow_run_id": workflow.workflow_run_id},
124127
)
125128
if not await self.fastloop_instance.restart_workflow(
126129
workflow.workflow_run_id
@@ -136,16 +139,17 @@ async def _check_orphaned_tasks(self) -> None:
136139
for task in running_tasks:
137140
if await self.state_manager.task_has_claim(task.task_id):
138141
continue
142+
if not await self.state_manager.try_claim_task_recovery(task.task_id):
143+
continue
144+
145+
await self.state_manager.update_task_status(task.task_id, TaskStatus.FAILED)
139146

140147
metadata = self.fastloop_instance._task_metadata.get(task.task_name)
141148
if not metadata:
142-
await self.state_manager.update_task_status(
143-
task.task_id, TaskStatus.FAILED
144-
)
145149
continue
146150

147151
logger.info(
148-
"Task has no claim, restarting",
152+
"Orphaned task recovered",
149153
extra={"task_id": task.task_id, "task_name": task.task_name},
150154
)
151155
await self.fastloop_instance.task_manager.submit(
@@ -157,12 +161,6 @@ async def _check_orphaned_tasks(self) -> None:
157161
)
158162

159163
async def _check_scheduled_workflows(self) -> None:
160-
"""Check for IDLE workflows with past-due scheduled wake times.
161-
162-
This is a backup mechanism that catches workflows that may have been
163-
removed from the ZSET but not yet processed (e.g., if the wake queue
164-
consumer failed or the wake monitoring thread died).
165-
"""
166164
now = time.time()
167165
idle_workflows = await self.state_manager.get_all_workflows(
168166
status=LoopStatus.IDLE
@@ -174,17 +172,14 @@ async def _check_scheduled_workflows(self) -> None:
174172
continue
175173
if await self.state_manager.workflow_has_claim(workflow.workflow_run_id):
176174
continue
177-
claimed_from_zset = await self.state_manager.try_claim_workflow_wake(
175+
if not await self.state_manager.try_claim_workflow_wake(
178176
workflow.workflow_run_id
179-
)
177+
):
178+
continue
179+
180180
logger.info(
181-
"IDLE workflow has past-due wake time, restarting",
182-
extra={
183-
"workflow_run_id": workflow.workflow_run_id,
184-
"scheduled_wake_time": workflow.scheduled_wake_time,
185-
"block_index": workflow.current_block_index,
186-
"claimed_from_zset": claimed_from_zset,
187-
},
181+
"IDLE workflow past-due, restarting",
182+
extra={"workflow_run_id": workflow.workflow_run_id},
188183
)
189184
if await self.fastloop_instance.restart_workflow(workflow.workflow_run_id):
190185
await self.state_manager.clear_workflow_wake_time(
@@ -200,8 +195,12 @@ async def _check_scheduled_workflows(self) -> None:
200195

201196
async def _check_scheduled_tasks(self) -> None:
202197
for schedule_id, schedule in await self.state_manager.get_due_schedules():
198+
if not await self.state_manager.try_claim_schedule(schedule_id):
199+
continue
200+
203201
metadata = self.fastloop_instance._task_metadata.get(schedule.task_name)
204202
if not metadata:
203+
await self.state_manager.advance_schedule(schedule_id, schedule)
205204
continue
206205

207206
try:
@@ -212,12 +211,13 @@ async def _check_scheduled_tasks(self) -> None:
212211
retry_policy=metadata.get("retry"),
213212
executor_type=metadata.get("executor"),
214213
)
215-
await self.state_manager.advance_schedule(schedule_id, schedule)
216214
except Exception as e:
217215
logger.error(
218216
"Scheduled task failed",
219217
extra={"schedule_id": schedule_id, "error": str(e)},
220218
)
219+
finally:
220+
await self.state_manager.advance_schedule(schedule_id, schedule)
221221

222222
async def _check_disconnect_stops(self) -> None:
223223
active_ids = await self.loop_manager.active_loop_ids()

fastloop/state/state_redis.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,12 @@ async def has_claim(self, loop_id: str) -> bool:
449449
)
450450
return result is not None
451451

452+
async def try_claim_loop_recovery(self, loop_id: str) -> bool:
453+
"""Atomically claim right to recover an orphaned loop. Returns True if won."""
454+
claim_key = f"{RedisKeys.LOOP_CLAIM.format(app_name=self.app_name, loop_id=loop_id)}:recovery"
455+
acquired = await self.rdb.set(claim_key, "1", nx=True, ex=60)
456+
return acquired is not None
457+
452458
async def get_all_loop_ids(self) -> set[str]:
453459
members = await self.rdb.smembers(
454460
RedisKeys.LOOP_INDEX.format(app_name=self.app_name)
@@ -864,6 +870,12 @@ async def workflow_has_claim(self, workflow_run_id: str) -> bool:
864870
)
865871
return result is not None
866872

873+
async def try_claim_workflow_recovery(self, workflow_run_id: str) -> bool:
874+
"""Atomically claim right to recover an orphaned workflow. Returns True if won."""
875+
claim_key = f"{RedisKeys.WORKFLOW_CLAIM.format(app_name=self.app_name, workflow_run_id=workflow_run_id)}:recovery"
876+
acquired = await self.rdb.set(claim_key, "1", nx=True, ex=60)
877+
return acquired is not None
878+
867879
@asynccontextmanager
868880
async def with_workflow_claim(
869881
self, workflow_run_id: str
@@ -1072,6 +1084,12 @@ async def task_has_claim(self, task_id: str) -> bool:
10721084
result = await self.rdb.get(claim_key)
10731085
return result is not None
10741086

1087+
async def try_claim_task_recovery(self, task_id: str) -> bool:
1088+
"""Atomically claim right to recover an orphaned task. Returns True if won."""
1089+
claim_key = f"{RedisKeys.TASK_CLAIM.format(app_name=self.app_name, task_id=task_id)}:recovery"
1090+
acquired = await self.rdb.set(claim_key, "1", nx=True, ex=60)
1091+
return acquired is not None
1092+
10751093
@asynccontextmanager
10761094
async def with_task_claim(self, task_id: str) -> AsyncGenerator[None, None]:
10771095
lease_key = RedisKeys.TASK_CLAIM.format(app_name=self.app_name, task_id=task_id)
@@ -1170,6 +1188,12 @@ async def get_due_schedules(self) -> list[tuple[str, Schedule]]:
11701188

11711189
return results
11721190

1191+
async def try_claim_schedule(self, schedule_id: str) -> bool:
1192+
"""Atomically claim a due schedule. Returns True if this replica won."""
1193+
queue_key = RedisKeys.SCHEDULE_QUEUE.format(app_name=self.app_name)
1194+
removed = await self.rdb.zrem(queue_key, schedule_id)
1195+
return removed > 0
1196+
11731197
async def advance_schedule(self, schedule_id: str, schedule: Schedule) -> None:
11741198
schedule.next_run = schedule.compute_next_run()
11751199
await self.save_schedule(schedule_id, schedule)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "fastloop"
3-
version = "0.1.99"
3+
version = "0.1.100"
44
description = "A Python package for deploying stateful loops"
55
readme = "README.md"
66
requires-python = ">=3.12"

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)