Skip to content

Commit 08ee387

Browse files
authored
Refactor cache key and prefix APIs (#1210)
1 parent c7e6865 commit 08ee387

13 files changed

Lines changed: 105 additions & 80 deletions

File tree

backend/app/admin/api/v1/monitor/online.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
async def get_sessions(
1919
username: Annotated[str | None, Query(description='用户名')] = None,
2020
) -> ResponseSchemaModel[list[GetTokenDetail]]:
21-
token_keys = await redis_client.get_prefix(f'{settings.TOKEN_REDIS_PREFIX}:*')
21+
token_keys = await redis_client.get_by_prefix(settings.TOKEN_REDIS_PREFIX)
2222
online_clients = await redis_client.smembers(settings.TOKEN_ONLINE_REDIS_PREFIX)
2323
data: list[GetTokenDetail] = []
2424
if not token_keys:

backend/app/admin/service/auth_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ async def refresh_token(*, db: AsyncSession, request: Request, response: Respons
218218
raise errors.NotFoundError(msg='用户不存在')
219219
if not user.status:
220220
raise errors.AuthorizationError(msg='用户已被锁定, 请联系统管理员')
221-
token_keys = await redis_client.get_prefix(f'{settings.TOKEN_REDIS_PREFIX}:{user.id}:*')
221+
token_keys = await redis_client.get_by_prefix(f'{settings.TOKEN_REDIS_PREFIX}:{user.id}')
222222
if not user.is_multi_login and [
223223
key for key in token_keys if not key.endswith(f':{token_payload.session_uuid}')
224224
]:

backend/app/admin/service/plugin_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ async def get_all() -> list[dict[str, Any]]:
2727
"""获取所有插件"""
2828

2929
changed_key = f'{settings.PLUGIN_REDIS_PREFIX}:changed'
30-
keys = [key for key in await redis_client.get_prefix(f'{settings.PLUGIN_REDIS_PREFIX}:') if key != changed_key]
30+
keys = [key for key in await redis_client.get_by_prefix(settings.PLUGIN_REDIS_PREFIX) if key != changed_key]
3131
if not keys:
3232
return []
3333

backend/app/admin/service/user_service.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,15 @@ async def update_permission(*, db: AsyncSession, request: Request, pk: int, type
178178
# 系统管理员修改自身时,除当前 token 外,其他 token 失效
179179
if not new_multi_login:
180180
key_prefix = f'{settings.TOKEN_REDIS_PREFIX}:{user.id}'
181-
await redis_client.delete_prefix(
181+
await redis_client.delete_by_prefix(
182182
key_prefix,
183-
exclude=f'{key_prefix}:{token_payload.session_uuid}',
183+
exclude_keys=f'{key_prefix}:{token_payload.session_uuid}',
184184
)
185185
else:
186186
# 系统管理员修改他人时,他人 token 全部失效
187187
if not new_multi_login:
188188
key_prefix = f'{settings.TOKEN_REDIS_PREFIX}:{user.id}'
189-
await redis_client.delete_prefix(key_prefix)
189+
await redis_client.delete_by_prefix(key_prefix)
190190
case _:
191191
raise errors.RequestError(msg='权限类型不存在')
192192

@@ -213,9 +213,9 @@ async def reset_password(*, db: AsyncSession, pk: int, password: str) -> int:
213213
history_obj = CreateUserPasswordHistoryParam(user_id=user.id, password=user.password)
214214
await password_security_service.save_password_history(db, history_obj)
215215
await user_dao.update_password_changed_time(db, user.id)
216-
await redis_client.delete_prefix(f'{settings.TOKEN_REDIS_PREFIX}:{user.id}')
217-
await redis_client.delete_prefix(f'{settings.TOKEN_REFRESH_REDIS_PREFIX}:{user.id}')
218-
await redis_client.delete_prefix(f'{settings.JWT_USER_REDIS_PREFIX}:{user.id}')
216+
await redis_client.delete_by_prefix(f'{settings.TOKEN_REDIS_PREFIX}:{user.id}')
217+
await redis_client.delete_by_prefix(f'{settings.TOKEN_REFRESH_REDIS_PREFIX}:{user.id}')
218+
await redis_client.delete_by_prefix(f'{settings.JWT_USER_REDIS_PREFIX}:{user.id}')
219219
return count
220220

221221
@staticmethod
@@ -294,9 +294,9 @@ async def update_password(*, db: AsyncSession, user_id: int, obj: ResetPasswordP
294294
history_obj = CreateUserPasswordHistoryParam(user_id=user.id, password=user.password)
295295
await password_security_service.save_password_history(db, history_obj)
296296
await user_dao.update_password_changed_time(db, user.id)
297-
await redis_client.delete_prefix(f'{settings.TOKEN_REDIS_PREFIX}:{user_id}')
298-
await redis_client.delete_prefix(f'{settings.TOKEN_REFRESH_REDIS_PREFIX}:{user_id}')
299-
await redis_client.delete_prefix(f'{settings.JWT_USER_REDIS_PREFIX}:{user_id}')
297+
await redis_client.delete_by_prefix(f'{settings.TOKEN_REDIS_PREFIX}:{user_id}')
298+
await redis_client.delete_by_prefix(f'{settings.TOKEN_REFRESH_REDIS_PREFIX}:{user_id}')
299+
await redis_client.delete_by_prefix(f'{settings.JWT_USER_REDIS_PREFIX}:{user_id}')
300300
return count
301301

302302
@staticmethod
@@ -312,9 +312,9 @@ async def delete(*, db: AsyncSession, pk: int) -> int:
312312
if not user:
313313
raise errors.NotFoundError(msg='用户不存在')
314314
count = await user_dao.delete(db, user.id)
315-
await redis_client.delete_prefix(f'{settings.TOKEN_REDIS_PREFIX}:{user.id}')
316-
await redis_client.delete_prefix(f'{settings.TOKEN_REFRESH_REDIS_PREFIX}:{user.id}')
317-
await redis_client.delete_prefix(f'{settings.JWT_USER_REDIS_PREFIX}:{user.id}')
315+
await redis_client.delete_by_prefix(f'{settings.TOKEN_REDIS_PREFIX}:{user.id}')
316+
await redis_client.delete_by_prefix(f'{settings.TOKEN_REFRESH_REDIS_PREFIX}:{user.id}')
317+
await redis_client.delete_by_prefix(f'{settings.JWT_USER_REDIS_PREFIX}:{user.id}')
318318
return count
319319

320320

backend/cli.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ async def init(db: AsyncSession, redis: RedisCli) -> None:
264264
settings.TOKEN_REDIS_PREFIX,
265265
settings.TOKEN_REFRESH_REDIS_PREFIX,
266266
]:
267-
await redis.delete_prefix(prefix)
267+
await redis.delete_by_prefix(prefix)
268268

269269
console.note('重建数据库表')
270270
conn = await db.connection()

backend/common/cache/decorator.py

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import functools
22

3-
from collections.abc import Callable, Sequence
3+
from collections.abc import Awaitable, Callable, Sequence
4+
from inspect import isawaitable
45
from typing import Any, ParamSpec, TypeVar
56

67
from msgspec import json
@@ -16,22 +17,23 @@
1617

1718
P = ParamSpec('P')
1819
T = TypeVar('T')
20+
_MISSING = object()
1921

2022

21-
def _build_cache_key(
22-
name: str,
23+
async def _build_cache_key(
24+
namespace: str,
2325
key: str | None,
24-
key_builder: Callable[..., str] | None,
26+
key_builder: Callable[..., str | Awaitable[str]] | None,
2527
*args: Any,
2628
**kwargs: Any,
2729
) -> str:
2830
"""构建缓存 Key"""
2931
if key:
3032
if '.' in key:
3133
param, field = key.split('.', 1)
32-
value = kwargs.get(param)
33-
if value is None:
34-
raise errors.ServerError(msg=f'缓存键构建失败,参数 "{param}" 不存在或值为空')
34+
value = kwargs.get(param, _MISSING)
35+
if value is _MISSING:
36+
raise errors.ServerError(msg=f'缓存键构建失败,参数 "{param}" 不存在')
3537

3638
if isinstance(value, list):
3739
raise errors.ServerError(msg='缓存键构建失败:不支持从列表中提取字段,请使用 key_builder 处理列表参数')
@@ -43,16 +45,19 @@ def _build_cache_key(
4345
else:
4446
raise errors.ServerError(msg=f'缓存键构建失败,对象中不存在字段 "{field}"')
4547
else:
46-
value = kwargs.get(key)
47-
if value is None:
48-
raise errors.ServerError(msg=f'缓存键构建失败,参数 "{key}" 不存在或值为空')
48+
value = kwargs.get(key, _MISSING)
49+
if value is _MISSING:
50+
raise errors.ServerError(msg=f'缓存键构建失败,参数 "{key}" 不存在')
4951

50-
return f'{name}:{value}'
52+
return f'{namespace}:{value if value is not None else "none"}'
5153

5254
if key_builder:
53-
return f'{name}:{key_builder(*args, **kwargs)}'
55+
value = key_builder(*args, **kwargs)
56+
if isawaitable(value):
57+
value = await value
58+
return f'{namespace}:{value}'
5459

55-
return name
60+
return namespace
5661

5762

5863
def _serialize_result(result: Any) -> bytes:
@@ -101,15 +106,15 @@ def user_key_builder() -> str:
101106

102107

103108
def cached( # noqa: C901
104-
name: str,
109+
namespace: str,
105110
*,
106111
key: str | None = None,
107-
key_builder: Callable[..., str] | None = None,
112+
key_builder: Callable[..., str | Awaitable[str]] | None = None,
108113
) -> Callable[[Callable[P, T]], Callable[P, T]]:
109114
"""
110115
缓存装饰器
111116
112-
:param name: 缓存名称(通常为缓存 Key 前缀)
117+
:param namespace: 缓存命名空间(通常为缓存 Key 前缀)
113118
:param key: 从方法参数中获取指定参数名的值作为缓存 Key,与 key_builder 互斥
114119
:param key_builder: 自定义 Key 生成函数,与 key 互斥
115120
:return:
@@ -120,7 +125,7 @@ def cached( # noqa: C901
120125
def decorator(func: Callable[P, T]) -> Callable[P, T]: # noqa: C901
121126
@functools.wraps(func)
122127
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
123-
cache_key = _build_cache_key(name, key, key_builder, *args, **kwargs)
128+
cache_key = await _build_cache_key(namespace, key, key_builder, *args, **kwargs)
124129

125130
# L1: 本地缓存
126131
if settings.CACHE_LOCAL_ENABLED:
@@ -168,16 +173,16 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
168173

169174

170175
def cache_invalidate( # noqa: C901
171-
name: str,
176+
namespace: str,
172177
*,
173178
key: str | None = None,
174-
key_builder: Callable[..., str] | None = None,
179+
key_builder: Callable[..., str | Awaitable[str]] | None = None,
175180
atomic: bool = True,
176181
) -> Callable[[Callable[P, T]], Callable[P, T]]:
177182
"""
178183
缓存失效装饰器
179184
180-
:param name: 缓存名称(通常为缓存 Key 前缀)
185+
:param namespace: 缓存命名空间(通常为缓存 Key 前缀)
181186
:param key: 从方法参数中获取指定参数名的值作为缓存 Key,与 key_builder 互斥
182187
:param key_builder: 自定义 Key 生成函数,与 key 互斥
183188
:param atomic: 是否保证缓存原子性
@@ -196,25 +201,25 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
196201
invalidate_error = None
197202

198203
try:
199-
invalidate_key = _build_cache_key(name, key, key_builder, *args, **kwargs)
204+
invalidate_key = await _build_cache_key(namespace, key, key_builder, *args, **kwargs)
200205

201206
# L1 缓存失效
202207
if settings.CACHE_LOCAL_ENABLED:
203-
if invalidate_key == name:
204-
local_cache_manager.delete_prefix(invalidate_key)
208+
if invalidate_key == namespace:
209+
local_cache_manager.delete_by_prefix(invalidate_key)
205210
else:
206211
local_cache_manager.delete(invalidate_key)
207212

208213
# 广播失效消息(通知其他节点清除本地缓存)
209214
if settings.CACHE_LOCAL_ENABLED:
210-
if invalidate_key == name:
211-
await cache_pubsub_manager.publish_invalidation(invalidate_key, is_delete_prefix=True)
215+
if invalidate_key == namespace:
216+
await cache_pubsub_manager.publish_invalidation(invalidate_key, delete_by_prefix=True)
212217
else:
213-
await cache_pubsub_manager.publish_invalidation(invalidate_key, is_delete_prefix=False)
218+
await cache_pubsub_manager.publish_invalidation(invalidate_key, delete_by_prefix=False)
214219

215220
# L2 缓存失效
216-
if invalidate_key == name:
217-
await redis_client.delete_prefix(invalidate_key)
221+
if invalidate_key == namespace:
222+
await redis_client.delete_by_prefix(invalidate_key)
218223
else:
219224
await redis_client.delete(invalidate_key)
220225

backend/common/cache/local.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,23 @@ def clear(self) -> None:
3636
"""清空缓存"""
3737
self.hot_cache.clear()
3838

39-
def delete_prefix(self, prefix: str, exclude: str | list[str] | None = None) -> None:
39+
def delete_by_prefix(self, key_prefix: str, exclude_keys: str | list[str] | None = None) -> None:
4040
"""
4141
删除指定前缀的缓存
4242
43-
:param prefix: 要删除的键前缀
44-
:param exclude: 要排除的键或键列表
43+
:param key_prefix: 要删除的键前缀
44+
:param exclude_keys: 要排除的键或键列表
4545
:return:
4646
"""
47-
exclude_set = set(exclude) if isinstance(exclude, list) else {exclude} if isinstance(exclude, str) else set()
47+
exclude_set = (
48+
set(exclude_keys)
49+
if isinstance(exclude_keys, list)
50+
else {exclude_keys}
51+
if isinstance(exclude_keys, str)
52+
else set()
53+
)
4854
for key in list(self.hot_cache.keys()):
49-
if key.startswith(prefix) and key not in exclude_set:
55+
if (key == key_prefix or key.startswith(f'{key_prefix}:')) and key not in exclude_set:
5056
try:
5157
del self.hot_cache[key]
5258
except KeyError:

backend/common/cache/pubsub.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,16 @@ class CachePubSubManager:
1313
_pubsub_task: asyncio.Task | None = None
1414

1515
@staticmethod
16-
async def publish_invalidation(key: str, *, is_delete_prefix: bool) -> None:
16+
async def publish_invalidation(cache_key: str, *, delete_by_prefix: bool) -> None:
1717
"""
1818
发布缓存失效通知
1919
20-
:param key: 缓存键
21-
:param is_delete_prefix: 是否删除符合前缀的所有缓存
20+
:param cache_key: 缓存键
21+
:param delete_by_prefix: 是否删除符合前缀的所有缓存
2222
:return:
2323
"""
2424
try:
25-
message = json.dumps({'key': key, 'is_delete_prefix': is_delete_prefix})
25+
message = json.dumps({'cache_key': cache_key, 'delete_by_prefix': delete_by_prefix})
2626
await redis_client.publish(settings.CACHE_PUBSUB_CHANNEL, message)
2727
except Exception as e:
2828
log.warning(f'[CachePubSub] 发布通知失败: {e}')
@@ -49,11 +49,11 @@ async def subscribe_and_listen() -> None: # noqa: C901
4949
if message['type'] == 'message':
5050
try:
5151
data = json.loads(message['data'])
52-
key = data['key']
53-
if not data['is_delete_prefix']:
54-
local_cache_manager.delete(key)
52+
cache_key = data['cache_key']
53+
if not data['delete_by_prefix']:
54+
local_cache_manager.delete(cache_key)
5555
else:
56-
local_cache_manager.delete_prefix(key)
56+
local_cache_manager.delete_by_prefix(cache_key)
5757
except json.JSONDecodeError as e:
5858
log.warning(f'[CachePubSub] 消息格式错误 {e}')
5959
except Exception as e:

backend/common/security/jwt.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ async def create_access_token(user_id: int, *, multi_login: bool, **kwargs) -> A
8484
})
8585

8686
if not multi_login:
87-
await redis_client.delete_prefix(f'{settings.TOKEN_REDIS_PREFIX}:{user_id}')
87+
await redis_client.delete_by_prefix(f'{settings.TOKEN_REDIS_PREFIX}:{user_id}')
8888

8989
await redis_client.set(
9090
f'{settings.TOKEN_REDIS_PREFIX}:{user_id}:{session_uuid}',
@@ -120,7 +120,7 @@ async def create_refresh_token(session_uuid: str, user_id: int, *, multi_login:
120120
})
121121

122122
if not multi_login:
123-
await redis_client.delete_prefix(f'{settings.TOKEN_REFRESH_REDIS_PREFIX}:{user_id}')
123+
await redis_client.delete_by_prefix(f'{settings.TOKEN_REFRESH_REDIS_PREFIX}:{user_id}')
124124

125125
await redis_client.set(
126126
f'{settings.TOKEN_REFRESH_REDIS_PREFIX}:{user_id}:{session_uuid}',

backend/database/redis.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,19 +62,33 @@ async def init(self) -> None:
6262
log.error('Redis 服务器连接异常 {}', e)
6363
sys.exit()
6464

65-
async def delete_prefix(self, prefix: str, exclude: str | list[str] | None = None, batch_size: int = 1000) -> None:
65+
async def delete_by_prefix(
66+
self,
67+
key_prefix: str,
68+
exclude_keys: str | list[str] | None = None,
69+
batch_size: int = 1000,
70+
) -> None:
6671
"""
6772
删除指定前缀的所有 key
6873
69-
:param prefix: 要删除的键前缀
70-
:param exclude: 要排除的键或键列表
74+
:param key_prefix: 要删除的键前缀
75+
:param exclude_keys: 要排除的键或键列表
7176
:param batch_size: 批量删除的大小,避免一次性删除过多键导致 Redis 阻塞
7277
:return:
7378
"""
74-
exclude_set = set(exclude) if isinstance(exclude, list) else {exclude} if isinstance(exclude, str) else set()
79+
exclude_set = (
80+
set(exclude_keys)
81+
if isinstance(exclude_keys, list)
82+
else {exclude_keys}
83+
if isinstance(exclude_keys, str)
84+
else set()
85+
)
7586
batch_keys = []
7687

77-
async for key in self.scan_iter(match=f'{prefix}*'):
88+
if key_prefix not in exclude_set and await self.exists(key_prefix):
89+
batch_keys.append(key_prefix)
90+
91+
async for key in self.scan_iter(match=f'{key_prefix}:*'):
7892
if key not in exclude_set:
7993
batch_keys.append(key)
8094

@@ -85,15 +99,15 @@ async def delete_prefix(self, prefix: str, exclude: str | list[str] | None = Non
8599
if batch_keys:
86100
await self.delete(*batch_keys)
87101

88-
async def get_prefix(self, prefix: str, count: int = 100) -> list[str]:
102+
async def get_by_prefix(self, key_prefix: str, count: int = 100) -> list[str]:
89103
"""
90104
获取指定前缀的所有 key
91105
92-
:param prefix: 要搜索的键前缀
106+
:param key_prefix: 要搜索的键前缀
93107
:param count: 每次扫描批次的数量,值越大扫描速度越快,但会占用更多服务器资源
94108
:return:
95109
"""
96-
return [key async for key in self.scan_iter(match=f'{prefix}*', count=count)]
110+
return [key async for key in self.scan_iter(match=f'{key_prefix}:*', count=count)]
97111

98112

99113
# 创建 redis 客户端单例

0 commit comments

Comments
 (0)