Skip to content

Commit 797e337

Browse files
committed
feat(BA-5837): add ValkeyCache source for AppConfigFragment merged-view reads
Re-introduces a Valkey-backed cache layer for the merged AppConfig view, replacing the cache source that was dropped together with the legacy `app_configs` table in #11265 (BA-5822). The hot path — `AppConfigFragmentRepository.app_config(user_id, name)` — is the WebUI bootstrap loop (BEP-1052 §6) and was previously a DB hit per request. Mirrors the legacy `AppConfigCacheSource` shape on top of the `(user, name)`-keyed merged view: - `cache_source/cache_source.py`: - `get_merged_config(user_id, name)` cache-aside read keyed by `app_config:merged:{user_id}:{name}` - `set_merged_config(merged, domain_name=...)` writes the deep-merged payload + indexes the cache key in `app_config:user_keys:{user_id}` and the user in `app_config:domain_users:{domain_name}` so invalidation runs without `SCAN` - `invalidate_for_scope(scope_type, scope_id)`: - `USER`: drop the user's per-name set - `DOMAIN` / `DOMAIN_USER_DEFAULTS`: cascade through every member of the domain user set - `PUBLIC`: rely on TTL (broad invalidation TBD) - `invalidate_for_user(user_id)` convenience for self-service writes - `db_source.user_domain_name(user_id)` single-column lookup so the cache layer can tag merged-view entries with their owning domain - `AppConfigFragmentRepository.app_config(...)` is now cache-aside (cache hit returns the cached payload + fresh DB fragments; cache miss writes through) - `AppConfigFragmentAdminRepository.{create,update,purge}` invalidate the affected scope after the DB write - `repositories.py` builds the cache source from `args.valkey_stat_client` and threads it into both repos - All cache calls pass through `suppress_with_log` — cache failures log a warning and fall through to the DB
1 parent 138a35d commit 797e337

7 files changed

Lines changed: 294 additions & 9 deletions

File tree

changes/11297.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Front the `AppConfigFragment` merged-view repository with a Valkey cache. Read path is cache-aside (`get_merged_config(user_id, name)` → DB fallback → cache write); admin / self-service bulk writes invalidate per-user or per-domain via membership indexes (no `SCAN`). Cache failures fall through to the DB (BEP-1052 §5).

