Skip to content

Commit 410ebf8

Browse files
fregataaclaude
andcommitted
fix(BA-5967): pre-validate slot types in sokovan validator chain
Add two SessionSpec validator rules so slot keys absent from the target resource group's agent inventory are caught with a clear 4xx before they reach ResourceLimitRule, where the downstream humanizer would otherwise escalate the missing key into a 500. ImageSlotTypeRule rejects images that declare a resource_spec slot the target resource group has no agent for. RequestedSlotTypeRule rejects caller-supplied resource entries with the same property. Both rules also reject when the RG has no non-terminated agents at all. The per-RG slot inventory is sourced from the same ScalingGroupRow fetch via selectinload over agents -> agent_resource_rows, with TERMINATED agents filtered out in Python. The new bundle dataclass _ScalingGroupWithAgentResources keeps the SG row and its filtered agent list together so the inventory snapshot stays consistent with the same readonly transaction. The fetch helper raises ScalingGroupNotFound on missing RG, so the caller no longer needs a separate None check. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e5623ca commit 410ebf8

11 files changed

Lines changed: 314 additions & 10 deletions

File tree

changes/11515.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Pre-validate slot types in the Sokovan SessionSpec validator chain so a session request whose image or caller declares a slot the target resource group does not provide is rejected with a clear 4xx, instead of crashing later in error-message humanization with a 500.

