Skip to content

Commit 82b72b9

Browse files
authored
feat: add capacity reservation (#1205)
1 parent 28f9997 commit 82b72b9

31 files changed

Lines changed: 1976 additions & 13 deletions

File tree

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ API_SPECS := \
5252
components/renku_data_services/platform/apispec.py \
5353
components/renku_data_services/data_connectors/apispec.py \
5454
components/renku_data_services/search/apispec.py \
55-
components/renku_data_services/notifications/apispec.py
55+
components/renku_data_services/notifications/apispec.py \
56+
components/renku_data_services/capacity_reservation/apispec.py
5657

5758
schemas: ${API_SPECS} ## Generate pydantic classes from apispec yaml files
5859
@echo "generated classes based on ApiSpec"

bases/renku_data_services/data_api/app.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from renku_data_services.base_api.error_handler import CustomErrorHandler
1313
from renku_data_services.base_api.misc import MiscBP
1414
from renku_data_services.base_models.core import Slug
15+
from renku_data_services.capacity_reservation.blueprints import CapacityReservationBP
1516
from renku_data_services.connected_services.blueprints import OAuth2ClientsBP, OAuth2ConnectionsBP
1617
from renku_data_services.crc import apispec
1718
from renku_data_services.crc.blueprints import (
@@ -265,6 +266,13 @@ def register_all_handlers(app: Sanic, dm: DependencyManager) -> Sanic:
265266
authenticator=dm.authenticator,
266267
alertmanager_webhook_role=dm.config.alertmanager_webhook_role,
267268
)
269+
capacity_reservation = CapacityReservationBP(
270+
name="capacity_reservation",
271+
url_prefix=url_prefix,
272+
capacity_reservation_repo=dm.capacity_reservation_repo,
273+
occurrence_repo=dm.occurrence_repo,
274+
authenticator=dm.authenticator,
275+
)
268276
app.blueprint(
269277
[
270278
resource_pools.blueprint(),
@@ -293,6 +301,7 @@ def register_all_handlers(app: Sanic, dm: DependencyManager) -> Sanic:
293301
data_connectors.blueprint(),
294302
platform_redirects.blueprint(),
295303
notifications.blueprint(),
304+
capacity_reservation.blueprint(),
296305
]
297306
)
298307
if builds is not None:

bases/renku_data_services/data_api/dependencies.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from yaml import safe_load
1212

1313
import renku_data_services.base_models as base_models
14+
import renku_data_services.capacity_reservation
1415
import renku_data_services.connected_services
1516
import renku_data_services.crc
1617
import renku_data_services.data_connectors
@@ -24,6 +25,7 @@
2425
from renku_data_services.authn.gitlab import EmptyGitlabAuthenticator, GitlabAuthenticator
2526
from renku_data_services.authn.keycloak import KcUserStore, KeycloakAuthenticator
2627
from renku_data_services.authz.authz import Authz
28+
from renku_data_services.capacity_reservation.db import CapacityReservationRepository, OccurrenceRepository
2729
from renku_data_services.connected_services.db import ConnectedServicesRepository
2830
from renku_data_services.connected_services.oauth_http import DefaultOAuthHttpClientFactory, OAuthHttpClientFactory
2931
from renku_data_services.crc import models as crc_models
@@ -154,6 +156,8 @@ class DependencyManager:
154156
git_provider_helper: GitProviderHelperProto
155157
notifications_repo: NotificationsRepository
156158
oauth_http_client_factory: OAuthHttpClientFactory
159+
capacity_reservation_repo: CapacityReservationRepository
160+
occurrence_repo: OccurrenceRepository
157161

158162
spec: dict[str, Any] = field(init=False, repr=False, default_factory=dict)
159163
app_name: str = "renku_data_services"
@@ -182,6 +186,7 @@ def load_apispec() -> dict[str, Any]:
182186
renku_data_services.data_connectors.__file__,
183187
renku_data_services.search.__file__,
184188
renku_data_services.notifications.__file__,
189+
renku_data_services.capacity_reservation.__file__,
185190
]
186191

