Skip to content

Commit faba7ca

Browse files
fix wake condition
1 parent 1dab95b commit faba7ca

6 files changed

Lines changed: 146 additions & 6 deletions

File tree

fastloop/fastloop.py

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -801,13 +801,24 @@ async def _process_wake(self, wake_id: str) -> None:
801801
if await self.state_manager.workflow_has_claim(workflow_id):
802802
return
803803
logger.info(
804-
"Workflow woke up, restarting", extra={"workflow_id": workflow_id}
804+
"Workflow woke up, attempting restart",
805+
extra={"workflow_id": workflow_id},
805806
)
806-
await self.state_manager.clear_workflow_wake_time(workflow_id)
807-
if not await self.fastloop_instance.restart_workflow(workflow_id):
807+
if await self.fastloop_instance.restart_workflow(workflow_id):
808+
await self.state_manager.clear_workflow_wake_time(workflow_id)
809+
logger.info(
810+
"Workflow restarted successfully",
811+
extra={"workflow_id": workflow_id},
812+
)
813+
else:
814+
await self.state_manager.clear_workflow_wake_time(workflow_id)
808815
await self.state_manager.update_workflow_status(
809816
workflow_id, LoopStatus.STOPPED
810817
)
818+
logger.warning(
819+
"Workflow restart failed, marked as stopped",
820+
extra={"workflow_id": workflow_id},
821+
)
811822
else:
812823
loop_id = wake_id
813824
if await self.state_manager.has_claim(loop_id):
@@ -850,6 +861,38 @@ async def _check_orphaned_workflows(self) -> None:
850861
workflow.workflow_id, LoopStatus.STOPPED
851862
)
852863

864+
async def _check_scheduled_workflows(self) -> None:
865+
"""Check for IDLE workflows with past-due scheduled wake times."""
866+
import time
867+
868+
now = time.time()
869+
idle_workflows = await self.state_manager.get_all_workflows(
870+
status=LoopStatus.IDLE
871+
)
872+
for workflow in idle_workflows:
873+
if not workflow.scheduled_wake_time:
874+
continue
875+
if workflow.scheduled_wake_time > now:
876+
continue
877+
if await self.state_manager.workflow_has_claim(workflow.workflow_id):
878+
continue
879+
if not await self.state_manager.try_claim_workflow_wake(
880+
workflow.workflow_id
881+
):
882+
continue
883+
logger.info(
884+
"IDLE workflow has past-due wake time, restarting",
885+
extra={
886+
"workflow_id": workflow.workflow_id,
887+
"scheduled_wake_time": workflow.scheduled_wake_time,
888+
"block_index": workflow.current_block_index,
889+
},
890+
)
891+
if not await self.fastloop_instance.restart_workflow(workflow.workflow_id):
892+
await self.state_manager.update_workflow_status(
893+
workflow.workflow_id, LoopStatus.STOPPED
894+
)
895+
853896
async def _check_disconnect_stops(self) -> None:
854897
active_ids = await self.loop_manager.active_loop_ids()
855898
for loop_id in active_ids:
@@ -928,6 +971,7 @@ async def run(self):
928971

929972
await self._check_orphaned_loops()
930973
await self._check_orphaned_workflows()
974+
await self._check_scheduled_workflows()
931975
await self._check_disconnect_stops()
932976

933977
try:

fastloop/state/state.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,11 @@ async def set_workflow_wake_time(self, workflow_id: str, timestamp: float) -> No
247247
async def clear_workflow_wake_time(self, workflow_id: str) -> None:
248248
pass
249249

250+
@abstractmethod
251+
async def try_claim_workflow_wake(self, workflow_id: str) -> bool:
252+
"""Atomically try to claim a workflow wake. Returns True if this caller won the race."""
253+
pass
254+
250255
@abstractmethod
251256
async def set_workflow_block_output(self, workflow_id: str, output: Any) -> None:
252257
pass

fastloop/state/state_redis.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,13 @@ def _run_wake_monitoring(self):
128128
with suppress(sync_redis.exceptions.ResponseError):
129129
rdb.config_set("notify-keyspace-events", "Ex")
130130

131-
self._process_due_wakes(rdb)
131+
logger.info("Wake monitoring thread started, processing due wakes")
132+
due_count = self._process_due_wakes(rdb)
133+
if due_count > 0:
134+
logger.info(
135+
"Processed due wakes on startup",
136+
extra={"count": due_count},
137+
)
132138

133139
pubsub = rdb.pubsub()
134140
pubsub.psubscribe("__keyevent@*__:expired")
@@ -142,9 +148,17 @@ def _run_wake_monitoring(self):
142148
key = message["data"].decode("utf-8")
143149
if f":{self.app_name}:wake:" in key:
144150
loop_id = key.split(":")[-1]
151+
logger.info(
152+
"Loop wake key expired",
153+
extra={"loop_id": loop_id},
154+
)
145155
self._queue_wake(rdb, loop_id)
146156
elif f":{self.app_name}:workflow_wake:" in key:
147157
workflow_id = key.split(":")[-1]
158+
logger.info(
159+
"Workflow wake key expired",
160+
extra={"workflow_id": workflow_id},
161+
)
148162
self._queue_wake(rdb, workflow_id)
149163
except Exception as e:
150164
logger.error(f"Error processing wake notification: {e}")
@@ -174,6 +188,10 @@ def _process_due_wakes(self, rdb) -> int:
174188
for loop_id_bytes in due_loop_wakes:
175189
loop_id = loop_id_bytes.decode("utf-8")
176190
if rdb.zrem(loop_schedule_key, loop_id):
191+
logger.info(
192+
"Due loop wake found, queuing",
193+
extra={"loop_id": loop_id},
194+
)
177195
self.wake_queue.put(loop_id)
178196
processed += 1
179197

