Skip to content

Commit eff3c2d

Browse files
authored
feat: Allow remote cluster sessions to function on lock down cluster (#1193)
* Introduce K8s Subclasses for K8sResourceQuota & K8sPriorityClass * Properly handle quota.id * Move QuotaRepo to crc.db, and conversion methods to models.Quota * K8sClients * Always use the namespace from the ClusterConnection * Stabilise some tests by ensuring generated values do not overlap * K8sWatcher: prevent spurious logs in case of failing connection to remote clusters * Set AmaltheaSession.spec.url correctly
1 parent 136df8c commit eff3c2d

22 files changed

Lines changed: 523 additions & 569 deletions

File tree

bases/renku_data_services/data_api/dependencies.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from renku_data_services.connected_services.oauth_http import DefaultOAuthHttpClientFactory, OAuthHttpClientFactory
2929
from renku_data_services.crc import models as crc_models
3030
from renku_data_services.crc.constants import DEFAULT_RUNTIME_PLATFORM
31-
from renku_data_services.crc.db import ClusterRepository, ResourcePoolRepository, UserRepository
31+
from renku_data_services.crc.db import ClusterRepository, QuotaRepository, ResourcePoolRepository, UserRepository
3232
from renku_data_services.data_api.config import Config
3333
from renku_data_services.data_connectors.db import (
3434
DataConnectorRepository,
@@ -37,11 +37,11 @@
3737
from renku_data_services.git.gitlab import DummyGitlabAPI, EmptyGitlabAPI, GitlabAPI
3838
from renku_data_services.k8s.clients import (
3939
K8sClusterClientsPool,
40+
K8sPriorityClassClient,
4041
K8sResourceQuotaClient,
41-
K8sSchedulingClient,
4242
)
4343
from renku_data_services.k8s.config import KubeConfigEnv
44-
from renku_data_services.k8s.db import K8sDbCache, QuotaRepository
44+
from renku_data_services.k8s.db import K8sDbCache
4545
from renku_data_services.message_queue.db import ReprovisioningRepository
4646
from renku_data_services.metrics.core import StagingMetricsService
4747
from renku_data_services.metrics.db import MetricsRepository
@@ -236,9 +236,7 @@ def from_env(cls) -> DependencyManager:
236236
kinds_to_cache=[AMALTHEA_SESSION_GVK, JUPYTER_SESSION_GVK, BUILD_RUN_GVK, TASK_RUN_GVK],
237237
),
238238
)
239-
quota_repo = QuotaRepository(
240-
K8sResourceQuotaClient(client), K8sSchedulingClient(client), namespace=config.k8s_namespace
241-
)
239+
quota_repo = QuotaRepository(K8sResourceQuotaClient(client), K8sPriorityClassClient(client))
242240

243241
if config.dummy_stores:
244242
authenticator = DummyAuthenticator()

bases/renku_data_services/k8s_cache/dependencies.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
from dataclasses import dataclass, field
44

5-
from renku_data_services.crc.db import ClusterRepository, ResourcePoolRepository
6-
from renku_data_services.k8s.clients import DummyCoreClient, DummySchedulingClient
7-
from renku_data_services.k8s.db import K8sDbCache, QuotaRepository
5+
from renku_data_services.crc.db import ClusterRepository, QuotaRepository, ResourcePoolRepository
6+
from renku_data_services.k8s.clients import DummyPriorityClassClient, DummyResourceQuotaClient
7+
from renku_data_services.k8s.db import K8sDbCache
88
from renku_data_services.k8s_cache.config import Config
99
from renku_data_services.metrics.core import StagingMetricsService
1010
from renku_data_services.metrics.db import MetricsRepository
@@ -63,9 +63,7 @@ def quota_repo(self) -> QuotaRepository:
6363
# NOTE: We only need the QuotaRepository to instantiate the ResourcePoolRepository which is used to get
6464
# the resource class and pool information for metrics. We don't need quota information for metrics at all
6565
# so we use the dummy client for quotas here as we don't actually access k8s, just the db.
66-
self._quota_repo = QuotaRepository(
67-
DummyCoreClient(), DummySchedulingClient(), namespace=self.config.k8s.renku_namespace
68-
)
66+
self._quota_repo = QuotaRepository(DummyResourceQuotaClient(), DummyPriorityClassClient())
6967
return self._quota_repo
7068

7169
@classmethod

components/renku_data_services/crc/core.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,19 @@
1212

1313
def validate_quota(body: apispec.QuotaWithOptionalId) -> models.UnsavedQuota:
1414
"""Validate a quota object."""
15-
return models.UnsavedQuota(
16-
cpu=body.cpu,
17-
memory=body.memory,
18-
gpu=body.gpu,
19-
)
15+
if body.id is None:
16+
return models.UnsavedQuota(
17+
cpu=body.cpu,
18+
memory=body.memory,
19+
gpu=body.gpu,
20+
)
21+
else:
22+
return models.Quota(
23+
cpu=body.cpu,
24+
memory=body.memory,
25+
gpu=body.gpu,
26+
id=body.id,
27+
)
2028

