Skip to content

Commit b881f8b

Browse files
committed
chore: fix mypy errors and formatting
1 parent 7780d15 commit b881f8b

4 files changed

Lines changed: 517 additions & 9 deletions

File tree

components/renku_data_services/metrics/db.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Repository for the metrics staging table."""
22

3+
import asyncio
34
from collections.abc import AsyncGenerator, Callable
45
from typing import Any
56

@@ -31,9 +32,36 @@ async def get_unprocessed_metrics(self) -> AsyncGenerator[MetricsORM, None]:
3132
async for metrics in result:
3233
yield metrics
3334

35+
async def delete_all_metrics(self) -> None:
36+
"""Delete all metrics from the staging table."""
37+
async with self.session_maker() as session, session.begin():
38+
await session.execute(delete(MetricsORM))
39+
3440
async def delete_processed_metrics(self, metrics_ids: list[ULID]) -> None:
3541
"""Delete metrics events from the staging table."""
3642
if not metrics_ids:
3743
return
3844
async with self.session_maker() as session, session.begin():
3945
await session.execute(delete(MetricsORM).where(MetricsORM.id.in_(metrics_ids)))
46+
47+
async def wait_for_metrics(self, timeout: float = 5.0, poll_interval: float = 0.1) -> bool:
48+
"""Wait for metrics to be processed.
49+
50+
Polls for metrics events and returns when at least one event is found or timeout is reached.
51+
52+
Args:
53+
timeout: Maximum time to wait in seconds
54+
poll_interval: Time between polls in seconds
55+
56+
Returns:
57+
True if metrics were found, False if timeout reached
58+
"""
59+
import time
60+
61+
start_time = time.monotonic()
62+
while time.monotonic() - start_time < timeout:
63+
metrics = [m async for m in self.get_unprocessed_metrics()]
64+
if metrics:
65+
return True
66+
await asyncio.sleep(poll_interval)
67+
return False

components/renku_data_services/notebooks/core_sessions.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,16 +1111,16 @@ async def patch_session(
11111111
)
11121112
rp = await rp_repo.get_resource_pool_from_class(user, body.resource_class_id)
11131113
rc = rp.get_resource_class(body.resource_class_id)
1114-
if not rc:
1115-
raise errors.MissingResourceError(
1116-
message=f"The resource class you requested with ID {body.resource_class_id} does not exist"
1117-
)
11181114
# Resource class is not being changed but we still need to get the resource pool and class for patching
11191115
# in case they changed since the session was created
11201116
else:
11211117
rp = await rp_repo.get_resource_pool_from_class(user, session.resource_class_id())
11221118
rc = rp.get_resource_class(session.resource_class_id())
11231119

1120+
if not rc:
1121+
raise errors.MissingResourceError(
1122+
message=f"The resource class you requested with ID {body.resource_class_id} does not exist"
1123+
)
11241124
# If the session is being hibernated we do not need to patch anything else that is
11251125
# not specifically called for in the request body, we can refresh things when the user resumes.
11261126
if is_getting_hibernated:
@@ -1161,7 +1161,7 @@ async def patch_session(
11611161
"resource_pool_id": str(rp.id) or "",
11621162
"resource_class_name": f"{rp.name}.{rc.name}",
11631163
"session_id": session_id,
1164-
}
1164+
},
11651165
)
11661166

11671167
patch.spec.culling = get_culling_patch(user, rp, nb_config, body.lastInteraction)

test/bases/renku_data_services/data_api/test_metrics.py

Lines changed: 267 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import re
2+
import subprocess
23
from collections.abc import AsyncGenerator
34
from typing import cast
45
from unittest.mock import AsyncMock, MagicMock
@@ -7,6 +8,16 @@
78
import pytest_asyncio
89
from sanic_testing.testing import SanicASGITestClient
910

11+
12+
def _has_docker() -> bool:
13+
"""Check if docker is available."""
14+
try:
15+
result = subprocess.run(["docker", "info"], capture_output=True, timeout=5)
16+
return result.returncode == 0
17+
except (FileNotFoundError, subprocess.TimeoutExpired):
18+
return False
19+
20+
1021
from renku_data_services.base_models.metrics import ProjectCreationType
1122
from renku_data_services.metrics.core import StagingMetricsService
1223

@@ -15,17 +26,30 @@
1526
async def sanic_metrics_client(monkeypatch, app_manager, sanic_client) -> AsyncGenerator[SanicASGITestClient, None]:
1627
monkeypatch.setenv("POSTHOG_ENABLED", "true")
1728

18-
# NOTE: Replace the `project_created` and `session_launcher_created` methods with actual implementations to store
19-
# metrics in the database.
29+
# NOTE: Replace the metrics methods with actual implementations to store metrics in the database.
2030
metrics = StagingMetricsService(enabled=True, metrics_repo=app_manager.metrics_repo)
2131
metrics_mock = cast(MagicMock, app_manager.metrics)
2232
metrics_mock.configure_mock(
23-
project_created=metrics.project_created, session_launcher_created=metrics.session_launcher_created
33+
project_created=metrics.project_created,
34+
session_launcher_created=metrics.session_launcher_created,
35+
session_started=metrics.session_started,
36+
session_resumed=metrics.session_resumed,
37+
session_stopped=metrics.session_stopped,
38+
session_hibernated=metrics.session_hibernated,
39+
user_requested_session_resume=metrics.user_requested_session_resume,
2440
)
2541

2642
yield sanic_client
2743

28-
metrics_mock.configure_mock(project_created=AsyncMock(), session_launcher_created=AsyncMock())
44+
metrics_mock.configure_mock(
45+
project_created=AsyncMock(),
46+
session_launcher_created=AsyncMock(),
47+
session_started=AsyncMock(),
48+
session_resumed=AsyncMock(),
49+
session_stopped=AsyncMock(),
50+
session_hibernated=AsyncMock(),
51+
user_requested_session_resume=AsyncMock(),
52+
)
2953

3054

3155
@pytest.mark.asyncio
@@ -47,3 +71,242 @@ async def test_metrics_are_stored(sanic_metrics_client, app_manager, create_proj
4771
assert re.match(r"^[0-7][0-9A-HJKMNP-TV-Z]{25}$", str(session_launcher_created.id))
4872
assert session_launcher_created.event == "session_launcher_created"
4973
assert session_launcher_created.metadata_ == {"environment_image_source": "image", "environment_kind": "CUSTOM"}
74+
75+
76+
@pytest.mark.asyncio
77+
@pytest.mark.skipif(not _has_docker(), reason="docker is not available - kind cannot create clusters")
78+
async def test_session_metrics_are_stored(
79+
sanic_metrics_client, app_manager, create_project, create_session_launcher, create_resource_pool, user_headers
80+
) -> None:
81+
"""Test that session lifecycle metrics with metadata are stored correctly.
82+
83+
Note: This test requires kind cluster to be available.
84+
"""
85+
86+
project = await create_project(name="Project", sanic_client=sanic_metrics_client)
87+
# Create a resource pool with a resource class to use in the session launcher
88+
resource_pool = await create_resource_pool(admin=True)
89+
resource_class_id = resource_pool["classes"][0]["id"]
90+
91+
# Create a session launcher with a resource_class_id
92+
session_launcher = await create_session_launcher(
93+
"Launcher 1", project_id=project["id"], resource_class_id=resource_class_id
94+
)
95+
96+
# Start a session to trigger session_started metric
97+
_, res = await sanic_metrics_client.post(
98+
"/api/data/sessions",
99+
headers=user_headers,
100+
json={
101+
"launcher_id": session_launcher["id"],
102+
"name": "Test Session",
103+
"resource_class_id": resource_class_id,
104+
},
105+
)
106+
assert res.status_code == 201
107+
session_data = res.json
108+
session_id = session_data["id"]
109+
110+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
111+
events.sort(key=lambda e: e.timestamp)
112+
113+
# Find session_started event
114+
session_started_events = [e for e in events if e.event == "session_started"]
115+
assert len(session_started_events) >= 1
116+
session_started_event = session_started_events[0]
117+
118+
# Verify session_started has required metadata fields
119+
metadata = session_started_event.metadata_
120+
assert metadata["session_id"] == session_id
121+
assert "resource_class_id" in metadata
122+
assert "resource_pool_id" in metadata
123+
assert "resource_class_name" in metadata
124+
assert "cpu" in metadata
125+
assert "memory" in metadata
126+
assert "gpu" in metadata
127+
assert "storage" in metadata
128+
129+
# Simulate session resumed event
130+
from renku_data_services.metrics.core import StagingMetricsService
131+
132+
metrics = StagingMetricsService(enabled=True, metrics_repo=app_manager.metrics_repo)
133+
await metrics.session_resumed(
134+
user=type("APIUser", (), {"id": user_headers["X-API-Token"]})(),
135+
metadata={
136+
"session_id": session_id,
137+
"resource_class_id": "1",
138+
"resource_pool_id": "pool-1",
139+
"resource_class_name": "test-pool.test-class",
140+
"cpu": 2000,
141+
"memory": 4096,
142+
"gpu": 0,
143+
"storage": 10000,
144+
},
145+
)
146+
147+
# Reload events
148+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
149+
events.sort(key=lambda e: e.timestamp)
150+
151+
# Find session_resumed event
152+
session_resumed_events = [e for e in events if e.event == "session_resumed"]
153+
assert len(session_resumed_events) >= 1
154+
session_resumed_event = session_resumed_events[-1]
155+
156+
# Verify session_resumed has required metadata fields
157+
metadata = session_resumed_event.metadata_
158+
assert metadata["session_id"] == session_id
159+
assert "resource_class_id" in metadata
160+
assert "resource_pool_id" in metadata
161+
assert "resource_class_name" in metadata
162+
assert "cpu" in metadata
163+
assert "memory" in metadata
164+
assert "gpu" in metadata
165+
assert "storage" in metadata
166+
167+
# Simulate user_requested_session_resume event
168+
await metrics.user_requested_session_resume(
169+
user=type("APIUser", (), {"id": user_headers["X-API-Token"]})(),
170+
metadata={
171+
"session_id": session_id,
172+
"resource_class_id": "1",
173+
"resource_pool_id": "pool-1",
174+
"resource_class_name": "test-pool.test-class",
175+
"cpu": 2000,
176+
"memory": 4096,
177+
"gpu": 0,
178+
},
179+
)
180+
181+
# Reload events
182+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
183+
events.sort(key=lambda e: e.timestamp)
184+
185+
# Find user_requested_session_resume event
186+
resume_request_events = [e for e in events if e.event == "user_requested_session_resume"]
187+
assert len(resume_request_events) >= 1
188+
resume_request_event = resume_request_events[-1]
189+
190+
# Verify user_requested_session_resume has required metadata fields
191+
metadata = resume_request_event.metadata_
192+
assert metadata["session_id"] == session_id
193+
assert "resource_class_id" in metadata
194+
assert "resource_pool_id" in metadata
195+
assert "resource_class_name" in metadata
196+
assert "cpu" in metadata
197+
assert "memory" in metadata
198+
assert "gpu" in metadata
199+
assert "storage" not in metadata # This event doesn't include storage
200+
201+
# Also verify session_hibernated metric
202+
await metrics.session_hibernated(
203+
user=type("APIUser", (), {"id": user_headers["X-API-Token"]})(),
204+
metadata={"session_id": session_id},
205+
)
206+
207+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
208+
events.sort(key=lambda e: e.timestamp)
209+
210+
hibernated_events = [e for e in events if e.event == "session_hibernated"]
211+
assert len(hibernated_events) >= 1
212+
hibernated_event = hibernated_events[-1]
213+
assert hibernated_event.metadata_["session_id"] == session_id
214+
215+
216+
@pytest.mark.asyncio
217+
async def test_session_metrics_metadata_structure(app_manager) -> None:
218+
"""Test that session metrics store metadata with correct structure."""
219+
from renku_data_services.metrics.core import StagingMetricsService
220+
221+
# Clear existing metrics before running the test
222+
await app_manager.metrics_repo.delete_all_metrics()
223+
224+
metrics = StagingMetricsService(enabled=True, metrics_repo=app_manager.metrics_repo)
225+
226+
# Create mock user
227+
mock_user = type("APIUser", (), {"id": "test-user-123", "is_authenticated": True})()
228+
229+
# Test session_started metadata
230+
await metrics.session_started(
231+
user=mock_user,
232+
metadata={
233+
"session_id": "session-456",
234+
"resource_class_id": 5,
235+
"resource_pool_id": "pool-abc",
236+
"resource_class_name": "test-pool.test-class",
237+
"cpu": 2000,
238+
"memory": 4096,
239+
"gpu": 1,
240+
"storage": 10000,
241+
},
242+
)
243+
244+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
245+
session_started_event = [e for e in events if e.event == "session_started"][0]
246+
247+
# Verify all metadata fields are preserved
248+
assert session_started_event.metadata_["session_id"] == "session-456"
249+
assert session_started_event.metadata_["resource_class_id"] == 5
250+
assert session_started_event.metadata_["resource_pool_id"] == "pool-abc"
251+
assert session_started_event.metadata_["resource_class_name"] == "test-pool.test-class"
252+
assert session_started_event.metadata_["cpu"] == 2000
253+
assert session_started_event.metadata_["memory"] == 4096
254+
assert session_started_event.metadata_["gpu"] == 1
255+
assert session_started_event.metadata_["storage"] == 10000
256+
257+
# Test session_resumed metadata
258+
await metrics.session_resumed(
259+
user=mock_user,
260+
metadata={
261+
"session_id": "session-789",
262+
"resource_class_id": 10,
263+
"resource_pool_id": "pool-def",
264+
"resource_class_name": "different-pool.different-class",
265+
"cpu": 4000,
266+
"memory": 8192,
267+
"gpu": 2,
268+
"storage": 20000,
269+
},
270+
)
271+
272+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
273+
session_resumed_event = [e for e in events if e.event == "session_resumed"][0]
274+
275+
assert session_resumed_event.metadata_["session_id"] == "session-789"
276+
assert session_resumed_event.metadata_["resource_class_id"] == 10
277+
assert session_resumed_event.metadata_["resource_pool_id"] == "pool-def"
278+
assert session_resumed_event.metadata_["resource_class_name"] == "different-pool.different-class"
279+
assert session_resumed_event.metadata_["cpu"] == 4000
280+
assert session_resumed_event.metadata_["memory"] == 8192
281+
assert session_resumed_event.metadata_["gpu"] == 2
282+
assert session_resumed_event.metadata_["storage"] == 20000
283+
284+
# Test user_requested_session_resume metadata
285+
await metrics.user_requested_session_resume(
286+
user=mock_user,
287+
metadata={
288+
"session_id": "session-999",
289+
"resource_class_id": 3,
290+
"resource_pool_id": "pool-xyz",
291+
"resource_class_name": "another-pool.another-class",
292+
"cpu": 1000,
293+
"memory": 2048,
294+
"gpu": 0,
295+
},
296+
)
297+
298+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
299+
resume_event = [e for e in events if e.event == "user_requested_session_resume"][0]
300+
301+
assert resume_event.metadata_["session_id"] == "session-999"
302+
assert resume_event.metadata_["resource_class_id"] == 3
303+
assert resume_event.metadata_["resource_pool_id"] == "pool-xyz"
304+
assert resume_event.metadata_["resource_class_name"] == "another-pool.another-class"
305+
assert resume_event.metadata_["cpu"] == 1000
306+
assert resume_event.metadata_["memory"] == 2048
307+
assert resume_event.metadata_["gpu"] == 0
308+
assert "storage" not in resume_event.metadata_
309+
310+
# Verify the event count
311+
all_events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
312+
assert len(all_events) == 3 # session_started, session_resumed, user_requested_session_resume

0 commit comments

Comments
 (0)