Skip to content

Commit 6b7cf4d

Browse files
authored
feat: add cache layer for llm (#981)
1 parent 0928b32 commit 6b7cf4d

16 files changed

Lines changed: 1504 additions & 225 deletions

aperag/app.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from fastapi import FastAPI
1616

1717
from aperag.exception_handlers import register_exception_handlers
18-
from aperag.llm.litellm_track import register_opik_llm_track
18+
from aperag.llm.litellm_track import register_custom_llm_track
1919
from aperag.views.api_key import router as api_key_router
2020
from aperag.views.audit import router as audit_router
2121
from aperag.views.auth import router as auth_router
@@ -34,7 +34,7 @@
3434
# Register global exception handlers
3535
register_exception_handlers(app)
3636

37-
register_opik_llm_track()
37+
register_custom_llm_track()
3838

3939
app.include_router(auth_router, prefix="/api/v1")
4040
app.include_router(main_router, prefix="/api/v1")

aperag/db/redis_manager.py

Lines changed: 132 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -16,68 +16,114 @@
1616
Redis connection manager for the application.
1717
1818
This module provides a simple and efficient Redis connection management system
19-
using redis-py's built-in connection pooling capabilities.
19+
using redis-py's built-in connection pooling capabilities for both sync and async operations.
2020
"""
2121

2222
import logging
2323
from typing import Optional
2424

25-
import redis.asyncio as redis
25+
import redis
26+
import redis.asyncio as async_redis
2627

2728
logger = logging.getLogger(__name__)
2829

2930

3031
class RedisConnectionManager:
3132
"""
32-
Simple Redis connection manager using redis-py's built-in connection pooling.
33+
Redis connection manager supporting both sync and async operations.
3334
34-
This provides a shared connection pool for the entire application, avoiding
35+
This provides shared connection pools for the entire application, avoiding
3536
the overhead of creating multiple connections for different Redis operations.
3637
3738
Features:
38-
- Automatic connection pooling with redis-py
39+
- Automatic connection pooling with redis-py (both sync and async)
3940
- Configurable pool size and timeouts
4041
- Global shared instance for efficiency
4142
- Proper cleanup handling
4243
"""
4344

4445
_instance: Optional["RedisConnectionManager"] = None
45-
_client: Optional[redis.Redis] = None
46-
_pool: Optional[redis.ConnectionPool] = None
46+
_async_client: Optional[async_redis.Redis] = None
47+
_sync_client: Optional[redis.Redis] = None
48+
_async_pool: Optional[async_redis.ConnectionPool] = None
49+
_sync_pool: Optional[redis.ConnectionPool] = None
4750

4851
def __new__(cls):
4952
if cls._instance is None:
5053
cls._instance = super().__new__(cls)
5154
return cls._instance
5255

5356
@classmethod
54-
async def get_client(cls, redis_url: str = None) -> redis.Redis:
57+
async def get_async_client(cls, redis_url: str = None) -> async_redis.Redis:
5558
"""
56-
Get Redis client with shared connection pool.
59+
Get async Redis client with shared connection pool.
5760
5861
Args:
5962
redis_url: Redis connection URL. If None, will use from config.
6063
6164
Returns:
62-
Redis client instance with shared connection pool
65+
Async Redis client instance with shared connection pool
6366
"""
64-
if cls._client is None:
65-
await cls._initialize_client(redis_url)
66-
return cls._client
67+
if cls._async_client is None:
68+
await cls._initialize_async_client(redis_url)
69+
return cls._async_client
6770

6871
@classmethod
69-
async def _initialize_client(cls, redis_url: str = None):
70-
"""Initialize Redis client with connection pool."""
72+
def get_sync_client(cls, redis_url: str = None) -> redis.Redis:
73+
"""
74+
Get sync Redis client with shared connection pool.
75+
76+
Args:
77+
redis_url: Redis connection URL. If None, will use from config.
78+
79+
Returns:
80+
Sync Redis client instance with shared connection pool
81+
"""
82+
if cls._sync_client is None:
83+
cls._initialize_sync_client(redis_url)
84+
return cls._sync_client
85+
86+
@classmethod
87+
async def _initialize_async_client(cls, redis_url: str = None):
88+
"""Initialize async Redis client with connection pool."""
7189
if redis_url is None:
72-
# Import here to avoid circular imports
73-
from aperag.config import settings
90+
redis_url = cls._get_redis_url()
91+
92+
logger.debug(f"Initializing async Redis connection pool: {redis_url}")
93+
94+
# Create async connection pool
95+
cls._async_pool = async_redis.ConnectionPool.from_url(
96+
redis_url,
97+
max_connections=20, # Pool size
98+
encoding="utf-8",
99+
decode_responses=True,
100+
socket_connect_timeout=5,
101+
socket_timeout=5,
102+
retry_on_timeout=True,
103+
health_check_interval=30,
104+
)
105+
106+
# Create async client using the pool
107+
cls._async_client = async_redis.Redis(connection_pool=cls._async_pool)
108+
109+
# Test connection
110+
try:
111+
await cls._async_client.ping()
112+
logger.debug("Async Redis connection pool initialized successfully")
113+
except Exception as e:
114+
logger.error(f"Failed to connect to async Redis: {e}")
115+
raise ConnectionError(f"Cannot connect to async Redis: {e}")
74116

75-
redis_url = settings.memory_redis_url
117+
@classmethod
118+
def _initialize_sync_client(cls, redis_url: str = None):
119+
"""Initialize sync Redis client with connection pool."""
120+
if redis_url is None:
121+
redis_url = cls._get_redis_url()
76122

77-
logger.debug(f"Initializing Redis connection pool: {redis_url}")
123+
logger.debug(f"Initializing sync Redis connection pool: {redis_url}")
78124

79-
# Create connection pool with reasonable defaults
80-
cls._pool = redis.ConnectionPool.from_url(
125+
# Create sync connection pool
126+
cls._sync_pool = redis.ConnectionPool.from_url(
81127
redis_url,
82128
max_connections=20, # Pool size
83129
encoding="utf-8",
@@ -88,46 +134,88 @@ async def _initialize_client(cls, redis_url: str = None):
88134
health_check_interval=30,
89135
)
90136

91-
# Create client using the pool
92-
cls._client = redis.Redis(connection_pool=cls._pool)
137+
# Create sync client using the pool
138+
cls._sync_client = redis.Redis(connection_pool=cls._sync_pool)
93139

94140
# Test connection
95141
try:
96-
await cls._client.ping()
97-
logger.debug("Redis connection pool initialized successfully")
142+
cls._sync_client.ping()
143+
logger.debug("Sync Redis connection pool initialized successfully")
98144
except Exception as e:
99-
logger.error(f"Failed to connect to Redis: {e}")
100-
raise ConnectionError(f"Cannot connect to Redis: {e}")
145+
logger.error(f"Failed to connect to sync Redis: {e}")
146+
raise ConnectionError(f"Cannot connect to sync Redis: {e}")
147+
148+
@classmethod
149+
def _get_redis_url(cls) -> str:
150+
"""Get Redis URL from configuration."""
151+
# Import here to avoid circular imports
152+
from aperag.config import settings
153+
154+
return settings.MEMORY_REDIS_URL
101155

102156
@classmethod
103157
async def close(cls):
104-
"""Close Redis connection pool and clean up resources."""
105-
if cls._client:
106-
logger.debug("Closing Redis connection pool")
107-
await cls._client.close()
108-
cls._client = None
109-
cls._pool = None
110-
logger.debug("Redis connection pool closed")
158+
"""Close Redis connection pools and clean up resources."""
159+
# Close async client
160+
if cls._async_client:
161+
logger.debug("Closing async Redis connection pool")
162+
await cls._async_client.close()
163+
cls._async_client = None
164+
cls._async_pool = None
165+
166+
# Close sync client
167+
if cls._sync_client:
168+
logger.debug("Closing sync Redis connection pool")
169+
cls._sync_client.close()
170+
cls._sync_client = None
171+
cls._sync_pool = None
172+
173+
logger.debug("All Redis connection pools closed")
111174

112175
@classmethod
113176
def get_pool_info(cls) -> dict:
114177
"""Get connection pool information for monitoring."""
115-
if cls._pool:
116-
return {
117-
"max_connections": cls._pool.max_connections,
118-
"created_connections": cls._pool.created_connections,
119-
"available_connections": len(cls._pool._available_connections),
120-
"in_use_connections": len(cls._pool._in_use_connections),
178+
info = {}
179+
180+
if cls._async_pool:
181+
info["async_pool"] = {
182+
"max_connections": cls._async_pool.max_connections,
183+
"created_connections": cls._async_pool.created_connections,
184+
"available_connections": len(cls._async_pool._available_connections),
185+
"in_use_connections": len(cls._async_pool._in_use_connections),
186+
}
187+
188+
if cls._sync_pool:
189+
info["sync_pool"] = {
190+
"max_connections": cls._sync_pool.max_connections,
191+
"created_connections": cls._sync_pool.created_connections,
192+
"available_connections": len(cls._sync_pool._available_connections),
193+
"in_use_connections": len(cls._sync_pool._in_use_connections),
121194
}
122-
return {"status": "not_initialized"}
195+
196+
if not info:
197+
info["status"] = "not_initialized"
198+
199+
return info
123200

124201

125-
# Convenience functions for backward compatibility with history.py
126-
async def get_async_redis_client() -> redis.Redis:
202+
# Convenience functions for backward compatibility
203+
async def get_async_redis_client() -> async_redis.Redis:
127204
"""Get async Redis client - backward compatible with history.py"""
128-
return await RedisConnectionManager.get_client()
205+
return await RedisConnectionManager.get_async_client()
206+
207+
208+
def get_sync_redis_client() -> redis.Redis:
209+
"""Get sync Redis client for cache and other sync operations."""
210+
return RedisConnectionManager.get_sync_client()
129211

130212

131213
def get_redis_connection_manager() -> RedisConnectionManager:
132214
"""Get the Redis connection manager instance."""
133215
return RedisConnectionManager()
216+
217+
218+
# Legacy compatibility - keep the old function name
219+
async def get_client(redis_url: str = None) -> async_redis.Redis:
220+
"""Legacy function name - use get_async_client instead."""
221+
return await RedisConnectionManager.get_async_client(redis_url)

aperag/flow/runners/rerank.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ async def run(self, ui: RerankInput, si: SystemInput) -> Tuple[RerankOutput, dic
114114
)
115115

116116
# Perform reranking
117-
result = await rerank_service.rank(query, docs)
117+
result = await rerank_service.async_rerank(query, docs)
118118
logger.info(f"Successfully reranked {len(result)} documents")
119119

120120
except (InvalidConfigurationError, ProviderNotFoundError) as e:

0 commit comments

Comments
 (0)