Skip to content

Commit cdd7adc

Browse files
committed
chore: fix mypy errors and formatting
1 parent 7780d15 commit cdd7adc

4 files changed

Lines changed: 514 additions & 9 deletions

File tree

components/renku_data_services/metrics/db.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Repository for the metrics staging table."""
22

3+
import asyncio
34
from collections.abc import AsyncGenerator, Callable
45
from typing import Any
56

@@ -31,9 +32,36 @@ async def get_unprocessed_metrics(self) -> AsyncGenerator[MetricsORM, None]:
3132
async for metrics in result:
3233
yield metrics
3334

35+
async def delete_all_metrics(self) -> None:
36+
"""Delete all metrics from the staging table."""
37+
async with self.session_maker() as session, session.begin():
38+
await session.execute(delete(MetricsORM))
39+
3440
async def delete_processed_metrics(self, metrics_ids: list[ULID]) -> None:
3541
"""Delete metrics events from the staging table."""
3642
if not metrics_ids:
3743
return
3844
async with self.session_maker() as session, session.begin():
3945
await session.execute(delete(MetricsORM).where(MetricsORM.id.in_(metrics_ids)))
46+
47+
async def wait_for_metrics(self, timeout: float = 5.0, poll_interval: float = 0.1) -> bool:
48+
"""Wait for metrics to be processed.
49+
50+
Polls for metrics events and returns when at least one event is found or timeout is reached.
51+
52+
Args:
53+
timeout: Maximum time to wait in seconds
54+
poll_interval: Time between polls in seconds
55+
56+
Returns:
57+
True if metrics were found, False if timeout reached
58+
"""
59+
import time
60+
61+
start_time = time.monotonic()
62+
while time.monotonic() - start_time < timeout:
63+
metrics = [m async for m in self.get_unprocessed_metrics()]
64+
if metrics:
65+
return True
66+
await asyncio.sleep(poll_interval)
67+
return False

components/renku_data_services/notebooks/core_sessions.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,16 +1111,16 @@ async def patch_session(
11111111
)
11121112
rp = await rp_repo.get_resource_pool_from_class(user, body.resource_class_id)
11131113
rc = rp.get_resource_class(body.resource_class_id)
1114-
if not rc:
1115-
raise errors.MissingResourceError(
1116-
message=f"The resource class you requested with ID {body.resource_class_id} does not exist"
1117-
)
11181114
# Resource class is not being changed but we still need to get the resource pool and class for patching
11191115
# in case they changed since the session was created
11201116
else:
11211117
rp = await rp_repo.get_resource_pool_from_class(user, session.resource_class_id())
11221118
rc = rp.get_resource_class(session.resource_class_id())
11231119

1120+
if not rc:
1121+
raise errors.MissingResourceError(
1122+
message=f"The resource class you requested with ID {body.resource_class_id} does not exist"
1123+
)
11241124
# If the session is being hibernated we do not need to patch anything else that is
11251125
# not specifically called for in the request body, we can refresh things when the user resumes.
11261126
if is_getting_hibernated:
@@ -1161,7 +1161,7 @@ async def patch_session(
11611161
"resource_pool_id": str(rp.id) or "",
11621162
"resource_class_name": f"{rp.name}.{rc.name}",
11631163
"session_id": session_id,
1164-
}
1164+
},
11651165
)
11661166

11671167
patch.spec.culling = get_culling_patch(user, rp, nb_config, body.lastInteraction)

test/bases/renku_data_services/data_api/test_metrics.py

Lines changed: 264 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import re
2+
import subprocess
23
from collections.abc import AsyncGenerator
34
from typing import cast
45
from unittest.mock import AsyncMock, MagicMock
@@ -11,21 +12,45 @@
1112
from renku_data_services.metrics.core import StagingMetricsService
1213

1314

15+
def _has_docker() -> bool:
16+
"""Check if docker is available."""
17+
try:
18+
result = subprocess.run(["docker", "info"], capture_output=True, timeout=5)
19+
return result.returncode == 0
20+
except (FileNotFoundError, subprocess.TimeoutExpired):
21+
return False
22+
23+
24+
25+
1426
@pytest_asyncio.fixture
1527
async def sanic_metrics_client(monkeypatch, app_manager, sanic_client) -> AsyncGenerator[SanicASGITestClient, None]:
1628
monkeypatch.setenv("POSTHOG_ENABLED", "true")
1729

18-
# NOTE: Replace the `project_created` and `session_launcher_created` methods with actual implementations to store
19-
# metrics in the database.
30+
# NOTE: Replace the metrics methods with actual implementations to store metrics in the database.
2031
metrics = StagingMetricsService(enabled=True, metrics_repo=app_manager.metrics_repo)
2132
metrics_mock = cast(MagicMock, app_manager.metrics)
2233
metrics_mock.configure_mock(
23-
project_created=metrics.project_created, session_launcher_created=metrics.session_launcher_created
34+
project_created=metrics.project_created,
35+
session_launcher_created=metrics.session_launcher_created,
36+
session_started=metrics.session_started,
37+
session_resumed=metrics.session_resumed,
38+
session_stopped=metrics.session_stopped,
39+
session_hibernated=metrics.session_hibernated,
40+
user_requested_session_resume=metrics.user_requested_session_resume,
2441
)
2542

2643
yield sanic_client
2744

28-
metrics_mock.configure_mock(project_created=AsyncMock(), session_launcher_created=AsyncMock())
45+
metrics_mock.configure_mock(
46+
project_created=AsyncMock(),
47+
session_launcher_created=AsyncMock(),
48+
session_started=AsyncMock(),
49+
session_resumed=AsyncMock(),
50+
session_stopped=AsyncMock(),
51+
session_hibernated=AsyncMock(),
52+
user_requested_session_resume=AsyncMock(),
53+
)
2954

3055

3156
@pytest.mark.asyncio
@@ -47,3 +72,238 @@ async def test_metrics_are_stored(sanic_metrics_client, app_manager, create_proj
4772
assert re.match(r"^[0-7][0-9A-HJKMNP-TV-Z]{25}$", str(session_launcher_created.id))
4873
assert session_launcher_created.event == "session_launcher_created"
4974
assert session_launcher_created.metadata_ == {"environment_image_source": "image", "environment_kind": "CUSTOM"}
75+
76+
77+
@pytest.mark.asyncio
78+
@pytest.mark.skipif(not _has_docker(), reason="docker is not available - kind cannot create clusters")
79+
async def test_session_metrics_are_stored(
80+
sanic_metrics_client, app_manager, create_project, create_session_launcher, create_resource_pool, user_headers
81+
) -> None:
82+
"""Test that session lifecycle metrics with metadata are stored correctly.
83+
84+
Note: This test requires kind cluster to be available.
85+
"""
86+
87+
project = await create_project(name="Project", sanic_client=sanic_metrics_client)
88+
# Create a resource pool with a resource class to use in the session launcher
89+
resource_pool = await create_resource_pool(admin=True)
90+
resource_class_id = resource_pool["classes"][0]["id"]
91+
92+
# Create a session launcher with a resource_class_id
93+
session_launcher = await create_session_launcher(
94+
"Launcher 1", project_id=project["id"], resource_class_id=resource_class_id
95+
)
96+
97+
# Start a session to trigger session_started metric
98+
_, res = await sanic_metrics_client.post(
99+
"/api/data/sessions",
100+
headers=user_headers,
101+
json={
102+
"launcher_id": session_launcher["id"],
103+
"name": "Test Session",
104+
"resource_class_id": resource_class_id,
105+
},
106+
)
107+
assert res.status_code == 201
108+
session_data = res.json
109+
session_id = session_data["id"]
110+
111+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
112+
events.sort(key=lambda e: e.timestamp)
113+
114+
# Find session_started event
115+
session_started_events = [e for e in events if e.event == "session_started"]
116+
assert len(session_started_events) >= 1
117+
session_started_event = session_started_events[0]
118+
119+
# Verify session_started has required metadata fields
120+
metadata = session_started_event.metadata_
121+
assert metadata["session_id"] == session_id
122+
assert "resource_class_id" in metadata
123+
assert "resource_pool_id" in metadata
124+
assert "resource_class_name" in metadata
125+
assert "cpu" in metadata
126+
assert "memory" in metadata
127+
assert "gpu" in metadata
128+
assert "storage" in metadata
129+
130+
# Simulate session resumed event
131+
metrics = StagingMetricsService(enabled=True, metrics_repo=app_manager.metrics_repo)
132+
await metrics.session_resumed(
133+
user=type("APIUser", (), {"id": user_headers["X-API-Token"]})(),
134+
metadata={
135+
"session_id": session_id,
136+
"resource_class_id": "1",
137+
"resource_pool_id": "pool-1",
138+
"resource_class_name": "test-pool.test-class",
139+
"cpu": 2000,
140+
"memory": 4096,
141+
"gpu": 0,
142+
"storage": 10000,
143+
},
144+
)
145+
146+
# Reload events
147+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
148+
events.sort(key=lambda e: e.timestamp)
149+
150+
# Find session_resumed event
151+
session_resumed_events = [e for e in events if e.event == "session_resumed"]
152+
assert len(session_resumed_events) >= 1
153+
session_resumed_event = session_resumed_events[-1]
154+
155+
# Verify session_resumed has required metadata fields
156+
metadata = session_resumed_event.metadata_
157+
assert metadata["session_id"] == session_id
158+
assert "resource_class_id" in metadata
159+
assert "resource_pool_id" in metadata
160+
assert "resource_class_name" in metadata
161+
assert "cpu" in metadata
162+
assert "memory" in metadata
163+
assert "gpu" in metadata
164+
assert "storage" in metadata
165+
166+
# Simulate user_requested_session_resume event
167+
await metrics.user_requested_session_resume(
168+
user=type("APIUser", (), {"id": user_headers["X-API-Token"]})(),
169+
metadata={
170+
"session_id": session_id,
171+
"resource_class_id": "1",
172+
"resource_pool_id": "pool-1",
173+
"resource_class_name": "test-pool.test-class",
174+
"cpu": 2000,
175+
"memory": 4096,
176+
"gpu": 0,
177+
},
178+
)
179+
180+
# Reload events
181+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
182+
events.sort(key=lambda e: e.timestamp)
183+
184+
# Find user_requested_session_resume event
185+
resume_request_events = [e for e in events if e.event == "user_requested_session_resume"]
186+
assert len(resume_request_events) >= 1
187+
resume_request_event = resume_request_events[-1]
188+
189+
# Verify user_requested_session_resume has required metadata fields
190+
metadata = resume_request_event.metadata_
191+
assert metadata["session_id"] == session_id
192+
assert "resource_class_id" in metadata
193+
assert "resource_pool_id" in metadata
194+
assert "resource_class_name" in metadata
195+
assert "cpu" in metadata
196+
assert "memory" in metadata
197+
assert "gpu" in metadata
198+
assert "storage" not in metadata # This event doesn't include storage
199+
200+
# Also verify session_hibernated metric
201+
await metrics.session_hibernated(
202+
user=type("APIUser", (), {"id": user_headers["X-API-Token"]})(),
203+
metadata={"session_id": session_id},
204+
)
205+
206+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
207+
events.sort(key=lambda e: e.timestamp)
208+
209+
hibernated_events = [e for e in events if e.event == "session_hibernated"]
210+
assert len(hibernated_events) >= 1
211+
hibernated_event = hibernated_events[-1]
212+
assert hibernated_event.metadata_["session_id"] == session_id
213+
214+
215+
@pytest.mark.asyncio
216+
async def test_session_metrics_metadata_structure(app_manager) -> None:
217+
"""Test that session metrics store metadata with correct structure."""
218+
# Clear existing metrics before running the test
219+
await app_manager.metrics_repo.delete_all_metrics()
220+
221+
metrics = StagingMetricsService(enabled=True, metrics_repo=app_manager.metrics_repo)
222+
223+
# Create mock user
224+
mock_user = type("APIUser", (), {"id": "test-user-123", "is_authenticated": True})()
225+
226+
# Test session_started metadata
227+
await metrics.session_started(
228+
user=mock_user,
229+
metadata={
230+
"session_id": "session-456",
231+
"resource_class_id": 5,
232+
"resource_pool_id": "pool-abc",
233+
"resource_class_name": "test-pool.test-class",
234+
"cpu": 2000,
235+
"memory": 4096,
236+
"gpu": 1,
237+
"storage": 10000,
238+
},
239+
)
240+
241+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
242+
session_started_event = [e for e in events if e.event == "session_started"][0]
243+
244+
# Verify all metadata fields are preserved
245+
assert session_started_event.metadata_["session_id"] == "session-456"
246+
assert session_started_event.metadata_["resource_class_id"] == 5
247+
assert session_started_event.metadata_["resource_pool_id"] == "pool-abc"
248+
assert session_started_event.metadata_["resource_class_name"] == "test-pool.test-class"
249+
assert session_started_event.metadata_["cpu"] == 2000
250+
assert session_started_event.metadata_["memory"] == 4096
251+
assert session_started_event.metadata_["gpu"] == 1
252+
assert session_started_event.metadata_["storage"] == 10000
253+
254+
# Test session_resumed metadata
255+
await metrics.session_resumed(
256+
user=mock_user,
257+
metadata={
258+
"session_id": "session-789",
259+
"resource_class_id": 10,
260+
"resource_pool_id": "pool-def",
261+
"resource_class_name": "different-pool.different-class",
262+
"cpu": 4000,
263+
"memory": 8192,
264+
"gpu": 2,
265+
"storage": 20000,
266+
},
267+
)
268+
269+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
270+
session_resumed_event = [e for e in events if e.event == "session_resumed"][0]
271+
272+
assert session_resumed_event.metadata_["session_id"] == "session-789"
273+
assert session_resumed_event.metadata_["resource_class_id"] == 10
274+
assert session_resumed_event.metadata_["resource_pool_id"] == "pool-def"
275+
assert session_resumed_event.metadata_["resource_class_name"] == "different-pool.different-class"
276+
assert session_resumed_event.metadata_["cpu"] == 4000
277+
assert session_resumed_event.metadata_["memory"] == 8192
278+
assert session_resumed_event.metadata_["gpu"] == 2
279+
assert session_resumed_event.metadata_["storage"] == 20000
280+
281+
# Test user_requested_session_resume metadata
282+
await metrics.user_requested_session_resume(
283+
user=mock_user,
284+
metadata={
285+
"session_id": "session-999",
286+
"resource_class_id": 3,
287+
"resource_pool_id": "pool-xyz",
288+
"resource_class_name": "another-pool.another-class",
289+
"cpu": 1000,
290+
"memory": 2048,
291+
"gpu": 0,
292+
},
293+
)
294+
295+
events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
296+
resume_event = [e for e in events if e.event == "user_requested_session_resume"][0]
297+
298+
assert resume_event.metadata_["session_id"] == "session-999"
299+
assert resume_event.metadata_["resource_class_id"] == 3
300+
assert resume_event.metadata_["resource_pool_id"] == "pool-xyz"
301+
assert resume_event.metadata_["resource_class_name"] == "another-pool.another-class"
302+
assert resume_event.metadata_["cpu"] == 1000
303+
assert resume_event.metadata_["memory"] == 2048
304+
assert resume_event.metadata_["gpu"] == 0
305+
assert "storage" not in resume_event.metadata_
306+
307+
# Verify the event count
308+
all_events = [e async for e in app_manager.metrics_repo.get_unprocessed_metrics()]
309+
assert len(all_events) == 3 # session_started, session_resumed, user_requested_session_resume

0 commit comments

Comments
 (0)