-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcache.py
More file actions
211 lines (171 loc) · 6 KB
/
Copy pathcache.py
File metadata and controls
211 lines (171 loc) · 6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
"""
Response caching middleware and utilities
Implements caching for GET endpoints using fastapi-cache2 with Redis backend.
Provides cache invalidation utilities for write operations.
Features:
- Automatic caching of GET endpoints
- Custom cache key generation
- TTL configuration per endpoint
- Cache invalidation patterns
- Prometheus metrics for cache hits/misses
"""
import hashlib
from typing import Optional
from fastapi import Request, Response
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from redis import asyncio as aioredis
import logging
from prometheus_client import Counter
logger = logging.getLogger(__name__)
# Prometheus metrics for cache
cache_hits = Counter(
'cache_hits_total',
'Total cache hits',
['endpoint']
)
cache_misses = Counter(
'cache_misses_total',
'Total cache misses',
['endpoint']
)
cache_invalidations = Counter(
'cache_invalidations_total',
'Total cache invalidations',
['pattern']
)
def generate_cache_key(
func,
namespace: str = "",
request: Request = None,
response: Response = None,
*args,
**kwargs,
) -> str:
"""
Generate cache key for requests.
Includes:
- Function name
- Path parameters
- Query parameters
- Request headers (optional, for user-specific caching)
Args:
func: The endpoint function
namespace: Cache namespace prefix
request: FastAPI request object
response: FastAPI response object
*args: Positional arguments
**kwargs: Keyword arguments
Returns:
Cache key string
"""
prefix = f"{namespace}:" if namespace else ""
# Build key from function and path
func_name = f"{func.__module__}:{func.__name__}"
# Add path parameters
path_params = ""
if request:
path_params = ":".join(str(v) for v in request.path_params.values())
# Add query parameters (sorted for consistency)
query_params = ""
if request and request.query_params:
sorted_params = sorted(request.query_params.items())
query_params = ":".join(f"{k}={v}" for k, v in sorted_params)
# Combine all parts
key_parts = [prefix, func_name, path_params, query_params]
key = ":".join(filter(None, key_parts))
# Hash if too long (Redis keys should be kept under 250 chars for performance)
if len(key) > 200:
key_hash = hashlib.md5(key.encode()).hexdigest()
# Return just the prefix and hash to keep it short
return f"{prefix}hash:{key_hash}"
return key
async def invalidate_cache_pattern(pattern: str, redis_client: aioredis.Redis):
"""
Invalidate cache keys matching a pattern.
Args:
pattern: Redis key pattern (supports wildcards *)
redis_client: Redis client instance
Example:
await invalidate_cache_pattern("cache:examples:*", redis_client)
"""
try:
# Find all keys matching pattern
keys = []
async for key in redis_client.scan_iter(match=pattern):
keys.append(key)
if keys:
# Delete all matching keys
await redis_client.delete(*keys)
cache_invalidations.labels(pattern=pattern).inc(len(keys))
logger.info(f"Invalidated {len(keys)} cache keys matching pattern: {pattern}")
else:
logger.debug(f"No cache keys found matching pattern: {pattern}")
except Exception as e:
logger.error(f"Failed to invalidate cache pattern {pattern}: {e}")
async def invalidate_cache_key(key: str, redis_client: aioredis.Redis):
"""
Invalidate a specific cache key.
Args:
key: Exact cache key to invalidate
redis_client: Redis client instance
"""
try:
deleted = await redis_client.delete(key)
if deleted:
cache_invalidations.labels(pattern=key).inc()
logger.info(f"Invalidated cache key: {key}")
else:
logger.debug(f"Cache key not found: {key}")
except Exception as e:
logger.error(f"Failed to invalidate cache key {key}: {e}")
class CacheManager:
"""
Manages cache initialization and operations.
"""
def __init__(self):
self.redis_client: Optional[aioredis.Redis] = None
self.enabled = False
async def init(self, redis_url: str, prefix: str = "cache:"):
"""
Initialize cache with Redis backend.
Args:
redis_url: Redis connection URL
prefix: Cache key prefix
"""
try:
# Note: decode_responses must be False (default) for fastapi-cache2
# as it stores cached data as binary/bytes
self.redis_client = aioredis.from_url(redis_url)
# Test connection
await self.redis_client.ping()
# Initialize FastAPI Cache
FastAPICache.init(
RedisBackend(self.redis_client),
prefix=prefix
)
self.enabled = True
# Log success without exposing credentials (URL may contain password)
logger.info("Cache initialized with Redis successfully")
except Exception as e:
logger.error(f"Failed to initialize cache: {e}")
logger.warning("Application will continue without caching")
self.enabled = False
async def clear_all(self):
"""Clear all cache entries."""
if self.redis_client and self.enabled:
try:
await invalidate_cache_pattern("cache:*", self.redis_client)
logger.info("Cleared all cache entries")
except Exception as e:
logger.error(f"Failed to clear cache: {e}")
async def close(self):
"""Close Redis connection."""
if self.redis_client:
try:
await self.redis_client.close()
logger.info("Cache connection closed")
except Exception as e:
logger.error(f"Error closing cache connection: {e}")
# Global cache manager instance
cache_manager = CacheManager()