187192
api_specs = []
@@ -405,6 +410,13 @@ def from_env(cls) -> DependencyManager:
405410
session_maker=config.db.async_session_maker,
406411
alertmanager_webhook_role=config.alertmanager_webhook_role,
407412
)
413+
capacity_reservation_repo = CapacityReservationRepository(
414+
session_maker=config.db.async_session_maker,
415+
cluster_repo=cluster_repo,
416+
)
417+
occurrence_repo = OccurrenceRepository(
418+
session_maker=config.db.async_session_maker,
419+
)
408420
return cls(
409421
config,
410422
authenticator=authenticator,
@@ -445,4 +457,6 @@ def from_env(cls) -> DependencyManager:
445457
git_provider_helper=git_provider_helper,
446458
notifications_repo=notifications_repo,
447459
oauth_http_client_factory=oauth_http_client_factory,
460+
capacity_reservation_repo=capacity_reservation_repo,
461+
occurrence_repo=occurrence_repo,
448462
)

bases/renku_data_services/data_tasks/config.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@ class Config:
4747
main_log_interval_seconds: int
4848
tcp_host: str
4949
tcp_port: int
50+
x_short_task_period_s: int
5051
short_task_period_s: int
5152
long_task_period_s: int
53+
k8s_config_root: str
5254

5355
@classmethod
5456
def from_env(cls) -> Config:
@@ -63,9 +65,12 @@ def from_env(cls) -> Config:
6365
tcp_host = os.environ.get("TCP_HOST", "127.0.0.1")
6466
tcp_port = int(os.environ.get("TCP_PORT", "8001"))
6567

68+
x_short_task_period = int(os.environ.get("X_SHORT_TASK_PERIOD_S", 30))
6669
short_task_period = int(os.environ.get("SHORT_TASK_PERIOD_S", 2 * 60))
6770
long_task_period = int(os.environ.get("LONG_TASK_PERIOD_S", 3 * 60 * 60))
6871

72+
k8s_config_root = os.environ.get("K8S_CONFIG_ROOT", "/secrets/kube_configs")
73+
6974
authz = AuthzConfig.from_env()
7075

7176
keycloak = None if dummy_stores else KeycloakConfig.from_env()
@@ -79,7 +84,9 @@ def from_env(cls) -> Config:
7984
keycloak=keycloak,
8085
tcp_host=tcp_host,
8186
tcp_port=tcp_port,
87+
x_short_task_period_s=x_short_task_period,
8288
short_task_period_s=short_task_period,
8389
long_task_period_s=long_task_period,
90+
k8s_config_root=k8s_config_root,
8491
dummy_stores=dummy_stores,
8592
)

bases/renku_data_services/data_tasks/dependencies.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,19 @@
33
from dataclasses import dataclass
44

55
from renku_data_services.authz.authz import Authz
6+
from renku_data_services.capacity_reservation.db import CapacityReservationRepository, OccurrenceRepository
7+
from renku_data_services.capacity_reservation.k8s_client import CapacityReservationK8sClient
8+
from renku_data_services.capacity_reservation.tasks import CapacityReservationTasks
9+
from renku_data_services.crc.db import ClusterRepository
610
from renku_data_services.data_tasks.config import Config
11+
from renku_data_services.k8s.clients import K8sClusterClientsPool
12+
from renku_data_services.k8s.config import KubeConfigEnv
13+
from renku_data_services.k8s.db import K8sDbCache
714
from renku_data_services.metrics.core import StagingMetricsService
815
from renku_data_services.metrics.db import MetricsRepository
916
from renku_data_services.namespace.db import GroupRepository
17+
from renku_data_services.notebooks.config import get_clusters
18+
from renku_data_services.notebooks.constants import AMALTHEA_SESSION_GVK
1019
from renku_data_services.project.db import ProjectRepository
1120
from renku_data_services.search.db import SearchUpdatesRepo
1221
from renku_data_services.session.db import SessionRepository
@@ -30,6 +39,7 @@ class DependencyManager:
3039
syncer: UsersSync
3140
kc_api: IKeycloakAPI
3241
session_tasks: SessionTasks
42+
capacity_reservation_tasks: CapacityReservationTasks
3343