src/ai/backend/manager/repositories/app_config_fragment/admin_repository.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
)
1212
from ai.backend.manager.models.app_config_fragment.row import AppConfigFragmentRow
1313
from ai.backend.manager.models.utils import ExtendedAsyncSAEngine
14+
from ai.backend.manager.repositories.app_config_fragment.cache_source import (
15+
AppConfigFragmentCacheSource,
16+
)
1417
from ai.backend.manager.repositories.app_config_fragment.creators import (
1518
AppConfigFragmentCreatorSpec,
1619
)
@@ -42,9 +45,15 @@ class AppConfigFragmentAdminRepository:
4245
"""
4346

4447
_db_source: AppConfigFragmentDBSource
48+
_cache_source: AppConfigFragmentCacheSource | None
4549

46-
def __init__(self, db: ExtendedAsyncSAEngine) -> None:
50+
def __init__(
51+
self,
52+
db: ExtendedAsyncSAEngine,
53+
cache_source: AppConfigFragmentCacheSource | None = None,
54+
) -> None:
4755
self._db_source = AppConfigFragmentDBSource(db)
56+
self._cache_source = cache_source
4857

4958
# ── Mutations ─────────────────────────────────────────────────
5059

@@ -61,7 +70,9 @@ async def create(
6170
extra_config=extra_config,
6271
),
6372
)
64-
return await self._db_source.create(creator)
73+
result = await self._db_source.create(creator)
74+
await self._invalidate(key)
75+
return result
6576

6677
async def update(
6778
self,
@@ -71,10 +82,20 @@ async def update(
7182
"""Update a fragment by natural key. Raises
7283
``AppConfigFragmentNotFound`` when missing."""
7384
spec = AppConfigFragmentUpdaterSpec(extra_config=extra_config)
74-
return await self._db_source.update(key, spec)
85+
result = await self._db_source.update(key, spec)
86+
await self._invalidate(key)
87+
return result
7588

7689
async def purge(self, key: AppConfigFragmentKey) -> bool:
77-
return await self._db_source.purge(key)
90+
result = await self._db_source.purge(key)
91+
if result:
92+
await self._invalidate(key)
93+
return result
94+
95+
async def _invalidate(self, key: AppConfigFragmentKey) -> None:
96+
if self._cache_source is None:
97+
return
98+
await self._cache_source.invalidate_for_scope(key.scope_type, key.scope_id)
7899

79100
# ── Cross-scope reads ────────────────────────────────────────
80101

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .cache_source import AppConfigFragmentCacheSource
2+
3+
__all__ = ("AppConfigFragmentCacheSource",)
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
"""Cache source for AppConfigFragment merged-view reads.
2+
3+
The hot path is `AppConfigFragmentRepository.app_config(user_id, name)`,
4+
the per-`(user, name)` deep-merge of every contributing fragment in
5+
the policy's `scope_sources` chain (BEP-1052 §5). Each merged value
6+
gets cached under `app_config:merged:{user_id}:{name}` with a TTL,
7+
and is invalidated whenever a contributing fragment changes.
8+
9+
Membership indexes let invalidation work without `SCAN`:
10+
11+
- ``app_config:user_keys:{user_id}`` — set of `(user_id, name)` cache
12+
keys this user currently has cached. Per-user invalidation pops the
13+
set in one `SMEMBERS` + `DEL`.
14+
- ``app_config:domain_users:{domain_name}`` — set of `user_id`s
15+
currently observed in this domain. Per-domain invalidation expands
16+
to a per-user invalidation for each member.
17+
18+
Cache failures never break a request — every public method is wrapped
19+
in `suppress_with_log` so any Valkey error falls through to the DB.
20+
"""
21+
22+
from __future__ import annotations
23+
24+
import logging
25+
import uuid
26+
from collections.abc import Mapping
27+
from typing import Any, cast
28+
29+
from glide import Batch, ExpirySet, ExpiryType
30+
31+
from ai.backend.common.json import dump_json, load_json
32+
from ai.backend.logging.utils import BraceStyleAdapter
33+
from ai.backend.manager.clients.valkey_client.valkey_cache import ValkeyCache
34+
from ai.backend.manager.data.app_config.types import AppConfigData
35+
from ai.backend.manager.data.app_config_fragment.types import AppConfigScopeType
36+
from ai.backend.manager.repositories.utils import suppress_with_log
37+
38+
log = BraceStyleAdapter(logging.getLogger(__spec__.name))
39+
40+
41+
class AppConfigFragmentCacheSource:
42+
"""Valkey-backed cache for the per-`(user, name)` merged AppConfig view."""
43+
44+
_valkey_cache: ValkeyCache
45+
_cache_ttl: int
46+
47+
def __init__(self, valkey_cache: ValkeyCache, cache_ttl: int = 600) -> None:
48+
"""Args:
49+
valkey_cache: Manager-side Valkey cache client.
50+
cache_ttl: TTL in seconds (default: 10 minutes).
51+
"""
52+
self._valkey_cache = valkey_cache
53+
self._cache_ttl = cache_ttl
54+
55+
# ── Key derivation ─────────────────────────────────────────────
56+
57+
@staticmethod
58+
def _merged_key(user_id: uuid.UUID | str, name: str) -> str:
59+
return f"app_config:merged:{user_id}:{name}"
60+
61+
@staticmethod
62+
def _user_keys_set(user_id: uuid.UUID | str) -> str:
63+
return f"app_config:user_keys:{user_id}"
64+
65+
@staticmethod
66+
def _domain_users_set(domain_name: str) -> str:
67+
return f"app_config:domain_users:{domain_name}"
68+
69+
# ── Read ───────────────────────────────────────────────────────
70+
71+
async def get_merged_config(
72+
self,
73+
user_id: uuid.UUID,
74+
name: str,
75+
) -> Mapping[str, Any] | None:
76+
"""Return the cached `config` payload, or `None` on miss / failure.
77+
78+
Only the merged `config` mapping is cached — `fragments` are
79+
cheap to recompute when needed and would bloat the cache.
80+
"""
81+
with suppress_with_log([Exception], "Failed to read merged config from cache"):
82+
cache_key = self._merged_key(user_id, name)
83+
async with self._valkey_cache.client() as conn:
84+
cached_value = await conn.get(cache_key)
85+
if cached_value:
86+
log.debug("Cache hit for merged config: {} {}", user_id, name)
87+
return cast(Mapping[str, Any] | None, load_json(cached_value))
88+
log.debug("Cache miss for merged config: {} {}", user_id, name)
89+
return None
90+
91+
# ── Write ──────────────────────────────────────────────────────
92+
93+
async def set_merged_config(
94+
self,
95+
merged: AppConfigData,
96+
domain_name: str | None = None,
97+
) -> None:
98+
"""Cache the merged `config` payload + index it for invalidation.
99+
100+
Indexes the cache key in the user's key set; if `domain_name`
101+
is supplied, also adds the user to the domain's user set so
102+
domain-level invalidation can cascade.
103+
"""
104+
if merged.config is None:
105+
# Nothing useful to cache — leave as miss-on-next-read.
106+
return
107+
with suppress_with_log([Exception], "Failed to write merged config to cache"):
108+
user_id = str(merged.user_id)
109+
cache_key = self._merged_key(user_id, merged.name)
110+
111+
batch = Batch(is_atomic=False)
112+
batch.set(
113+
cache_key,
114+
dump_json(merged.config),
115+
expiry=ExpirySet(ExpiryType.SEC, self._cache_ttl),
116+
)
117+
user_keys = self._user_keys_set(user_id)
118+
batch.sadd(user_keys, [cache_key])
119+
batch.expire(user_keys, self._cache_ttl)
120+
if domain_name is not None:
121+
domain_users = self._domain_users_set(domain_name)
122+
batch.sadd(domain_users, [user_id])
123+
batch.expire(domain_users, self._cache_ttl)
124+
125+
async with self._valkey_cache.client() as conn:
126+
await conn.exec(batch, raise_on_error=True)
127+
128+
log.trace(
129+
"Cached merged config for user {} name {} (domain={})",
130+
user_id,
131+
merged.name,
132+
domain_name,
133+
)
134+
135+
# ── Invalidate ────────────────────────────────────────────────
136+
137+
async def invalidate_for_scope(
138+
self,
139+
scope_type: AppConfigScopeType,
140+
scope_id: str,
141+
) -> None:
142+
"""Invalidate every cached merged view affected by a fragment write.
143+
144+
`scope_id` is the user UUID for `USER`, the domain name for
145+
`DOMAIN` / `DOMAIN_USER_DEFAULTS`, or the literal `"public"`
146+
for `PUBLIC`. `PUBLIC` invalidation is intentionally not
147+
wired here — its blast radius is the whole cache; rely on TTL
148+
for now (admin-only, low-frequency operation).
149+
"""
150+
with suppress_with_log([Exception], "Failed to invalidate merged-config cache"):
151+
match scope_type:
152+
case AppConfigScopeType.USER:
153+
await self._invalidate_user(scope_id)
154+
case AppConfigScopeType.DOMAIN | AppConfigScopeType.DOMAIN_USER_DEFAULTS:
155+
await self._invalidate_domain(scope_id)
156+
case AppConfigScopeType.PUBLIC:
157+
log.debug(
158+
"PUBLIC-scope invalidation not wired; relying on TTL ({}s)",
159+
self._cache_ttl,
160+
)
161+
162+
async def invalidate_for_user(self, user_id: uuid.UUID | str) -> None:
163+
"""Drop every cached merged view owned by `user_id`.
164+
165+
Convenience wrapper used by self-service bulk writes that
166+
always operate on the caller's `USER` row.
167+
"""
168+
with suppress_with_log([Exception], "Failed to invalidate user merged-config cache"):
169+
await self._invalidate_user(str(user_id))
170+
171+
async def _invalidate_user(self, user_id: str) -> None:
172+
user_keys = self._user_keys_set(user_id)
173+
async with self._valkey_cache.client() as conn:
174+
cached_keys = await conn.smembers(user_keys)
175+
if not cached_keys:
176+
log.debug("No cached keys for user {}, skipping invalidation", user_id)
177+
return
178+
keys_to_delete: list[str | bytes] = list(cached_keys)
179+
keys_to_delete.append(user_keys)
180+
async with self._valkey_cache.client() as conn:
181+
removed = await conn.delete(keys_to_delete)
182+
log.debug("Invalidated {} merged-config keys for user {}", removed, user_id)
183+
184+
async def _invalidate_domain(self, domain_name: str) -> None:
185+
domain_users = self._domain_users_set(domain_name)
186+
async with self._valkey_cache.client() as conn:
187+
user_ids = await conn.smembers(domain_users)
188+
if not user_ids:
189+
log.debug(
190+
"No tracked users for domain {}, skipping cache invalidation",
191+
domain_name,
192+
)
193+
return
194+
for raw in user_ids:
195+
user_id = raw.decode() if isinstance(raw, bytes) else str(raw)
196+
await self._invalidate_user(user_id)
197+
async with self._valkey_cache.client() as conn:
198+
await conn.delete([domain_users])
199+
log.debug(
200+
"Invalidated merged-config caches for {} users in domain {}",
201+
len(user_ids),
202+
domain_name,
203+
)

src/ai/backend/manager/repositories/app_config_fragment/db_source/db_source.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,19 @@ async def get_by_id(self, id: uuid.UUID) -> AppConfigFragmentData | None:
107107
)
108108
return row.to_data() if row is not None else None
109109

110+
@app_config_fragment_db_source_resilience.apply()
111+
async def user_domain_name(self, user_id: uuid.UUID) -> str | None:
112+
"""Single-column lookup of a user's `domain_name`.
113+
114+
Used by the cache layer to tag merged-view entries with their
115+
owning domain so domain-scoped fragment writes can target a
116+
bounded user set during invalidation.
117+
"""
118+
async with self._db.begin_readonly_session() as db_sess:
119+
return await db_sess.scalar(
120+
sa.select(UserRow.domain_name).where(UserRow.uuid == user_id)
121+
)
122+
110123
@app_config_fragment_db_source_resilience.apply()
111124
async def create(self, creator: Creator[AppConfigFragmentRow]) -> AppConfigFragmentData:
112125
"""Insert a new fragment via the shared Creator helper.