src/ai/backend/manager/repositories/scheduler/db_source/db_source.py

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from collections import defaultdict
77
from collections.abc import AsyncIterator, Mapping, Sequence
88
from contextlib import asynccontextmanager as actxmgr
9+
from dataclasses import dataclass
910
from datetime import datetime
1011
from decimal import Decimal
1112
from typing import TYPE_CHECKING, Any, cast
@@ -187,6 +188,20 @@ def _create_resource_slot_from_policy(
187188
return ResourceSlot.from_policy(resource_policy_map, cast(Mapping[str, Any], known_slot_types))
188189

189190

191+
@dataclass(frozen=True)
192+
class _ScalingGroupWithSlotInventory:
193+
"""Scaling group bundled with the slot-name set served by its agents.
194+
195+
``active_slot_names`` is the set of slot names served by agents in
196+
this scaling group whose status is not TERMINATED — i.e. the per-RG
197+
slot inventory the validator chain consults to reject session
198+
requests for slots the resource group does not provide.
199+
"""
200+
201+
sg_row: ScalingGroupRow
202+
active_slot_names: frozenset[SlotName]
203+
204+
190205
class ScheduleDBSource:
191206
"""
192207
Database source for schedule-related operations.
@@ -289,6 +304,44 @@ async def get_scheduling_data(self, scaling_group: str, spec: SchedulingSpec) ->
289304
spec=spec,
290305
)
291306

307+
async def _fetch_scaling_group_with_slot_inventory(
308+
self,
309+
db_sess: SASession,
310+
name: str,
311+
) -> _ScalingGroupWithSlotInventory:
312+
"""Load a scaling group together with its per-RG slot inventory.
313+
314+
Eager-loads ``agents`` and per-agent ``agent_resource_rows`` via
315+
``selectinload``, filters out TERMINATED agents, and projects the
316+
remaining ``agent_resource_rows`` into a ``slot_name`` set. The
317+
``AgentRow`` instances themselves are not exposed — callers only
318+
see the SG row and the derived inventory.
319+
320+
Raises:
321+
ScalingGroupNotFound: when the scaling group does not exist.
322+
"""
323+
sg_row = (
324+
await db_sess.scalars(
325+
sa.select(ScalingGroupRow)
326+
.options(
327+
selectinload(ScalingGroupRow.agents).selectinload(AgentRow.agent_resource_rows)
328+
)
329+
.where(ScalingGroupRow.name == name)
330+
)
331+
).one_or_none()
332+
if sg_row is None:
333+
raise ScalingGroupNotFound(f"Resource group {name} not found")
334+
active_slot_names: frozenset[SlotName] = frozenset(
335+
SlotName(ar.slot_name)
336+
for agent in sg_row.agents
337+
if agent.status != AgentStatus.TERMINATED
338+
for ar in agent.agent_resource_rows
339+
)
340+
return _ScalingGroupWithSlotInventory(
341+
sg_row=sg_row,
342+
active_slot_names=active_slot_names,
343+
)
344+
292345
async def _fetch_scaling_group(
293346
self, db_sess: SASession, scaling_group: str
294347
) -> ScalingGroupMeta:
@@ -1463,16 +1516,13 @@ async def fetch_session_spec_contexts(
14631516
network_info: ScalingGroupNetworkInfo | None = None
14641517
rg_defaults = None
14651518
resource_group_allow_fractional = False
1519+
resource_group_slot_names: frozenset[SlotName] = frozenset()
14661520
if resource_group_name:
1467-
sg_row = (
1468-
await db_sess.scalars(
1469-
sa.select(ScalingGroupRow).where(
1470-
ScalingGroupRow.name == resource_group_name
1471-
)
1472-
)
1473-
).one_or_none()
1474-
if sg_row is None:
1475-
raise ScalingGroupNotFound(f"Resource group {resource_group_name} not found")
1521+
rg_bundle = await self._fetch_scaling_group_with_slot_inventory(
1522+
db_sess, resource_group_name
1523+
)
1524+
sg_row = rg_bundle.sg_row
1525+
resource_group_slot_names = rg_bundle.active_slot_names
14761526
# Every production caller of ``enqueue_session_from_draft`` populates
14771527
# access_key/domain_name/project_id alongside resource_group_name; this
14781528
# branch flags the contract violation rather than letting the RG
@@ -1632,6 +1682,7 @@ async def fetch_session_spec_contexts(
16321682
dotfile_data=dotfile_bundle,
16331683
active_session_count=active_session_count,
16341684
keypair_resource_policy=keypair_policy,
1685+
resource_group_slot_names=resource_group_slot_names,
16351686
)
16361687

16371688
async def pick_default_resource_group(

src/ai/backend/manager/repositories/scheduler/types/session_creation.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from ai.backend.common.types import (
1111
AccessKey,
1212
SessionId,
13+
SlotName,
1314
VFolderMount,
1415
)
1516
from ai.backend.manager.data.dotfile.types import DotfileBundle
@@ -143,4 +144,5 @@ class SessionSpecContextFetch:
143144
vfolder_mounts_by_role: dict[str, tuple[VFolderMount, ...]]
144145
dotfile_data: DotfileBundle
145146
keypair_resource_policy: Any | None # KeyPairResourcePolicyData
147+
resource_group_slot_names: frozenset[SlotName] = field(default_factory=frozenset)
146148
active_session_count: int = 0

src/ai/backend/manager/sokovan/scheduling_controller/scheduling_controller.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@
4848
ConcurrentSessionLimitRule,
4949
ContainerLimitRule,
5050
DotfileVFolderConflictRule,
51+
ImageSlotTypeRule,
5152
InferenceModelFolderRule,
5253
MountNameValidationRule,
54+
RequestedSlotTypeRule,
5355
ResourceLimitRule,
5456
ServicePortRule,
5557
SessionSpecValidationContext,
@@ -125,6 +127,8 @@ def __init__(self, args: SchedulingControllerArgs) -> None:
125127
self._spec_validator = SessionSpecValidator([
126128
ConcurrentSessionLimitRule(),
127129
ContainerLimitRule(),
130+
ImageSlotTypeRule(),
131+
RequestedSlotTypeRule(),
128132
ResourceLimitRule(),
129133
ServicePortRule(),
130134
MountNameValidationRule(),
@@ -187,6 +191,7 @@ async def enqueue_session_from_draft(
187191
keypair_resource_policy=fetched.keypair_resource_policy,
188192
image_infos=fetched.image_infos,
189193
known_slot_types=known_slot_types,
194+
resource_group_slot_names=fetched.resource_group_slot_names,
190195
dotfile_data=fetched.dotfile_data,
191196
active_session_count=fetched.active_session_count,
192197
)

src/ai/backend/manager/sokovan/scheduling_controller/validators/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
from .concurrent_session_limit_rule import ConcurrentSessionLimitRule
44
from .container_limit_rule import ContainerLimitRule
55
from .dotfile_vfolder_conflict_rule import DotfileVFolderConflictRule
6+
from .image_slot_type_rule import ImageSlotTypeRule
67
from .inference_model_folder_rule import InferenceModelFolderRule
78
from .mount_name_validation_rule import MountNameValidationRule
9+
from .requested_slot_type_rule import RequestedSlotTypeRule
810
from .resource_limit_rule import ResourceLimitRule
911
from .service_port_rule import ServicePortRule
1012
from .session_spec_base import (
@@ -17,8 +19,10 @@
1719
"ConcurrentSessionLimitRule",
1820
"ContainerLimitRule",
1921
"DotfileVFolderConflictRule",
22+
"ImageSlotTypeRule",
2023
"InferenceModelFolderRule",
2124
"MountNameValidationRule",
25+
"RequestedSlotTypeRule",
2226
"ResourceLimitRule",
2327
"ServicePortRule",
2428
"SessionSpecValidationContext",
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""Image-declared slot-type compatibility validator.
2+
3+
Every slot key declared in an image's ``resource_spec`` must be served
4+
by some non-terminated agent in the requested resource group. The
5+
context's ``resource_group_slot_names`` is sourced from
6+
``agent_resources`` joined with ``agents`` (status != TERMINATED), so it
7+
reflects the RG's hardware inventory rather than the system-wide etcd
8+
slot registry.
9+
10+
When the RG has no non-terminated agents the request is rejected
11+
outright — an empty inventory cannot satisfy any image declaration and
12+
would otherwise let the session reach the scheduler only to fail there.
13+
"""
14+
15+
from __future__ import annotations
16+
17+
from ai.backend.manager.data.session.spec import SessionSpec
18+
from ai.backend.manager.errors.api import InvalidAPIParameters
19+
from ai.backend.manager.sokovan.scheduling_controller.validators.session_spec_base import (
20+
SessionSpecValidationContext,
21+
SessionSpecValidatorRule,
22+
)
23+
24+
25+
class ImageSlotTypeRule(SessionSpecValidatorRule):
26+
"""Image-declared slot keys must be served by an agent in the target RG."""
27+
28+
def name(self) -> str:
29+
return "image_slot_type"
30+
31+
def validate(
32+
self,
33+
spec: SessionSpec,
34+
context: SessionSpecValidationContext,
35+
) -> None:
36+
rg_slot_names = context.resource_group_slot_names
37+
if not rg_slot_names:
38+
raise InvalidAPIParameters(
39+
extra_msg=(
40+
f"resource group '{spec.scope.resource_group_name}' has no "
41+
f"agents serving any resource slot."
42+
),
43+
)
44+
for idx, kernel in enumerate(spec.kernel_specs):
45+
image_info = context.image_infos.get(kernel.execution_spec.image_id)
46+
if image_info is None:
47+
continue
48+
unknown = sorted(
49+
slot_name
50+
for slot_name in image_info.resource_spec
51+
if slot_name not in rg_slot_names
52+
)
53+
if unknown:
54+
raise InvalidAPIParameters(
55+
extra_msg=(
56+
f"kernel_specs[{idx}]: image '{image_info.canonical}' "
57+
f"requires resource slot(s) {unknown} that resource "
58+
f"group '{spec.scope.resource_group_name}' does not "
59+
f"serve. Pick an image whose required slots are "
60+
f"available here, or switch to a resource group that "
61+
f"supports these slots."
62+
),
63+
)
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
"""User-requested slot-type compatibility validator.
2+
3+
Every ``resource_type`` in a kernel's requested resource list must be
4+
served by some non-terminated agent in the requested resource group.
5+
The context's ``resource_group_slot_names`` is sourced from
6+
``agent_resources`` joined with ``agents`` (status != TERMINATED), so it
7+
reflects the RG's hardware inventory rather than the system-wide etcd
8+
slot registry.
9+
10+
When the RG has no non-terminated agents the request is rejected
11+
outright — an empty inventory cannot satisfy any caller-supplied
12+
request and would otherwise let the session reach the scheduler only
13+
to fail there.
14+
"""
15+
16+
from __future__ import annotations
17+
18+
from ai.backend.manager.data.session.spec import SessionSpec
19+
from ai.backend.manager.errors.api import InvalidAPIParameters
20+
from ai.backend.manager.sokovan.scheduling_controller.validators.session_spec_base import (
21+
SessionSpecValidationContext,
22+
SessionSpecValidatorRule,
23+
)
24+
25+
26+
class RequestedSlotTypeRule(SessionSpecValidatorRule):
27+
"""Requested slot keys must be served by an agent in the target RG."""
28+
29+
def name(self) -> str:
30+
return "requested_slot_type"
31+
32+
def validate(
33+
self,
34+
spec: SessionSpec,
35+
context: SessionSpecValidationContext,
36+
) -> None:
37+
rg_slot_names = context.resource_group_slot_names
38+
if not rg_slot_names:
39+
raise InvalidAPIParameters(
40+
extra_msg=(
41+
f"resource group '{spec.scope.resource_group_name}' has no "
42+
f"agents serving any resource slot."
43+
),
44+
)
45+
for idx, kernel in enumerate(spec.kernel_specs):
46+
unknown = sorted({
47+
entry.resource_type
48+
for entry in kernel.execution_spec.resources
49+
if entry.resource_type not in rg_slot_names
50+
})
51+
if unknown:
52+
raise InvalidAPIParameters(
53+
extra_msg=(
54+
f"kernel_specs[{idx}]: the request asks for resource "
55+
f"slot(s) {unknown} that resource group "
56+
f"'{spec.scope.resource_group_name}' does not serve. "
57+
f"Drop these slots from the request or switch to a "
58+
f"resource group that supports them."
59+
),
60+
)

src/ai/backend/manager/sokovan/scheduling_controller/validators/session_spec_base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class SessionSpecValidationContext:
5757
keypair_resource_policy: KeyPairResourcePolicyData | None = None
5858
image_infos: Mapping[ImageID, ImageInfo] = field(default_factory=dict)
5959
known_slot_types: Mapping[SlotName, SlotTypes] = field(default_factory=dict)
60+
resource_group_slot_names: frozenset[SlotName] = field(default_factory=frozenset)
6061
dotfile_data: DotfileBundle = field(default_factory=DotfileBundle)
6162
active_session_count: int = 0
6263

tests/unit/manager/repositories/scheduler/test_owner_resource_group_access.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
SessionSpecDraft,
3434
)
3535
from ai.backend.manager.errors.api import InvalidAPIParameters
36+
from ai.backend.manager.models.agent import AgentRow
37+
from ai.backend.manager.models.resource_slot import AgentResourceRow, ResourceSlotTypeRow
3638
from ai.backend.manager.models.scaling_group import ScalingGroupOpts, ScalingGroupRow
3739
from ai.backend.manager.models.utils import ExtendedAsyncSAEngine
3840
from ai.backend.manager.repositories.scheduler.db_source.db_source import ScheduleDBSource
@@ -110,7 +112,13 @@ async def db_with_rg(
110112
short-circuit on ``ScalingGroupNotFound`` and never exercise the
111113
invariant under test.
112114
"""
113-
async with with_tables(database_connection, [ScalingGroupRow]):
115+
# Include the agent tables so the SG fetch's
116+
# ``selectinload(agents).selectinload(agent_resource_rows)`` chain
117+
# has tables to query, even though we seed no rows below.
118+
async with with_tables(
119+
database_connection,
120+
[ScalingGroupRow, ResourceSlotTypeRow, AgentRow, AgentResourceRow],
121+
):
114122
async with database_connection.begin_session() as db_sess:
115123
db_sess.add(
116124
ScalingGroupRow(

tests/unit/manager/sokovan/scheduling_controller/test_enqueue_session_from_draft.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ def _fetch_bundle(image_id: ImageID) -> SessionSpecContextFetch:
208208
vfolder_mounts_by_role={"main": (_vfolder_mount(),)},
209209
dotfile_data=DotfileBundle(),
210210
keypair_resource_policy=_keypair_policy(),
211+
resource_group_slot_names=frozenset({SlotName("cpu"), SlotName("mem")}),
211212
)
212213

213214

0 commit comments

Comments
 (0)