Skip to content

Commit ffa35db

Browse files
jopemachineclaude
andcommitted
feat(BA-6626): app_config_fragment bulk repository layer & conditional-bulk primitives
Repository-layer foundation for app_config_fragment bulk operations (split out of BA-6618): - Generic conditional-bulk primitives in repositories/base: ConditionalCreator / ConditionalUpdater / ConditionalPurger and BulkConditionalCreator / BulkConditionalUpdater / BulkConditionalPurger (a write spec paired with an only_if existence gate). - WriteOps.bulk_conditional_create_partial / _update_partial / _purge_partial — each item runs in its own savepoint for partial success (gate-rejected / missing / failed items are reported, the rest proceed). - Repository errors ConditionalWriteNotAllowed / ConditionalWriteTargetNotFound. - app_config_fragment repository: AppConfigFragmentCreatorSpec as a plain CreatorSpec with scope-based rank; bulk_create / bulk_update / bulk_purge in db_source + repository; the AppConfigFragmentBulkWriteResult data type (succeeded + failed[index, message]). - Repository and ops-provider unit tests. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent d4c9971 commit ffa35db

14 files changed

Lines changed: 920 additions & 48 deletions

File tree

changes/12426.misc.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add generic conditional-bulk repository primitives (`ConditionalCreator`/`Updater`/`Purger` paired with an `only_if` gate, and `WriteOps.bulk_conditional_*_partial` for per-item partial success) and the `app_config_fragment` bulk repository operations built on them.

src/ai/backend/manager/data/app_config_fragment/types.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,23 @@ class AppConfigFragmentSearchResult:
3030
total_count: int
3131
has_next_page: bool
3232
has_previous_page: bool
33+
34+
35+
@dataclass(frozen=True)
36+
class AppConfigFragmentBulkItemError:
37+
"""One failed item of a partial bulk mutation: its batch position and a reason."""
38+
39+
index: int
40+
message: str
41+
42+
43+
@dataclass(frozen=True)
44+
class AppConfigFragmentBulkWriteResult:
45+
"""Partial-success result of a bulk mutation.
46+
47+
``succeeded`` are the fragments that were created/updated/purged; ``failed`` are the items
48+
whose gate was rejected or whose write failed, each with its batch ``index`` and a reason.
49+
"""
50+
51+
succeeded: list[AppConfigFragmentData]
52+
failed: list[AppConfigFragmentBulkItemError]

src/ai/backend/manager/errors/repository.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
ErrorDomain,
1616
ErrorOperation,
1717
)
18+
from ai.backend.manager.errors.common import GenericForbidden, ObjectNotFound
1819

1920

2021
class RepositoryError(BackendAIError):
@@ -192,3 +193,25 @@ def error_code(self) -> ErrorCode:
192193
operation=ErrorOperation.GENERIC,
193194
error_detail=ErrorDetail.CONFLICT,
194195
)
196+
197+
198+
class ConditionalWriteNotAllowed(GenericForbidden):
199+
"""A gated bulk write item was rejected because its ``only_if`` gate did not pass.
200+
201+
Recorded as a per-item failure by the partial ``WriteOps.bulk_conditional_*_partial``
202+
methods (the item is skipped, the rest proceed).
203+
"""
204+
205+
error_type = "https://api.backend.ai/probs/conditional-write-not-allowed"
206+
error_title = "Conditional write is not allowed (gate not satisfied)."
207+
208+
209+
class ConditionalWriteTargetNotFound(ObjectNotFound):
210+
"""A gated bulk update/purge item targeted a row that does not exist.
211+
212+
Recorded as a per-item failure by the partial ``WriteOps.bulk_conditional_update_partial``
213+
/ ``bulk_conditional_purge_partial`` methods.
214+
"""
215+
216+
error_type = "https://api.backend.ai/probs/conditional-write-target-not-found"
217+
object_name = "target row"

src/ai/backend/manager/repositories/app_config_fragment/creators.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,26 @@
77

88
from ai.backend.common.data.app_config.types import AppConfigScopeType
99
from ai.backend.manager.models.app_config_fragment.row import AppConfigFragmentRow
10-
from ai.backend.manager.repositories.base.creator import DependentCreatorSpec
10+
from ai.backend.manager.repositories.base.creator import CreatorSpec
11+
12+
# Merge precedence by scope: lower ranks merge first, so a more specific scope (higher rank)
13+
# overrides a broader one. Within one resolve there is at most one fragment per scope_type
14+
# (unique on ``(config_name, scope_type, scope_id)``), so these ranks are distinct per merge.
15+
_SCOPE_RANK: dict[AppConfigScopeType, int] = {
16+
AppConfigScopeType.PUBLIC: 100,
17+
AppConfigScopeType.DOMAIN: 200,
18+
AppConfigScopeType.USER: 300,
19+
}
1120

