Skip to content

Commit d3d610b

Browse files
committed
feat(BA-5837): add ValkeyCache source for AppConfigFragment merged-view reads
Re-introduces a Valkey-backed cache layer for the merged AppConfig view, replacing the cache source that was dropped together with the legacy `app_configs` table in #11265 (BA-5822). The hot path — `AppConfigFragmentRepository.app_config(user_id, name)` — is the WebUI bootstrap loop (BEP-1052 §6) and was previously a DB hit per request. Mirrors the legacy `AppConfigCacheSource` shape on top of the `(user, name)`-keyed merged view: - `cache_source/cache_source.py`: - `get_merged_config(user_id, name)` cache-aside read keyed by `app_config:merged:{user_id}:{name}` - `set_merged_config(merged, domain_name=...)` writes the deep-merged payload + indexes the cache key in `app_config:user_keys:{user_id}` and the user in `app_config:domain_users:{domain_name}` so invalidation runs without `SCAN` - `invalidate_for_scope(scope_type, scope_id)`: - `USER`: drop the user's per-name set - `DOMAIN` / `DOMAIN_USER_DEFAULTS`: cascade through every member of the domain user set - `PUBLIC`: rely on TTL (broad invalidation TBD) - `invalidate_for_user(user_id)` convenience for self-service writes - `db_source.user_domain_name(user_id)` single-column lookup so the cache layer can tag merged-view entries with their owning domain - `AppConfigFragmentRepository.app_config(...)` is now cache-aside (cache hit returns the cached payload + fresh DB fragments; cache miss writes through) - `AppConfigFragmentAdminRepository.{create,update,purge}` invalidate the affected scope after the DB write - `repositories.py` builds the cache source from `args.valkey_stat_client` and threads it into both repos - All cache calls pass through `suppress_with_log` — cache failures log a warning and fall through to the DB
1 parent e6ad6e5 commit d3d610b

7 files changed

Lines changed: 298 additions & 33 deletions

File tree

changes/11297.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Front the `AppConfigFragment` merged-view repository with a Valkey cache. Read path is cache-aside (`get_merged_config(user_id, name)` → DB fallback → cache write); admin / self-service bulk writes invalidate per-user or per-domain via membership indexes (no `SCAN`). Cache failures fall through to the DB (BEP-1052 §5).

