Skip to content

Commit 9a343fa

Browse files
authored
feat: Enable resource requests tracking (#1207)
* Record certain data in a `resource_request_log table` * Snapshots are stored periodically from data_tasks * Resource request tracking is disabled by default, requires `ENABLE_RESOURCE_REQUEST_TRACKING=true` to be in the env
1 parent 4336619 commit 9a343fa

38 files changed

Lines changed: 4833 additions & 20 deletions

File tree

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ API_SPECS := \
5353
components/renku_data_services/data_connectors/apispec.py \
5454
components/renku_data_services/search/apispec.py \
5555
components/renku_data_services/notifications/apispec.py \
56-
components/renku_data_services/capacity_reservation/apispec.py
56+
components/renku_data_services/capacity_reservation/apispec.py \
57+
components/renku_data_services/resource_usage/apispec.py
5758

5859
schemas: ${API_SPECS} ## Generate pydantic classes from apispec yaml files
5960
@echo "generated classes based on ApiSpec"

bases/renku_data_services/data_api/app.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from renku_data_services.platform.blueprints import PlatformConfigBP, PlatformUrlRedirectBP
3232
from renku_data_services.project.blueprints import ProjectsBP, ProjectSessionSecretBP
3333
from renku_data_services.repositories.blueprints import RepositoriesBP
34+
from renku_data_services.resource_usage.blueprints import ResourceUsageBP
3435
from renku_data_services.search.blueprints import SearchBP
3536
from renku_data_services.search.reprovision import SearchReprovision
3637
from renku_data_services.search.solr_user_query import UsernameResolve
@@ -273,6 +274,13 @@ def register_all_handlers(app: Sanic, dm: DependencyManager) -> Sanic:
273274
occurrence_repo=dm.occurrence_repo,
274275
authenticator=dm.authenticator,
275276
)
277+
resource_usage = ResourceUsageBP(
278+
name="resource_usage",
279+
url_prefix=url_prefix,
280+
rr_repo=dm.resource_requests_repo,
281+
rr_svc=dm.resource_usage_service,
282+
authenticator=dm.authenticator,
283+
)
276284
app.blueprint(
277285
[
278286
resource_pools.blueprint(),
@@ -302,6 +310,7 @@ def register_all_handlers(app: Sanic, dm: DependencyManager) -> Sanic:
302310
platform_redirects.blueprint(),
303311
notifications.blueprint(),
304312
capacity_reservation.blueprint(),
313+
resource_usage.blueprint(),
305314
]
306315
)
307316
if builds is not None:

bases/renku_data_services/data_api/dependencies.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
DataConnectorSecretRepository,
3838
)
3939
from renku_data_services.git.gitlab import DummyGitlabAPI, EmptyGitlabAPI, GitlabAPI
40+
from renku_data_services.k8s.client_interfaces import K8sClient
4041
from renku_data_services.k8s.clients import (
4142
K8sClusterClientsPool,
4243
K8sPriorityClassClient,
@@ -62,6 +63,8 @@
6263
ProjectSessionSecretRepository,
6364
)
6465
from renku_data_services.repositories.db import GitRepositoriesRepository
66+
from renku_data_services.resource_usage.core import ResourceUsageService
67+
from renku_data_services.resource_usage.db import ResourceRequestsRepo
6568
from renku_data_services.search import query_manual
6669
from renku_data_services.search.db import SearchUpdatesRepo
6770
from renku_data_services.search.reprovision import SearchReprovision
@@ -118,6 +121,7 @@ class DependencyManager:
118121

119122
config: Config
120123

124+
k8s_client: K8sClient
121125
user_store: base_models.UserStore
122126
authenticator: base_models.Authenticator
123127
gitlab_authenticator: base_models.Authenticator
@@ -158,6 +162,8 @@ class DependencyManager:
158162
oauth_http_client_factory: OAuthHttpClientFactory
159163
capacity_reservation_repo: CapacityReservationRepository
160164
occurrence_repo: OccurrenceRepository
165+
resource_requests_repo: ResourceRequestsRepo
166+
resource_usage_service: ResourceUsageService
161167

162168
spec: dict[str, Any] = field(init=False, repr=False, default_factory=dict)
163169
app_name: str = "renku_data_services"
@@ -187,6 +193,7 @@ def load_apispec() -> dict[str, Any]:
187193
renku_data_services.search.__file__,
188194
renku_data_services.notifications.__file__,
189195
renku_data_services.capacity_reservation.__file__,
196+
renku_data_services.resource_usage.__file__,
190197
]
191198

