Skip to content

Commit 75e96ae

Browse files
feat: task to check session quota and send alerts (#1252)
1 parent 0d0e2bd commit 75e96ae

8 files changed

Lines changed: 161 additions & 11 deletions

File tree

bases/renku_data_services/data_tasks/config.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ class Config:
5252
short_task_period_s: int
5353
long_task_period_s: int
5454
enable_resource_request_tracking: bool
55+
session_quota_alert_check_interval_s: int
56+
session_quota_alert_remaining_threshold_p: int
57+
session_quota_alert_critical_m: int
58+
alertmanager_webhook_role: str
5559

5660
@classmethod
5761
def from_env(cls) -> Config:
@@ -69,6 +73,9 @@ def from_env(cls) -> Config:
6973
x_short_task_period = int(os.environ.get("X_SHORT_TASK_PERIOD_S", 30))
7074
short_task_period = int(os.environ.get("SHORT_TASK_PERIOD_S", 2 * 60))
7175
long_task_period = int(os.environ.get("LONG_TASK_PERIOD_S", 3 * 60 * 60))
76+
session_quota_alert_check_interval = int(os.environ.get("SESSION_QUOTA_ALERT_CHECK_INTERVAL_S", 5 * 60))
77+
session_quota_alert_remaining_threshold = int(os.environ.get("SESSION_QUOTA_ALERT_REMAINING_THRESHOLD_P", 20))
78+
session_quota_alert_critical = int(os.environ.get("SESSION_QUOTA_ALERT_CRITICAL_M", 10))
7279

7380
k8s_config_root = os.environ.get("K8S_CONFIG_ROOT", "/secrets/kube_configs")
7481

@@ -92,4 +99,8 @@ def from_env(cls) -> Config:
9299
long_task_period_s=long_task_period,
93100
dummy_stores=dummy_stores,
94101
enable_resource_request_tracking=enable_resource_request_tracking,
102+
session_quota_alert_check_interval_s=session_quota_alert_check_interval,
103+
session_quota_alert_remaining_threshold_p=session_quota_alert_remaining_threshold,
104+
session_quota_alert_critical_m=session_quota_alert_critical,
105+
alertmanager_webhook_role=os.environ.get("ALERTMANAGER_WEBHOOK_ROLE", "alertmanager-webhook"),
95106
)

bases/renku_data_services/data_tasks/dependencies.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
from renku_data_services.metrics.db import MetricsRepository
1717
from renku_data_services.namespace.db import GroupRepository
1818
from renku_data_services.notebooks.constants import AMALTHEA_SESSION_GVK
19+
from renku_data_services.notifications.db import NotificationsRepository
1920
from renku_data_services.project.db import ProjectRepository
2021
from renku_data_services.resource_usage.core import (
2122
DefaultResourcesRequestRecorder,
2223
NoopResourcesRequestRecorder,
2324
ResourceRequestsFetch,
2425
ResourcesRequestRecorder,
26+
ResourceUsageService,
2527
)
2628
from renku_data_services.resource_usage.db import ResourceRequestsRepo
2729
from renku_data_services.search.db import SearchUpdatesRepo
@@ -50,6 +52,10 @@ class DependencyManager:
5052
session_tasks: SessionTasks
5153
capacity_reservation_tasks: CapacityReservationTasks
5254
resource_requests_recorder: ResourcesRequestRecorder
55+
k8s_client: K8sClusterClientsPool
56+
notifications_repo: NotificationsRepository
57+
resource_usage_service: ResourceUsageService
58+
resource_requests_repo: ResourceRequestsRepo
5359

5460
@classmethod
5561
def from_env(cls, cfg: Config | None = None) -> "DependencyManager":
@@ -112,15 +118,23 @@ def from_env(cls, cfg: Config | None = None) -> "DependencyManager":
112118
k8s_client=cr_k8s_client,
113119
)
114120

