Skip to content

Commit 63447da

Browse files
more reliable wake events
1 parent 60c1000 commit 63447da

6 files changed

Lines changed: 98 additions & 22 deletions

File tree

fastloop/monitor.py

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def is_healthy(self, max_stale_s: float = 30.0) -> bool:
5454

5555
async def run(self) -> None:
5656
"""Main loop."""
57-
logger.info("LoopMonitor started")
57+
logger.info(f"LoopMonitor started, queue_id={id(self.wake_queue)}")
5858
try:
5959
await self._app_start()
6060
await self._main_loop()
@@ -108,24 +108,47 @@ async def _main_loop(self) -> None:
108108
self._iteration += 1
109109
self._last_tick = time.time()
110110

111-
with contextlib.suppress(Exception):
111+
try:
112112
wakes = await self._drain_queue()
113113
if wakes:
114+
logger.info(f"Processing {len(wakes)} wakes: {wakes}")
114115
await self._process_wakes(wakes)
115116
await self._maintenance()
117+
except Exception as e:
118+
logger.error(f"Monitor iteration error: {e}")
116119

117120
if time.time() - last_heartbeat >= HEARTBEAT_S:
118121
logger.info(
119-
f"LoopMonitor heartbeat: iter={self._iteration} queue={self.wake_queue.qsize()}"
122+
f"LoopMonitor heartbeat: iter={self._iteration} queue={self.wake_queue.qsize()} queue_id={id(self.wake_queue)}"
120123
)
121124
last_heartbeat = time.time()
122125

