Skip to content

Commit 9810ac4

Browse files
fregataaclaude
andcommitted
fix(BA-5967): pre-validate slot types in sokovan validator chain
Add two SessionSpec validator rules so slot keys absent from the target resource group's agent inventory are caught with a clear 4xx before they reach ResourceLimitRule, where the downstream humanizer would otherwise escalate the missing key into a 500. ImageSlotTypeRule rejects images that declare a resource_spec slot the target resource group has no agent for. RequestedSlotTypeRule rejects caller-supplied resource entries with the same property. Both rules also reject when the RG has no non-terminated agents at all. The per-RG slot inventory is sourced from the same ScalingGroupRow fetch via selectinload over agents -> agent_resource_rows, with TERMINATED agents filtered out in Python. The new bundle dataclass _ScalingGroupWithAgentResources keeps the SG row and its filtered agent list together so the inventory snapshot stays consistent with the same readonly transaction. The fetch helper raises ScalingGroupNotFound on missing RG, so the caller no longer needs a separate None check. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 5dfe5c6 commit 9810ac4

10 files changed

Lines changed: 322 additions & 17 deletions

File tree

changes/11515.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Pre-validate slot types in the Sokovan SessionSpec validator chain so a session request whose image or caller declares a slot the target resource group does not provide is rejected with a clear 4xx, instead of crashing later in error-message humanization with a 500.