src/ai/backend/manager/repositories/app_config_fragment/admin_repository.py

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
from ai.backend.manager.errors.app_config import AppConfigFragmentNotFound
1818
from ai.backend.manager.models.app_config_fragment.row import AppConfigFragmentRow
1919
from ai.backend.manager.models.utils import ExtendedAsyncSAEngine
20+
from ai.backend.manager.repositories.app_config_fragment.cache_source import (
21+
AppConfigFragmentCacheSource,
22+
)
2023
from ai.backend.manager.repositories.app_config_fragment.creators import (
2124
AppConfigFragmentCreatorSpec,
2225
)
@@ -77,9 +80,15 @@ class AppConfigFragmentAdminRepository:
7780
"""
7881

7982
_db_source: AppConfigFragmentDBSource
83+
_cache_source: AppConfigFragmentCacheSource | None
8084

81-
def __init__(self, db: ExtendedAsyncSAEngine) -> None:
85+
def __init__(
86+
self,
87+
db: ExtendedAsyncSAEngine,
88+
cache_source: AppConfigFragmentCacheSource | None = None,
89+
) -> None:
8290
self._db_source = AppConfigFragmentDBSource(db)
91+
self._cache_source = cache_source
8392

8493
# ── Mutations ─────────────────────────────────────────────────
8594

@@ -97,43 +106,34 @@ async def create(
97106
config=config,
98107
),
99108
)
100-
return await self._db_source.create(creator)
109+
result = await self._db_source.create(creator)
110+
await self._invalidate(key)
111+
return result
101112

102113
@app_config_fragment_admin_repository_resilience.apply()
103114
async def update(
104115
self,
105116
key: AppConfigFragmentKey,
106117
config: Mapping[str, Any],
107118
) -> AppConfigFragmentData:
108-
"""Update a fragment by natural key. Resolves the natural key
109-
to the row's UUID, builds an ``Updater``, and delegates to the
110-
DB source. Raises :class:`AppConfigFragmentNotFound` when the
111-
row is missing (or vanishes between resolve and write)."""
112-
pk_value = await self._db_source.resolve_pk_by_key(key)
113-
if pk_value is None:
114-
raise _missing(key)
115-
updater: Updater[AppConfigFragmentRow] = Updater(
116-
spec=AppConfigFragmentUpdaterSpec(config=config),
117-
pk_value=pk_value,
118-
)
119-
result = await self._db_source.update(updater)
120-
if result is None:
121-
raise _missing(key)
119+
"""Update a fragment by natural key. Raises
120+
``AppConfigFragmentNotFound`` when missing."""
121+
spec = AppConfigFragmentUpdaterSpec(extra_config=extra_config)
122+
result = await self._db_source.update(key, spec)
123+
await self._invalidate(key)
122124
return result
123125

124126
@app_config_fragment_admin_repository_resilience.apply()
125127
async def purge(self, key: AppConfigFragmentKey) -> bool:
126-
"""Delete a fragment by natural key. Resolves the natural key,
127-
builds a ``Purger``, and delegates to the DB source. Returns
128-
``True`` only when a row was actually removed."""
129-
pk_value = await self._db_source.resolve_pk_by_key(key)
130-
if pk_value is None:
131-
return False
132-
purger: Purger[AppConfigFragmentRow] = Purger(
133-
row_class=AppConfigFragmentRow,
134-
pk_value=pk_value,
135-
)
136-
return await self._db_source.purge(purger)
128+
result = await self._db_source.purge(key)
129+
if result:
130+
await self._invalidate(key)
131+
return result
132+
133+
async def _invalidate(self, key: AppConfigFragmentKey) -> None:
134+
if self._cache_source is None:
135+
return
136+
await self._cache_source.invalidate_for_scope(key.scope_type, key.scope_id)
137137

138138
# ── Cross-scope reads ────────────────────────────────────────
139139

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .cache_source import AppConfigFragmentCacheSource
2+
3+
__all__ = ("AppConfigFragmentCacheSource",)
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
"""Cache source for AppConfigFragment merged-view reads.
2+
3+
The hot path is `AppConfigFragmentRepository.app_config(user_id, name)`,
4+
the per-`(user, name)` deep-merge of every contributing fragment in
5+
the policy's `scope_sources` chain (BEP-1052 §5). Each merged value
6+
gets cached under `app_config:merged:{user_id}:{name}` with a TTL,
7+
and is invalidated whenever a contributing fragment changes.
8+
9+
Membership indexes let invalidation work without `SCAN`:
10+
11+
- ``app_config:user_keys:{user_id}`` — set of `(user_id, name)` cache
12+
keys this user currently has cached. Per-user invalidation pops the
13+
set in one `SMEMBERS` + `DEL`.
14+
- ``app_config:domain_users:{domain_name}`` — set of `user_id`s
15+
currently observed in this domain. Per-domain invalidation expands
16+
to a per-user invalidation for each member.
17+
18+
Cache failures never break a request — every public method is wrapped
19+
in `suppress_with_log` so any Valkey error falls through to the DB.
20+
"""
21+
22+
from __future__ import annotations
23+
24+
import logging
25+
import uuid
26+
from collections.abc import Mapping
27+
from typing import Any, cast
28+
29+
from glide import Batch, ExpirySet, ExpiryType
30+
31+
from ai.backend.common.json import dump_json, load_json
32+
from ai.backend.logging.utils import BraceStyleAdapter
33+
from ai.backend.manager.clients.valkey_client.valkey_cache import ValkeyCache
34+
from ai.backend.manager.data.app_config.types import AppConfigData
35+
from ai.backend.manager.data.app_config_fragment.types import AppConfigScopeType
36+
from ai.backend.manager.repositories.utils import suppress_with_log
37+
38+
log = BraceStyleAdapter(logging.getLogger(__spec__.name))
39+
40+
41+
class AppConfigFragmentCacheSource:
42+
"""Valkey-backed cache for the per-`(user, name)` merged AppConfig view."""
43+
44+
_valkey_cache: ValkeyCache
45+
_cache_ttl: int
46+
47+
def __init__(self, valkey_cache: ValkeyCache, cache_ttl: int = 600) -> None:
48+
"""Args:
49+
valkey_cache: Manager-side Valkey cache client.
50+
cache_ttl: TTL in seconds (default: 10 minutes).
51+
"""
52+
self._valkey_cache = valkey_cache
53+
self._cache_ttl = cache_ttl
54+
55+
# ── Key derivation ─────────────────────────────────────────────
56+
57+
@staticmethod
58+
def _merged_key(user_id: uuid.UUID | str, name: str) -> str:
59+
return f"app_config:merged:{user_id}:{name}"
60+
61+
@staticmethod
62+
def _user_keys_set(user_id: uuid.UUID | str) -> str:
63+
return f"app_config:user_keys:{user_id}"
64+
65+
@staticmethod
66+
def _domain_users_set(domain_name: str) -> str:
67+
return f"app_config:domain_users:{domain_name}"
68+
69+
# ── Read ───────────────────────────────────────────────────────
70+
71+
async def get_merged_config(
72+
self,
73+
user_id: uuid.UUID,
74+
name: str,
75+
) -> Mapping[str, Any] | None:
76+
"""Return the cached `config` payload, or `None` on miss / failure.
77+
78+
Only the merged `config` mapping is cached — `fragments` are
79+
cheap to recompute when needed and would bloat the cache.
80+
"""
81+
with suppress_with_log([Exception], "Failed to read merged config from cache"):
82+
cache_key = self._merged_key(user_id, name)
83+
async with self._valkey_cache.client() as conn:
84+
cached_value = await conn.get(cache_key)
85+
if cached_value:
86+
log.debug("Cache hit for merged config: {} {}", user_id, name)
87+
return cast(Mapping[str, Any] | None, load_json(cached_value))
88+
log.debug("Cache miss for merged config: {} {}", user_id, name)
89+
return None
90+
91+
# ── Write ──────────────────────────────────────────────────────
92+
93+
async def set_merged_config(
94+
self,
95+
merged: AppConfigData,
96+
domain_name: str | None = None,
97+
) -> None:
98+
"""Cache the merged `config` payload + index it for invalidation.
99+
100+
Indexes the cache key in the user's key set; if `domain_name`
101+
is supplied, also adds the user to the domain's user set so
102+
domain-level invalidation can cascade.
103+
"""
104+
if merged.config is None:
105+
# Nothing useful to cache — leave as miss-on-next-read.
106+
return
107+
with suppress_with_log([Exception], "Failed to write merged config to cache"):
108+
user_id = str(merged.user_id)
109+
cache_key = self._merged_key(user_id, merged.name)
110+
111+
batch = Batch(is_atomic=False)
112+
batch.set(
113+
cache_key,
114+
dump_json(merged.config),
115+
expiry=ExpirySet(ExpiryType.SEC, self._cache_ttl),
116+
)
117+
user_keys = self._user_keys_set(user_id)
118+
batch.sadd(user_keys, [cache_key])
119+
batch.expire(user_keys, self._cache_ttl)
120+
if domain_name is not None:
121+
domain_users = self._domain_users_set(domain_name)
122+
batch.sadd(domain_users, [user_id])
123+
batch.expire(domain_users, self._cache_ttl)
124+
125+
async with self._valkey_cache.client() as conn:
126+
await conn.exec(batch, raise_on_error=True)
127+
128+
log.trace(
129+
"Cached merged config for user {} name {} (domain={})",
130+
user_id,
131+
merged.name,
132+
domain_name,
133+
)
134+
135+
# ── Invalidate ────────────────────────────────────────────────
136+
137+
async def invalidate_for_scope(
138+
self,
139+
scope_type: AppConfigScopeType,
140+
scope_id: str,
141+
) -> None:
142+
"""Invalidate every cached merged view affected by a fragment write.
143+
144+
`scope_id` is the user UUID for `USER`, the domain name for
145+
`DOMAIN` / `DOMAIN_USER_DEFAULTS`, or the literal `"public"`
146+
for `PUBLIC`. `PUBLIC` invalidation is intentionally not
147+
wired here — its blast radius is the whole cache; rely on TTL
148+
for now (admin-only, low-frequency operation).
149+
"""
150+
with suppress_with_log([Exception], "Failed to invalidate merged-config cache"):
151+
match scope_type:
152+
case AppConfigScopeType.USER:
153+
await self._invalidate_user(scope_id)
154+
case AppConfigScopeType.DOMAIN | AppConfigScopeType.DOMAIN_USER_DEFAULTS:
155+
await self._invalidate_domain(scope_id)
156+
case AppConfigScopeType.PUBLIC:
157+
log.debug(
158+
"PUBLIC-scope invalidation not wired; relying on TTL ({}s)",
159+
self._cache_ttl,
160+
)
161+
162+
async def invalidate_for_user(self, user_id: uuid.UUID | str) -> None:
163+
"""Drop every cached merged view owned by `user_id`.
164+
165+
Convenience wrapper used by self-service bulk writes that
166+
always operate on the caller's `USER` row.
167+
"""
168+
with suppress_with_log([Exception], "Failed to invalidate user merged-config cache"):
169+
await self._invalidate_user(str(user_id))
170+
171+
async def _invalidate_user(self, user_id: str) -> None:
172+
user_keys = self._user_keys_set(user_id)
173+
async with self._valkey_cache.client() as conn:
174+
cached_keys = await conn.smembers(user_keys)
175+
if not cached_keys:
176+
log.debug("No cached keys for user {}, skipping invalidation", user_id)
177+
return
178+
keys_to_delete: list[str | bytes] = list(cached_keys)
179+
keys_to_delete.append(user_keys)
180+
async with self._valkey_cache.client() as conn:
181+
removed = await conn.delete(keys_to_delete)
182+
log.debug("Invalidated {} merged-config keys for user {}", removed, user_id)
183+
184+
async def _invalidate_domain(self, domain_name: str) -> None:
185+
domain_users = self._domain_users_set(domain_name)
186+
async with self._valkey_cache.client() as conn:
187+
user_ids = await conn.smembers(domain_users)
188+
if not user_ids:
189+
log.debug(
190+
"No tracked users for domain {}, skipping cache invalidation",
191+
domain_name,
192+
)
193+
return
194+
for raw in user_ids:
195+
user_id = raw.decode() if isinstance(raw, bytes) else str(raw)
196+
await self._invalidate_user(user_id)
197+
async with self._valkey_cache.client() as conn:
198+
await conn.delete([domain_users])
199+
log.debug(
200+
"Invalidated merged-config caches for {} users in domain {}",
201+
len(user_ids),
202+
domain_name,
203+
)

src/ai/backend/manager/repositories/app_config_fragment/db_source/db_source.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,19 @@ async def get_by_id(self, id: uuid.UUID) -> AppConfigFragmentData | None:
103103
)
104104
return row.to_data() if row is not None else None
105105

106+
@app_config_fragment_db_source_resilience.apply()
107+
async def user_domain_name(self, user_id: uuid.UUID) -> str | None:
108+
"""Single-column lookup of a user's `domain_name`.
109+
110+
Used by the cache layer to tag merged-view entries with their
111+
owning domain so domain-scoped fragment writes can target a
112+
bounded user set during invalidation.
113+
"""
114+
async with self._db.begin_readonly_session() as db_sess:
115+
return await db_sess.scalar(
116+
sa.select(UserRow.domain_name).where(UserRow.uuid == user_id)
117+
)
118+
106119
@app_config_fragment_db_source_resilience.apply()
107120
async def create(self, creator: Creator[AppConfigFragmentRow]) -> AppConfigFragmentData:
108121
"""Insert a new fragment via the shared Creator helper.

