Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions bases/renku_data_services/data_tasks/task_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,24 +375,43 @@ async def migrate_user_namespaces_make_all_public(dm: DependencyManager) -> None
await asyncio.sleep(dm.config.short_task_period_s)


# async def events_sync_from_keycloak(dm: DependencyManager) -> None:
# """Sync all users from keycloak."""
# while True:
# try:
# await dm.syncer.events_sync(dm.kc_api)
# except (asyncio.CancelledError, KeyboardInterrupt) as e:
# logger.warning(f"Exiting: {e}")
# await asyncio.sleep(10)


async def admin_events_sync(dm: DependencyManager) -> None:
"""Update the authorization database using the Keycloak admin events API."""
while True:
try:
await dm.syncer.admin_events_sync(dm.kc_api)
except (asyncio.CancelledError, KeyboardInterrupt) as e:
logger.warning(f"Exiting: {e}")
else:
await asyncio.sleep(dm.config.x_short_task_period_s)


async def users_sync(dm: DependencyManager) -> None:
"""Sync all users from keycloak."""
while True:
try:
await dm.syncer.users_sync(dm.kc_api)

except (asyncio.CancelledError, KeyboardInterrupt) as e:
logger.warning(f"Exiting: {e}")
else:
await asyncio.sleep(dm.config.long_task_period_s)


async def sync_admins_from_keycloak(dm: DependencyManager) -> None:
"""Sync all users from keycloak."""
"""Sync users with the admin role from keycloak."""
while True:
try:
await admin_sync.sync_admins_from_keycloak(dm.kc_api, dm.authz)

except (asyncio.CancelledError, KeyboardInterrupt) as e:
logger.warning(f"Exiting: {e}")
else:
Expand Down Expand Up @@ -590,6 +609,8 @@ def all_tasks(dm: DependencyManager) -> TaskDefininions:
"fix_mismatched_project_namespace_ids": lambda: fix_mismatched_project_namespace_ids(dm),
"migrate_groups_make_all_public": lambda: migrate_groups_make_all_public(dm),
"migrate_user_namespaces_make_all_public": lambda: migrate_user_namespaces_make_all_public(dm),
"admin_events_sync": lambda: admin_events_sync(dm),
# "events_sync_from_keycloak": lambda: events_sync_from_keycloak(dm),
"users_sync": lambda: users_sync(dm),
"sync_admins_from_keycloak": lambda: sync_admins_from_keycloak(dm),
"initialize_session_environments": lambda: initialize_session_environments(dm),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Update events sync table

Revision ID: cc3b595b7423
Revises: a8f0e7b3c2d1
Create Date: 2026-06-01 09:16:03.862495

"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "cc3b595b7423"
down_revision = "a8f0e7b3c2d1"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.drop_table("last_keycloak_event_timestamp", schema="users")
op.create_table(
"last_keycloak_event_timestamp_v2",
sa.Column(
"id",
sa.Enum("realm_events", "realm_admin_events", name="keycloakeventsource", create_type=True),
nullable=False,
),
sa.Column("timestamp_utc", sa.DateTime(timezone=True), nullable=False),
sa.PrimaryKeyConstraint("id"),
schema="users",
)


def downgrade() -> None:
op.drop_table("last_keycloak_event_timestamp_v2", schema="users")
op.execute("DROP TYPE keycloakeventsource")
op.create_table(
"last_keycloak_event_timestamp",
sa.Column(
"id",
sa.INTEGER(),
sa.Identity(always=True, start=1, increment=1, minvalue=1, maxvalue=2147483647, cycle=False, cache=1),
autoincrement=True,
nullable=False,
),
sa.Column("timestamp_utc", postgresql.TIMESTAMP(), autoincrement=False, nullable=False),
sa.PrimaryKeyConstraint("id", name="last_keycloak_event_timestamp_pkey"),
schema="users",
)
147 changes: 80 additions & 67 deletions components/renku_data_services/users/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,16 @@
from abc import abstractmethod
from collections.abc import AsyncGenerator, Callable, Mapping
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from typing import Any, Protocol, cast

