Skip to content

Commit 8bcf22f

Browse files
feat: task to check session quota and send alerts
1 parent 839648e commit 8bcf22f

3 files changed

Lines changed: 106 additions & 3 deletions

File tree

bases/renku_data_services/data_tasks/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class Config:
5252
short_task_period_s: int
5353
long_task_period_s: int
5454
enable_resource_request_tracking: bool
55+
session_quota_alert_check_interval_s: int
5556

5657
@classmethod
5758
def from_env(cls) -> Config:
@@ -69,6 +70,7 @@ def from_env(cls) -> Config:
6970
x_short_task_period = int(os.environ.get("X_SHORT_TASK_PERIOD_S", 30))
7071
short_task_period = int(os.environ.get("SHORT_TASK_PERIOD_S", 2 * 60))
7172
long_task_period = int(os.environ.get("LONG_TASK_PERIOD_S", 3 * 60 * 60))
73+
session_quota_alert_check_interval = int(os.environ.get("SESSION_QUOTA_ALERT_CHECK_INTERVAL_S", 5 * 60))
7274

7375
k8s_config_root = os.environ.get("K8S_CONFIG_ROOT", "/secrets/kube_configs")
7476

@@ -92,4 +94,5 @@ def from_env(cls) -> Config:
9294
long_task_period_s=long_task_period,
9395
dummy_stores=dummy_stores,
9496
enable_resource_request_tracking=enable_resource_request_tracking,
97+
session_quota_alert_check_interval_s=session_quota_alert_check_interval,
9598
)

