Skip to content

Commit 5350913

Browse files
fix race
1 parent faba7ca commit 5350913

4 files changed

Lines changed: 154 additions & 81 deletions

File tree

fastloop/fastloop.py

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -796,28 +796,42 @@ def stop(self) -> None:
796796
self._stop_event.set()
797797

798798
async def _process_wake(self, wake_id: str) -> None:
799+
logger.info(
800+
"Processing wake from queue",
801+
extra={"wake_id": wake_id},
802+
)
799803
if wake_id.startswith("workflow:"):
800804
workflow_id = wake_id[9:]
801805
if await self.state_manager.workflow_has_claim(workflow_id):
806+
logger.info(
807+
"Workflow has active claim, skipping wake",
808+
extra={"workflow_id": workflow_id},
809+
)
802810
return
803811
logger.info(
804812
"Workflow woke up, attempting restart",
805813
extra={"workflow_id": workflow_id},
806814
)
807-
if await self.fastloop_instance.restart_workflow(workflow_id):
808-
await self.state_manager.clear_workflow_wake_time(workflow_id)
809-
logger.info(
810-
"Workflow restarted successfully",
811-
extra={"workflow_id": workflow_id},
812-
)
813-
else:
814-
await self.state_manager.clear_workflow_wake_time(workflow_id)
815-
await self.state_manager.update_workflow_status(
816-
workflow_id, LoopStatus.STOPPED
817-
)
818-
logger.warning(
819-
"Workflow restart failed, marked as stopped",
820-
extra={"workflow_id": workflow_id},
815+
try:
816+
if await self.fastloop_instance.restart_workflow(workflow_id):
817+
await self.state_manager.clear_workflow_wake_time(workflow_id)
818+
logger.info(
819+
"Workflow restarted successfully",
820+
extra={"workflow_id": workflow_id},
821+
)
822+
else:
823+
await self.state_manager.clear_workflow_wake_time(workflow_id)
824+
await self.state_manager.update_workflow_status(
825+
workflow_id, LoopStatus.STOPPED
826+
)
827+
logger.warning(
828+
"Workflow restart failed, marked as stopped",
829+
extra={"workflow_id": workflow_id},
830+
)
831+
except Exception as e:
832+
logger.error(
833+
"Error restarting workflow from wake",
834+
extra={"workflow_id": workflow_id, "error": str(e)},
821835
)
822836
else:
823837
loop_id = wake_id
@@ -862,7 +876,12 @@ async def _check_orphaned_workflows(self) -> None:
862876
)
863877

864878
async def _check_scheduled_workflows(self) -> None:
865-
"""Check for IDLE workflows with past-due scheduled wake times."""
879+
"""Check for IDLE workflows with past-due scheduled wake times.
880+
881+
This is a backup mechanism that catches workflows that may have been
882+
removed from the ZSET but not yet processed (e.g., if the wake queue
883+
consumer failed or the wake monitoring thread died).
884+
"""
866885
import time
867886

868887
now = time.time()
@@ -876,19 +895,25 @@ async def _check_scheduled_workflows(self) -> None:
876895
continue
877896
if await self.state_manager.workflow_has_claim(workflow.workflow_id):
878897
continue
879-
if not await self.state_manager.try_claim_workflow_wake(
898+
# Try to claim from ZSET first (atomic dedup across replicas)
899+
# If that fails, the workflow may have already been removed from ZSET
900+
# but not processed - still try to restart it as a fallback
901+
claimed_from_zset = await self.state_manager.try_claim_workflow_wake(
880902
workflow.workflow_id
881-
):
882-
continue
903+
)
883904
logger.info(
884905
"IDLE workflow has past-due wake time, restarting",
885906
extra={
886907
"workflow_id": workflow.workflow_id,
887908
"scheduled_wake_time": workflow.scheduled_wake_time,
888909
"block_index": workflow.current_block_index,
910+
"claimed_from_zset": claimed_from_zset,
889911
},
890912
)
891-
if not await self.fastloop_instance.restart_workflow(workflow.workflow_id):
913+
if await self.fastloop_instance.restart_workflow(workflow.workflow_id):
914+
await self.state_manager.clear_workflow_wake_time(workflow.workflow_id)
915+
else:
916+
await self.state_manager.clear_workflow_wake_time(workflow.workflow_id)
892917
await self.state_manager.update_workflow_status(
893918
workflow.workflow_id, LoopStatus.STOPPED
894919
)
@@ -960,14 +985,30 @@ async def _process_app_start_callbacks(self) -> None:
960985
await self.state_manager.release_app_start_lock(loop.loop_id)
961986