src/ai/backend/manager/repositories/scheduler/db_source/db_source.py

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from collections import defaultdict
77
from collections.abc import AsyncIterator, Mapping, Sequence
88
from contextlib import asynccontextmanager as actxmgr
9+
from dataclasses import dataclass
910
from datetime import datetime
1011
from decimal import Decimal
1112
from typing import TYPE_CHECKING, Any, cast
@@ -187,6 +188,21 @@ def _create_resource_slot_from_policy(
187188
return ResourceSlot.from_policy(resource_policy_map, cast(Mapping[str, Any], known_slot_types))
188189

189190

191+
@dataclass(frozen=True)
192+
class _ScalingGroupWithSlotInventory:
193+
"""Scaling group bundled with the slot inventory served by its agents.
194+
195+
``active_slot_types`` maps each slot name served by a non-terminated
196+
agent in this scaling group to its registered :class:`SlotTypes`
197+
unit. The validator chain consults this map both for membership
198+
(reject requests for slots the RG does not provide) and for unit
199+
metadata (humanize values during error formatting).
200+
"""
201+
202+
sg_row: ScalingGroupRow
203+
active_slot_types: Mapping[SlotName, SlotTypes]
204+
205+
190206
class ScheduleDBSource:
191207
"""
192208
Database source for schedule-related operations.
@@ -289,6 +305,46 @@ async def get_scheduling_data(self, scaling_group: str, spec: SchedulingSpec) ->
289305
spec=spec,
290306
)
291307

308+
async def _fetch_scaling_group_with_slot_inventory(
309+
self,
310+
db_sess: SASession,
311+
name: str,
312+
) -> _ScalingGroupWithSlotInventory:
313+
"""Load a scaling group together with its per-RG slot inventory.
314+
315+
Eager-loads ``agents`` -> ``agent_resource_rows`` -> ``slot_type_row``
316+
via ``selectinload``, filters out TERMINATED agents, and projects
317+
the remaining rows into ``{slot_name: SlotTypes}``. The ``AgentRow``
318+
instances themselves are not exposed — callers only see the SG row
319+
and the derived inventory.
320+
321+
Raises:
322+
ScalingGroupNotFound: when the scaling group does not exist.
323+
"""
324+
sg_row = (
325+
await db_sess.scalars(
326+
sa.select(ScalingGroupRow)
327+
.options(
328+
selectinload(ScalingGroupRow.agents)
329+
.selectinload(AgentRow.agent_resource_rows)
330+
.selectinload(AgentResourceRow.slot_type_row)
331+
)
332+
.where(ScalingGroupRow.name == name)
333+
)
334+
).one_or_none()
335+
if sg_row is None:
336+
raise ScalingGroupNotFound(f"Resource group {name} not found")
337+
active_slot_types: dict[SlotName, SlotTypes] = {
338+
SlotName(ar.slot_name): SlotTypes(ar.slot_type_row.slot_type)
339+
for agent in sg_row.agents
340+
if agent.status != AgentStatus.TERMINATED
341+
for ar in agent.agent_resource_rows
342+
}
343+
return _ScalingGroupWithSlotInventory(
344+
sg_row=sg_row,
345+
active_slot_types=active_slot_types,
346+
)
347+
292348
async def _fetch_scaling_group(
293349
self, db_sess: SASession, scaling_group: str
294350
) -> ScalingGroupMeta:
@@ -1463,16 +1519,13 @@ async def fetch_session_spec_contexts(
14631519
network_info: ScalingGroupNetworkInfo | None = None
14641520
rg_defaults = None
14651521
resource_group_allow_fractional = False
1522+
known_slot_types: Mapping[SlotName, SlotTypes] = {}
14661523
if resource_group_name:
1467-
sg_row = (
1468-
await db_sess.scalars(
1469-
sa.select(ScalingGroupRow).where(
1470-
ScalingGroupRow.name == resource_group_name
1471-
)
1472-
)
1473-
).one_or_none()
1474-
if sg_row is None:
1475-
raise ScalingGroupNotFound(f"Resource group {resource_group_name} not found")
1524+
rg_bundle = await self._fetch_scaling_group_with_slot_inventory(
1525+
db_sess, resource_group_name
1526+
)
1527+
sg_row = rg_bundle.sg_row
1528+
known_slot_types = rg_bundle.active_slot_types
14761529
# Every production caller of ``enqueue_session_from_draft`` populates
14771530
# access_key/domain_name/project_id alongside resource_group_name; this
14781531
# branch flags the contract violation rather than letting the RG
@@ -1632,6 +1685,7 @@ async def fetch_session_spec_contexts(
16321685
dotfile_data=dotfile_bundle,
16331686
active_session_count=active_session_count,
16341687
keypair_resource_policy=keypair_policy,
1688+
known_slot_types=known_slot_types,
16351689
)
16361690

16371691
async def pick_default_resource_group(

src/ai/backend/manager/repositories/scheduler/types/session_creation.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Types for session creation and enqueueing."""
22

3+
from collections.abc import Mapping
34
from dataclasses import dataclass, field
45
from decimal import Decimal
56
from typing import Any
@@ -10,6 +11,8 @@
1011
from ai.backend.common.types import (
1112
AccessKey,
1213
SessionId,
14+
SlotName,
15+
SlotTypes,
1316
VFolderMount,
1417
)
1518
from ai.backend.manager.data.dotfile.types import DotfileBundle
@@ -143,4 +146,5 @@ class SessionSpecContextFetch:
143146
vfolder_mounts_by_role: dict[str, tuple[VFolderMount, ...]]
144147
dotfile_data: DotfileBundle
145148
keypair_resource_policy: Any | None # KeyPairResourcePolicyData
149+
known_slot_types: Mapping[SlotName, SlotTypes] = field(default_factory=dict)
146150
active_session_count: int = 0

src/ai/backend/manager/sokovan/scheduling_controller/scheduling_controller.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@
4848
ConcurrentSessionLimitRule,
4949
ContainerLimitRule,
5050
DotfileVFolderConflictRule,
51+
ImageSlotTypeRule,
5152
InferenceModelFolderRule,
5253
MountNameValidationRule,
54+
RequestedSlotTypeRule,
5355
ResourceLimitRule,
5456
ServicePortRule,
5557
SessionSpecValidationContext,
@@ -125,6 +127,8 @@ def __init__(self, args: SchedulingControllerArgs) -> None:
125127
self._spec_validator = SessionSpecValidator([
126128
ConcurrentSessionLimitRule(),
127129
ContainerLimitRule(),
130+
ImageSlotTypeRule(),
131+
RequestedSlotTypeRule(),
128132
ResourceLimitRule(),
129133
ServicePortRule(),
130134
MountNameValidationRule(),
@@ -161,9 +165,6 @@ async def enqueue_session_from_draft(
161165
allowed_vfolder_types = list(
162166
await self._config_provider.legacy_etcd_config_loader.get_vfolder_types()
163167
)
164-
known_slot_types = (
165-
await self._config_provider.legacy_etcd_config_loader.get_resource_slots()
166-
)
167168

168169
with self._metric_observer.measure_phase(
169170
"scheduling_controller", rg_name, "spec_fetch_contexts"
@@ -186,7 +187,7 @@ async def enqueue_session_from_draft(
186187
val_ctx = SessionSpecValidationContext(
187188
keypair_resource_policy=fetched.keypair_resource_policy,
188189
image_infos=fetched.image_infos,
189-
known_slot_types=known_slot_types,
190+
known_slot_types=fetched.known_slot_types,
190191
dotfile_data=fetched.dotfile_data,
191192
active_session_count=fetched.active_session_count,
192193
)

src/ai/backend/manager/sokovan/scheduling_controller/validators/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
from .concurrent_session_limit_rule import ConcurrentSessionLimitRule
44
from .container_limit_rule import ContainerLimitRule
55
from .dotfile_vfolder_conflict_rule import DotfileVFolderConflictRule
6+
from .image_slot_type_rule import ImageSlotTypeRule
67
from .inference_model_folder_rule import InferenceModelFolderRule
78
from .mount_name_validation_rule import MountNameValidationRule
9+
from .requested_slot_type_rule import RequestedSlotTypeRule
810
from .resource_limit_rule import ResourceLimitRule
911
from .service_port_rule import ServicePortRule
1012
from .session_spec_base import (
@@ -17,8 +19,10 @@
1719
"ConcurrentSessionLimitRule",
1820
"ContainerLimitRule",
1921
"DotfileVFolderConflictRule",
22+
"ImageSlotTypeRule",
2023
"InferenceModelFolderRule",
2124
"MountNameValidationRule",
25+
"RequestedSlotTypeRule",
2226
"ResourceLimitRule",
2327
"ServicePortRule",
2428
"SessionSpecValidationContext",
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""Image-declared slot-type compatibility validator.
2+
3+
Every slot key declared in an image's ``resource_spec`` must be served
4+
by some non-terminated agent in the requested resource group. The
5+
context's ``known_slot_types`` is sourced from
6+
``agent_resources`` joined with ``agents`` (status != TERMINATED) and
7+
``resource_slot_types``, so it reflects the RG's hardware inventory and
8+
the registered unit metadata in one mapping.
9+
10+
When the RG has no non-terminated agents the request is rejected
11+
outright — an empty inventory cannot satisfy any image declaration and
12+
would otherwise let the session reach the scheduler only to fail there.
13+
"""
14+
15+
from __future__ import annotations
16+
17+
from ai.backend.manager.data.session.spec import SessionSpec
18+
from ai.backend.manager.errors.api import InvalidAPIParameters
19+
from ai.backend.manager.sokovan.scheduling_controller.validators.session_spec_base import (
20+
SessionSpecValidationContext,
21+
SessionSpecValidatorRule,
22+
)
23+
24+
25+
class ImageSlotTypeRule(SessionSpecValidatorRule):
26+
"""Image-declared slot keys must be served by an agent in the target RG."""
27+
28+
def name(self) -> str:
29+
return "image_slot_type"
30+
31+
def validate(
32+
self,
33+
spec: SessionSpec,
34+
context: SessionSpecValidationContext,
35+
) -> None:
36+
rg_slot_types = context.known_slot_types
37+
if not rg_slot_types:
38+
raise InvalidAPIParameters(
39+
extra_msg=(
40+
f"resource group '{spec.scope.resource_group_name}' has no "
41+
f"agents serving any resource slot."
42+
),
43+
)
44+
for idx, kernel in enumerate(spec.kernel_specs):
45+
image_info = context.image_infos.get(kernel.execution_spec.image_id)
46+
if image_info is None:
47+
continue
48+
unknown = sorted(
49+
slot_name
50+
for slot_name in image_info.resource_spec
51+
if slot_name not in rg_slot_types
52+
)
53+
if unknown:
54+
raise InvalidAPIParameters(
55+
extra_msg=(
56+
f"kernel_specs[{idx}]: image '{image_info.canonical}' "
57+
f"requires resource slot(s) {unknown} that resource "
58+
f"group '{spec.scope.resource_group_name}' does not "
59+
f"serve. Pick an image whose required slots are "
60+
f"available here, or switch to a resource group that "
61+
f"supports these slots."
62+
),
63+
)
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
"""User-requested slot-type compatibility validator.
2+
3+
Every ``resource_type`` in a kernel's requested resource list must be
4+
served by some non-terminated agent in the requested resource group.
5+
The context's ``known_slot_types`` is sourced from ``agent_resources``
6+
joined with ``agents`` (status != TERMINATED) and
7+
``resource_slot_types``, so it reflects the RG's hardware inventory and
8+
the registered unit metadata in one mapping.
9+
10+
When the RG has no non-terminated agents the request is rejected
11+
outright — an empty inventory cannot satisfy any caller-supplied
12+
request and would otherwise let the session reach the scheduler only
13+
to fail there.
14+
"""
15+
16+
from __future__ import annotations
17+
18+
from ai.backend.manager.data.session.spec import SessionSpec
19+
from ai.backend.manager.errors.api import InvalidAPIParameters
20+
from ai.backend.manager.sokovan.scheduling_controller.validators.session_spec_base import (
21+
SessionSpecValidationContext,
22+
SessionSpecValidatorRule,
23+
)
24+
25+
26+
class RequestedSlotTypeRule(SessionSpecValidatorRule):
27+
"""Requested slot keys must be served by an agent in the target RG."""
28+
29+
def name(self) -> str:
30+
return "requested_slot_type"
31+
32+
def validate(
33+
self,
34+
spec: SessionSpec,
35+
context: SessionSpecValidationContext,
36+
) -> None:
37+
rg_slot_types = context.known_slot_types
38+
if not rg_slot_types:
39+
raise InvalidAPIParameters(
40+
extra_msg=(
41+
f"resource group '{spec.scope.resource_group_name}' has no "
42+
f"agents serving any resource slot."
43+
),
44+
)
45+
for idx, kernel in enumerate(spec.kernel_specs):
46+
unknown = sorted({
47+
entry.resource_type
48+
for entry in kernel.execution_spec.resources
49+
if entry.resource_type not in rg_slot_types
50+
})
51+
if unknown:
52+
raise InvalidAPIParameters(
53+
extra_msg=(
54+
f"kernel_specs[{idx}]: the request asks for resource "
55+
f"slot(s) {unknown} that resource group "
56+
f"'{spec.scope.resource_group_name}' does not serve. "
57+
f"Drop these slots from the request or switch to a "
58+
f"resource group that supports them."
59+
),
60+
)

tests/unit/manager/repositories/scheduler/test_owner_resource_group_access.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
SessionSpecDraft,
3434
)
3535
from ai.backend.manager.errors.api import InvalidAPIParameters
36+
from ai.backend.manager.models.agent import AgentRow
37+
from ai.backend.manager.models.resource_slot import AgentResourceRow, ResourceSlotTypeRow
3638
from ai.backend.manager.models.scaling_group import ScalingGroupOpts, ScalingGroupRow
3739
from ai.backend.manager.models.utils import ExtendedAsyncSAEngine
3840
from ai.backend.manager.repositories.scheduler.db_source.db_source import ScheduleDBSource
@@ -110,7 +112,13 @@ async def db_with_rg(
110112
short-circuit on ``ScalingGroupNotFound`` and never exercise the
111113
invariant under test.
112114
"""
113-
async with with_tables(database_connection, [ScalingGroupRow]):
115+
# Include the agent tables so the SG fetch's
116+
# ``selectinload(agents).selectinload(agent_resource_rows)`` chain
117+
# has tables to query, even though we seed no rows below.
118+
async with with_tables(
119+
database_connection,
120+
[ScalingGroupRow, ResourceSlotTypeRow, AgentRow, AgentResourceRow],
121+
):
114122
async with database_connection.begin_session() as db_sess:
115123
db_sess.add(
116124
ScalingGroupRow(

tests/unit/manager/sokovan/scheduling_controller/test_enqueue_session_from_draft.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,10 @@ def _fetch_bundle(image_id: ImageID) -> SessionSpecContextFetch:
208208
vfolder_mounts_by_role={"main": (_vfolder_mount(),)},
209209
dotfile_data=DotfileBundle(),
210210
keypair_resource_policy=_keypair_policy(),
211+
known_slot_types={
212+
SlotName("cpu"): SlotTypes.COUNT,
213+
SlotName("mem"): SlotTypes.BYTES,
214+
},
211215
)
212216

213217

@@ -219,9 +223,6 @@ def _build_controller(
219223
config_provider = MagicMock()
220224
etcd_loader = MagicMock()
221225
etcd_loader.get_vfolder_types = AsyncMock(return_value=["user"])
222-
etcd_loader.get_resource_slots = AsyncMock(
223-
return_value={SlotName("cpu"): SlotTypes.COUNT, SlotName("mem"): SlotTypes.BYTES}
224-
)
225226
config_provider.legacy_etcd_config_loader = etcd_loader
226227

227228
storage_manager = MagicMock()

0 commit comments

Comments
 (0)