@@ -186,6 +204,10 @@ def _process_due_wakes(self, rdb) -> int:
186204
for workflow_id_bytes in due_workflow_wakes:
187205
workflow_id = workflow_id_bytes.decode("utf-8")
188206
if rdb.zrem(workflow_schedule_key, workflow_id):
207+
logger.info(
208+
"Due workflow wake found, queuing",
209+
extra={"workflow_id": workflow_id},
210+
)
189211
self.wake_queue.put(f"workflow:{workflow_id}")
190212
processed += 1
191213

@@ -858,6 +880,15 @@ async def set_workflow_wake_time(self, workflow_id: str, timestamp: float) -> No
858880
)
859881
await pipe.execute()
860882

883+
logger.info(
884+
"Workflow wake scheduled",
885+
extra={
886+
"workflow_id": workflow_id,
887+
"wake_timestamp": timestamp,
888+
"ttl_ms": ttl_ms,
889+
},
890+
)
891+
861892
async def clear_workflow_wake_time(self, workflow_id: str) -> None:
862893
"""Clear any scheduled workflow wake time."""
863894
schedule_key = RedisKeys.WORKFLOW_WAKE_SCHEDULE.format(app_name=self.app_name)
@@ -870,6 +901,28 @@ async def clear_workflow_wake_time(self, workflow_id: str) -> None:
870901
pipe.delete(wake_key)
871902
await pipe.execute()
872903

904+
logger.info(
905+
"Workflow wake cleared",
906+
extra={"workflow_id": workflow_id},
907+
)
908+
909+
async def try_claim_workflow_wake(self, workflow_id: str) -> bool:
910+
"""Atomically try to claim a workflow wake. Returns True if this caller won the race."""
911+
schedule_key = RedisKeys.WORKFLOW_WAKE_SCHEDULE.format(app_name=self.app_name)
912+
wake_key = RedisKeys.WORKFLOW_WAKE_KEY.format(
913+
app_name=self.app_name, workflow_id=workflow_id
914+
)
915+
916+
removed = await self.rdb.zrem(schedule_key, workflow_id)
917+
if removed:
918+
await self.rdb.delete(wake_key)
919+
logger.info(
920+
"Workflow wake claimed",
921+
extra={"workflow_id": workflow_id},
922+
)
923+
return True
924+
return False
925+
873926
async def set_workflow_block_output(self, workflow_id: str, output: Any) -> None:
874927
"""Store the block output for a workflow using cloudpickle."""
875928
output_key = RedisKeys.WORKFLOW_BLOCK_OUTPUT.format(

fastloop/state/state_s3.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -701,3 +701,41 @@ async def get_all_workflows(
701701
continue
702702

703703
return workflows
704+
705+
async def set_workflow_wake_time(self, workflow_id: str, timestamp: float) -> None:
706+
workflow = await self.get_workflow(workflow_id)
707+
workflow.scheduled_wake_time = timestamp
708+
workflow.status = LoopStatus.IDLE
709+
await self.update_workflow(workflow_id, workflow)
710+
711+
async def clear_workflow_wake_time(self, workflow_id: str) -> None:
712+
try:
713+
workflow = await self.get_workflow(workflow_id)
714+
workflow.scheduled_wake_time = None
715+
await self.update_workflow(workflow_id, workflow)
716+
except WorkflowNotFoundError:
717+
pass
718+
719+
async def try_claim_workflow_wake(self, workflow_id: str) -> bool:
720+
try:
721+
workflow = await self.get_workflow(workflow_id)
722+
if workflow.scheduled_wake_time is None:
723+
return False
724+
workflow.scheduled_wake_time = None
725+
await self.update_workflow(workflow_id, workflow)
726+
return True
727+
except WorkflowNotFoundError:
728+
return False
729+
730+
async def set_workflow_block_output(self, workflow_id: str, output: Any) -> None:
731+
key = f"{self.prefix}/{self.app_name}/workflow_output/{workflow_id}.pkl"
732+
output_bytes: bytes = cloudpickle.dumps(output)
733+
self.s3.put_object(Bucket=self.bucket, Key=key, Body=output_bytes)
734+
735+
async def get_workflow_block_output(self, workflow_id: str) -> Any:
736+
key = f"{self.prefix}/{self.app_name}/workflow_output/{workflow_id}.pkl"
737+
try:
738+
response = self.s3.get_object(Bucket=self.bucket, Key=key)
739+
return cloudpickle.loads(response["Body"].read())
740+
except ClientError:
741+
return None

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "fastloop"
3-
version = "0.1.90"
3+
version = "0.1.91"
44
description = "A Python package for deploying stateful loops"
55
readme = "README.md"
66
requires-python = ">=3.12"

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)