192199
api_specs = []
@@ -417,8 +424,13 @@ def from_env(cls) -> DependencyManager:
417424
occurrence_repo = OccurrenceRepository(
418425
session_maker=config.db.async_session_maker,
419426
)
427+
resource_requests_repo = ResourceRequestsRepo(
428+
session_maker=config.db.async_session_maker,
429+
)
430+
resource_usage_service = ResourceUsageService(repo=resource_requests_repo)
420431
return cls(
421432
config,
433+
k8s_client=client,
422434
authenticator=authenticator,
423435
gitlab_authenticator=gitlab_authenticator,
424436
gitlab_client=gitlab_client,
@@ -459,4 +471,6 @@ def from_env(cls) -> DependencyManager:
459471
oauth_http_client_factory=oauth_http_client_factory,
460472
capacity_reservation_repo=capacity_reservation_repo,
461473
occurrence_repo=occurrence_repo,
474+
resource_requests_repo=resource_requests_repo,
475+
resource_usage_service=resource_usage_service,
462476
)

bases/renku_data_services/data_tasks/config.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class Config:
4242
posthog: PosthogConfig
4343
authz: AuthzConfig
4444
keycloak: KeycloakConfig | None
45+
k8s_config_root: str
4546
dummy_stores: bool
4647
max_retry_wait_seconds: int
4748
main_log_interval_seconds: int
@@ -50,7 +51,7 @@ class Config:
5051
x_short_task_period_s: int
5152
short_task_period_s: int
5253
long_task_period_s: int
53-
k8s_config_root: str
54+
enable_resource_request_tracking: bool
5455

5556
@classmethod
5657
def from_env(cls) -> Config:
@@ -71,6 +72,7 @@ def from_env(cls) -> Config:
7172

7273
k8s_config_root = os.environ.get("K8S_CONFIG_ROOT", "/secrets/kube_configs")
7374

75+
enable_resource_request_tracking = os.environ.get("ENABLE_RESOURCE_REQUEST_TRACKING", "false").lower() == "true"
7476
authz = AuthzConfig.from_env()
7577

7678
keycloak = None if dummy_stores else KeycloakConfig.from_env()
@@ -82,11 +84,12 @@ def from_env(cls) -> Config:
8284
posthog=posthog_config,
8385
authz=authz,
8486
keycloak=keycloak,
87+
k8s_config_root=k8s_config_root,
8588
tcp_host=tcp_host,
8689
tcp_port=tcp_port,
8790
x_short_task_period_s=x_short_task_period,
8891
short_task_period_s=short_task_period,
8992
long_task_period_s=long_task_period,
90-
k8s_config_root=k8s_config_root,
9193
dummy_stores=dummy_stores,
94+
enable_resource_request_tracking=enable_resource_request_tracking,
9295
)

bases/renku_data_services/data_tasks/dependencies.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,28 @@
22

33
from dataclasses import dataclass
44

5+
from renku_data_services.app_config import logging
56
from renku_data_services.authz.authz import Authz
67
from renku_data_services.capacity_reservation.db import CapacityReservationRepository, OccurrenceRepository
78
from renku_data_services.capacity_reservation.k8s_client import CapacityReservationK8sClient
89
from renku_data_services.capacity_reservation.tasks import CapacityReservationTasks
910
from renku_data_services.crc.db import ClusterRepository
1011
from renku_data_services.data_tasks.config import Config
1112
from renku_data_services.k8s.clients import K8sClusterClientsPool
12-
from renku_data_services.k8s.config import KubeConfigEnv
13+
from renku_data_services.k8s.config import KubeConfigEnv, get_clusters
1314
from renku_data_services.k8s.db import K8sDbCache
1415
from renku_data_services.metrics.core import StagingMetricsService
1516
from renku_data_services.metrics.db import MetricsRepository
1617
from renku_data_services.namespace.db import GroupRepository
17-
from renku_data_services.notebooks.config import get_clusters
1818
from renku_data_services.notebooks.constants import AMALTHEA_SESSION_GVK
1919
from renku_data_services.project.db import ProjectRepository
20+
from renku_data_services.resource_usage.core import (
21+
DefaultResourcesRequestRecorder,
22+
NoopResourcesRequestRecorder,
23+
ResourceRequestsFetch,
24+
ResourcesRequestRecorder,
25+
)
26+
from renku_data_services.resource_usage.db import ResourceRequestsRepo
2027
from renku_data_services.search.db import SearchUpdatesRepo
2128
from renku_data_services.session.db import SessionRepository
2229
from renku_data_services.session.tasks import SessionTasks
@@ -25,6 +32,8 @@
2532
from renku_data_services.users.kc_api import IKeycloakAPI, KeycloakAPI
2633
from renku_data_services.users.models import UnsavedUserInfo
2734

35+
logger = logging.getLogger(__file__)
36+
2837

2938
@dataclass
3039
class DependencyManager:
@@ -40,6 +49,7 @@ class DependencyManager:
4049
kc_api: IKeycloakAPI
4150
session_tasks: SessionTasks
4251
capacity_reservation_tasks: CapacityReservationTasks
52+
resource_requests_recorder: ResourcesRequestRecorder
4353