src/ai/backend/manager/repositories/app_config_fragment/repositories.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
from dataclasses import dataclass
22
from typing import Self
33

4+
from ai.backend.manager.clients.valkey_client.valkey_cache import ValkeyCache
45
from ai.backend.manager.repositories.app_config_fragment.admin_repository import (
56
AppConfigFragmentAdminRepository,
67
)
8+
from ai.backend.manager.repositories.app_config_fragment.cache_source import (
9+
AppConfigFragmentCacheSource,
10+
)
711
from ai.backend.manager.repositories.app_config_fragment.repository import (
812
AppConfigFragmentRepository,
913
)
@@ -17,7 +21,12 @@ class AppConfigFragmentRepositories:
1721

1822
@classmethod
1923
def create(cls, args: RepositoryArgs) -> Self:
24+
# The merged-view read path is fronted by a Valkey cache; the
25+
# repository falls through to the DB transparently if the
26+
# cache layer fails.
27+
valkey_cache = ValkeyCache(args.valkey_stat_client._client)
28+
cache_source = AppConfigFragmentCacheSource(valkey_cache)
2029
return cls(
21-
repository=AppConfigFragmentRepository(args.db),
22-
admin_repository=AppConfigFragmentAdminRepository(args.db),
30+
repository=AppConfigFragmentRepository(args.db, cache_source=cache_source),
31+
admin_repository=AppConfigFragmentAdminRepository(args.db, cache_source=cache_source),
2332
)