962987
async def run(self):
988+
from queue import Empty
989+
963990
if not self._app_start_processed:
964991
await self._process_app_start_callbacks()
965992
self._app_start_processed = True
966993

967994
while not self._stop_event.is_set():
968995
try:
969-
while not self.wake_queue.empty():
970-
await self._process_wake(self.wake_queue.get_nowait())
996+
# Process all pending wakes, handling errors individually
997+
# Use get_nowait in a try/except to avoid race between empty() and get()
998+
wakes_processed = 0
999+
while True:
1000+
try:
1001+
wake_id = self.wake_queue.get_nowait()
1002+
wakes_processed += 1
1003+
try:
1004+
await self._process_wake(wake_id)
1005+
except Exception as e:
1006+
logger.error(
1007+
"Error processing wake",
1008+
extra={"wake_id": wake_id, "error": str(e)},
1009+
)
1010+
except Empty:
1011+
break
9711012

9721013
await self._check_orphaned_loops()
9731014
await self._check_orphaned_workflows()

fastloop/state/state_redis.py

Lines changed: 90 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -107,76 +107,108 @@ def stop(self):
107107
self.wake_thread.join(timeout=2.0)
108108

109109
def _run_wake_monitoring(self):
110-
"""Background thread for reliable wake scheduling using ZSET + periodic reconciliation."""
110+
"""Background thread for reliable wake scheduling using ZSET + periodic reconciliation.
111+
112+
This thread uses two mechanisms for reliability:
113+
1. Redis keyspace notifications for immediate wake on TTL key expiry
114+
2. Periodic ZSET reconciliation as a fallback
115+
116+
The thread will automatically reconnect on Redis connection errors.
117+
"""
111118
import redis as sync_redis
112119

113120
from ..logging import setup_logger
114121

115122
logger = setup_logger(__name__)
116-
rdb = None
117-
pubsub = None
118-
119-
try:
120-
rdb = sync_redis.Redis(
121-
host=self.config.host,
122-
port=self.config.port,
123-
db=self.config.database,
124-
password=self.config.password,
125-
ssl=self.config.ssl,
126-
)
127123

128-
with suppress(sync_redis.exceptions.ResponseError):
129-
rdb.config_set("notify-keyspace-events", "Ex")
124+
while not self._stop_wake_monitor.is_set():
125+
rdb = None
126+
pubsub = None
130127

