Skip to content

Commit 31b2539

Browse files
committed
chore: fix mypy errors and formatting
1 parent 7780d15 commit 31b2539

4 files changed

Lines changed: 512 additions & 9 deletions

File tree

components/renku_data_services/metrics/db.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Repository for the metrics staging table."""
22

3+
import asyncio
34
from collections.abc import AsyncGenerator, Callable
45
from typing import Any
56

@@ -31,9 +32,36 @@ async def get_unprocessed_metrics(self) -> AsyncGenerator[MetricsORM, None]:
3132
async for metrics in result:
3233
yield metrics
3334

35+
async def delete_all_metrics(self) -> None:
36+
"""Delete all metrics from the staging table."""
37+
async with self.session_maker() as session, session.begin():
38+
await session.execute(delete(MetricsORM))
39+
3440
async def delete_processed_metrics(self, metrics_ids: list[ULID]) -> None:
3541
"""Delete metrics events from the staging table."""
3642
if not metrics_ids:
3743
return
3844
async with self.session_maker() as session, session.begin():
3945
await session.execute(delete(MetricsORM).where(MetricsORM.id.in_(metrics_ids)))
46+
47+
async def wait_for_metrics(self, timeout: float = 5.0, poll_interval: float = 0.1) -> bool:
48+
"""Wait for metrics to be processed.
49+
50+
Polls for metrics events and returns when at least one event is found or timeout is reached.
51+
52+
Args:
53+
timeout: Maximum time to wait in seconds
54+
poll_interval: Time between polls in seconds
55+
56+
Returns:
57+
True if metrics were found, False if timeout reached
58+
"""
59+
import time
60+
61+
start_time = time.monotonic()
62+
while time.monotonic() - start_time < timeout:
63+
metrics = [m async for m in self.get_unprocessed_metrics()]
64+
if metrics:
65+
return True
66+
await asyncio.sleep(poll_interval)
67+
return False

components/renku_data_services/notebooks/core_sessions.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,16 +1111,16 @@ async def patch_session(
11111111
)
11121112
rp = await rp_repo.get_resource_pool_from_class(user, body.resource_class_id)
11131113
rc = rp.get_resource_class(body.resource_class_id)
1114-
if not rc:
1115-
raise errors.MissingResourceError(
1116-
message=f"The resource class you requested with ID {body.resource_class_id} does not exist"
1117-
)
11181114
# Resource class is not being changed but we still need to get the resource pool and class for patching
11191115
# in case they changed since the session was created
11201116
else:
11211117
rp = await rp_repo.get_resource_pool_from_class(user, session.resource_class_id())
11221118
rc = rp.get_resource_class(session.resource_class_id())
11231119

1120+
if not rc:
1121+
raise errors.MissingResourceError(
1122+
message=f"The resource class you requested with ID {body.resource_class_id} does not exist"
1123+
)
11241124
# If the session is being hibernated we do not need to patch anything else that is
11251125
# not specifically called for in the request body, we can refresh things when the user resumes.
11261126
if is_getting_hibernated:
@@ -1161,7 +1161,7 @@ async def patch_session(
11611161
"resource_pool_id": str(rp.id) or "",
11621162
"resource_class_name": f"{rp.name}.{rc.name}",
11631163
"session_id": session_id,
1164-
}
1164+
},
11651165
)
11661166

11671167
patch.spec.culling = get_culling_patch(user, rp, nb_config, body.lastInteraction)

test/bases/renku_data_services/data_api/test_metrics.py

Lines changed: 262 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import re
2+
import subprocess
23
from collections.abc import AsyncGenerator
34
from typing import cast
45
from unittest.mock import AsyncMock, MagicMock
@@ -11,21 +12,43 @@
1112
from renku_data_services.metrics.core import StagingMetricsService
1213

1314

15+
def _has_docker() -> bool:
16+
"""Check if docker is available."""
17+
try:
18+
result = subprocess.run(["docker", "info"], capture_output=True, timeout=5)
19+
return result.returncode == 0
20+
except (FileNotFoundError, subprocess.TimeoutExpired):
21+
return False
22+
23+
1424
@pytest_asyncio.fixture
1525
async def sanic_metrics_client(monkeypatch, app_manager, sanic_client) -> AsyncGenerator[SanicASGITestClient, None]:
1626
monkeypatch.setenv("POSTHOG_ENABLED", "true")
1727

18-
# NOTE: Replace the `project_created` and `session_launcher_created` methods with actual implementations to store
19-
# metrics in the database.
28+
# NOTE: Replace the metrics methods with actual implementations to store metrics in the database.
2029
metrics = StagingMetricsService(enabled=True, metrics_repo=app_manager.metrics_repo)
2130
metrics_mock = cast(MagicMock, app_manager.metrics)
2231
metrics_mock.configure_mock(
23-
project_created=metrics.project_created, session_launcher_created=metrics.session_launcher_created
32+
project_created=metrics.project_created,
33+
session_launcher_created=metrics.session_launcher_created,
34+
session_started=metrics.session_started,
35+
session_resumed=metrics.session_resumed,
36+
session_stopped=metrics.session_stopped,
37+
session_hibernated=metrics.session_hibernated,
38+
user_requested_session_resume=metrics.user_requested_session_resume,
2439
)
2540

2641
yield sanic_client
2742

28-
metrics_mock.configure_mock(project_created=AsyncMock(), session_launcher_created=AsyncMock())
43+
metrics_mock.configure_mock(
44+
project_created=AsyncMock(),
45+
session_launcher_created=AsyncMock(),
46+
session_started=AsyncMock(),
47+
session_resumed=AsyncMock(),
48+
session_stopped=AsyncMock(),
49+
session_hibernated=AsyncMock(),
50+
user_requested_session_resume=AsyncMock(),
51+
)
2952

3053

3154
@pytest.mark.asyncio
@@ -47,3 +70,238 @@ async def test_metrics_are_stored(sanic_metrics_client, app_manager, create_proj
4770
assert re.match(r"^[0-7][0-9A-HJKMNP-TV-Z]{25}$", str(session_launcher_created.id))
4871
assert session_launcher_created.event == "session_launcher_created"
4972
assert session_launcher_created.metadata_ == {"environment_image_source": "image", "environment_kind": "CUSTOM"}
73+
74+
75+
@pytest.mark.asyncio
76+
@pytest.mark.skipif(not _has_docker(), reason="docker is not available - kind cannot create clusters")
77+
async def test_session_metrics_are_stored(
78+
sanic_metrics_client, app_manager, create_project, create_session_launcher, create_resource_pool, user_headers
79+
) -> None:
80+
"""Test that session lifecycle metrics with metadata are stored correctly.
81+
82+
Note: This test requires kind cluster to be available.
83+
"""
84+
85+
project = await create_project(name="Project", sanic_client=sanic_metrics_client)
86+
# Create a resource pool with a resource class to use in the session launcher
87+
resource_pool = await create_resource_pool(admin=True)
88+
resource_class_id = resource_pool["classes"][0]["id"]
89+
90+
# Create a session launcher with a resource_class_id
91+
session_launcher = await create_session_launcher(
92+
"Launcher 1", project_id=project["id"], resource_class_id=resource_class_id
93+
)
94+
95+
# Start a session to trigger session_started metric
96+
_, res = await sanic_metrics_client.post(
97+
"/api/data/sessions",
98+
headers=user_headers,
99+
json={
100+
"launcher_id": session_launcher["id"],
101+
"name": "Test Session",
102+
"resource_class_id": resource_class_id,
103+
},
104+
)
105+
assert res.status_code == 201
106+
session_data = res.json
107+
session_id = session_data["id"]
108+
109+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
110+
events.sort(key=lambda e: e.timestamp)
111+
112+
# Find session_started event
113+
session_started_events = [e for e in events if e.event == "session_started"]
114+
assert len(session_started_events) >= 1
115+
session_started_event = session_started_events[0]
116+
117+
# Verify session_started has required metadata fields
118+
metadata = session_started_event.metadata_
119+
assert metadata["session_id"] == session_id
120+
assert "resource_class_id" in metadata
121+
assert "resource_pool_id" in metadata
122+
assert "resource_class_name" in metadata
123+
assert "cpu" in metadata
124+
assert "memory" in metadata
125+
assert "gpu" in metadata
126+
assert "storage" in metadata
127+
128+
# Simulate session resumed event
129+
metrics = StagingMetricsService(enabled=True, metrics_repo=app_manager.metrics_repo)
130+
await metrics.session_resumed(
131+
user=type("APIUser", (), {"id": user_headers["X-API-Token"]})(),
132+
metadata={
133+
"session_id": session_id,
134+
"resource_class_id": "1",
135+
"resource_pool_id": "pool-1",
136+
"resource_class_name": "test-pool.test-class",
137+
"cpu": 2000,
138+
"memory": 4096,
139+
"gpu": 0,
140+
"storage": 10000,
141+
},
142+
)
143+
144+
# Reload events
145+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
146+
events.sort(key=lambda e: e.timestamp)
147+
148+
# Find session_resumed event
149+
session_resumed_events = [e for e in events if e.event == "session_resumed"]
150+
assert len(session_resumed_events) >= 1
151+
session_resumed_event = session_resumed_events[-1]
152+
153+
# Verify session_resumed has required metadata fields
154+
metadata = session_resumed_event.metadata_
155+
assert metadata["session_id"] == session_id
156+
assert "resource_class_id" in metadata
157+
assert "resource_pool_id" in metadata
158+
assert "resource_class_name" in metadata
159+
assert "cpu" in metadata
160+
assert "memory" in metadata
161+
assert "gpu" in metadata
162+
assert "storage" in metadata
163+
164+
# Simulate user_requested_session_resume event
165+
await metrics.user_requested_session_resume(
166+
user=type("APIUser", (), {"id": user_headers["X-API-Token"]})(),
167+
metadata={
168+
"session_id": session_id,
169+
"resource_class_id": "1",
170+
"resource_pool_id": "pool-1",
171+
"resource_class_name": "test-pool.test-class",
172+
"cpu": 2000,
173+
"memory": 4096,
174+
"gpu": 0,
175+
},
176+
)
177+
178+
# Reload events
179+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
180+
events.sort(key=lambda e: e.timestamp)
181+
182+
# Find user_requested_session_resume event
183+
resume_request_events = [e for e in events if e.event == "user_requested_session_resume"]
184+
assert len(resume_request_events) >= 1
185+
resume_request_event = resume_request_events[-1]
186+
187+
# Verify user_requested_session_resume has required metadata fields
188+
metadata = resume_request_event.metadata_
189+
assert metadata["session_id"] == session_id
190+
assert "resource_class_id" in metadata
191+
assert "resource_pool_id" in metadata
192+
assert "resource_class_name" in metadata
193+
assert "cpu" in metadata
194+
assert "memory" in metadata
195+
assert "gpu" in metadata
196+
assert "storage" not in metadata # This event doesn't include storage
197+
198+
# Also verify session_hibernated metric
199+
await metrics.session_hibernated(
200+
user=type("APIUser", (), {"id": user_headers["X-API-Token"]})(),
201+
metadata={"session_id": session_id},
202+
)
203+
204+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
205+
events.sort(key=lambda e: e.timestamp)
206+
207+
hibernated_events = [e for e in events if e.event == "session_hibernated"]
208+
assert len(hibernated_events) >= 1
209+
hibernated_event = hibernated_events[-1]
210+
assert hibernated_event.metadata_["session_id"] == session_id
211+
212+
213+
@pytest.mark.asyncio
214+
async def test_session_metrics_metadata_structure(app_manager) -> None:
215+
"""Test that session metrics store metadata with correct structure."""
216+
# Clear existing metrics before running the test
217+
await app_manager.metrics_repo.delete_all_metrics()
218+
219+
metrics = StagingMetricsService(enabled=True, metrics_repo=app_manager.metrics_repo)
220+
221+
# Create mock user
222+
mock_user = type("APIUser", (), {"id": "test-user-123", "is_authenticated": True})()
223+
224+
# Test session_started metadata
225+
await metrics.session_started(
226+
user=mock_user,
227+
metadata={
228+
"session_id": "session-456",
229+
"resource_class_id": 5,
230+
"resource_pool_id": "pool-abc",
231+
"resource_class_name": "test-pool.test-class",
232+
"cpu": 2000,
233+
"memory": 4096,
234+
"gpu": 1,
235+
"storage": 10000,
236+
},
237+
)
238+
239+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
240+
session_started_event = [e for e in events if e.event == "session_started"][0]
241+
242+
# Verify all metadata fields are preserved
243+
assert session_started_event.metadata_["session_id"] == "session-456"
244+
assert session_started_event.metadata_["resource_class_id"] == 5
245+
assert session_started_event.metadata_["resource_pool_id"] == "pool-abc"
246+
assert session_started_event.metadata_["resource_class_name"] == "test-pool.test-class"
247+
assert session_started_event.metadata_["cpu"] == 2000
248+
assert session_started_event.metadata_["memory"] == 4096
249+
assert session_started_event.metadata_["gpu"] == 1
250+
assert session_started_event.metadata_["storage"] == 10000
251+
252+
# Test session_resumed metadata
253+
await metrics.session_resumed(
254+
user=mock_user,
255+
metadata={
256+
"session_id": "session-789",
257+
"resource_class_id": 10,
258+
"resource_pool_id": "pool-def",
259+
"resource_class_name": "different-pool.different-class",
260+
"cpu": 4000,
261+
"memory": 8192,
262+
"gpu": 2,
263+
"storage": 20000,
264+
},
265+
)
266+
267+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
268+
session_resumed_event = [e for e in events if e.event == "session_resumed"][0]
269+
270+
assert session_resumed_event.metadata_["session_id"] == "session-789"
271+
assert session_resumed_event.metadata_["resource_class_id"] == 10
272+
assert session_resumed_event.metadata_["resource_pool_id"] == "pool-def"
273+
assert session_resumed_event.metadata_["resource_class_name"] == "different-pool.different-class"
274+
assert session_resumed_event.metadata_["cpu"] == 4000
275+
assert session_resumed_event.metadata_["memory"] == 8192
276+
assert session_resumed_event.metadata_["gpu"] == 2
277+
assert session_resumed_event.metadata_["storage"] == 20000
278+
279+
# Test user_requested_session_resume metadata
280+
await metrics.user_requested_session_resume(
281+
user=mock_user,
282+
metadata={
283+
"session_id": "session-999",
284+
"resource_class_id": 3,
285+
"resource_pool_id": "pool-xyz",
286+
"resource_class_name": "another-pool.another-class",
287+
"cpu": 1000,
288+
"memory": 2048,
289+
"gpu": 0,
290+
},
291+
)
292+
293+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
294+
resume_event = [e for e in events if e.event == "user_requested_session_resume"][0]
295+
296+
assert resume_event.metadata_["session_id"] == "session-999"
297+
assert resume_event.metadata_["resource_class_id"] == 3
298+
assert resume_event.metadata_["resource_pool_id"] == "pool-xyz"
299+
assert resume_event.metadata_["resource_class_name"] == "another-pool.another-class"
300+
assert resume_event.metadata_["cpu"] == 1000
301+
assert resume_event.metadata_["memory"] == 2048
302+
assert resume_event.metadata_["gpu"] == 0
303+
assert "storage" not in resume_event.metadata_
304+
305+
# Verify the event count
306+
all_events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
307+
assert len(all_events) == 3 # session_started, session_resumed, user_requested_session_resume

0 commit comments

Comments
 (0)