src/ai/backend/manager/repositories/app_config_fragment/repository.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
AppConfigFragmentSearchResult,
1010
)
1111
from ai.backend.manager.models.utils import ExtendedAsyncSAEngine
12+
from ai.backend.manager.repositories.app_config_fragment.cache_source import (
13+
AppConfigFragmentCacheSource,
14+
)
1215
from ai.backend.manager.repositories.app_config_fragment.db_source import (
1316
AppConfigFragmentDBSource,
1417
)
@@ -25,13 +28,21 @@ class AppConfigFragmentRepository:
2528
Scope-bound reads on raw fragments plus the per-user merged
2629
`AppConfig` view (BEP-1052 §5). Mutations and admin cross-scope
2730
reads live on `AppConfigFragmentAdminRepository`. Retry + metric
28-
policies are applied at the DB-source layer.
31+
policies are applied at the DB-source layer; the merged-view read
32+
path is fronted by a Valkey cache so repeated WebUI bootstrap
33+
queries don't hammer the DB.
2934
"""
3035

3136
_db_source: AppConfigFragmentDBSource
37+
_cache_source: AppConfigFragmentCacheSource | None
3238

33-
def __init__(self, db: ExtendedAsyncSAEngine) -> None:
39+
def __init__(
40+
self,
41+
db: ExtendedAsyncSAEngine,
42+
cache_source: AppConfigFragmentCacheSource | None = None,
43+
) -> None:
3444
self._db_source = AppConfigFragmentDBSource(db)
45+
self._cache_source = cache_source
3546

3647
# ── Raw fragment reads ────────────────────────────────────────
3748

@@ -55,7 +66,31 @@ async def app_config(
5566
user_id: uuid.UUID,
5667
config_name: str,
5768
) -> AppConfigData:
58-
return await self._db_source.get_user_app_config(user_id, config_name)
69+
"""Cache-aside read for the per-`(user, name)` merged view.
70+
71+
Returns a fresh `AppConfigData` whose `config` payload may
72+
come from cache (fragments still resolve from the DB on
73+
demand). Cache failures fall through transparently.
74+
"""
75+
if self._cache_source is not None:
76+
cached_config = await self._cache_source.get_merged_config(user_id, config_name)
77+
if cached_config is not None:
78+
# Re-fetch the fragment list from the DB; only the
79+
# deep-merged `config` payload is cached. Keeps the
80+
# response-shape contract unchanged.
81+
merged = await self._db_source.get_user_app_config(user_id, config_name)
82+
return AppConfigData(
83+
user_id=merged.user_id,
84+
name=merged.name,
85+
fragments=merged.fragments,
86+
config=cached_config,
87+
)
88+
89+
merged = await self._db_source.get_user_app_config(user_id, config_name)
90+
if self._cache_source is not None:
91+
domain_name = await self._db_source.user_domain_name(user_id)
92+
await self._cache_source.set_merged_config(merged, domain_name=domain_name)
93+
return merged
5994

6095
async def search_app_configs(
6196
self,

0 commit comments

Comments
 (0)