@@ -358,7 +358,6 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
358358 self ._cache_request (
359359 cache_key ,
360360 processed_request ,
361- forefront = False ,
362361 hydrated_request = request ,
363362 )
364363 except Exception as exc :
@@ -405,7 +404,6 @@ async def reclaim_request(
405404 self ._cache_request (
406405 cache_key ,
407406 processed_request ,
408- forefront = forefront ,
409407 hydrated_request = request ,
410408 )
411409
@@ -463,9 +461,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
463461 # Try to prolong the lock if it's expired
464462 try :
465463 lock_secs = int (self ._DEFAULT_LOCK_TIME .total_seconds ())
466- response = await self ._prolong_request_lock (
467- request_id , forefront = cached_entry .forefront , lock_secs = lock_secs
468- )
464+ response = await self ._prolong_request_lock (request_id , lock_secs = lock_secs )
469465 cached_entry .lock_expires_at = response .lock_expires_at
470466 except Exception :
471467 # If prolonging the lock fails, we lost the request
@@ -478,7 +474,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
478474 try :
479475 # Try to acquire or prolong the lock
480476 lock_secs = int (self ._DEFAULT_LOCK_TIME .total_seconds ())
481- await self ._prolong_request_lock (request_id , forefront = False , lock_secs = lock_secs )
477+ await self ._prolong_request_lock (request_id , lock_secs = lock_secs )
482478
483479 # Fetch the request data
484480 request = await self .get_request (request_id )
@@ -498,7 +494,6 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
498494 was_already_present = True ,
499495 was_already_handled = request .handled_at is not None ,
500496 ),
501- forefront = False ,
502497 hydrated_request = request ,
503498 )
504499 except Exception as exc :
@@ -569,6 +564,12 @@ async def _list_head(
569564 lock_time = lock_time ,
570565 )
571566
567+ leftover_buffer = list [str ]()
568+ if self ._should_check_for_forefront_requests :
569+ leftover_buffer = list (self ._queue_head )
570+ self ._queue_head .clear ()
571+ self ._should_check_for_forefront_requests = False
572+
572573 # Otherwise fetch from API
573574 lock_time = lock_time or self ._DEFAULT_LOCK_TIME
574575 lock_secs = int (lock_time .total_seconds ())
@@ -581,15 +582,6 @@ async def _list_head(
581582 # Update the queue head cache
582583 self ._queue_has_locked_requests = response .get ('queueHasLockedRequests' , False )
583584
584- # Clear current queue head if we're checking for forefront requests
585- if self ._should_check_for_forefront_requests :
586- self ._queue_head .clear ()
587- self ._should_check_for_forefront_requests = False
588-
589- # Process and cache the requests
590- head_id_buffer = list [str ]()
591- forefront_head_id_buffer = list [str ]()
592-
593585 for request_data in response .get ('items' , []):
594586 request = Request .model_validate (request_data )
595587
@@ -604,59 +596,46 @@ async def _list_head(
604596 )
605597 continue
606598
607- # Check if this request was already cached and if it was added to forefront
608- cache_key = unique_key_to_request_id (request .unique_key )
609- cached_request = self ._requests_cache .get (cache_key )
610- forefront = cached_request .forefront if cached_request else False
611-
612- # Add to appropriate buffer based on forefront flag
613- if forefront :
614- forefront_head_id_buffer .insert (0 , request .id )
615- else :
616- head_id_buffer .append (request .id )
617-
618599 # Cache the request
619600 self ._cache_request (
620- cache_key ,
601+ unique_key_to_request_id ( request . unique_key ) ,
621602 ProcessedRequest (
622603 id = request .id ,
623604 unique_key = request .unique_key ,
624605 was_already_present = True ,
625606 was_already_handled = False ,
626607 ),
627- forefront = forefront ,
628608 hydrated_request = request ,
629609 )
630610
631- # Update the queue head deque
632- for request_id in head_id_buffer :
633- self ._queue_head .append (request_id )
611+ self ._queue_head .append (request .id )
634612
635- for request_id in forefront_head_id_buffer :
636- self ._queue_head .appendleft (request_id )
613+ for leftover_request_id in leftover_buffer :
614+ # After adding new requests to the forefront, any existing leftover locked request is kept in the end.
615+ self ._queue_head .append (leftover_request_id )
637616
638617 return RequestQueueHead .model_validate (response )
639618
640619 async def _prolong_request_lock (
641620 self ,
642621 request_id : str ,
643622 * ,
644- forefront : bool = False ,
645623 lock_secs : int ,
646624 ) -> ProlongRequestLockResponse :
647625 """Prolong the lock on a specific request in the queue.
648626
649627 Args:
650628 request_id: The identifier of the request whose lock is to be prolonged.
651- forefront: Whether to put the request in the beginning or the end of the queue after lock expires.
652629 lock_secs: The additional amount of time, in seconds, that the request will remain locked.
653630
654631 Returns:
655632 A response containing the time at which the lock will expire.
656633 """
657634 response = await self ._api_client .prolong_request_lock (
658635 request_id = request_id ,
659- forefront = forefront ,
636+ # All requests reaching this code were the tip of the queue at the moment when they were fetched,
637+ # so if their lock expires, they should be put back to the forefront as their handling is long overdue.
638+ forefront = True ,
660639 lock_secs = lock_secs ,
661640 )
662641
@@ -703,7 +682,6 @@ def _cache_request(
703682 cache_key : str ,
704683 processed_request : ProcessedRequest ,
705684 * ,
706- forefront : bool ,
707685 hydrated_request : Request | None = None ,
708686 ) -> None :
709687 """Cache a request for future use.
@@ -719,5 +697,4 @@ def _cache_request(
719697 was_already_handled = processed_request .was_already_handled ,
720698 hydrated = hydrated_request ,
721699 lock_expires_at = None ,
722- forefront = forefront ,
723700 )
0 commit comments