4454
@classmethod
4555
def from_env(cls, cfg: Config | None = None) -> "DependencyManager":
@@ -83,10 +93,11 @@ def from_env(cls, cfg: Config | None = None) -> "DependencyManager":
8393
session_tasks = SessionTasks(session_environment_repo=session_environment_repo)
8494
cluster_repo = ClusterRepository(session_maker=cfg.db.async_session_maker)
8595
k8s_db_cache = K8sDbCache(cfg.db.async_session_maker)
96+
default_kubeconfig = KubeConfigEnv()
8697
k8s_client = K8sClusterClientsPool(
8798
lambda: get_clusters(
8899
kube_conf_root_dir=cfg.k8s_config_root,
89-
default_kubeconfig=KubeConfigEnv(),
100+
default_kubeconfig=default_kubeconfig,
90101
cluster_repo=cluster_repo,
91102
cache=k8s_db_cache,
92103
kinds_to_cache=[AMALTHEA_SESSION_GVK],
@@ -100,6 +111,16 @@ def from_env(cls, cfg: Config | None = None) -> "DependencyManager":
100111
),
101112
k8s_client=cr_k8s_client,
102113
)
114+
115+
resource_requests_recorder: ResourcesRequestRecorder
116+
if cfg.enable_resource_request_tracking:
117+
resource_requests_recorder = DefaultResourcesRequestRecorder(
118+
repo=ResourceRequestsRepo(cfg.db.async_session_maker), fetch=ResourceRequestsFetch(k8s_client)
119+
)
120+
else:
121+
logger.warning("Resource request tracking is disabled!")
122+
resource_requests_recorder = NoopResourcesRequestRecorder()
123+
103124
kc_api: IKeycloakAPI
104125
if cfg.dummy_stores:
105126
dummy_users = [
@@ -127,4 +148,5 @@ def from_env(cls, cfg: Config | None = None) -> "DependencyManager":
127148
kc_api=kc_api,
128149
session_tasks=session_tasks,
129150
capacity_reservation_tasks=capacity_reservation_tasks,
151+
resource_requests_recorder=resource_requests_recorder,
130152
)

bases/renku_data_services/data_tasks/task_defs.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""The task definitions in form of coroutines."""
22

33
import asyncio
4+
from datetime import timedelta
45

56
from authzed.api.v1 import (
67
Consistency,
@@ -433,6 +434,14 @@ async def cleanup_orphaned_capacity_reservations(dm: DependencyManager) -> None:
433434
await asyncio.sleep(dm.config.x_short_task_period_s)
434435

435436

437+
async def record_resource_requests(dm: DependencyManager) -> None:
438+
"""Periodically record all resource requests."""
439+
interval_seconds = 600
440+
while True:
441+
await dm.resource_requests_recorder.record_resource_requests(timedelta(seconds=interval_seconds))
442+
await asyncio.sleep(interval_seconds)
443+
444+
436445
def all_tasks(dm: DependencyManager) -> TaskDefininions:
437446
"""A dict of task factories to be managed in main."""
438447
# Impl. note: We pass the entire config to the coroutines, because
@@ -457,5 +466,6 @@ def all_tasks(dm: DependencyManager) -> TaskDefininions:
457466
"activate_capacity_reservations": lambda: activate_capacity_reservations(dm),
458467
"monitor_capacity_reservations": lambda: monitor_capacity_reservations(dm),
459468
"cleanup_orphaned_capacity_reservations": lambda: cleanup_orphaned_capacity_reservations(dm),
469+
"record_resource_requests": lambda: record_resource_requests(dm),
460470
}
461471
)

components/renku_data_services/migrations/env.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from renku_data_services.notifications.orm import BaseORM as notifications
1414
from renku_data_services.platform.orm import BaseORM as platform
1515
from renku_data_services.project.orm import BaseORM as project
16+
from renku_data_services.resource_usage.orm import BaseORM as resource_usage
1617
from renku_data_services.search.orm import BaseORM as search
1718
from renku_data_services.secrets.orm import BaseORM as secrets
1819
from renku_data_services.session.orm import BaseORM as sessions
@@ -37,6 +38,7 @@
3738
sessions.metadata,
3839
storage.metadata,
3940
users.metadata,
41+
resource_usage.metadata,
4042
]
4143

4244
run_migrations(all_metadata)

components/renku_data_services/migrations/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def include_object(
3030
compare_to: SchemaItem | None,
3131
) -> bool:
3232
"""Prevents from alembic migrating the alembic_version tables."""
33-
return type_ != "table" or name != "alembic_version"
33+
return type_ != "table" or (name != "alembic_version" and name != "resource_requests_view")
3434

3535

3636
def combine_version_tables(conn: Connection, metadata_schema: str | None) -> None:

0 commit comments

Comments
 (0)