from cryptography.hazmat.primitives.asymmetric import rsa
from sqlalchemy import delete, func, select
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession

from renku_data_services import base_models
from renku_data_services.app_config import logging
from renku_data_services.authz.authz import Authz, AuthzOperation, ResourceType
from renku_data_services.base_api.auth import APIUser, only_authenticated
from renku_data_services.base_models.core import InternalServiceAdmin, ServiceAdminId
from renku_data_services.base_models.metrics import MetricsService, UserIdentity
from renku_data_services.base_models.nel import Nel
from renku_data_services.errors import errors
Expand All @@ -29,11 +27,10 @@
from renku_data_services.users.kc_api import IKeycloakAPI
from renku_data_services.users.models import (
DeletedUser,
KeycloakAdminEvent,
KeycloakEventSource,
PinnedProjects,
UnsavedUserInfo,
UserInfo,
UserInfoFieldUpdate,
UserInfoUpdate,
UserPatch,
UserPreferences,
Expand Down Expand Up @@ -404,72 +401,88 @@ async def _do_update(raw_kc_user: dict[str, Any]) -> None:
for user in kc_users:
await _do_update(user)

async def events_sync(self, kc_api: IKeycloakAPI) -> None:
"""Use the events from Keycloak to update the users database."""
async def admin_events_sync(self, kc_api: IKeycloakAPI) -> None:
"""Update the authorization database using the Keycloak admin events API."""
logger.info("Starting admin events sync")
async with self.session_maker() as session, session.begin():
res_count = await session.execute(select(func.count()).select_from(UserORM))
count = res_count.scalar() or 0
if count == 0:
await self.users_sync(kc_api)
logger.info("Starting periodic event sync.")
stmt = select(LastKeycloakEventTimestamp)
latest_utc_timestamp_orm = (await session.execute(stmt)).scalar_one_or_none()
stmt = select(LastKeycloakEventTimestamp).where(
LastKeycloakEventTimestamp.id == KeycloakEventSource.realm_admin_events
)
latest_utc_timestamp_orm = await session.scalar(stmt)
previous_sync_latest_utc_timestamp = (
latest_utc_timestamp_orm.timestamp_utc if latest_utc_timestamp_orm is not None else None
latest_utc_timestamp_orm.timestamp_utc if latest_utc_timestamp_orm else None
)
logger.info(f"The previous sync latest event is {previous_sync_latest_utc_timestamp} UTC")
now_utc = datetime.now(tz=UTC)
start_date = now_utc.date() - timedelta(days=1)
logger.info(f"Pulling events with a start date of {start_date} UTC")
user_events = kc_api.get_user_events(start_date=start_date)
update_admin_events = kc_api.get_admin_events(
start_date=start_date, event_types=[KeycloakAdminEvent.CREATE, KeycloakAdminEvent.UPDATE]
)
delete_admin_events = kc_api.get_admin_events(
start_date=start_date, event_types=[KeycloakAdminEvent.DELETE]
)
parsed_updates = UserInfoFieldUpdate.from_json_admin_events(update_admin_events)
parsed_updates.extend(UserInfoFieldUpdate.from_json_user_events(user_events))
parsed_deletions = UserInfoFieldUpdate.from_json_admin_events(delete_admin_events)
parsed_updates = sorted(parsed_updates, key=lambda x: x.timestamp_utc)
parsed_deletions = sorted(parsed_deletions, key=lambda x: x.timestamp_utc)
if previous_sync_latest_utc_timestamp is not None:
# Some events have already been processed - filter out old events we have seen
logger.info(f"Filtering events older than {previous_sync_latest_utc_timestamp}")
parsed_updates = [u for u in parsed_updates if u.timestamp_utc > previous_sync_latest_utc_timestamp]
parsed_deletions = [u for u in parsed_deletions if u.timestamp_utc > previous_sync_latest_utc_timestamp]
latest_update_timestamp = None
latest_delete_timestamp = None
for update in parsed_updates:
logger.info(f"Processing update event {update}")
# TODO: add typing to `update.field_name` for safer updates
await self.update_or_insert_user(
user=UnsavedUserInfo(id=update.user_id, **{update.field_name: update.new_value})
)
latest_update_timestamp = update.timestamp_utc
for deletion in parsed_deletions:
logger.info(f"Processing deletion event {deletion}")
await self.user_repo.remove_user(
requested_by=InternalServiceAdmin(id=ServiceAdminId.migrations), user_id=deletion.user_id
)
latest_delete_timestamp = deletion.timestamp_utc
# Update the latest processed event timestamp
current_sync_latest_utc_timestamp = latest_update_timestamp
if latest_delete_timestamp is not None and (
current_sync_latest_utc_timestamp is None or current_sync_latest_utc_timestamp < latest_delete_timestamp
):
current_sync_latest_utc_timestamp = latest_delete_timestamp
if current_sync_latest_utc_timestamp is not None:
if latest_utc_timestamp_orm is None:
session.add(LastKeycloakEventTimestamp(current_sync_latest_utc_timestamp))
logger.info(
f"Inserted the latest sync event timestamp in the database: {current_sync_latest_utc_timestamp}"
)
else:
latest_utc_timestamp_orm.timestamp_utc = current_sync_latest_utc_timestamp
logger.info(
f"Updated the latest sync event timestamp in the database: {current_sync_latest_utc_timestamp}"
)

pass

# async def events_sync(self, kc_api: IKeycloakAPI) -> None:
# """Use the events from Keycloak to update the users database."""
# async with self.session_maker() as session, session.begin():
# res_count = await session.execute(select(func.count()).select_from(UserORM))
# count = res_count.scalar() or 0
# if count == 0:
# await self.users_sync(kc_api)
# logger.info("Starting periodic event sync.")
# stmt = select(LastKeycloakEventTimestamp)
# latest_utc_timestamp_orm = (await session.execute(stmt)).scalar_one_or_none()
# previous_sync_latest_utc_timestamp = (
# latest_utc_timestamp_orm.timestamp_utc if latest_utc_timestamp_orm is not None else None
# )
# logger.info(f"The previous sync latest event is {previous_sync_latest_utc_timestamp} UTC")
# now_utc = datetime.now(tz=UTC)
# # start_date = now_utc.date() - timedelta(days=1)
# start_date = now_utc.date() - timedelta(minutes=1)
# logger.info(f"Pulling events with a start date of {start_date} UTC")
# user_events = kc_api.get_user_events(start_date=start_date)
# update_admin_events = kc_api.get_admin_events(
# start_date=start_date, event_types=[KeycloakAdminEvent.CREATE, KeycloakAdminEvent.UPDATE]
# )
# # delete_admin_events = kc_api.get_admin_events(
# # start_date=start_date, event_types=[KeycloakAdminEvent.DELETE]
# # )
# parsed_updates = UserInfoFieldUpdate.from_json_admin_events(update_admin_events)
# parsed_updates.extend(UserInfoFieldUpdate.from_json_user_events(user_events))
# # parsed_deletions = UserInfoFieldUpdate.from_json_admin_events(delete_admin_events)
# parsed_updates = sorted(parsed_updates, key=lambda x: x.timestamp_utc)
# # parsed_deletions = sorted(parsed_deletions, key=lambda x: x.timestamp_utc)
# if previous_sync_latest_utc_timestamp is not None:
# # Some events have already been processed - filter out old events we have seen
# logger.info(f"Filtering events older than {previous_sync_latest_utc_timestamp}")
# parsed_updates = [u for u in parsed_updates if u.timestamp_utc > previous_sync_latest_utc_timestamp]
# # parsed_deletions = [u for u in parsed_deletions if u.timestamp_utc > previous_sync_latest_utc_timestamp] # noqa: E501
# latest_update_timestamp = None
# # latest_delete_timestamp = None
# for update in parsed_updates:
# logger.info(f"Processing update event {update}")
# # TODO: add typing to `update.field_name` for safer updates
# await self.update_or_insert_user(
# user=UnsavedUserInfo(id=update.user_id, **{update.field_name: update.new_value})
# )
# latest_update_timestamp = update.timestamp_utc
# # for deletion in parsed_deletions:
# # logger.info(f"Processing deletion event {deletion}")
# # await self.user_repo.remove_user(
# # requested_by=InternalServiceAdmin(id=ServiceAdminId.migrations), user_id=deletion.user_id
# # )
# # latest_delete_timestamp = deletion.timestamp_utc
# # Update the latest processed event timestamp
# current_sync_latest_utc_timestamp = latest_update_timestamp
# # if latest_delete_timestamp is not None and (
# # current_sync_latest_utc_timestamp is None or current_sync_latest_utc_timestamp < latest_delete_timestamp # noqa: E501
# # ):
# # current_sync_latest_utc_timestamp = latest_delete_timestamp
# if current_sync_latest_utc_timestamp is not None:
# if latest_utc_timestamp_orm is None:
# session.add(LastKeycloakEventTimestamp(current_sync_latest_utc_timestamp))
# logger.info(
# f"Inserted the latest sync event timestamp in the database: {current_sync_latest_utc_timestamp}"
# )
# else:
# latest_utc_timestamp_orm.timestamp_utc = current_sync_latest_utc_timestamp
# logger.info(
# f"Updated the latest sync event timestamp in the database: {current_sync_latest_utc_timestamp}"
# )


@dataclass
Expand Down
13 changes: 12 additions & 1 deletion components/renku_data_services/users/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from collections.abc import Iterable
from dataclasses import dataclass
from datetime import UTC, datetime
from enum import Enum
from enum import Enum, StrEnum
from typing import Any, NamedTuple

from pydantic import BaseModel, Field
Expand All @@ -19,6 +19,17 @@
logger = logging.getLogger(__name__)


class KeycloakEventSource(StrEnum):
"""The Keycloak event source; can be either "realm_events" or "realm_admin_events".

- "realm_events" represents events from the `GET /admin/realms/{realm}/events` API endpoint
- "realm_admin_events" represents events from the `GET /admin/realms/{realm}/admin-events` API endpoint
"""

realm_events = "realm_events"
realm_admin_events = "realm_admin_events"


class KeycloakEvent(Enum):
"""The Keycloak user events that result from the user registering or updating their personal information."""

Expand Down
11 changes: 6 additions & 5 deletions components/renku_data_services/users/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from sqlalchemy.orm import DeclarativeBase, Mapped, MappedAsDataclass, mapped_column, relationship

from renku_data_services.base_orm.registry import COMMON_ORM_REGISTRY
from renku_data_services.users.models import PinnedProjects, UserInfo, UserPreferences
from renku_data_services.users.models import KeycloakEventSource, PinnedProjects, UserInfo, UserPreferences

if TYPE_CHECKING:
from renku_data_services.namespace.orm import NamespaceORM
Expand Down Expand Up @@ -72,11 +72,12 @@ class UserMetricsORM(BaseORM):


class LastKeycloakEventTimestamp(BaseORM):
"""The latest event timestamp processed from Keycloak."""
"""The latest event timestamp processed from Keycloak, per event source."""

__tablename__ = "last_keycloak_event_timestamp"
id: Mapped[int] = mapped_column(Integer, Identity(always=True), primary_key=True, init=False)
timestamp_utc: Mapped[datetime] = mapped_column(DateTime(timezone=False), default_factory=datetime.utcnow)
__tablename__ = "last_keycloak_event_timestamp_v2"

id: Mapped[KeycloakEventSource] = mapped_column("id", primary_key=True)
timestamp_utc: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)


class UserPreferencesORM(BaseORM):
Expand Down
Loading