@@ -394,6 +394,10 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
394394 logger .warning (f'Marking request { request .unique_key } as handled that is not in progress.' )
395395 return None
396396
397+ # Update the request's handled_at timestamp.
398+ if request .handled_at is None :
399+ request .handled_at = datetime .now (timezone .utc )
400+
397401 async with self ._get_pipeline () as pipe :
398402 if self ._dedup_strategy == 'default' :
399403 await await_redis_response (pipe .sadd (self ._handled_set_key , request .unique_key ))
@@ -402,6 +406,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
402406 await await_redis_response (pipe .bf ().add (self ._handled_filter_key , request .unique_key ))
403407
404408 await await_redis_response (pipe .hdel (self ._in_progress_key , request .unique_key ))
409+ await await_redis_response (pipe .hset (self ._data_key , request .unique_key , request .model_dump_json ()))
405410
406411 await self ._update_metadata (
407412 pipe ,
@@ -444,6 +449,7 @@ async def reclaim_request(
444449 f'{{"client_id":"{ self .client_key } ","blocked_until_timestamp":{ blocked_until_timestamp } }}' ,
445450 )
446451 )
452+ await await_redis_response (pipe .hset (self ._data_key , request .unique_key , request .model_dump_json ()))
447453 self ._pending_fetch_cache .appendleft (request )
448454 else :
449455 await await_redis_response (pipe .rpush (self ._queue_key , request .unique_key ))
0 commit comments