Skip to content

Commit a51fb94

Browse files
fix wake
1 parent 56f6ac7 commit a51fb94

3 files changed

Lines changed: 29 additions & 106 deletions

File tree

fastloop/state/state_redis.py

Lines changed: 27 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ class RedisKeys:
5252
LOOP_CONTEXT = f"{KEY_PREFIX}:{{app_name}}:context:{{loop_id}}:{{key}}"
5353
LOOP_NONCE = f"{KEY_PREFIX}:{{app_name}}:nonce:{{loop_id}}"
5454
LOOP_EVENT_CHANNEL = f"{KEY_PREFIX}:{{app_name}}:events:{{loop_id}}:notify"
55-
LOOP_WAKE_KEY = f"{KEY_PREFIX}:{{app_name}}:wake:{{loop_id}}"
5655
LOOP_WAKE_SCHEDULE = f"{KEY_PREFIX}:{{app_name}}:wake_schedule"
5756
LOOP_MAPPING = f"{KEY_PREFIX}:{{app_name}}:mapping:{{external_ref_id}}"
5857
LOOP_CONNECTION_INDEX = f"{KEY_PREFIX}:{{app_name}}:connection_index:{{loop_id}}"
@@ -63,7 +62,6 @@ class RedisKeys:
6362
WORKFLOW_INDEX = f"{KEY_PREFIX}:{{app_name}}:workflow_index"
6463
WORKFLOW_STATE = f"{KEY_PREFIX}:{{app_name}}:workflow:{{workflow_run_id}}"
6564
WORKFLOW_CLAIM = f"{KEY_PREFIX}:{{app_name}}:workflow_claim:{{workflow_run_id}}"
66-
WORKFLOW_WAKE_KEY = f"{KEY_PREFIX}:{{app_name}}:workflow_wake:{{workflow_run_id}}"
6765
WORKFLOW_WAKE_SCHEDULE = f"{KEY_PREFIX}:{{app_name}}:workflow_wake_schedule"
6866
WORKFLOW_BLOCK_OUTPUT = (
6967
f"{KEY_PREFIX}:{{app_name}}:workflow_block_output:{{workflow_run_id}}"
@@ -156,11 +154,10 @@ def stop(self):
156154
self.wake_thread.join(timeout=2.0)
157155

158156
def _run_wake_monitoring(self):
159-
"""Background thread for reliable wake scheduling using ZSET + periodic reconciliation.
157+
"""Background thread for wake scheduling using ZSET reconciliation.
160158
161-
This thread uses two mechanisms for reliability:
162-
1. Redis keyspace notifications for immediate wake on TTL key expiry
163-
2. Periodic ZSET reconciliation as a fallback
159+
Polls the wake schedule ZSET every WAKE_RECONCILIATION_INTERVAL_S seconds
160+
for due wakes. Uses atomic ZREM to ensure only one replica processes each wake.
164161
165162
The thread will automatically reconnect on Redis connection errors.
166163
"""
@@ -172,7 +169,6 @@ def _run_wake_monitoring(self):
172169

173170
while not self._stop_wake_monitor.is_set():
174171
rdb = None
175-
pubsub = None
176172

177173
try:
178174
rdb = sync_redis.Redis(
@@ -183,63 +179,36 @@ def _run_wake_monitoring(self):
183179
ssl=self.config.ssl,
184180
)
185181

186-
with suppress(sync_redis.exceptions.ResponseError):
187-
rdb.config_set("notify-keyspace-events", "Ex")
188-
189-
logger.info("Wake monitoring thread started, processing due wakes")
182+
logger.info("Wake monitoring thread started")
190183
due_count = self._process_due_wakes(rdb)
191184
if due_count > 0:
192185
logger.info(
193186
"Processed due wakes on startup",
194187
extra={"count": due_count},
195188
)
196189

197-
pubsub = rdb.pubsub()
198-
pubsub.psubscribe("__keyevent@*__:expired")
199-
last_reconciliation = time.time()
200-
201190
while not self._stop_wake_monitor.is_set():
202191
try:
203-
message = pubsub.get_message(timeout=0.1)
204-
205-
if message and message["type"] == "pmessage":
206-
try:
207-
key = message["data"].decode("utf-8")
208-
if f":{self.app_name}:wake:" in key:
209-
loop_id = key.split(":")[-1]
210-
logger.info(
211-
"Loop wake key expired",
212-
extra={"loop_id": loop_id},
213-
)
214-
self._queue_wake(rdb, loop_id)
215-
elif f":{self.app_name}:workflow_wake:" in key:
216-
workflow_run_id = key.split(":")[-1]
217-
logger.info(
218-
"Workflow wake key expired",
219-
extra={"workflow_run_id": workflow_run_id},
220-
)
221-
self._queue_wake(rdb, workflow_run_id)
222-
except Exception as e:
223-
logger.error(f"Error processing wake notification: {e}")
224-
225-
now = time.time()
226-
if now - last_reconciliation >= WAKE_RECONCILIATION_INTERVAL_S:
227-
due_count = self._process_due_wakes(rdb)
228-
if due_count > 0:
229-
logger.info(
230-
"Wake reconciliation processed due wakes",
231-
extra={
232-
"count": due_count,
233-
"queue_size": self.wake_queue.qsize(),
234-
},
235-
)
236-
last_reconciliation = now
192+
time.sleep(WAKE_RECONCILIATION_INTERVAL_S)
193+
194+
if self._stop_wake_monitor.is_set():
195+
break
196+
197+
due_count = self._process_due_wakes(rdb)
198+
if due_count > 0:
199+
logger.info(
200+
"Processed due wakes",
201+
extra={
202+
"count": due_count,
203+
"queue_size": self.wake_queue.qsize(),
204+
},
205+
)
237206

238207
except sync_redis.exceptions.ConnectionError as e:
239208
logger.warning(
240-
f"Redis connection error in wake monitor inner loop: {e}, reconnecting"
209+
f"Redis connection error in wake monitor: {e}, reconnecting"
241210
)
242-
break # Break inner loop to reconnect
211+
break
243212

244213
except sync_redis.exceptions.ConnectionError as e:
245214
logger.warning(
@@ -250,9 +219,6 @@ def _run_wake_monitoring(self):
250219
logger.error(f"Wake monitoring thread error: {e}, retrying in 5s")
251220
time.sleep(5)
252221
finally:
253-
if pubsub:
254-
with suppress(Exception):
255-
pubsub.close()
256222
if rdb:
257223
with suppress(Exception):
258224
rdb.close()
@@ -294,22 +260,6 @@ def _process_due_wakes(self, rdb) -> int:
294260

295261
return processed
296262

297-
def _queue_wake(self, rdb, loop_id: str) -> bool:
298-
"""Remove loop/workflow from schedule and queue wake. Returns True if queued."""
299-
loop_schedule_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name=self.app_name)
300-
if rdb.zrem(loop_schedule_key, loop_id):
301-
self.wake_queue.put(loop_id)
302-
return True
303-
304-
workflow_schedule_key = RedisKeys.WORKFLOW_WAKE_SCHEDULE.format(
305-
app_name=self.app_name
306-
)
307-
if rdb.zrem(workflow_schedule_key, loop_id):
308-
self.wake_queue.put(f"workflow:{loop_id}")
309-
return True
310-
311-
return False
312-
313263
async def set_loop_mapping(self, external_ref_id: str, loop_id: str):
314264
await self.rdb.set(
315265
RedisKeys.LOOP_MAPPING.format(
@@ -391,13 +341,9 @@ async def update_loop_status(self, loop_id: str, status: LoopStatus) -> LoopStat
391341

392342
if status == LoopStatus.STOPPED:
393343
schedule_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name=self.app_name)
394-
wake_key = RedisKeys.LOOP_WAKE_KEY.format(
395-
app_name=self.app_name, loop_id=loop_id
396-
)
397344
async with self.rdb.pipeline(transaction=True) as pipe:
398345
pipe.set(state_key, loop.to_string())
399346
pipe.zrem(schedule_key, loop_id)
400-
pipe.delete(wake_key)
401347
await pipe.execute()
402348
else:
403349
await self.rdb.set(state_key, loop.to_string())
@@ -731,20 +677,16 @@ async def pop_event(
731677
return None
732678

733679
async def set_wake_time(self, loop_id: str, timestamp: float) -> None:
734-
"""Schedule a wake time. Uses ZSET (source of truth) + TTL key (fast notification)."""
680+
"""Schedule a wake time using ZSET. Reconciliation thread polls for due wakes."""
735681
if timestamp <= time.time():
736682
raise ValueError("Timestamp is in the past")
737683

738684
schedule_key = RedisKeys.LOOP_WAKE_SCHEDULE.format(app_name=self.app_name)
739-
wake_key = RedisKeys.LOOP_WAKE_KEY.format(
740-
app_name=self.app_name, loop_id=loop_id
685+
await self.rdb.zadd(schedule_key, {loop_id: timestamp})
686+
logger.info(
687+
"Loop wake scheduled",
688+
extra={"loop_id": loop_id, "wake_timestamp": timestamp},
741689
)
742-
ttl_ms = max(1, int((timestamp - time.time()) * 1000))
743-
744-
async with self.rdb.pipeline(transaction=True) as pipe:
745-
pipe.zadd(schedule_key, {loop_id: timestamp})
746-
pipe.set(wake_key, "1", px=ttl_ms)
747-
await pipe.execute()
748690

749691
async def get_initial_event(self, loop_id: str) -> "LoopEvent | None":
750692
"""Get the initial event for a loop."""
@@ -1032,23 +974,18 @@ async def get_all_workflows(
1032974
async def set_workflow_wake_time(
1033975
self, workflow_run_id: str, timestamp: float
1034976
) -> None:
1035-
"""Schedule a workflow wake time using ZSET + TTL key."""
977+
"""Schedule a workflow wake time using ZSET. Reconciliation thread polls for due wakes."""
1036978
if timestamp <= time.time():
1037979
raise ValueError("Timestamp is in the past")
1038980

1039981
schedule_key = RedisKeys.WORKFLOW_WAKE_SCHEDULE.format(app_name=self.app_name)
1040-
wake_key = RedisKeys.WORKFLOW_WAKE_KEY.format(
1041-
app_name=self.app_name, workflow_run_id=workflow_run_id
1042-
)
1043-
ttl_ms = max(1, int((timestamp - time.time()) * 1000))
1044982

1045983
workflow = await self.get_workflow(workflow_run_id)
1046984
workflow.scheduled_wake_time = timestamp
1047985
workflow.status = LoopStatus.IDLE
1048986

1049987
async with self.rdb.pipeline(transaction=True) as pipe:
1050988
pipe.zadd(schedule_key, {workflow_run_id: timestamp})
1051-
pipe.set(wake_key, "1", px=ttl_ms)
1052989
pipe.set(
1053990
RedisKeys.WORKFLOW_STATE.format(
1054991
app_name=self.app_name, workflow_run_id=workflow_run_id
@@ -1062,22 +999,13 @@ async def set_workflow_wake_time(
1062999
extra={
10631000
"workflow_run_id": workflow_run_id,
10641001
"wake_timestamp": timestamp,
1065-
"ttl_ms": ttl_ms,
10661002
},
10671003
)
10681004

10691005
async def clear_workflow_wake_time(self, workflow_run_id: str) -> None:
10701006
"""Clear any scheduled workflow wake time."""
10711007
schedule_key = RedisKeys.WORKFLOW_WAKE_SCHEDULE.format(app_name=self.app_name)
1072-
wake_key = RedisKeys.WORKFLOW_WAKE_KEY.format(
1073-
app_name=self.app_name, workflow_run_id=workflow_run_id
1074-
)
1075-
1076-
async with self.rdb.pipeline(transaction=True) as pipe:
1077-
pipe.zrem(schedule_key, workflow_run_id)
1078-
pipe.delete(wake_key)
1079-
await pipe.execute()
1080-
1008+
await self.rdb.zrem(schedule_key, workflow_run_id)
10811009
logger.info(
10821010
"Workflow wake cleared",
10831011
extra={"workflow_run_id": workflow_run_id},
@@ -1086,13 +1014,8 @@ async def clear_workflow_wake_time(self, workflow_run_id: str) -> None:
10861014
async def try_claim_workflow_wake(self, workflow_run_id: str) -> bool:
10871015
"""Atomically try to claim a workflow wake. Returns True if this caller won the race."""
10881016
schedule_key = RedisKeys.WORKFLOW_WAKE_SCHEDULE.format(app_name=self.app_name)
1089-
wake_key = RedisKeys.WORKFLOW_WAKE_KEY.format(
1090-
app_name=self.app_name, workflow_run_id=workflow_run_id
1091-
)
1092-
10931017
removed = await self.rdb.zrem(schedule_key, workflow_run_id)
10941018
if removed:
1095-
await self.rdb.delete(wake_key)
10961019
logger.info(
10971020
"Workflow wake claimed",
10981021
extra={"workflow_run_id": workflow_run_id},

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "fastloop"
3-
version = "0.1.117"
3+
version = "0.1.119"
44
description = "A Python package for deploying stateful loops"
55
readme = "README.md"
66
requires-python = ">=3.12"

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)