121+
resource_requests_repo = ResourceRequestsRepo(cfg.db.async_session_maker)
122+
resource_usage_service = ResourceUsageService(repo=resource_requests_repo)
123+
115124
resource_requests_recorder: ResourcesRequestRecorder
116125
if cfg.enable_resource_request_tracking:
117126
resource_requests_recorder = DefaultResourcesRequestRecorder(
118-
repo=ResourceRequestsRepo(cfg.db.async_session_maker), fetch=ResourceRequestsFetch(k8s_client)
127+
repo=resource_requests_repo, fetch=ResourceRequestsFetch(k8s_client)
119128
)
120129
else:
121130
logger.warning("Resource request tracking is disabled!")
122131
resource_requests_recorder = NoopResourcesRequestRecorder()
123132

133+
notifications_repo = NotificationsRepository(
134+
session_maker=cfg.db.async_session_maker,
135+
alertmanager_webhook_role=cfg.alertmanager_webhook_role,
136+
)
137+
124138
kc_api: IKeycloakAPI
125139
if cfg.dummy_stores:
126140
dummy_users = [
@@ -149,4 +163,8 @@ def from_env(cls, cfg: Config | None = None) -> "DependencyManager":
149163
session_tasks=session_tasks,
150164
capacity_reservation_tasks=capacity_reservation_tasks,
151165
resource_requests_recorder=resource_requests_recorder,
166+
k8s_client=k8s_client,
167+
notifications_repo=notifications_repo,
168+
resource_usage_service=resource_usage_service,
169+
resource_requests_repo=resource_requests_repo,
152170
)

bases/renku_data_services/data_tasks/task_defs.py

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@
2727
from renku_data_services.base_models.metrics import MetricsEvent
2828
from renku_data_services.data_tasks.dependencies import DependencyManager
2929
from renku_data_services.data_tasks.taskman import TaskDefininions
30+
from renku_data_services.k8s.models import K8sObject, K8sObjectFilter
3031
from renku_data_services.namespace.models import NamespaceKind
32+
from renku_data_services.notebooks.constants import AMALTHEA_SESSION_GVK
33+
from renku_data_services.notifications.models import UnsavedAlert
3134
from renku_data_services.solr.solr_client import DefaultSolrClient
35+
from renku_data_services.utils.core import get_nonzero_minimum
3236

3337
logger = logging.getLogger(__name__)
3438

@@ -122,8 +126,8 @@ async def sync_user_namespaces(dm: DependencyManager) -> None:
122126
await dm.authz.client.WriteRelationships(authz_change.apply)
123127
num_authz += 1
124128
except Exception as err:
125-
# NOTE: We do not rollback the authz changes here because it is OK if something is in Authz DB
126-
# but not in the message queue but not vice-versa.
129+
# NOTE: We do not roll back the authz changes here because it is OK if something is in Authz DB
130+
# but not in the message queue but not vice versa.
127131
logger.error(f"Failed to sync user namespace {user_namespace} because {err}")
128132
await tx.rollback()
129133
else:
@@ -442,6 +446,121 @@ async def record_resource_requests(dm: DependencyManager) -> None:
442446
await asyncio.sleep(interval_seconds)
443447

444448

