Skip to content

Commit 5a4dfa2

Browse files
authored
Merge branch 'main' into feat-add-new-sessions-apispec
2 parents c89d7fb + d118c3a commit 5a4dfa2

66 files changed

Lines changed: 1393 additions & 944 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/test_publish.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ jobs:
3333
type=ref,event=pr,prefix=cache-pr-,priority=600
3434
type=ref,event=branch,prefix=cache-,priority=500
3535
type=ref,event=tag,prefix=cache-,priority=500
36+
flavor: |
37+
latest=false
3638
- name: Extract Docker image name
3739
id: docker_image
3840
env:

bases/renku_data_services/background_jobs/core.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
SubjectFilter,
1212
WriteRelationshipsRequest,
1313
)
14+
from ulid import ULID
1415

1516
from renku_data_services.authz.authz import Authz, ResourceType, _AuthzConverter, _Relation
1617
from renku_data_services.authz.models import Scope
1718
from renku_data_services.background_jobs.config import SyncConfig
1819
from renku_data_services.base_models.core import InternalServiceAdmin, ServiceAdminId
20+
from renku_data_services.errors import errors
1921
from renku_data_services.message_queue.avro_models.io.renku.events import v2
2022
from renku_data_services.message_queue.converters import EventConverter
2123
from renku_data_services.namespace.models import NamespaceKind
@@ -97,7 +99,22 @@ async def fix_mismatched_project_namespace_ids(config: SyncConfig) -> None:
9799
async for rel in res:
98100
logging.info(f"Checking project namespace - group relation {rel} for correct group ID")
99101
project_id = rel.relationship.resource.object_id
100-
project = await config.project_repo.get_project(api_user, project_id)
102+
try:
103+
project = await config.project_repo.get_project(api_user, project_id)
104+
except errors.MissingResourceError:
105+
logging.info(f"Couldn't find project {project_id}, deleting relation")
106+
authz.client.WriteRelationships(
107+
WriteRelationshipsRequest(
108+
updates=[
109+
RelationshipUpdate(
110+
operation=RelationshipUpdate.OPERATION_DELETE,
111+
relationship=rel.relationship,
112+
),
113+
]
114+
)
115+
)
116+
continue
117+
101118
if project.namespace.kind != NamespaceKind.group:
102119
continue
103120
correct_group_id = project.namespace.underlying_resource_id
@@ -117,7 +134,7 @@ async def fix_mismatched_project_namespace_ids(config: SyncConfig) -> None:
117134
relation=rel.relationship.relation,
118135
subject=SubjectReference(
119136
object=ObjectReference(
120-
object_type=ResourceType.group.value, object_id=correct_group_id
137+
object_type=ResourceType.group.value, object_id=str(correct_group_id)
121138
)
122139
),
123140
),
@@ -169,7 +186,7 @@ async def migrate_groups_make_all_public(config: SyncConfig) -> None:
169186
all_users = SubjectReference(object=_AuthzConverter.all_users())
170187
all_anon_users = SubjectReference(object=_AuthzConverter.anonymous_users())
171188
for group_id in groups_to_process:
172-
group_res = _AuthzConverter.group(group_id)
189+
group_res = _AuthzConverter.group(ULID.from_str(group_id))
173190
all_users_are_viewers = Relationship(
174191
resource=group_res,
175192
relation=_Relation.public_viewer.value,
@@ -228,7 +245,7 @@ async def migrate_user_namespaces_make_all_public(config: SyncConfig) -> None:
228245
all_users = SubjectReference(object=_AuthzConverter.all_users())
229246
all_anon_users = SubjectReference(object=_AuthzConverter.anonymous_users())
230247
for ns_id in namespaces_to_process:
231-
namespace_res = _AuthzConverter.user_namespace(ns_id)
248+
namespace_res = _AuthzConverter.user_namespace(ULID.from_str(ns_id))
232249
all_users_are_viewers = Relationship(
233250
resource=namespace_res,
234251
relation=_Relation.public_viewer.value,

bases/renku_data_services/data_api/app.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
def register_all_handlers(app: Sanic, config: Config) -> Sanic:
2828
"""Register all handlers on the application."""
29-
app.router.register_pattern("ulid", ULID.from_str, r"^[0-9A-HJKMNP-TV-Z]{26}$")
29+
app.router.register_pattern("ulid", ULID.from_str, r"^[0-7][0-9A-HJKMNP-TV-Z]{25}$")
3030
app.router.register_pattern("renku_slug", str, r"^[a-zA-Z0-9][a-zA-Z0-9\-_.]*$")
3131

3232
url_prefix = "/api/data"

bases/renku_data_services/data_api/main.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import sentry_sdk
99
import uvloop
10-
from prometheus_sanic import monitor
1110
from sanic import Sanic
1211
from sanic.log import logger
1312
from sanic.worker.loader import AppLoader
@@ -17,6 +16,7 @@
1716
from renku_data_services.app_config import Config
1817
from renku_data_services.authz.admin_sync import sync_admins_from_keycloak
1918
from renku_data_services.data_api.app import register_all_handlers
19+
from renku_data_services.data_api.prometheus import collect_system_metrics, setup_app_metrics, setup_prometheus
2020
from renku_data_services.errors.errors import (
2121
ForbiddenError,
2222
MissingResourceError,
@@ -31,11 +31,14 @@
3131
import sentry_sdk._types
3232

3333

34-
async def _send_messages() -> None:
34+
async def _send_messages(app: Sanic) -> None:
3535
config = Config.from_env()
3636
while True:
3737
try:
3838
await config.event_repo.send_pending_events()
39+
# we need to collect metrics for this background process separately from the task we add to the
40+
# server processes
41+
await collect_system_metrics(app, "send_events_worker")
3942
await asyncio.sleep(1)
4043
except (asyncio.CancelledError, KeyboardInterrupt) as e:
4144
logger.warning(f"Exiting: {e}")
@@ -45,14 +48,15 @@ async def _send_messages() -> None:
4548
raise
4649

4750

48-
def send_pending_events() -> None:
51+
def send_pending_events(app_name: str) -> None:
4952
"""Send pending messages in case sending in a handler failed."""
50-
_ = Sanic("send_events") # we need a dummy app for logging to work.
53+
app = Sanic(app_name)
54+
setup_app_metrics(app)
5155

5256
logger.info("running events sending loop.")
5357

5458
asyncio.set_event_loop(uvloop.new_event_loop())
55-
asyncio.run(_send_messages())
59+
asyncio.run(_send_messages(app))
5660

5761

5862
def create_app() -> Sanic:
@@ -104,9 +108,7 @@ async def setup_sentry(_: Sanic) -> None:
104108
logger.info(f"REAL_IP_HEADER = {app.config.REAL_IP_HEADER}")
105109

106110
app = register_all_handlers(app, config)
107-
108-
# Setup prometheus
109-
monitor(app, endpoint_type="url", multiprocess_mode="all", is_middleware=True).expose_endpoint()
111+
setup_prometheus(app)
110112

111113
if environ.get("CORS_ALLOW_ALL_ORIGINS", "false").lower() == "true":
112114
from sanic_ext import Extend
@@ -134,7 +136,7 @@ async def setup_rclone_validator(app: Sanic) -> None:
134136
async def ready(app: Sanic) -> None:
135137
"""Application ready event handler."""
136138
logger.info("starting events background job.")
137-
app.manager.manage("SendEvents", send_pending_events, {}, transient=True)
139+
app.manager.manage("SendEvents", send_pending_events, {"app_name": config.app_name}, transient=True)
138140

139141
return app
140142

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
"""Prometheus Metrics."""
2+
3+
import asyncio
4+
import resource
5+
6+
import aiofiles
7+
from prometheus_client import Gauge
8+
from prometheus_sanic import monitor
9+
from prometheus_sanic.constants import BaseMetrics
10+
from prometheus_sanic.metrics import init
11+
from sanic import Sanic
12+
13+
_PAGESIZE = resource.getpagesize()
14+
PROMETHEUS_VIRTUAL_MEMORY = "sanic_process_virtual_memory_bytes"
15+
PROMETHEUS_RESIDENT_MEMORY = "sanic_process_resident_memory_bytes"
16+
PROMETHEUS_METRICS_LIST = [
17+
(
18+
PROMETHEUS_VIRTUAL_MEMORY,
19+
Gauge(PROMETHEUS_VIRTUAL_MEMORY, "Virtual memory size in bytes.", ["worker"]),
20+
),
21+
(
22+
PROMETHEUS_RESIDENT_MEMORY,
23+
Gauge(PROMETHEUS_RESIDENT_MEMORY, "Resident memory size in bytes.", ["worker"]),
24+
),
25+
]
26+
27+
28+
async def collect_system_metrics(app: Sanic, name: str) -> None:
29+
"""Collect prometheus system metrics in a background task.
30+
31+
This is similar to the official prometheus_client implementation, which doesn't support CPU/Mem metrics
32+
in multiprocess mode
33+
"""
34+
try:
35+
async with aiofiles.open("/proc/self/stat", "rb") as stat:
36+
content = await stat.read()
37+
parts = content.split(b")")[-1].split()
38+
app.ctx.metrics[PROMETHEUS_VIRTUAL_MEMORY].labels({name}).set(float(parts[20]))
39+
app.ctx.metrics[PROMETHEUS_RESIDENT_MEMORY].labels({name}).set(float(parts[21]) * _PAGESIZE)
40+
except OSError:
41+
pass
42+
43+
44+
async def collect_system_metrics_task(app: Sanic) -> None:
45+
"""Background task to collect metrics."""
46+
while True:
47+
await collect_system_metrics(app, app.m.name)
48+
await asyncio.sleep(5)
49+
50+
51+
def setup_prometheus(app: Sanic) -> None:
52+
"""Setup prometheus monitoring.
53+
54+
We add custom metrics collection wo sanic workers and to the send_messages background job, since
55+
prometheus does not collect cpy/memory metrics when in multiprocess mode.
56+
"""
57+
app.add_task(collect_system_metrics_task) # type:ignore[arg-type]
58+
monitor(
59+
app,
60+
endpoint_type="url",
61+
multiprocess_mode="all",
62+
is_middleware=True,
63+
metrics_list=PROMETHEUS_METRICS_LIST,
64+
).expose_endpoint()
65+
66+
67+
def setup_app_metrics(app: Sanic) -> None:
68+
"""Setup metrics for a Sanic app.
69+
70+
NOTE: this should only be called for manually created workers (with app.manager.manage(...))
71+
"""
72+
app.ctx.metrics = {}
73+
init(app, metrics_list=PROMETHEUS_METRICS_LIST, metrics=BaseMetrics)

0 commit comments

Comments
 (0)