131-
logger.info("Wake monitoring thread started, processing due wakes")
132-
due_count = self._process_due_wakes(rdb)
133-
if due_count > 0:
134-
logger.info(
135-
"Processed due wakes on startup",
136-
extra={"count": due_count},
128+
try:
129+
rdb = sync_redis.Redis(
130+
host=self.config.host,
131+
port=self.config.port,
132+
db=self.config.database,
133+
password=self.config.password,
134+
ssl=self.config.ssl,
137135
)
138136

139-
pubsub = rdb.pubsub()
140-
pubsub.psubscribe("__keyevent@*__:expired")
141-
last_reconciliation = time.time()
137+
with suppress(sync_redis.exceptions.ResponseError):
138+
rdb.config_set("notify-keyspace-events", "Ex")
139+
140+
logger.info("Wake monitoring thread started, processing due wakes")
141+
due_count = self._process_due_wakes(rdb)
142+
if due_count > 0:
143+
logger.info(
144+
"Processed due wakes on startup",
145+
extra={"count": due_count},
146+
)
142147

143-
while not self._stop_wake_monitor.is_set():
144-
message = pubsub.get_message(timeout=0.1)
148+
pubsub = rdb.pubsub()
149+
pubsub.psubscribe("__keyevent@*__:expired")
150+
last_reconciliation = time.time()
145151

146-
if message and message["type"] == "pmessage":
152+
while not self._stop_wake_monitor.is_set():
147153
try:
148-
key = message["data"].decode("utf-8")
149-
if f":{self.app_name}:wake:" in key:
150-
loop_id = key.split(":")[-1]
151-
logger.info(
152-
"Loop wake key expired",
153-
extra={"loop_id": loop_id},
154-
)
155-
self._queue_wake(rdb, loop_id)
156-
elif f":{self.app_name}:workflow_wake:" in key:
157-
workflow_id = key.split(":")[-1]
158-
logger.info(
159-
"Workflow wake key expired",
160-
extra={"workflow_id": workflow_id},
161-
)
162-
self._queue_wake(rdb, workflow_id)
163-
except Exception as e:
164-
logger.error(f"Error processing wake notification: {e}")
165-
166-
now = time.time()
167-
if now - last_reconciliation >= WAKE_RECONCILIATION_INTERVAL_S:
168-
self._process_due_wakes(rdb)
169-
last_reconciliation = now
170-
171-
except Exception as e:
172-
logger.error(f"Wake monitoring thread error: {e}")
173-
finally:
174-
if pubsub:
175-
with suppress(Exception):
176-
pubsub.close()
177-
if rdb:
178-
with suppress(Exception):
179-
rdb.close()
154+
message = pubsub.get_message(timeout=0.1)
155+
156+
if message and message["type"] == "pmessage":
157+
try:
158+
key = message["data"].decode("utf-8")
159+
if f":{self.app_name}:wake:" in key:
160+
loop_id = key.split(":")[-1]
161+
logger.info(
162+
"Loop wake key expired",
163+
extra={"loop_id": loop_id},
164+
)
165+
self._queue_wake(rdb, loop_id)
166+
elif f":{self.app_name}:workflow_wake:" in key:
167+
workflow_id = key.split(":")[-1]
168+
logger.info(
169+
"Workflow wake key expired",
170+
extra={"workflow_id": workflow_id},
171+
)
172+
self._queue_wake(rdb, workflow_id)
173+
except Exception as e:
174+
logger.error(f"Error processing wake notification: {e}")
175+
176+
now = time.time()
177+
if now - last_reconciliation >= WAKE_RECONCILIATION_INTERVAL_S:
178+
due_count = self._process_due_wakes(rdb)
179+
if due_count > 0:
180+
logger.info(
181+
"Wake reconciliation processed due wakes",
182+
extra={
183+
"count": due_count,
184+
"queue_size": self.wake_queue.qsize(),
185+
},
186+
)
187+
last_reconciliation = now
188+
189+
except sync_redis.exceptions.ConnectionError as e:
190+
logger.warning(
191+
f"Redis connection error in wake monitor inner loop: {e}, reconnecting"
192+
)
193+
break # Break inner loop to reconnect
194+
195+
except sync_redis.exceptions.ConnectionError as e:
196+
logger.warning(
197+
f"Redis connection error in wake monitor: {e}, retrying in 5s"
198+
)
199+
time.sleep(5)
200+
except Exception as e:
201+
logger.error(f"Wake monitoring thread error: {e}, retrying in 5s")
202+
time.sleep(5)
203+
finally:
204+
if pubsub:
205+
with suppress(Exception):
206+
pubsub.close()
207+
if rdb:
208+
with suppress(Exception):
209+
rdb.close()
210+
211+
logger.info("Wake monitoring thread stopped")
180212

181213
def _process_due_wakes(self, rdb) -> int:
182214
"""Process all wakes with score <= now. Returns count processed."""

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "fastloop"
3-
version = "0.1.91"
3+
version = "0.1.93"
44
description = "A Python package for deploying stateful loops"
55
readme = "README.md"
66
requires-python = ">=3.12"

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)