449+
def _extract_session_quota_metadata(session: K8sObject) -> tuple[str, str, int, int | None] | None:
450+
"""Extract session name, user id, resource pool id and resource class id from an AmaltheaSession object."""
451+
manifest = session.manifest
452+
453+
state = str(manifest.get("status", {}).get("state", "")).lower()
454+
if state != "running":
455+
return None
456+
457+
labels = manifest.get("metadata", {}).get("labels", {})
458+
user_id = labels.get("renku.io/safe-username")
459+
if not user_id:
460+
return None
461+
462+
annotations = manifest.get("metadata", {}).get("annotations", {})
463+
resource_pool_id_raw = annotations.get("renku.io/resource_pool_id")
464+
if resource_pool_id_raw is None:
465+
return None
466+
resource_pool_id = int(resource_pool_id_raw)
467+
468+
resource_class_id_raw = annotations.get("renku.io/resource_class_id")
469+
resource_class_id = int(resource_class_id_raw) if resource_class_id_raw else None
470+
471+
return session.name, user_id, resource_pool_id, resource_class_id
472+
473+
474+
async def _check_session_quota_and_send_alerts(dm: DependencyManager) -> None:
475+
"""Check all active sessions and send alerts if the remaining user quota is below threshold."""
476+
admin_user = InternalServiceAdmin(id=ServiceAdminId.capacity_reservation)
477+
session_filter = K8sObjectFilter(gvk=AMALTHEA_SESSION_GVK)
478+
479+
async for session in dm.k8s_client.list(session_filter):
480+
try:
481+
metadata = _extract_session_quota_metadata(session)
482+
if not metadata:
483+
continue
484+
485+
session_name, user_id, resource_pool_id, resource_class_id = metadata
486+
487+
usage = await dm.resource_usage_service.get_running_week(resource_pool_id=resource_pool_id, user_id=user_id)
488+
if not usage:
489+
continue
490+
491+
total_quota = get_nonzero_minimum(usage.pool_limits.user_limit.value, usage.pool_limits.total_limit.value)
492+
if total_quota <= 0:
493+
continue
494+
495+
usage_p = (usage.user_usage.cost.value / total_quota) * 100
496+
usage_threshold_p = 100 - dm.config.session_quota_alert_remaining_threshold_p
497+
if usage_p <= usage_threshold_p:
498+
continue
499+
500+
# NOTE: Without the resource_class_id, we cannot calculate the remaining time
501+
if resource_class_id:
502+
if usage.user_usage.cost.value >= total_quota:
503+
message = f"Your session {session_name} in resource pool {resource_pool_id} has exhausted its quota"
504+
log_message = (
505+
f"Session {session_name} for user {user_id} has exhausted its quota in resource pool "
506+
f"{resource_pool_id}"
507+
)
508+
hibernation_alert = UnsavedAlert(
509+
user_id=user_id,
510+
event_type="session_quota_exhausted",
511+
session_name=session_name,
512+
title="Session paused due to quota exhaustion",
513+
message=message,
514+
)
515+
await dm.notifications_repo.create_or_update_alert(user=admin_user, alert=hibernation_alert)
516+
logger.info(log_message)
517+
continue
518+
519+
resource_class_cost = await dm.resource_requests_repo.find_resource_class_costs(
520+
resource_pool_id, resource_class_id
521+
)
522+
523+
if resource_class_cost and resource_class_cost.cost.value > 0:
524+
remaining_credits = total_quota - usage.user_usage.cost.value
525+
remaining_minutes = (remaining_credits / resource_class_cost.cost.value) * 60
526+
if remaining_minutes < dm.config.session_quota_alert_critical_m:
527+
critical_alert = UnsavedAlert(
528+
user_id=user_id,
529+
event_type="session_quota_critically_low",
530+
session_name=session_name,
531+
title="Session quota expiring soon",
532+
message=f"Your session in resource pool {resource_pool_id} will run out of quota in "
533+
+ f"approximately {remaining_minutes:.0f} minutes.",
534+
)
535+
await dm.notifications_repo.create_or_update_alert(user=admin_user, alert=critical_alert)
536+
logger.info(f"Created critical quota alert for user {user_id}, session {session_name}")
537+
continue
538+
539+
alert = UnsavedAlert(
540+
user_id=user_id,
541+
event_type="session_quota_low",
542+
session_name=session_name,
543+
title="Session quota running low",
544+
message=f"You have used {usage_p:.1f}% of your quota in resource pool {resource_pool_id}.",
545+
)
546+
await dm.notifications_repo.create_or_update_alert(user=admin_user, alert=alert)
547+
logger.info(f"Created quota alert for user {user_id}, session {session_name}")
548+
except Exception as e:
549+
logger.warning(f"Failed to check quota for pod: {e}")
550+
continue
551+
552+
553+
async def monitor_session_quota_and_send_alerts(dm: DependencyManager) -> None:
554+
"""Periodically check session quotas and send alerts when the remaining quota is low."""
555+
while True:
556+
try:
557+
await _check_session_quota_and_send_alerts(dm)
558+
except (asyncio.CancelledError, KeyboardInterrupt) as e:
559+
logger.warning(f"Exiting: {e}")
560+
else:
561+
await asyncio.sleep(dm.config.session_quota_alert_check_interval_s)
562+
563+
445564
def all_tasks(dm: DependencyManager) -> TaskDefininions:
446565
"""A dict of task factories to be managed in main."""
447566
# Impl. note: We pass the entire config to the coroutines, because
@@ -467,5 +586,6 @@ def all_tasks(dm: DependencyManager) -> TaskDefininions:
467586
"monitor_capacity_reservations": lambda: monitor_capacity_reservations(dm),
468587
"cleanup_orphaned_capacity_reservations": lambda: cleanup_orphaned_capacity_reservations(dm),
469588
"record_resource_requests": lambda: record_resource_requests(dm),
589+
"monitor_session_quota_and_send_alerts": lambda: monitor_session_quota_and_send_alerts(dm),
470590
}
471591
)

