11from __future__ import annotations
22
33import asyncio
4+ import re
5+ from base64 import b64encode
46from collections import deque
57from datetime import datetime , timedelta , timezone
8+ from hashlib import sha256
69from logging import getLogger
710from typing import TYPE_CHECKING , Final
811
@@ -320,16 +323,16 @@ async def add_batch_of_requests(
320323 return api_response
321324
322325 @override
323- async def get_request (self , request_unique_key : str ) -> Request | None :
324- """Get a request by ID .
326+ async def get_request (self , unique_key : str ) -> Request | None :
327+ """Get a request by unique key .
325328
326329 Args:
327- request_unique_key : Unique key of the request to get.
330+ unique_key : Unique key of the request to get.
328331
329332 Returns:
330333 The request or None if not found.
331334 """
332- response = await self ._api_client .get_request_by_unique_key ( request_unique_key )
335+ response = await self ._api_client .get_request ( unique_key_to_request_id ( unique_key ) )
333336
334337 if response is None :
335338 return None
@@ -357,23 +360,23 @@ async def fetch_next_request(self) -> Request | None:
357360 return None
358361
359362 # Get the next request ID from the queue head
360- next_request_id = self ._queue_head .popleft ()
363+ next_unique_key = self ._queue_head .popleft ()
361364
362- request = await self ._get_or_hydrate_request (next_request_id )
365+ request = await self ._get_or_hydrate_request (next_unique_key )
363366
364367 # Handle potential inconsistency where request might not be in the main table yet
365368 if request is None :
366369 logger .debug (
367370 'Cannot find a request from the beginning of queue, will be retried later' ,
368- extra = {'nextRequestId ' : next_request_id },
371+ extra = {'nextRequestUniqueKey ' : next_unique_key },
369372 )
370373 return None
371374
372375 # If the request was already handled, skip it
373376 if request .handled_at is not None :
374377 logger .debug (
375378 'Request fetched from the beginning of queue was already handled' ,
376- extra = {'nextRequestId ' : next_request_id },
379+ extra = {'nextRequestUniqueKey ' : next_unique_key },
377380 )
378381 return None
379382
@@ -382,7 +385,7 @@ async def fetch_next_request(self) -> Request | None:
382385 if request is None :
383386 logger .debug (
384387 'Request fetched from the beginning of queue was not found in the RQ' ,
385- extra = {'nextRequestId ' : next_request_id },
388+ extra = {'nextRequestUniqueKey ' : next_unique_key },
386389 )
387390 return None
388391
@@ -509,29 +512,29 @@ async def _ensure_head_is_non_empty(self) -> None:
509512 # Fetch requests from the API and populate the queue head
510513 await self ._list_head (lock_time = self ._DEFAULT_LOCK_TIME )
511514
512- async def _get_or_hydrate_request (self , request_id : str ) -> Request | None :
513- """Get a request by ID , either from cache or by fetching from API.
515+ async def _get_or_hydrate_request (self , unique_key : str ) -> Request | None :
516+ """Get a request by unique key , either from cache or by fetching from API.
514517
515518 Args:
516- request_id: The ID of the request to get.
519+ unique_key: Unique keu of the request to get.
517520
518521 Returns:
519522 The request if found and valid, otherwise None.
520523 """
521524 # First check if the request is in our cache
522- cached_entry = self ._requests_cache .get (request_id )
525+ cached_entry = self ._requests_cache .get (unique_key )
523526
524527 if cached_entry and cached_entry .hydrated :
525528 # If we have the request hydrated in cache, check if lock is expired
526529 if cached_entry .lock_expires_at and cached_entry .lock_expires_at < datetime .now (tz = timezone .utc ):
527530 # Try to prolong the lock if it's expired
528531 try :
529532 lock_secs = int (self ._DEFAULT_LOCK_TIME .total_seconds ())
530- response = await self ._prolong_request_lock (request_id , lock_secs = lock_secs )
533+ response = await self ._prolong_request_lock (unique_key , lock_secs = lock_secs )
531534 cached_entry .lock_expires_at = response .lock_expires_at
532535 except Exception :
533536 # If prolonging the lock fails, we lost the request
534- logger .debug (f'Failed to prolong lock for request { request_id } , returning None' )
537+ logger .debug (f'Failed to prolong lock for request { unique_key } , returning None' )
535538 return None
536539
537540 return cached_entry .hydrated
@@ -540,30 +543,29 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
540543 try :
541544 # Try to acquire or prolong the lock
542545 lock_secs = int (self ._DEFAULT_LOCK_TIME .total_seconds ())
543- await self ._prolong_request_lock (request_id , lock_secs = lock_secs )
546+ await self ._prolong_request_lock (unique_key , lock_secs = lock_secs )
544547
545548 # Fetch the request data
546- request = await self .get_request (request_id )
549+ request = await self .get_request (unique_key )
547550
548551 # If request is not found, release lock and return None
549552 if not request :
550- await self ._delete_request_lock (request_id )
553+ await self ._delete_request_lock (unique_key )
551554 return None
552555
553556 # Update cache with hydrated request
554557 cache_key = request .unique_key
555558 self ._cache_request (
556559 cache_key ,
557560 ProcessedRequest (
558- id = request_id ,
559561 unique_key = request .unique_key ,
560562 was_already_present = True ,
561563 was_already_handled = request .handled_at is not None ,
562564 ),
563565 hydrated_request = request ,
564566 )
565567 except Exception as exc :
566- logger .debug (f'Error fetching or locking request { request_id } : { exc !s} ' )
568+ logger .debug (f'Error fetching or locking request { unique_key } : { exc !s} ' )
567569 return None
568570 else :
569571 return request
@@ -613,8 +615,8 @@ async def _list_head(
613615 logger .debug (f'Using cached queue head with { len (self ._queue_head )} requests' )
614616 # Create a list of requests from the cached queue head
615617 items = []
616- for request_id in list (self ._queue_head )[:limit ]:
617- cached_request = self ._requests_cache .get (request_id )
618+ for unique_key in list (self ._queue_head )[:limit ]:
619+ cached_request = self ._requests_cache .get (unique_key )
618620 if cached_request and cached_request .hydrated :
619621 items .append (cached_request .hydrated )
620622
@@ -671,28 +673,28 @@ async def _list_head(
671673 )
672674 self ._queue_head .append (request .unique_key )
673675
674- for leftover_request_id in leftover_buffer :
676+ for leftover_unique_key in leftover_buffer :
675677 # After adding new requests to the forefront, any existing leftover locked request is kept in the end.
676- self ._queue_head .append (leftover_request_id )
678+ self ._queue_head .append (leftover_unique_key )
677679 return RequestQueueHead .model_validate (response )
678680
679681 async def _prolong_request_lock (
680682 self ,
681- request_unique_key : str ,
683+ unique_key : str ,
682684 * ,
683685 lock_secs : int ,
684686 ) -> ProlongRequestLockResponse :
685687 """Prolong the lock on a specific request in the queue.
686688
687689 Args:
688- request_unique_key : Unique key of the request whose lock is to be prolonged.
690+ unique_key : Unique key of the request whose lock is to be prolonged.
689691 lock_secs: The additional amount of time, in seconds, that the request will remain locked.
690692
691693 Returns:
692694 A response containing the time at which the lock will expire.
693695 """
694- response = await self ._api_client .prolong_request_lock_by_unique_key (
695- request_unique_key = request_unique_key ,
696+ response = await self ._api_client .prolong_request_lock (
697+ request_id = unique_key_to_request_id ( unique_key ) ,
696698 # All requests reaching this code were the tip of the queue at the moment when they were fetched,
697699 # so if their lock expires, they should be put back to the forefront as their handling is long overdue.
698700 forefront = True ,
@@ -705,37 +707,37 @@ async def _prolong_request_lock(
705707
706708 # Update the cache with the new lock expiration
707709 for cached_request in self ._requests_cache .values ():
708- if cached_request .unique_key == request_unique_key :
710+ if cached_request .unique_key == unique_key :
709711 cached_request .lock_expires_at = result .lock_expires_at
710712 break
711713
712714 return result
713715
714716 async def _delete_request_lock (
715717 self ,
716- request_unique_key : str ,
718+ unique_key : str ,
717719 * ,
718720 forefront : bool = False ,
719721 ) -> None :
720722 """Delete the lock on a specific request in the queue.
721723
722724 Args:
723- request_unique_key : Unique key of the request to delete the lock.
725+ unique_key : Unique key of the request to delete the lock.
724726 forefront: Whether to put the request in the beginning or the end of the queue after the lock is deleted.
725727 """
726728 try :
727- await self ._api_client .delete_request_lock_by_unique_key (
728- request_unique_key = request_unique_key ,
729+ await self ._api_client .delete_request_lock (
730+ request_id = unique_key_to_request_id ( unique_key ) ,
729731 forefront = forefront ,
730732 )
731733
732734 # Update the cache to remove the lock
733735 for cached_request in self ._requests_cache .values ():
734- if cached_request .unique_key == request_unique_key :
736+ if cached_request .unique_key == unique_key :
735737 cached_request .lock_expires_at = None
736738 break
737739 except Exception as err :
738- logger .debug (f'Failed to delete request lock for request { request_unique_key } ' , exc_info = err )
740+ logger .debug (f'Failed to delete request lock for request { unique_key } ' , exc_info = err )
739741
740742 def _cache_request (
741743 self ,
@@ -758,3 +760,26 @@ def _cache_request(
758760 hydrated = hydrated_request ,
759761 lock_expires_at = None ,
760762 )
763+
764+
765+ def unique_key_to_request_id (unique_key : str , * , request_id_length : int = 15 ) -> str :
766+ """Generate a deterministic request ID based on a unique key.
767+
768+ Args:
769+ unique_key: The unique key to convert into a request ID.
770+ request_id_length: The length of the request ID.
771+
772+ Returns:
773+ A URL-safe, truncated request ID based on the unique key.
774+ """
775+ # Encode the unique key and compute its SHA-256 hash
776+ hashed_key = sha256 (unique_key .encode ('utf-8' )).digest ()
777+
778+ # Encode the hash in base64 and decode it to get a string
779+ base64_encoded = b64encode (hashed_key ).decode ('utf-8' )
780+
781+ # Remove characters that are not URL-safe ('+', '/', or '=')
782+ url_safe_key = re .sub (r'(\+|\/|=)' , '' , base64_encoded )
783+
784+ # Truncate the key to the desired length
785+ return url_safe_key [:request_id_length ]
0 commit comments