3444
@classmethod
3545
def from_env(cls, cfg: Config | None = None) -> "DependencyManager":
@@ -71,6 +81,25 @@ def from_env(cls, cfg: Config | None = None) -> "DependencyManager":
7181
authz=authz,
7282
)
7383
session_tasks = SessionTasks(session_environment_repo=session_environment_repo)
84+
cluster_repo = ClusterRepository(session_maker=cfg.db.async_session_maker)
85+
k8s_db_cache = K8sDbCache(cfg.db.async_session_maker)
86+
k8s_client = K8sClusterClientsPool(
87+
lambda: get_clusters(
88+
kube_conf_root_dir=cfg.k8s_config_root,
89+
default_kubeconfig=KubeConfigEnv(),
90+
cluster_repo=cluster_repo,
91+
cache=k8s_db_cache,
92+
kinds_to_cache=[AMALTHEA_SESSION_GVK],
93+
)
94+
)
95+
cr_k8s_client = CapacityReservationK8sClient(client=k8s_client, cluster_repo=cluster_repo)
96+
capacity_reservation_tasks = CapacityReservationTasks(
97+
occurrence_repo=OccurrenceRepository(cfg.db.async_session_maker),
98+
capacity_reservation_repo=CapacityReservationRepository(
99+
cfg.db.async_session_maker, cluster_repo=cluster_repo
100+
),
101+
k8s_client=cr_k8s_client,
102+
)
74103
kc_api: IKeycloakAPI
75104
if cfg.dummy_stores:
76105
dummy_users = [
@@ -97,4 +126,5 @@ def from_env(cls, cfg: Config | None = None) -> "DependencyManager":
97126
syncer=syncer,
98127
kc_api=kc_api,
99128
session_tasks=session_tasks,
129+
capacity_reservation_tasks=capacity_reservation_tasks,
100130
)

bases/renku_data_services/data_tasks/task_defs.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,39 @@ async def initialize_session_environments(dm: DependencyManager) -> None:
400400
await dm.session_tasks.initialize_session_environments_task(requested_by=api_user)
401401

402402

403+
async def activate_capacity_reservations(dm: DependencyManager) -> None:
404+
"""Activate pending capacity reservation occurrences."""
405+
while True:
406+
try:
407+
await dm.capacity_reservation_tasks.activate_pending_occurrences_task()
408+
except (asyncio.CancelledError, KeyboardInterrupt) as e:
409+
logger.warning(f"Exiting: {e}")
410+
else:
411+
await asyncio.sleep(dm.config.x_short_task_period_s)
412+
413+
414+
async def monitor_capacity_reservations(dm: DependencyManager) -> None:
415+
"""Monitor active capacity reservation occurrences."""
416+
while True:
417+
try:
418+
await dm.capacity_reservation_tasks.monitor_active_occurrences_task()
419+
except (asyncio.CancelledError, KeyboardInterrupt) as e:
420+
logger.warning(f"Exiting: {e}")
421+
else:
422+
await asyncio.sleep(dm.config.x_short_task_period_s)
423+
424+
425+
async def cleanup_orphaned_capacity_reservations(dm: DependencyManager) -> None:
426+
"""Clean up capacity reservation deployments whose occurrences no longer exist."""
427+
while True:
428+
try:
429+
await dm.capacity_reservation_tasks.cleanup_orphaned_deployments_task()
430+
except (asyncio.CancelledError, KeyboardInterrupt) as e:
431+
logger.warning(f"Exiting: {e}")
432+
else:
433+
await asyncio.sleep(dm.config.x_short_task_period_s)
434+
435+
403436
def all_tasks(dm: DependencyManager) -> TaskDefininions:
404437
"""A dict of task factories to be managed in main."""
405438
# Impl. note: We pass the entire config to the coroutines, because
@@ -421,5 +454,8 @@ def all_tasks(dm: DependencyManager) -> TaskDefininions:
421454
"users_sync": lambda: users_sync(dm),
422455
"sync_admins_from_keycloak": lambda: sync_admins_from_keycloak(dm),
423456
"initialize_session_environments": lambda: initialize_session_environments(dm),
457+
"activate_capacity_reservations": lambda: activate_capacity_reservations(dm),
458+
"monitor_capacity_reservations": lambda: monitor_capacity_reservations(dm),
459+
"cleanup_orphaned_capacity_reservations": lambda: cleanup_orphaned_capacity_reservations(dm),
424460
}
425461
)

components/renku_data_services/base_models/core.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class ServiceAdminId(StrEnum):
8686
secrets_rotation = "secrets_rotation"
8787
k8s_watcher = "k8s_watcher"
8888
search_reprovision = "search_reprovision"
89+
capacity_reservation = "capacity_reservation"
8990

9091

9192
@dataclass(kw_only=True, frozen=True)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Blueprints for capacity reservations."""

0 commit comments

Comments
 (0)