@@ -142,11 +142,17 @@ def __init__(
142142 self ._stop_wake_monitor = threading .Event ()
143143 self .wake_thread : threading .Thread | None = None
144144
145- if self .wake_queue :
145+ if self .wake_queue is not None :
146+ logger .info (
147+ "Starting wake monitoring thread" ,
148+ extra = {"app_name" : app_name },
149+ )
146150 self .wake_thread = threading .Thread (
147- target = self ._run_wake_monitoring , daemon = True
151+ target = self ._run_wake_monitoring , daemon = True , name = "wake-monitor"
148152 )
149153 self .wake_thread .start ()
154+ else :
155+ logger .warning ("Wake queue not provided, wake monitoring disabled" )
150156
151157 def stop (self ):
152158 self ._stop_wake_monitor .set ()
@@ -161,11 +167,24 @@ def _run_wake_monitoring(self):
161167
162168 The thread will automatically reconnect on Redis connection errors.
163169 """
170+ import traceback
171+
164172 import redis as sync_redis
165173
166174 from ..logging import setup_logger
167175
168176 logger = setup_logger (__name__ )
177+ logger .info (
178+ "Wake monitoring thread starting" ,
179+ extra = {
180+ "redis_host" : self .config .host ,
181+ "redis_port" : self .config .port ,
182+ "app_name" : self .app_name ,
183+ },
184+ )
185+
186+ heartbeat_interval = 60
187+ last_heartbeat = 0.0
169188
170189 while not self ._stop_wake_monitor .is_set ():
171190 rdb = None
@@ -179,7 +198,9 @@ def _run_wake_monitoring(self):
179198 ssl = self .config .ssl ,
180199 )
181200
182- logger .info ("Wake monitoring thread started" )
201+ rdb .ping ()
202+ logger .info ("Wake monitoring thread connected to Redis" )
203+
183204 due_count = self ._process_due_wakes (rdb )
184205 if due_count > 0 :
185206 logger .info (
@@ -204,6 +225,14 @@ def _run_wake_monitoring(self):
204225 },
205226 )
206227
228+ now = time .time ()
229+ if now - last_heartbeat >= heartbeat_interval :
230+ logger .info (
231+ "Wake monitor heartbeat" ,
232+ extra = {"queue_size" : self .wake_queue .qsize ()},
233+ )
234+ last_heartbeat = now
235+
207236 except sync_redis .exceptions .ConnectionError as e :
208237 logger .warning (
209238 f"Redis connection error in wake monitor: { e } , reconnecting"
@@ -216,7 +245,10 @@ def _run_wake_monitoring(self):
216245 )
217246 time .sleep (5 )
218247 except Exception as e :
219- logger .error (f"Wake monitoring thread error: { e } , retrying in 5s" )
248+ logger .error (
249+ f"Wake monitoring thread error: { e } , retrying in 5s" ,
250+ extra = {"traceback" : traceback .format_exc ()},
251+ )
220252 time .sleep (5 )
221253 finally :
222254 if rdb :
@@ -231,16 +263,39 @@ def _process_due_wakes(self, rdb) -> int:
231263 processed = 0
232264
233265 loop_schedule_key = RedisKeys .LOOP_WAKE_SCHEDULE .format (app_name = self .app_name )
266+
267+ # Check total pending wakes for debugging
268+ all_loop_wakes = rdb .zrange (loop_schedule_key , 0 , - 1 , withscores = True )
269+ if all_loop_wakes :
270+ pending_info = [
271+ {"id" : w [0 ].decode (), "wake_at" : w [1 ], "in_seconds" : w [1 ] - now }
272+ for w in all_loop_wakes [:5 ] # Limit to 5 for log size
273+ ]
274+ logger .debug (
275+ "Pending loop wakes in ZSET" ,
276+ extra = {
277+ "count" : len (all_loop_wakes ),
278+ "now" : now ,
279+ "pending" : pending_info ,
280+ },
281+ )
282+
234283 due_loop_wakes : list [bytes ] = rdb .zrangebyscore (loop_schedule_key , "-inf" , now )
235284 for loop_id_bytes in due_loop_wakes :
236285 loop_id = loop_id_bytes .decode ("utf-8" )
237- if rdb .zrem (loop_schedule_key , loop_id ):
286+ removed = rdb .zrem (loop_schedule_key , loop_id )
287+ if removed :
238288 logger .info (
239289 "Due loop wake found, queuing" ,
240- extra = {"loop_id" : loop_id },
290+ extra = {"loop_id" : loop_id , "now" : now },
241291 )
242292 self .wake_queue .put (loop_id )
243293 processed += 1
294+ else :
295+ logger .warning (
296+ "Due loop wake ZREM failed - already claimed by another replica" ,
297+ extra = {"loop_id" : loop_id },
298+ )
244299
245300 workflow_schedule_key = RedisKeys .WORKFLOW_WAKE_SCHEDULE .format (
246301 app_name = self .app_name
@@ -250,13 +305,19 @@ def _process_due_wakes(self, rdb) -> int:
250305 )
251306 for workflow_run_id_bytes in due_workflow_wakes :
252307 workflow_run_id = workflow_run_id_bytes .decode ("utf-8" )
253- if rdb .zrem (workflow_schedule_key , workflow_run_id ):
308+ removed = rdb .zrem (workflow_schedule_key , workflow_run_id )
309+ if removed :
254310 logger .info (
255311 "Due workflow wake found, queuing" ,
256312 extra = {"workflow_run_id" : workflow_run_id },
257313 )
258314 self .wake_queue .put (f"workflow:{ workflow_run_id } " )
259315 processed += 1
316+ else :
317+ logger .warning (
318+ "Due workflow wake ZREM failed - already claimed by another replica" ,
319+ extra = {"workflow_run_id" : workflow_run_id },
320+ )
260321
261322 return processed
262323
@@ -433,11 +494,33 @@ async def heartbeat():
433494 heartbeat_task .cancel ()
434495 with suppress (asyncio .CancelledError ):
435496 await heartbeat_task
436- await self ._script_conditional_delete (keys = [lease_key ], args = [owner_id ])
437- logger .debug (
438- f"{ entity_type .title ()} claim released" ,
439- extra = {f"{ entity_type } _id" : entity_id },
440- )
497+
498+ try :
499+ deleted = await self ._script_conditional_delete (
500+ keys = [lease_key ], args = [owner_id ]
501+ )
502+ if deleted :
503+ logger .info (
504+ f"{ entity_type .title ()} claim released" ,
505+ extra = {f"{ entity_type } _id" : entity_id , "owner_id" : owner_id },
506+ )
507+ else :
508+ current_value = await self .rdb .get (lease_key )
509+ logger .warning (
510+ f"{ entity_type .title ()} claim release failed - owner mismatch or already released" ,
511+ extra = {
512+ f"{ entity_type } _id" : entity_id ,
513+ "owner_id" : owner_id ,
514+ "current_owner" : current_value .decode ()
515+ if current_value
516+ else None ,
517+ },
518+ )
519+ except Exception as e :
520+ logger .error (
521+ f"{ entity_type .title ()} claim release error" ,
522+ extra = {f"{ entity_type } _id" : entity_id , "error" : str (e )},
523+ )
441524
442525 @asynccontextmanager
443526 async def with_claim (self , loop_id : str ) -> AsyncGenerator [None , None ]: # type: ignore
@@ -457,10 +540,21 @@ async def with_claim(self, loop_id: str) -> AsyncGenerator[None, None]: # type:
457540 yield
458541
459542 async def has_claim (self , loop_id : str ) -> bool :
460- result = await self .rdb .get (
461- RedisKeys .LOOP_CLAIM .format (app_name = self .app_name , loop_id = loop_id )
462- )
463- return result is not None
543+ claim_key = RedisKeys .LOOP_CLAIM .format (app_name = self .app_name , loop_id = loop_id )
544+ result = await self .rdb .get (claim_key )
545+
546+ has_it = result is not None
547+ if has_it :
548+ ttl = await self .rdb .ttl (claim_key )
549+ logger .debug (
550+ "Loop has active claim" ,
551+ extra = {
552+ "loop_id" : loop_id ,
553+ "owner_id" : result .decode () if result else None ,
554+ "ttl" : ttl ,
555+ },
556+ )
557+ return has_it
464558
465559 async def try_claim_loop_recovery (self , loop_id : str ) -> bool :
466560 """Atomically claim right to recover an orphaned loop. Returns True if won."""
0 commit comments