bases/renku_data_services/data_tasks/dependencies.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Dependency management for data tasks."""
22

3+
import os
34
from dataclasses import dataclass
45

56
from renku_data_services.app_config import logging
@@ -16,12 +17,14 @@
1617
from renku_data_services.metrics.db import MetricsRepository
1718
from renku_data_services.namespace.db import GroupRepository
1819
from renku_data_services.notebooks.constants import AMALTHEA_SESSION_GVK
20+
from renku_data_services.notifications.db import NotificationsRepository
1921
from renku_data_services.project.db import ProjectRepository
2022
from renku_data_services.resource_usage.core import (
2123
DefaultResourcesRequestRecorder,
2224
NoopResourcesRequestRecorder,
2325
ResourceRequestsFetch,
2426
ResourcesRequestRecorder,
27+
ResourceUsageService,
2528
)
2629
from renku_data_services.resource_usage.db import ResourceRequestsRepo
2730
from renku_data_services.search.db import SearchUpdatesRepo
@@ -50,6 +53,9 @@ class DependencyManager:
5053
session_tasks: SessionTasks
5154
capacity_reservation_tasks: CapacityReservationTasks
5255
resource_requests_recorder: ResourcesRequestRecorder
56+
k8s_client: K8sClusterClientsPool
57+
notifications_repo: NotificationsRepository
58+
resource_usage_service: ResourceUsageService
5359

5460
@classmethod
5561
def from_env(cls, cfg: Config | None = None) -> "DependencyManager":
@@ -112,15 +118,24 @@ def from_env(cls, cfg: Config | None = None) -> "DependencyManager":
112118
k8s_client=cr_k8s_client,
113119
)
114120

121+
resource_requests_repo = ResourceRequestsRepo(cfg.db.async_session_maker)
122+
resource_usage_service = ResourceUsageService(repo=resource_requests_repo)
123+
115124
resource_requests_recorder: ResourcesRequestRecorder
116125
if cfg.enable_resource_request_tracking:
117126
resource_requests_recorder = DefaultResourcesRequestRecorder(
118-
repo=ResourceRequestsRepo(cfg.db.async_session_maker), fetch=ResourceRequestsFetch(k8s_client)
127+
repo=resource_requests_repo, fetch=ResourceRequestsFetch(k8s_client)
119128
)
120129
else:
121130
logger.warning("Resource request tracking is disabled!")
122131
resource_requests_recorder = NoopResourcesRequestRecorder()
123132

133+
notifications_repo = NotificationsRepository(
134+
session_maker=cfg.db.async_session_maker,
135+
# TODO: Make sure that this is the correct name
136+
alertmanager_webhook_role=os.environ.get("ALERTMANAGER_WEBHOOK_ROLE", "alertmanager-webhook"),
137+
)
138+
124139
kc_api: IKeycloakAPI
125140
if cfg.dummy_stores:
126141
dummy_users = [
@@ -149,4 +164,7 @@ def from_env(cls, cfg: Config | None = None) -> "DependencyManager":
149164
session_tasks=session_tasks,
150165
capacity_reservation_tasks=capacity_reservation_tasks,
151166
resource_requests_recorder=resource_requests_recorder,
167+
k8s_client=k8s_client,
168+
notifications_repo=notifications_repo,
169+
resource_usage_service=resource_usage_service,
152170
)

bases/renku_data_services/data_tasks/task_defs.py

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@
2727
from renku_data_services.base_models.metrics import MetricsEvent
2828
from renku_data_services.data_tasks.dependencies import DependencyManager
2929
from renku_data_services.data_tasks.taskman import TaskDefininions
30+
from renku_data_services.k8s.models import K8sObject, K8sObjectFilter
3031
from renku_data_services.namespace.models import NamespaceKind
32+
from renku_data_services.notebooks.constants import AMALTHEA_SESSION_GVK
33+
from renku_data_services.notifications.models import UnsavedAlert
3134
from renku_data_services.solr.solr_client import DefaultSolrClient
3235

3336
logger = logging.getLogger(__name__)
@@ -122,8 +125,8 @@ async def sync_user_namespaces(dm: DependencyManager) -> None:
122125
await dm.authz.client.WriteRelationships(authz_change.apply)
123126
num_authz += 1
124127
except Exception as err:
125-
# NOTE: We do not rollback the authz changes here because it is OK if something is in Authz DB
126-
# but not in the message queue but not vice-versa.
128+
# NOTE: We do not roll back the authz changes here because it is OK if something is in Authz DB
129+
# but not in the message queue but not vice versa.
127130
logger.error(f"Failed to sync user namespace {user_namespace} because {err}")
128131
await tx.rollback()
129132
else:
@@ -442,6 +445,84 @@ async def record_resource_requests(dm: DependencyManager) -> None:
442445
await asyncio.sleep(interval_seconds)
443446

444447

448+
def _extract_session_quota_metadata(session: K8sObject) -> tuple[str, str, int] | None:
449+
"""Extract session name, user id and resource pool id from an AmaltheaSession object."""
450+
manifest = session.manifest
451+
452+
state = str(manifest.get("status", {}).get("state", "")).lower()
453+
if state != "running":
454+
return None
455+
456+
labels = manifest.get("metadata", {}).get("labels", {})
457+
user_id = labels.get("renku.io/safe-username")
458+
if not user_id:
459+
return None
460+
461+
annotations = manifest.get("metadata", {}).get("annotations", {})
462+
resource_pool_id_raw = annotations.get("renku.io/resource_pool_id")
463+
if resource_pool_id_raw is None:
464+
return None
465+
resource_pool_id = int(resource_pool_id_raw)
466+
467+
return session.name, user_id, resource_pool_id
468+
469+
470+
async def check_session_quota_and_send_alerts(dm: DependencyManager) -> None:
471+
"""Check all active sessions and send alerts if the remaining user quota is below threshold."""
472+
# TODO: Use a proper admin user here
473+
admin_user = InternalServiceAdmin(id=ServiceAdminId.k8s_watcher)
474+
session_filter = K8sObjectFilter(gvk=AMALTHEA_SESSION_GVK)
475+
476+
async for session in dm.k8s_client.list(session_filter):
477+
try:
478+
metadata = _extract_session_quota_metadata(session)
479+
if metadata is None:
480+
continue
481+
482+
session_name, user_id, resource_pool_id = metadata
483+
usage = await dm.resource_usage_service.get_running_week(resource_pool_id=resource_pool_id, user_id=user_id)
484+
if usage is None:
485+
continue
486+
487+
available_quota = usage.pool_limits.user_limit.value or usage.pool_limits.total_limit.value
488+
if available_quota <= 0:
489+
continue
490+
491+
usage_percentage = (usage.user_usage.cost.value / available_quota) * 100
492+
493+
if usage_percentage < 75.0:
494+
# TODO: Do we need to do any cleanup of old alerts when the quota is back to normal!?
495+
continue
496+
497+
# TODO: Does this send an alert every time the check runs and the quota is low!?
498+
499+
alert = UnsavedAlert(
500+
user_id=user_id,
501+
# TODO: Use proper event type here
502+
event_type="session_quota_low_remaining",
503+
session_name=session_name,
504+
title="Session quota running low",
505+
message=f"You have used {usage_percentage:.1f}% of your quota in resource pool {resource_pool_id}.",
506+
)
507+
await dm.notifications_repo.create_or_update_alert(user=admin_user, alert=alert)
508+
509+
logger.info(f"Created quota alert for user {user_id}, session {session_name}")
510+
except Exception as e:
511+
logger.warning(f"Failed to check quota for pod: {e}")
512+
continue
513+
514+
515+
async def monitor_session_quota_and_send_alerts(dm: DependencyManager) -> None:
516+
"""Periodically check session quotas and send alerts when the remaining quota is low."""
517+
while True:
518+
try:
519+
await check_session_quota_and_send_alerts(dm)
520+
except (asyncio.CancelledError, KeyboardInterrupt) as e:
521+
logger.warning(f"Exiting: {e}")
522+
else:
523+
await asyncio.sleep(dm.config.session_quota_alert_check_interval_s)
524+
525+
445526
def all_tasks(dm: DependencyManager) -> TaskDefininions:
446527
"""A dict of task factories to be managed in main."""
447528
# Impl. note: We pass the entire config to the coroutines, because
@@ -467,5 +548,6 @@ def all_tasks(dm: DependencyManager) -> TaskDefininions:
467548
"monitor_capacity_reservations": lambda: monitor_capacity_reservations(dm),
468549
"cleanup_orphaned_capacity_reservations": lambda: cleanup_orphaned_capacity_reservations(dm),
469550
"record_resource_requests": lambda: record_resource_requests(dm),
551+
"monitor_session_quota_and_send_alerts": lambda: monitor_session_quota_and_send_alerts(dm),
470552
}
471553
)

0 commit comments

Comments
 (0)