src/ai/backend/manager/repositories/app_config_fragment/repositories.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
from dataclasses import dataclass
22
from typing import Self
33

4+
from ai.backend.manager.clients.valkey_client.valkey_cache import ValkeyCache
45
from ai.backend.manager.repositories.app_config_fragment.admin_repository import (
56
AppConfigFragmentAdminRepository,
67
)
8+
from ai.backend.manager.repositories.app_config_fragment.cache_source import (
9+
AppConfigFragmentCacheSource,
10+
)
711
from ai.backend.manager.repositories.app_config_fragment.repository import (
812
AppConfigFragmentRepository,
913
)
@@ -17,7 +21,12 @@ class AppConfigFragmentRepositories:
1721

1822
@classmethod
1923
def create(cls, args: RepositoryArgs) -> Self:
24+
# The merged-view read path is fronted by a Valkey cache; the
25+
# repository falls through to the DB transparently if the
26+
# cache layer fails.
27+
valkey_cache = ValkeyCache(args.valkey_stat_client._client)
28+
cache_source = AppConfigFragmentCacheSource(valkey_cache)
2029
return cls(
21-
repository=AppConfigFragmentRepository(args.db),
22-
admin_repository=AppConfigFragmentAdminRepository(args.db),
30+
repository=AppConfigFragmentRepository(args.db, cache_source=cache_source),
31+
admin_repository=AppConfigFragmentAdminRepository(args.db, cache_source=cache_source),
2332
)

0 commit comments

Comments
 (0)