123126
async def _drain_queue(self) -> list[str]:
124-
"""Get all pending wakes from the queue."""
127+
"""Drain wakes.
128+
129+
Prefer Redis-backed wake queue (cross-process), fallback to in-memory queue.
130+
"""
131+
# Redis-backed: fixes multi-worker + multi-replica.
132+
if hasattr(self.state_manager, "drain_wake_queue"):
133+
try:
134+
wakes = await self.state_manager.drain_wake_queue( # type: ignore[attr-defined]
135+
timeout_s=WATCHDOG_INTERVAL_S
136+
)
137+
if wakes:
138+
logger.info(f"Got {len(wakes)} wakes from redis queue: {wakes[:5]}")
139+
return wakes
140+
except Exception as e:
141+
logger.error(f"Error draining redis wake queue: {e}")
142+
143+
# Fallback: in-memory queue (single-process only).
125144
wakes: list[str] = []
126145
try:
127-
wakes.append(
128-
await asyncio.to_thread(self.wake_queue.get, True, WATCHDOG_INTERVAL_S)
146+
item = await asyncio.to_thread(
147+
self.wake_queue.get, True, WATCHDOG_INTERVAL_S
148+
)
149+
wakes.append(item)
150+
logger.info(
151+
f"Got wake from in-memory queue: {item}, queue_id={id(self.wake_queue)}"
129152
)
130153
while True:
131154
try:
@@ -134,6 +157,8 @@ async def _drain_queue(self) -> list[str]:
134157
break
135158
except Empty:
136159
pass
160+
except Exception as e:
161+
logger.error(f"Error draining in-memory queue: {e}")
137162
return wakes
138163

139164
async def _process_wakes(self, wakes: list[str]) -> None:
@@ -153,15 +178,22 @@ async def _wake_loop(self, loop_id: str) -> None:
153178
if await self.state_manager.has_claim(loop_id):
154179
logger.debug(f"Loop {loop_id} already has claim, skipping wake")
155180
return
181+
182+
if not await self.state_manager.try_claim_loop_wake(loop_id):
183+
logger.debug(f"Loop {loop_id} wake already claimed, skipping")
184+
return
185+
156186
logger.info(f"Waking loop: {loop_id}")
157187
if not await self.restart_callback(loop_id):
158188
logger.warning(f"Failed to restart loop: {loop_id}")
159189

160190
async def _wake_workflow(self, run_id: str) -> None:
161191
if await self.state_manager.workflow_has_claim(run_id):
162192
return
193+
163194
if not await self.fastloop.restart_workflow(run_id):
164195
await self.state_manager.update_workflow_status(run_id, LoopStatus.STOPPED)
196+
165197
await self.state_manager.clear_workflow_wake_time(run_id)
166198

167199
async def _maintenance(self) -> None:

fastloop/state/state.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,11 @@ async def try_acquire_app_start_lock(self, loop_id: str) -> bool:
178178
"""Try to acquire an app start lock for a loop."""
179179
pass
180180

181+
@abstractmethod
182+
async def try_claim_loop_wake(self, loop_id: str) -> bool:
183+
"""Atomically try to claim a loop wake (dedupe across replicas)."""
184+
pass
185+
181186
@abstractmethod
182187
async def release_app_start_lock(self, loop_id: str) -> None:
183188
"""Release the app start lock for a loop."""

fastloop/state/state_redis.py

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,12 @@ class RedisKeys:
5050
LOOP_INITIAL_EVENT = f"{KEY_PREFIX}:{{app_name}}:initial_event:{{loop_id}}"
5151
LOOP_STATE = f"{KEY_PREFIX}:{{app_name}}:state:{{loop_id}}"
5252
LOOP_CLAIM = f"{KEY_PREFIX}:{{app_name}}:claim:{{loop_id}}"
53+
LOOP_WAKE_CLAIM = f"{KEY_PREFIX}:{{app_name}}:wake_claim:{{loop_id}}"
5354
LOOP_CONTEXT = f"{KEY_PREFIX}:{{app_name}}:context:{{loop_id}}:{{key}}"
5455
LOOP_NONCE = f"{KEY_PREFIX}:{{app_name}}:nonce:{{loop_id}}"
5556
LOOP_EVENT_CHANNEL = f"{KEY_PREFIX}:{{app_name}}:events:{{loop_id}}:notify"
5657
LOOP_WAKE_SCHEDULE = f"{KEY_PREFIX}:{{app_name}}:wake_schedule"
58+
WAKE_QUEUE = f"{KEY_PREFIX}:{{app_name}}:wake_queue"
5759
LOOP_MAPPING = f"{KEY_PREFIX}:{{app_name}}:mapping:{{external_ref_id}}"
5860
LOOP_CONNECTION_INDEX = f"{KEY_PREFIX}:{{app_name}}:connection_index:{{loop_id}}"
5961
LOOP_CONNECTION_KEY = (
@@ -140,6 +142,7 @@ def __init__(
140142
)
141143

142144
self.wake_queue: Queue[str] = wake_queue
145+
self._wake_queue_key = RedisKeys.WAKE_QUEUE.format(app_name=self.app_name)
143146
self._stop_wake_monitor = threading.Event()
144147
self.wake_thread: threading.Thread | None = None
145148

@@ -216,34 +219,54 @@ def _process_due_wakes(self, rdb) -> int:
216219
# Process loop wakes
217220
loop_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name=self.app_name)
218221

219-
# Debug: check what's pending in the ZSET
220-
all_pending = rdb.zrange(loop_key, 0, -1, withscores=True)
221-
if all_pending:
222-
first_id, first_score = all_pending[0]
223-
logger.info(
224-
f"Pending wakes: key={loop_key}, count={len(all_pending)}, "
225-
f"first=({first_id.decode()}, {first_score}), now={now}, "
226-
f"due_in={first_score - now:.1f}s"
227-
)
222+
# Keep the wake thread quiet; it runs continuously and can be noisy.
228223

229224
for loop_id_bytes in rdb.zrangebyscore(loop_key, "-inf", now):
230225
loop_id = loop_id_bytes.decode("utf-8")
231226
if rdb.zrem(loop_key, loop_id):
232-
self.wake_queue.put(loop_id)
233227
processed += 1
234-
logger.info(f"Queued wake: {loop_id}")
228+
logger.info(f"Queued wake: {loop_id} -> redis:{self._wake_queue_key}")
229+
# Redis-backed wake queue so any process/replica can consume.
230+
rdb.rpush(self._wake_queue_key, loop_id)
235231

236232
# Process workflow wakes
237233
wf_key = RedisKeys.WORKFLOW_WAKE_SCHEDULE.format(app_name=self.app_name)
238234
for wf_id_bytes in rdb.zrangebyscore(wf_key, "-inf", now):
239235
wf_id = wf_id_bytes.decode("utf-8")
240236
if rdb.zrem(wf_key, wf_id):
241-
self.wake_queue.put(f"{WORKFLOW_WAKE_PREFIX}{wf_id}")
242237
processed += 1
243-
logger.info(f"Queued workflow wake: {wf_id}")
238+
payload = f"{WORKFLOW_WAKE_PREFIX}{wf_id}"
239+
logger.info(
240+
f"Queued workflow wake: {wf_id} -> redis:{self._wake_queue_key}"
241+
)
242+
rdb.rpush(self._wake_queue_key, payload)
244243

245244
return processed
246245

246+
async def drain_wake_queue(
247+
self, *, timeout_s: float, max_items: int = 100
248+
) -> list[str]:
249+
"""Drain wake events from the Redis-backed wake queue.
250+
251+
Uses BLPOP to wait for a single item, then drains remaining items with LPOP.
252+
"""
253+
wakes: list[str] = []
254+
255+
item = await self.rdb.blpop(self._wake_queue_key, timeout=timeout_s) # type: ignore
256+
if not item:
257+
return wakes
258+
259+
_key, value = item
260+
wakes.append(value.decode("utf-8"))
261+
262+
for _ in range(max_items - 1):
263+
v = await self.rdb.lpop(self._wake_queue_key) # type: ignore
264+
if not v:
265+
break
266+
wakes.append(v.decode("utf-8"))
267+
268+
return wakes
269+
247270
async def set_loop_mapping(self, external_ref_id: str, loop_id: str):
248271
await self.rdb.set(
249272
RedisKeys.LOOP_MAPPING.format(
@@ -491,6 +514,14 @@ async def try_claim_loop_recovery(self, loop_id: str) -> bool:
491514
acquired = await self.rdb.set(claim_key, "1", nx=True, ex=60)
492515
return acquired is not None
493516

517+
async def try_claim_loop_wake(self, loop_id: str) -> bool:
518+
"""Atomically try to claim a loop wake. Returns True if this caller won."""
519+
claim_key = RedisKeys.LOOP_WAKE_CLAIM.format(
520+
app_name=self.app_name, loop_id=loop_id
521+
)
522+
acquired = await self.rdb.set(claim_key, "1", nx=True, ex=30)
523+
return acquired is not None
524+
494525
async def get_all_loop_ids(self) -> set[str]:
495526
members = await self.rdb.smembers(
496527
RedisKeys.LOOP_INDEX.format(app_name=self.app_name)
@@ -632,7 +663,8 @@ async def push_event(self, loop_id: str, event: "LoopEvent"):
632663
await pipe.execute()
633664

634665
if event.sender == LoopEventSender.CLIENT:
635-
self.wake_queue.put_nowait(loop_id)
666+
# Wake via Redis-backed wake queue so any process/replica can consume it.
667+
await self.rdb.rpush(self._wake_queue_key, loop_id) # type: ignore
636668

637669
async def get_context_value(self, loop_id: str, key: str) -> Any:
638670
value_str = await self.rdb.get(

fastloop/state/state_s3.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,13 @@ async def try_acquire_app_start_lock(self, loop_id: str) -> bool:
559559
return False
560560
raise
561561

562+
async def try_claim_loop_wake(self, loop_id: str) -> bool:
563+
"""Best-effort loop wake claim for S3 state.
564+
565+
Redis is the intended scalable state backend for multi-replica wakes.
566+
"""
567+
return True
568+
562569
async def release_app_start_lock(self, loop_id: str) -> None:
563570
"""Release the app start lock."""
564571
lock_key = f"{self.prefix}/{self.app_name}/app_start_lock/{loop_id}.lock"

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "fastloop"
3-
version = "0.1.126"
3+
version = "0.1.128"
44
description = "A Python package for deploying stateful loops"
55
readme = "README.md"
66
requires-python = ">=3.12"

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)