2129

2230
def validate_quota_put_patch(body: apispec.QuotaWithId | apispec.QuotaPatch) -> models.QuotaPatch:

components/renku_data_services/crc/db.py

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@
66
it all in one place.
77
"""
88

9+
from __future__ import annotations
10+
911
from asyncio import gather
1012
from collections.abc import AsyncGenerator, Callable, Collection, Coroutine, Sequence
1113
from dataclasses import asdict, dataclass, field
1214
from functools import wraps
1315
from typing import Any, Concatenate, Optional, ParamSpec, TypeVar
16+
from uuid import uuid4
1417

1518
from sqlalchemy import NullPool, delete, false, select, true
1619
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
@@ -27,8 +30,9 @@
2730
from renku_data_services.crc.core import validate_resource_class_update, validate_resource_pool_update
2831
from renku_data_services.crc.models import ClusterPatch, ClusterSettings, SavedClusterSettings, SessionProtocol
2932
from renku_data_services.crc.orm import ClusterORM
30-
from renku_data_services.k8s.constants import DEFAULT_K8S_CLUSTER
31-
from renku_data_services.k8s.db import QuotaRepository
33+
from renku_data_services.k8s.client_interfaces import PriorityClassClient, ResourceQuotaClient
34+
from renku_data_services.k8s.constants import DEFAULT_K8S_CLUSTER, ClusterId
35+
from renku_data_services.k8s.models import DeletePropagationPolicy, K8sPriorityClass
3236
from renku_data_services.users.db import UserRepo
3337

3438

@@ -290,7 +294,6 @@ async def insert_resource_pool(
290294
self, api_user: base_models.APIUser, new_resource_pool: models.UnsavedResourcePool
291295
) -> models.ResourcePool:
292296
"""Insert resource pool into database."""
293-
294297
cluster = None
295298
if new_resource_pool.cluster_id:
296299
cluster = await self.__cluster_repo.select(cluster_id=new_resource_pool.cluster_id)
@@ -1108,3 +1111,65 @@ async def delete(self, api_user: base_models.APIUser, cluster_id: ULID) -> None:
11081111
cluster = r.one_or_none()
11091112
if cluster is not None:
11101113
await session.delete(cluster)
1114+
1115+
1116+
@dataclass
1117+
class QuotaRepository:
1118+
"""Adapter for CRUD operations on resource quotas and priority classes in k8s."""
1119+
1120+
rq_client: ResourceQuotaClient
1121+
pc_client: PriorityClassClient
1122+
_label_name: str = field(init=False, default="app")
1123+
_label_value: str = field(init=False, default="renku")
1124+
1125+
async def get_quota(self, name: str | None, cluster_id: ClusterId) -> models.Quota | None:
1126+
"""Get a specific quota by name."""
1127+
if not name:
1128+
return None
1129+
try:
1130+
res_quota = await self.rq_client.read_resource_quota(name=name, cluster_id=cluster_id)
1131+
except errors.MissingResourceError:
1132+
return None
1133+
return models.Quota.from_k8s_resource_quota(res_quota)
1134+
1135+
async def create_quota(self, new_quota: models.UnsavedQuota, cluster_id: ClusterId) -> models.Quota:
1136+
"""Create a resource quota and priority class."""
1137+
quota = models.Quota(
1138+
cpu=new_quota.cpu,
1139+
memory=new_quota.memory,
1140+
gpu=new_quota.gpu,
1141+
gpu_kind=new_quota.gpu_kind,
1142+
id=new_quota.id if isinstance(new_quota, models.Quota) else str(uuid4()),
1143+
)
1144+
labels = {self._label_name: self._label_value}
1145+
1146+
# Check if we have a priority class with the given name, if not, create one it.
1147+
pc = await self.pc_client.read_priority_class(K8sPriorityClass.meta(quota.id, cluster_id))
1148+
if pc is None:
1149+
await self.pc_client.create_priority_class(
1150+
K8sPriorityClass.new(
1151+
name=quota.id,
1152+
cluster=cluster_id,
1153+
global_default=False,
1154+
value=100,
1155+
preemption_policy="Never",
1156+
description="Renku resource quota priority class",
1157+
labels=labels,
1158+
),
1159+
)
1160+
1161+
res = await self.rq_client.create_resource_quota(quota.to_patch(labels), cluster_id)
1162+
return models.Quota.from_k8s_resource_quota(res)
1163+
1164+
async def delete_quota(self, name: str, cluster_id: ClusterId) -> None:
1165+
"""Delete a resource quota and priority class."""
1166+
await self.pc_client.delete_priority_class(
1167+
meta=K8sPriorityClass.meta(name, cluster_id), propagation_policy=DeletePropagationPolicy.foreground
1168+
)
1169+
await self.rq_client.delete_resource_quota(name=name, cluster_id=cluster_id)
1170+
1171+
async def update_quota(self, quota: models.Quota, cluster_id: ClusterId) -> models.Quota:
1172+
"""Update a specific resource quota."""
1173+
patch = quota.to_patch({self._label_name: self._label_value})
1174+
patched_quota = await self.rq_client.patch_resource_quota(quota.id, patch, cluster_id)
1175+
return models.Quota.from_k8s_resource_quota(patched_quota)

components/renku_data_services/crc/models.py

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77
from enum import StrEnum
88
from typing import Any, Optional, Protocol, Self
99

10+
from kubernetes.utils import parse_quantity
11+
1012
from renku_data_services import errors
1113
from renku_data_services.base_models import ResetType
1214
from renku_data_services.k8s.constants import DEFAULT_K8S_CLUSTER, ClusterId
15+
from renku_data_services.k8s.models import K8sPatch, K8sResourceQuota
1316

1417

1518
class ResourcesProtocol(Protocol):
@@ -152,15 +155,65 @@ def is_resource_class_compatible(self, rc: ResourceClass | UnsavedResourceClass)
152155

153156

154157
@dataclass(frozen=True, eq=True, kw_only=True)
155-
class Quota(ResourcesCompareMixin):
158+
class Quota(UnsavedQuota):
156159
"""Quota model."""
157160

158-
cpu: float
159-
memory: int
160-
gpu: int
161-
gpu_kind: GpuKind = GpuKind.NVIDIA
162161
id: str
163162

163+
@classmethod
164+
def from_k8s_resource_quota(cls, quota: K8sResourceQuota) -> Quota:
165+
"""Convert a K8s Resource Quota."""
166+
167+
def require_key(manifest: dict, key: str, prefix: str = "") -> Any:
168+
try:
169+
return manifest.get(key)
170+
except Exception:
171+
raise errors.ValidationError(
172+
message=f"Kubernetes resource quota with missing {prefix}{key} is not supported"
173+
) from None
174+
175+
spec = require_key(quota.manifest.to_dict(), "spec", "")
176+
hard = require_key(spec, "hard", "spec.")
177+
178+
gpu = 0
179+
gpu_kind = GpuKind.NVIDIA
180+
for igpu_kind in GpuKind:
181+
key = f"requests.{igpu_kind}/gpu"
182+
if key in hard:
183+
gpu = int(parse_quantity(hard.get(key)))
184+
gpu_kind = igpu_kind
185+
break
186+
187+
memory_raw = require_key(hard, "requests.memory", "hard.")
188+
cpu_raw = require_key(hard, "requests.cpu", "hard.")
189+
return cls(
190+
cpu=float(parse_quantity(cpu_raw)) / 1_000_000_000,
191+
memory=round(parse_quantity(memory_raw) / 1_000_000_000),
192+
gpu=gpu,
193+
gpu_kind=gpu_kind,
194+
id=quota.name,
195+
)
196+
197+
def to_patch(self, labels: dict[str, str]) -> K8sPatch:
198+
"""Convert to a manifest."""
199+
200+
return {
201+
"metadata": {
202+
"name": self.id,
203+
"labels": labels,
204+
},
205+
"spec": {
206+
"hard": {
207+
"requests.cpu": str(round(self.cpu * 1_000_000_000)),
208+
"requests.memory": str(self.memory * 1_000_000_000),
209+
f"requests.{self.gpu_kind}/gpu": self.gpu,
210+
},
211+
"scopeSelector": {
212+
"matchExpressions": [{"operator": "In", "scopeName": "PriorityClass", "values": [self.id]}]
213+
},
214+
},
215+
}
216+
164217
def is_resource_class_compatible(self, rc: ResourceClass | UnsavedResourceClass) -> bool:
165218
"""Determine if a resource class is compatible with the quota."""
166219
return rc <= self

components/renku_data_services/k8s/client_interfaces.py

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,18 @@
33
from __future__ import annotations
44

55
from collections.abc import AsyncIterable
6-
from typing import Any, Protocol, overload
7-
8-
from kubernetes.client import V1PriorityClass, V1ResourceQuota
6+
from typing import Protocol
97

108
from renku_data_services.k8s.constants import ClusterId
119
from renku_data_services.k8s.models import (
12-
ClusterScopedK8sObject,
1310
DeletePropagationPolicy,
1411
K8sObject,
1512
K8sObjectFilter,
1613
K8sObjectMeta,
14+
K8sPatch,
15+
K8sPatches,
16+
K8sPriorityClass,
17+
K8sResourceQuota,
1718
K8sSecret,
1819
)
1920

@@ -22,28 +23,24 @@ class ResourceQuotaClient(Protocol):
2223
"""Methods to manipulate ResourceQuota kubernetes resources."""
2324

2425
def list_resource_quota(
25-
self, namespace: str, label_selector: dict[str, str], cluster_id: ClusterId
26-
) -> AsyncIterable[V1ResourceQuota]:
26+
self, label_selector: dict[str, str], cluster_id: ClusterId
27+
) -> AsyncIterable[K8sResourceQuota]:
2728
"""List resource quotas."""
2829
...
2930

30-
async def read_resource_quota(self, name: str, namespace: str, cluster_id: ClusterId) -> V1ResourceQuota:
31+
async def read_resource_quota(self, name: str, cluster_id: ClusterId) -> K8sResourceQuota:
3132
"""Get a resource quota."""
3233
...
3334

34-
async def create_resource_quota(
35-
self, namespace: str, body: V1ResourceQuota, cluster_id: ClusterId
36-
) -> V1ResourceQuota:
35+
async def create_resource_quota(self, quota: K8sPatch, cluster_id: ClusterId) -> K8sResourceQuota:
3736
"""Create a resource quota."""
3837
...
3938

40-
async def delete_resource_quota(self, name: str, namespace: str, cluster_id: ClusterId) -> None:
39+
async def delete_resource_quota(self, name: str, cluster_id: ClusterId) -> None:
4140
"""Delete a resource quota."""
4241
...
4342

44-
async def patch_resource_quota(
45-
self, name: str, namespace: str, body: V1ResourceQuota, cluster_id: ClusterId
46-
) -> V1ResourceQuota:
43+
async def patch_resource_quota(self, name: str, patch: K8sPatches, cluster_id: ClusterId) -> K8sResourceQuota:
4744
"""Update a resource quota."""
4845
...
4946

@@ -59,7 +56,7 @@ async def create_secret(self, secret: K8sSecret) -> K8sSecret:
5956
"""Create a secret."""
6057
...
6158

62-
async def patch_secret(self, secret: K8sObjectMeta, patch: dict[str, Any] | list[dict[str, Any]]) -> K8sSecret:
59+
async def patch_secret(self, secret: K8sObjectMeta, patch: K8sPatches) -> K8sSecret:
6360
"""Patch an existing secret."""
6461
...
6562

@@ -71,18 +68,17 @@ async def delete_secret(self, secret: K8sObjectMeta) -> None:
7168
class PriorityClassClient(Protocol):
7269
"""Methods to manipulate kubernetes Priority Class resources."""
7370

74-
async def create_priority_class(self, body: V1PriorityClass, cluster_id: ClusterId) -> V1PriorityClass:
71+
async def create_priority_class(self, priority_class: K8sPriorityClass) -> K8sPriorityClass:
7572
"""Create a priority class."""
7673
...
7774

78-
async def read_priority_class(self, name: str, cluster_id: ClusterId) -> V1PriorityClass | None:
75+
async def read_priority_class(self, meta: K8sObjectMeta) -> K8sPriorityClass | None:
7976
"""Retrieve a priority class."""
8077
...
8178

8279
async def delete_priority_class(
8380
self,
84-
name: str,
85-
cluster_id: ClusterId,
81+
meta: K8sObjectMeta,
8682
propagation_policy: DeletePropagationPolicy = DeletePropagationPolicy.foreground,
8783
) -> None:
8884
"""Delete a priority class."""
@@ -92,17 +88,11 @@ async def delete_priority_class(
9288
class K8sClient(Protocol):
9389
"""Methods to manipulate resources on a Kubernetes cluster."""
9490

95-
@overload
96-
async def create(self, obj: K8sObject, refresh: bool) -> K8sObject: ...
97-
@overload
98-
async def create(self, obj: ClusterScopedK8sObject, refresh: bool) -> ClusterScopedK8sObject: ...
99-
async def create(
100-
self, obj: K8sObject | ClusterScopedK8sObject, refresh: bool
101-
) -> K8sObject | ClusterScopedK8sObject:
91+
async def create(self, obj: K8sObject, refresh: bool) -> K8sObject:
10292
"""Create the k8s object."""
10393
...
10494

105-
async def patch(self, meta: K8sObjectMeta, patch: dict[str, Any] | list[dict[str, Any]]) -> K8sObject:
95+
async def patch(self, meta: K8sObjectMeta, patch: K8sPatches) -> K8sObject:
10696
"""Patch a k8s object.
10797
10898
If the patch is a list we assume that we have a rfc6902 json patch like

0 commit comments

Comments
 (0)