diff --git a/bases/renku_data_services/data_tasks/task_defs.py b/bases/renku_data_services/data_tasks/task_defs.py index 8f080e254..d40e0c6ab 100644 --- a/bases/renku_data_services/data_tasks/task_defs.py +++ b/bases/renku_data_services/data_tasks/task_defs.py @@ -375,12 +375,32 @@ async def migrate_user_namespaces_make_all_public(dm: DependencyManager) -> None await asyncio.sleep(dm.config.short_task_period_s) +# async def events_sync_from_keycloak(dm: DependencyManager) -> None: +# """Sync all users from keycloak.""" +# while True: +# try: +# await dm.syncer.events_sync(dm.kc_api) +# except (asyncio.CancelledError, KeyboardInterrupt) as e: +# logger.warning(f"Exiting: {e}") +# await asyncio.sleep(10) + + +async def admin_events_sync(dm: DependencyManager) -> None: + """Update the authorization database using the Keycloak admin events API.""" + while True: + try: + await dm.syncer.admin_events_sync(dm.kc_api) + except (asyncio.CancelledError, KeyboardInterrupt) as e: + logger.warning(f"Exiting: {e}") + else: + await asyncio.sleep(dm.config.x_short_task_period_s) + + async def users_sync(dm: DependencyManager) -> None: """Sync all users from keycloak.""" while True: try: await dm.syncer.users_sync(dm.kc_api) - except (asyncio.CancelledError, KeyboardInterrupt) as e: logger.warning(f"Exiting: {e}") else: @@ -388,11 +408,10 @@ async def users_sync(dm: DependencyManager) -> None: async def sync_admins_from_keycloak(dm: DependencyManager) -> None: - """Sync all users from keycloak.""" + """Sync users with the admin role from keycloak.""" while True: try: await admin_sync.sync_admins_from_keycloak(dm.kc_api, dm.authz) - except (asyncio.CancelledError, KeyboardInterrupt) as e: logger.warning(f"Exiting: {e}") else: @@ -590,6 +609,8 @@ def all_tasks(dm: DependencyManager) -> TaskDefininions: "fix_mismatched_project_namespace_ids": lambda: fix_mismatched_project_namespace_ids(dm), "migrate_groups_make_all_public": lambda: migrate_groups_make_all_public(dm), "migrate_user_namespaces_make_all_public": lambda: migrate_user_namespaces_make_all_public(dm), + "admin_events_sync": lambda: admin_events_sync(dm), + # "events_sync_from_keycloak": lambda: events_sync_from_keycloak(dm), "users_sync": lambda: users_sync(dm), "sync_admins_from_keycloak": lambda: sync_admins_from_keycloak(dm), "initialize_session_environments": lambda: initialize_session_environments(dm), diff --git a/components/renku_data_services/migrations/versions/cc3b595b7423_update_events_sync_table.py b/components/renku_data_services/migrations/versions/cc3b595b7423_update_events_sync_table.py new file mode 100644 index 000000000..2eecdfd7d --- /dev/null +++ b/components/renku_data_services/migrations/versions/cc3b595b7423_update_events_sync_table.py @@ -0,0 +1,50 @@ +"""Update events sync table + +Revision ID: cc3b595b7423 +Revises: a8f0e7b3c2d1 +Create Date: 2026-06-01 09:16:03.862495 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "cc3b595b7423" +down_revision = "a8f0e7b3c2d1" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.drop_table("last_keycloak_event_timestamp", schema="users") + op.create_table( + "last_keycloak_event_timestamp_v2", + sa.Column( + "id", + sa.Enum("realm_events", "realm_admin_events", name="keycloakeventsource", create_type=True), + nullable=False, + ), + sa.Column("timestamp_utc", sa.DateTime(timezone=True), nullable=False), + sa.PrimaryKeyConstraint("id"), + schema="users", + ) + + +def downgrade() -> None: + op.drop_table("last_keycloak_event_timestamp_v2", schema="users") + op.execute("DROP TYPE keycloakeventsource") + op.create_table( + "last_keycloak_event_timestamp", + sa.Column( + "id", + sa.INTEGER(), + sa.Identity(always=True, start=1, increment=1, minvalue=1, maxvalue=2147483647, cycle=False, cache=1), + autoincrement=True, + nullable=False, + ), + sa.Column("timestamp_utc", postgresql.TIMESTAMP(), autoincrement=False, nullable=False), + sa.PrimaryKeyConstraint("id", name="last_keycloak_event_timestamp_pkey"), + schema="users", + ) diff --git a/components/renku_data_services/users/db.py b/components/renku_data_services/users/db.py index 7441de214..96fc39c71 100644 --- a/components/renku_data_services/users/db.py +++ b/components/renku_data_services/users/db.py @@ -6,18 +6,16 @@ from abc import abstractmethod from collections.abc import AsyncGenerator, Callable, Mapping from dataclasses import dataclass, field -from datetime import UTC, datetime, timedelta from typing import Any, Protocol, cast from cryptography.hazmat.primitives.asymmetric import rsa -from sqlalchemy import delete, func, select +from sqlalchemy import delete, select from sqlalchemy.ext.asyncio import AsyncSession from renku_data_services import base_models from renku_data_services.app_config import logging from renku_data_services.authz.authz import Authz, AuthzOperation, ResourceType from renku_data_services.base_api.auth import APIUser, only_authenticated -from renku_data_services.base_models.core import InternalServiceAdmin, ServiceAdminId from renku_data_services.base_models.metrics import MetricsService, UserIdentity from renku_data_services.base_models.nel import Nel from renku_data_services.errors import errors @@ -29,11 +27,10 @@ from renku_data_services.users.kc_api import IKeycloakAPI from renku_data_services.users.models import ( DeletedUser, - KeycloakAdminEvent, + KeycloakEventSource, PinnedProjects, UnsavedUserInfo, UserInfo, - UserInfoFieldUpdate, UserInfoUpdate, UserPatch, UserPreferences, @@ -404,72 +401,88 @@ async def _do_update(raw_kc_user: dict[str, Any]) -> None: for user in kc_users: await _do_update(user) - async def events_sync(self, kc_api: IKeycloakAPI) -> None: - """Use the events from Keycloak to update the users database.""" + async def admin_events_sync(self, kc_api: IKeycloakAPI) -> None: + """Update the authorization database using the Keycloak admin events API.""" + logger.info("Starting admin events sync") async with self.session_maker() as session, session.begin(): - res_count = await session.execute(select(func.count()).select_from(UserORM)) - count = res_count.scalar() or 0 - if count == 0: - await self.users_sync(kc_api) - logger.info("Starting periodic event sync.") - stmt = select(LastKeycloakEventTimestamp) - latest_utc_timestamp_orm = (await session.execute(stmt)).scalar_one_or_none() + stmt = select(LastKeycloakEventTimestamp).where( + LastKeycloakEventTimestamp.id == KeycloakEventSource.realm_admin_events + ) + latest_utc_timestamp_orm = await session.scalar(stmt) previous_sync_latest_utc_timestamp = ( - latest_utc_timestamp_orm.timestamp_utc if latest_utc_timestamp_orm is not None else None + latest_utc_timestamp_orm.timestamp_utc if latest_utc_timestamp_orm else None ) logger.info(f"The previous sync latest event is {previous_sync_latest_utc_timestamp} UTC") - now_utc = datetime.now(tz=UTC) - start_date = now_utc.date() - timedelta(days=1) - logger.info(f"Pulling events with a start date of {start_date} UTC") - user_events = kc_api.get_user_events(start_date=start_date) - update_admin_events = kc_api.get_admin_events( - start_date=start_date, event_types=[KeycloakAdminEvent.CREATE, KeycloakAdminEvent.UPDATE] - ) - delete_admin_events = kc_api.get_admin_events( - start_date=start_date, event_types=[KeycloakAdminEvent.DELETE] - ) - parsed_updates = UserInfoFieldUpdate.from_json_admin_events(update_admin_events) - parsed_updates.extend(UserInfoFieldUpdate.from_json_user_events(user_events)) - parsed_deletions = UserInfoFieldUpdate.from_json_admin_events(delete_admin_events) - parsed_updates = sorted(parsed_updates, key=lambda x: x.timestamp_utc) - parsed_deletions = sorted(parsed_deletions, key=lambda x: x.timestamp_utc) - if previous_sync_latest_utc_timestamp is not None: - # Some events have already been processed - filter out old events we have seen - logger.info(f"Filtering events older than {previous_sync_latest_utc_timestamp}") - parsed_updates = [u for u in parsed_updates if u.timestamp_utc > previous_sync_latest_utc_timestamp] - parsed_deletions = [u for u in parsed_deletions if u.timestamp_utc > previous_sync_latest_utc_timestamp] - latest_update_timestamp = None - latest_delete_timestamp = None - for update in parsed_updates: - logger.info(f"Processing update event {update}") - # TODO: add typing to `update.field_name` for safer updates - await self.update_or_insert_user( - user=UnsavedUserInfo(id=update.user_id, **{update.field_name: update.new_value}) - ) - latest_update_timestamp = update.timestamp_utc - for deletion in parsed_deletions: - logger.info(f"Processing deletion event {deletion}") - await self.user_repo.remove_user( - requested_by=InternalServiceAdmin(id=ServiceAdminId.migrations), user_id=deletion.user_id - ) - latest_delete_timestamp = deletion.timestamp_utc - # Update the latest processed event timestamp - current_sync_latest_utc_timestamp = latest_update_timestamp - if latest_delete_timestamp is not None and ( - current_sync_latest_utc_timestamp is None or current_sync_latest_utc_timestamp < latest_delete_timestamp - ): - current_sync_latest_utc_timestamp = latest_delete_timestamp - if current_sync_latest_utc_timestamp is not None: - if latest_utc_timestamp_orm is None: - session.add(LastKeycloakEventTimestamp(current_sync_latest_utc_timestamp)) - logger.info( - f"Inserted the latest sync event timestamp in the database: {current_sync_latest_utc_timestamp}" - ) - else: - latest_utc_timestamp_orm.timestamp_utc = current_sync_latest_utc_timestamp - logger.info( - f"Updated the latest sync event timestamp in the database: {current_sync_latest_utc_timestamp}" - ) + + pass + + # async def events_sync(self, kc_api: IKeycloakAPI) -> None: + # """Use the events from Keycloak to update the users database.""" + # async with self.session_maker() as session, session.begin(): + # res_count = await session.execute(select(func.count()).select_from(UserORM)) + # count = res_count.scalar() or 0 + # if count == 0: + # await self.users_sync(kc_api) + # logger.info("Starting periodic event sync.") + # stmt = select(LastKeycloakEventTimestamp) + # latest_utc_timestamp_orm = (await session.execute(stmt)).scalar_one_or_none() + # previous_sync_latest_utc_timestamp = ( + # latest_utc_timestamp_orm.timestamp_utc if latest_utc_timestamp_orm is not None else None + # ) + # logger.info(f"The previous sync latest event is {previous_sync_latest_utc_timestamp} UTC") + # now_utc = datetime.now(tz=UTC) + # # start_date = now_utc.date() - timedelta(days=1) + # start_date = now_utc.date() - timedelta(minutes=1) + # logger.info(f"Pulling events with a start date of {start_date} UTC") + # user_events = kc_api.get_user_events(start_date=start_date) + # update_admin_events = kc_api.get_admin_events( + # start_date=start_date, event_types=[KeycloakAdminEvent.CREATE, KeycloakAdminEvent.UPDATE] + # ) + # # delete_admin_events = kc_api.get_admin_events( + # # start_date=start_date, event_types=[KeycloakAdminEvent.DELETE] + # # ) + # parsed_updates = UserInfoFieldUpdate.from_json_admin_events(update_admin_events) + # parsed_updates.extend(UserInfoFieldUpdate.from_json_user_events(user_events)) + # # parsed_deletions = UserInfoFieldUpdate.from_json_admin_events(delete_admin_events) + # parsed_updates = sorted(parsed_updates, key=lambda x: x.timestamp_utc) + # # parsed_deletions = sorted(parsed_deletions, key=lambda x: x.timestamp_utc) + # if previous_sync_latest_utc_timestamp is not None: + # # Some events have already been processed - filter out old events we have seen + # logger.info(f"Filtering events older than {previous_sync_latest_utc_timestamp}") + # parsed_updates = [u for u in parsed_updates if u.timestamp_utc > previous_sync_latest_utc_timestamp] + # # parsed_deletions = [u for u in parsed_deletions if u.timestamp_utc > previous_sync_latest_utc_timestamp] # noqa: E501 + # latest_update_timestamp = None + # # latest_delete_timestamp = None + # for update in parsed_updates: + # logger.info(f"Processing update event {update}") + # # TODO: add typing to `update.field_name` for safer updates + # await self.update_or_insert_user( + # user=UnsavedUserInfo(id=update.user_id, **{update.field_name: update.new_value}) + # ) + # latest_update_timestamp = update.timestamp_utc + # # for deletion in parsed_deletions: + # # logger.info(f"Processing deletion event {deletion}") + # # await self.user_repo.remove_user( + # # requested_by=InternalServiceAdmin(id=ServiceAdminId.migrations), user_id=deletion.user_id + # # ) + # # latest_delete_timestamp = deletion.timestamp_utc + # # Update the latest processed event timestamp + # current_sync_latest_utc_timestamp = latest_update_timestamp + # # if latest_delete_timestamp is not None and ( + # # current_sync_latest_utc_timestamp is None or current_sync_latest_utc_timestamp < latest_delete_timestamp # noqa: E501 + # # ): + # # current_sync_latest_utc_timestamp = latest_delete_timestamp + # if current_sync_latest_utc_timestamp is not None: + # if latest_utc_timestamp_orm is None: + # session.add(LastKeycloakEventTimestamp(current_sync_latest_utc_timestamp)) + # logger.info( + # f"Inserted the latest sync event timestamp in the database: {current_sync_latest_utc_timestamp}" + # ) + # else: + # latest_utc_timestamp_orm.timestamp_utc = current_sync_latest_utc_timestamp + # logger.info( + # f"Updated the latest sync event timestamp in the database: {current_sync_latest_utc_timestamp}" + # ) @dataclass diff --git a/components/renku_data_services/users/models.py b/components/renku_data_services/users/models.py index c8cfabaa7..5636d6e91 100644 --- a/components/renku_data_services/users/models.py +++ b/components/renku_data_services/users/models.py @@ -7,7 +7,7 @@ from collections.abc import Iterable from dataclasses import dataclass from datetime import UTC, datetime -from enum import Enum +from enum import Enum, StrEnum from typing import Any, NamedTuple from pydantic import BaseModel, Field @@ -19,6 +19,17 @@ logger = logging.getLogger(__name__) +class KeycloakEventSource(StrEnum): + """The Keycloak event source; can be either "realm_events" or "realm_admin_events". + + - "realm_events" represents events from the `GET /admin/realms/{realm}/events` API endpoint + - "realm_admin_events" represents events from the `GET /admin/realms/{realm}/admin-events` API endpoint + """ + + realm_events = "realm_events" + realm_admin_events = "realm_admin_events" + + class KeycloakEvent(Enum): """The Keycloak user events that result from the user registering or updating their personal information.""" diff --git a/components/renku_data_services/users/orm.py b/components/renku_data_services/users/orm.py index cd4b4587f..d6751ee8e 100644 --- a/components/renku_data_services/users/orm.py +++ b/components/renku_data_services/users/orm.py @@ -10,7 +10,7 @@ from sqlalchemy.orm import DeclarativeBase, Mapped, MappedAsDataclass, mapped_column, relationship from renku_data_services.base_orm.registry import COMMON_ORM_REGISTRY -from renku_data_services.users.models import PinnedProjects, UserInfo, UserPreferences +from renku_data_services.users.models import KeycloakEventSource, PinnedProjects, UserInfo, UserPreferences if TYPE_CHECKING: from renku_data_services.namespace.orm import NamespaceORM @@ -72,11 +72,12 @@ class UserMetricsORM(BaseORM): class LastKeycloakEventTimestamp(BaseORM): - """The latest event timestamp processed from Keycloak.""" + """The latest event timestamp processed from Keycloak, per event source.""" - __tablename__ = "last_keycloak_event_timestamp" - id: Mapped[int] = mapped_column(Integer, Identity(always=True), primary_key=True, init=False) - timestamp_utc: Mapped[datetime] = mapped_column(DateTime(timezone=False), default_factory=datetime.utcnow) + __tablename__ = "last_keycloak_event_timestamp_v2" + + id: Mapped[KeycloakEventSource] = mapped_column("id", primary_key=True) + timestamp_utc: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) class UserPreferencesORM(BaseORM):