Skip to content

Commit a8a3326

Browse files
authored
refactor: make resource pool initialisation async (#490)
1 parent 6e473ec commit a8a3326

4 files changed

Lines changed: 17 additions & 14 deletions

File tree

bases/renku_data_services/background_jobs/core.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
from renku_data_services.namespace.models import NamespaceKind
2727

2828

29+
async def generate_user_namespaces(config: SyncConfig) -> None:
30+
"""Generate namespaces for users if there are none."""
31+
await config.group_repo.generate_user_namespaces()
32+
33+
2934
async def sync_user_namespaces(config: SyncConfig) -> None:
3035
"""Lists all user namespaces in the database and adds them to Authzed and the event queue."""
3136
authz = Authz(config.authz_config)

bases/renku_data_services/background_jobs/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from renku_data_services.background_jobs.core import (
1111
bootstrap_user_namespaces,
1212
fix_mismatched_project_namespace_ids,
13+
generate_user_namespaces,
1314
migrate_groups_make_all_public,
1415
migrate_storages_v2_to_data_connectors,
1516
migrate_user_namespaces_make_all_public,
@@ -27,6 +28,7 @@ async def short_period_sync() -> None:
2728

2829
await error_handler(
2930
[
31+
generate_user_namespaces(config),
3032
bootstrap_user_namespaces(config),
3133
config.syncer.events_sync(config.kc_api),
3234
sync_admins_from_keycloak(config.kc_api, Authz(config.authz_config)),

bases/renku_data_services/data_api/main.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def create_app() -> Sanic:
6969
# NOTE: in single process mode where we usually run schemathesis to get coverage the db migrations
7070
# specified below with the main_process_start decorator do not run.
7171
run_migrations_for_app("common")
72-
config.rp_repo.initialize(config.db.conn_url(async_client=False), config.default_resource_pool)
72+
asyncio.run(config.rp_repo.initialize(config.db.conn_url(async_client=False), config.default_resource_pool))
7373
asyncio.run(config.kc_user_repo.initialize(config.kc_api))
7474
asyncio.run(sync_admins_from_keycloak(config.kc_api, config.authz))
7575
if config.sentry.enabled:
@@ -122,10 +122,7 @@ async def setup_sentry(_: Sanic) -> None:
122122
async def do_migrations(_: Sanic) -> None:
123123
logger.info("running migrations")
124124
run_migrations_for_app("common")
125-
config.rp_repo.initialize(config.db.conn_url(async_client=False), config.default_resource_pool)
126-
await config.kc_user_repo.initialize(config.kc_api)
127-
await sync_admins_from_keycloak(config.kc_api, config.authz)
128-
await config.group_repo.generate_user_namespaces()
125+
await config.rp_repo.initialize(config.db.conn_url(async_client=False), config.default_resource_pool)
129126

130127
@app.before_server_start
131128
async def setup_rclone_validator(app: Sanic) -> None:

components/renku_data_services/crc/db.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
from functools import wraps
1313
from typing import Any, Concatenate, Optional, ParamSpec, TypeVar, cast
1414

15-
from sqlalchemy import NullPool, create_engine, delete, select
16-
from sqlalchemy.ext.asyncio import AsyncSession
17-
from sqlalchemy.orm import Session, selectinload, sessionmaker
15+
from sqlalchemy import NullPool, delete, select
16+
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
17+
from sqlalchemy.orm import selectinload
1818
from sqlalchemy.sql import Select, and_, not_, or_
1919
from sqlalchemy.sql.expression import false, true
2020

@@ -147,17 +147,16 @@ async def decorated_function(self: Any, *args: _P.args, **kwargs: _P.kwargs) ->
147147
class ResourcePoolRepository(_Base):
148148
"""The adapter used for accessing resource pools with SQLAlchemy."""
149149

150-
def initialize(self, sync_connection_url: str, rp: models.ResourcePool) -> None:
150+
async def initialize(self, async_connection_url: str, rp: models.ResourcePool) -> None:
151151
"""Add the default resource pool if it does not already exist."""
152-
engine = create_engine(sync_connection_url, poolclass=NullPool)
153-
session_maker = sessionmaker(
152+
engine = create_async_engine(async_connection_url, poolclass=NullPool)
153+
session_maker = async_sessionmaker(
154154
engine,
155-
class_=Session,
156155
expire_on_commit=True,
157156
)
158-
with session_maker() as session, session.begin():
157+
async with session_maker() as session, session.begin():
159158
stmt = select(schemas.ResourcePoolORM.default == true())
160-
res = session.execute(stmt)
159+
res = await session.execute(stmt)
161160
default_rp = res.scalars().first()
162161
if default_rp is None:
163162
orm = schemas.ResourcePoolORM.load(rp)

0 commit comments

Comments
 (0)