1221

1322
@dataclass
14-
class AppConfigFragmentCreatorSpec(DependentCreatorSpec[int, AppConfigFragmentRow]):
15-
"""Fragment creator whose ``rank`` is assigned by the ops layer (next-value) at execution.
23+
class AppConfigFragmentCreatorSpec(CreatorSpec[AppConfigFragmentRow]):
24+
"""Fragment creator whose ``rank`` is derived from its ``scope_type`` (scope precedence).
1625
17-
``build_row`` receives the computed next rank, so a newly created fragment is placed
18-
after the existing fragments for the same ``config_name``.
26+
A fragment's rank is fixed by its scope (``public`` < ``domain`` < ``user``), so a more
27+
specific scope overrides a broader one when fragments are merged — independent of creation
28+
order. Being a plain ``CreatorSpec`` (rank known at build time), it composes with
29+
``BulkConditionalCreator`` for gated bulk inserts.
1930
"""
2031

2132
config_name: str
@@ -24,11 +35,11 @@ class AppConfigFragmentCreatorSpec(DependentCreatorSpec[int, AppConfigFragmentRo
2435
config: dict[str, Any]
2536

2637
@override
27-
def build_row(self, next_rank: int) -> AppConfigFragmentRow:
38+
def build_row(self) -> AppConfigFragmentRow:
2839
return AppConfigFragmentRow(
2940
config_name=self.config_name,
3041
scope_type=self.scope_type,
3142
scope_id=self.scope_id,
32-
rank=next_rank,
43+
rank=_SCOPE_RANK[self.scope_type],
3344
config=self.config,
3445
)

src/ai/backend/manager/repositories/app_config_fragment/db_source/db_source.py

Lines changed: 114 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
from ai.backend.common.resilience.policies.retry import BackoffStrategy, RetryArgs, RetryPolicy
1414
from ai.backend.common.resilience.resilience import Resilience
1515
from ai.backend.manager.data.app_config_fragment.types import (
16+
AppConfigFragmentBulkItemError,
17+
AppConfigFragmentBulkWriteResult,
1618
AppConfigFragmentData,
1719
AppConfigFragmentSearchResult,
1820
)
@@ -21,27 +23,26 @@
2123
AppConfigFragmentWriteNotAllowed,
2224
)
2325
from ai.backend.manager.models.app_config_allow_list.row import AppConfigAllowListRow
24-
from ai.backend.manager.models.app_config_definition.row import AppConfigDefinitionRow
2526
from ai.backend.manager.models.app_config_fragment.row import AppConfigFragmentRow
2627
from ai.backend.manager.models.scopes import SearchScope
2728
from ai.backend.manager.repositories.app_config_fragment.creators import (
2829
AppConfigFragmentCreatorSpec,
2930
)
3031
from ai.backend.manager.repositories.base import (
3132
BatchQuerier,
33+
BulkConditionalCreator,
34+
BulkConditionalPurger,
35+
BulkConditionalUpdater,
36+
Creator,
3237
ExistsQuerier,
3338
Purger,
3439
Querier,
3540
Updater,
3641
)
37-
from ai.backend.manager.repositories.base.creator import NextValuePolicy
38-
from ai.backend.manager.repositories.ops import DBOpsProvider
42+
from ai.backend.manager.repositories.ops import DBOpsProvider, WriteOps
3943

4044
__all__ = ("AppConfigFragmentDBSource",)
4145

42-
# Gap between successive ranks, leaving room to re-order fragments without renumbering.
43-
RANK_GAP = 100
44-
4546
app_config_fragment_db_source_resilience = Resilience(
4647
policies=[
4748
MetricPolicy(
@@ -70,29 +71,62 @@ class AppConfigFragmentDBSource:
7071
def __init__(self, ops_provider: DBOpsProvider) -> None:
7172
self._ops = ops_provider
7273

74+
async def _update_in_tx(
75+
self,
76+
w: WriteOps,
77+
updater: Updater[AppConfigFragmentRow],
78+
only_if: ExistsQuerier[AppConfigAllowListRow],
79+
) -> AppConfigFragmentData:
80+
"""Gate + update one fragment inside the caller's write transaction.
81+
82+
A missing fragment surfaces as the update returning None below.
83+
"""
84+
if not await w.exists(only_if):
85+
raise AppConfigFragmentWriteNotAllowed(
86+
f"Writing app config fragment {updater.pk_value} is not allowed."
87+
)
88+
result = await w.update(updater)
89+
if result is None:
90+
raise AppConfigFragmentNotFound(f"App config fragment {updater.pk_value} not found")
91+
return result.row.to_data()
92+
93+
async def _purge_in_tx(
94+
self,
95+
w: WriteOps,
96+
purger: Purger[AppConfigFragmentRow],
97+
only_if: ExistsQuerier[AppConfigAllowListRow],
98+
) -> AppConfigFragmentData:
99+
"""Gate + purge one fragment inside the caller's write transaction.
100+
101+
A missing fragment surfaces as the purge returning None below.
102+
"""
103+
if not await w.exists(only_if):
104+
raise AppConfigFragmentWriteNotAllowed(
105+
f"Writing app config fragment {purger.pk_value} is not allowed."
106+
)
107+
result = await w.purge(purger)
108+
if result is None:
109+
raise AppConfigFragmentNotFound(f"App config fragment {purger.pk_value} not found")
110+
return result.row.to_data()
111+
73112
@app_config_fragment_db_source_resilience.apply()
74113
async def create(
75114
self,
76115
spec: AppConfigFragmentCreatorSpec,
77116
only_if: ExistsQuerier[AppConfigAllowListRow],
78117
) -> AppConfigFragmentData:
79-
policy = NextValuePolicy(
80-
column=AppConfigFragmentRow.rank,
81-
scope_condition=lambda: AppConfigFragmentRow.config_name == spec.config_name,
82-
lock_selector=sa.select(AppConfigDefinitionRow).where(
83-
AppConfigDefinitionRow.config_name == spec.config_name
84-
),
85-
gap=RANK_GAP,
86-
)
87-
# ``only_if`` (built by the caller) and the write run in one transaction, so the gate
88-
# check and the write commit atomically — no check-then-write race.
118+
"""Gate + create one fragment in a single write transaction.
119+
120+
The gate check and the write run in one transaction, so they commit atomically —
121+
no check-then-write race. ``rank`` is derived from the fragment's ``scope_type``.
122+
"""
89123
async with self._ops.write_ops() as w:
90124
if not await w.exists(only_if):
91125
raise AppConfigFragmentWriteNotAllowed(
92126
f"Writing app config {spec.config_name!r} at scope "
93127
f"{spec.scope_type.value!r} is not allowed."
94128
)
95-
created = await w.create_with_next_value(policy, spec)
129+
created = await w.create(Creator(spec=spec))
96130
return created.row.to_data()
97131

98132
@app_config_fragment_db_source_resilience.apply()
@@ -109,35 +143,78 @@ async def update(
109143
updater: Updater[AppConfigFragmentRow],
110144
only_if: ExistsQuerier[AppConfigAllowListRow],
111145
) -> AppConfigFragmentData:
112-
# Gate first, then write — both in one transaction so the check and the write commit
113-
# atomically. A missing fragment surfaces as the update returning None below.
114146
async with self._ops.write_ops() as w:
115-
if not await w.exists(only_if):
116-
raise AppConfigFragmentWriteNotAllowed(
117-
f"Writing app config fragment {updater.pk_value} is not allowed."
118-
)
119-
result = await w.update(updater)
120-
if result is None:
121-
raise AppConfigFragmentNotFound(f"App config fragment {updater.pk_value} not found")
122-
return result.row.to_data()
147+
return await self._update_in_tx(w, updater, only_if)
123148

124149
@app_config_fragment_db_source_resilience.apply()
125150
async def purge(
126151
self,
127152
purger: Purger[AppConfigFragmentRow],
128153
only_if: ExistsQuerier[AppConfigAllowListRow],
129154
) -> AppConfigFragmentData:
130-
# Gate first, then write — both in one transaction so the check and the write commit
131-
# atomically. A missing fragment surfaces as the purge returning None below.
132155
async with self._ops.write_ops() as w:
133-
if not await w.exists(only_if):
134-
raise AppConfigFragmentWriteNotAllowed(
135-
f"Writing app config fragment {purger.pk_value} is not allowed."
136-
)
137-
result = await w.purge(purger)
138-
if result is None:
139-
raise AppConfigFragmentNotFound(f"App config fragment {purger.pk_value} not found")
140-
return result.row.to_data()
156+
return await self._purge_in_tx(w, purger, only_if)
157+
158+
@app_config_fragment_db_source_resilience.apply()
159+
async def bulk_create(
160+
self,
161+
bulk: BulkConditionalCreator[AppConfigFragmentRow, AppConfigAllowListRow],
162+
) -> AppConfigFragmentBulkWriteResult:
163+
"""Create many fragments with partial success.
164+
165+
Each item is gated and inserted independently in its own savepoint: a rejected gate or a
166+
failed insert is reported in ``failed`` (with its batch index) while the rest are
167+
created. The whole batch shares one transaction, so the successful inserts commit together.
168+
"""
169+
async with self._ops.write_ops() as w:
170+
result = await w.bulk_conditional_create_partial(bulk)
171+
return AppConfigFragmentBulkWriteResult(
172+
succeeded=[row.to_data() for row in result.successes],
173+
failed=[
174+
AppConfigFragmentBulkItemError(index=e.index, message=str(e.exception))
175+
for e in result.errors
176+
],
177+
)
178+
179+
@app_config_fragment_db_source_resilience.apply()
180+
async def bulk_update(
181+
self,
182+
bulk: BulkConditionalUpdater[AppConfigFragmentRow, AppConfigAllowListRow],
183+
) -> AppConfigFragmentBulkWriteResult:
184+
"""Update many fragments with partial success.
185+
186+
Each item is gated and updated independently in its own savepoint: a rejected gate, a
187+
missing target, or a failed update is reported in ``failed`` while the rest are updated.
188+
"""
189+
async with self._ops.write_ops() as w:
190+
result = await w.bulk_conditional_update_partial(bulk)
191+
return AppConfigFragmentBulkWriteResult(
192+
succeeded=[row.to_data() for row in result.successes],
193+
failed=[
194+
AppConfigFragmentBulkItemError(index=e.index, message=str(e.exception))
195+
for e in result.errors
196+
],
197+
)
198+
199+
@app_config_fragment_db_source_resilience.apply()
200+
async def bulk_purge(
201+
self,
202+
bulk: BulkConditionalPurger[AppConfigFragmentRow, AppConfigAllowListRow],
203+
) -> AppConfigFragmentBulkWriteResult:
204+
"""Purge many fragments with partial success.
205+
206+
Each item is gated and deleted independently in its own savepoint: a rejected gate, a
207+
missing target, or a failed delete is reported in ``failed`` while the rest are purged.
208+
"""
209+
async with self._ops.write_ops() as w:
210+
result = await w.bulk_conditional_purge_partial(bulk)
211+
return AppConfigFragmentBulkWriteResult(
212+
succeeded=[row.to_data() for row in result.successes],
213+
failed=[
214+
AppConfigFragmentBulkItemError(index=e.index, message=str(e.exception))
215+
for e in result.errors
216+
],
217+
)
141218

142219
@app_config_fragment_db_source_resilience.apply()
143220
async def admin_search(self, querier: BatchQuerier) -> AppConfigFragmentSearchResult:

src/ai/backend/manager/repositories/app_config_fragment/repository.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from ai.backend.common.resilience.policies.retry import BackoffStrategy, RetryArgs, RetryPolicy
1010
from ai.backend.common.resilience.resilience import Resilience
1111
from ai.backend.manager.data.app_config_fragment.types import (
12+
AppConfigFragmentBulkWriteResult,
1213
AppConfigFragmentData,
1314
AppConfigFragmentSearchResult,
1415
)
@@ -21,7 +22,15 @@
2122
from ai.backend.manager.repositories.app_config_fragment.db_source import (
2223
AppConfigFragmentDBSource,
2324
)
24-
from ai.backend.manager.repositories.base import BatchQuerier, ExistsQuerier, Purger, Updater
25+
from ai.backend.manager.repositories.base import (
26+
BatchQuerier,
27+
BulkConditionalCreator,
28+
BulkConditionalPurger,
29+
BulkConditionalUpdater,
30+
ExistsQuerier,
31+
Purger,
32+
Updater,
33+
)
2534
from ai.backend.manager.repositories.ops import DBOpsProvider
2635

2736
__all__ = ("AppConfigFragmentRepository",)
@@ -91,3 +100,24 @@ async def scoped_search(
91100
self, querier: BatchQuerier, scopes: Sequence[SearchScope]
92101
) -> AppConfigFragmentSearchResult:
93102
return await self._db_source.scoped_search(querier, scopes)
103+
104+
@app_config_fragment_repository_resilience.apply()
105+
async def bulk_create(
106+
self,
107+
bulk: BulkConditionalCreator[AppConfigFragmentRow, AppConfigAllowListRow],
108+
) -> AppConfigFragmentBulkWriteResult:
109+
return await self._db_source.bulk_create(bulk)
110+
111+
@app_config_fragment_repository_resilience.apply()
112+
async def bulk_update(
113+
self,
114+
bulk: BulkConditionalUpdater[AppConfigFragmentRow, AppConfigAllowListRow],
115+
) -> AppConfigFragmentBulkWriteResult:
116+
return await self._db_source.bulk_update(bulk)
117+
118+
@app_config_fragment_repository_resilience.apply()
119+
async def bulk_purge(
120+
self,
121+
bulk: BulkConditionalPurger[AppConfigFragmentRow, AppConfigAllowListRow],
122+
) -> AppConfigFragmentBulkWriteResult:
123+
return await self._db_source.bulk_purge(bulk)

src/ai/backend/manager/repositories/app_config_fragment/types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
"""Types for app config fragment repository operations (search scopes)."""
1+
"""Types for app config fragment repository operations (search scopes, gated writes)."""
22

33
from __future__ import annotations
44

0 commit comments

Comments
 (0)