11import io
22import json
3+ import logging
4+ import os
35from collections .abc import Awaitable , Callable
4- from dataclasses import dataclass
6+ from dataclasses import dataclass , field
57from typing import Annotated , Generic , TypeVar , cast
68
79import pandas as pd
810from fastapi import Depends , FastAPI , Request
9- from fluid .utils import log
1011from fluid .utils .redis import FluidRedis
1112from pydantic import BaseModel
1213from redis .asyncio import Redis
1314
1415from quantflow .data .fmp import FMP
1516
1617M = TypeVar ("M" , bound = BaseModel )
17- logger = log . get_logger (__name__ )
18+ logger = logging . getLogger (__name__ )
1819
1920
2021def instrument_app (app : FastAPI ) -> None :
@@ -44,7 +45,17 @@ class RedisCache(Generic[M]):
4445 redis : Redis
4546 Model : type [M ]
4647 key : str
47- ttl : int = 60
48+ prefix : str = field (
49+ default_factory = lambda : os .getenv (
50+ "QUANTFLOW_REDIS_CACHE_PREFIX" , "quantflow:cache"
51+ )
52+ )
53+ ttl : int = field (
54+ default_factory = lambda : int (os .getenv ("QUANTFLOW_REDIS_CACHE_TTL" , "60" ))
55+ )
56+
57+ def __post_init__ (self ) -> None :
58+ self .key = f"{ self .prefix } :{ self .key } "
4859
4960 async def from_cache (self , loader : Callable [[], Awaitable [M ]]) -> M :
5061 """Get a value from the cache"""
@@ -53,7 +64,7 @@ async def from_cache(self, loader: Callable[[], Awaitable[M]]) -> M:
5364 return await self .set_cache (await loader ())
5465 try :
5566 return self .Model .model_validate_json (value )
56- except json .JSONDecodeError :
67+ except json .JSONDecodeError : # pragma: no cover
5768 logger .exception (f"Failed to decode cache value for key { self .key } " )
5869 return await self .set_cache (await loader ())
5970
@@ -63,6 +74,15 @@ async def set_cache(self, value: M) -> M:
6374 await self .redis .set (self .key , payload , ex = self .ttl )
6475 return value
6576
77+ @classmethod
78+ async def clear (cls , redis : Redis ) -> int :
79+ """Delete all cache entries under the prefix"""
80+ cache = cls (redis = redis , Model = BaseModel , key = "*" )
81+ keys = [key async for key in cache .redis .scan_iter (f"{ cache .prefix } :*" )]
82+ if not keys :
83+ return 0
84+ return await cache .redis .delete (* keys )
85+
6686
6787@dataclass
6888class RedisDataframe :
0 commit comments