components/renku_data_services/crc/db.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,14 +264,10 @@ async def filter_resource_pools(
264264
rp: schemas.ResourcePoolORM
265265
for rp in res.scalars().all():
266266
quota = await self.quotas_repo.get_quota(rp.quota, rp.get_cluster_id())
267-
268-
# TODO: Do we need to calculate usage/remaining for admins!?
269267
credits_used = None
270268
if self.resource_usage_service and api_user.is_authenticated:
271269
usage_summary = await self.resource_usage_service.usage_of_running_week(rp.id, api_user.id)
272270
if not usage_summary.is_empty():
273-
# TODO: Should we use the cost_raw and runtime_hours here instead?
274-
# TODO: Should we calculate usage for individual entries?
275271
credits_used = usage_summary.cost
276272

277273
rp_model = rp.dump(quota, criteria, credits_used)

components/renku_data_services/notifications/db.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ async def update_alert(self, user: base_models.APIUser, alert_id: ULID, patch: m
103103
return alert_orm.dump()
104104

105105
def __update_alert(self, alert: schemas.AlertORM, update: models.AlertPatch) -> None:
106-
if update.resolved is True:
106+
if update.resolved:
107107
alert.resolved_date = datetime.now(UTC)
108108

109109
async def get_alerts_by_properties(

components/renku_data_services/resource_usage/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ async def usage_of_running_week(
175175
"""Return the resource usage for the given pool of the currently running week.
176176
177177
The week start is Monday 0:00 UTC. Resource usage is returned in 'credits'. When a user_id
178-
is given, the results represent the usage of only that user. Otherwise the overall pool usage
178+
is given, the results represent the usage of only that user. Otherwise, the overall pool usage
179179
is returned. The running week is calculated from the `current_time` argument, which is the current
180180
time if not specified.
181181
"""

components/renku_data_services/utils/core.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,3 +236,8 @@ async def get_openbis_pat(
236236
"from creating a personal access token."
237237
)
238238
return json2["result"][0]["permId"], valid_to
239+
240+
241+
def get_nonzero_minimum(a: int, b: int) -> int:
242+
"""Return the minimum nonzero value, where both inputs are non-negative."""
243+
return a if b == 0 else b if a == 0 else min(a, b)

test/bases/renku_data_services/data_api/test_repositories.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ async def test_get_repository_without_connection(
111111

112112
@pytest.mark.asyncio
113113
async def test_get_one_repository(oauth2_test_client: SanicASGITestClient, user_headers, create_oauth2_connection):
114-
connection = await create_oauth2_connection("provider_1")
114+
connection = await create_oauth2_connection("provider_2")
115115
repository_url = "https://example.org/username/my_repo.git"
116116

117117
_, res = await oauth2_test_client.get(
@@ -122,7 +122,7 @@ async def test_get_one_repository(oauth2_test_client: SanicASGITestClient, user_
122122
assert res.json is not None
123123
result = res.json
124124
assert result.get("connection", {}).get("id") == connection["id"]
125-
assert result.get("provider", {}).get("id") == "provider_1"
125+
assert result.get("provider", {}).get("id") == "provider_2"
126126
assert result.get("metadata") is not None
127127
repository_metadata = result["metadata"]
128128
assert repository_metadata.get("git_url") == repository_url

0 commit comments

Comments
 (0)