88from renku_data_services .base_models .core import APIUser
99from renku_data_services .data_connectors .db import DataConnectorRepository
1010from renku_data_services .data_connectors .models import DataConnector , GlobalDataConnector
11+ from renku_data_services .errors .errors import ForbiddenError
1112from renku_data_services .message_queue .db import ReprovisioningRepository
1213from renku_data_services .message_queue .models import Reprovisioning
1314from renku_data_services .namespace .db import GroupRepository
@@ -43,10 +44,10 @@ def __init__(
4344 self ._project_repo = project_repo
4445 self ._data_connector_repo = data_connector_repo
4546
46- async def run_reprovision (self , requested_by : APIUser ) -> int :
47+ async def run_reprovision (self , admin : APIUser ) -> int :
4748 """Start a reprovisioning if not already running."""
4849 reprovision = await self .acquire_reprovision ()
49- return await self .init_reprovision (requested_by , reprovision )
50+ return await self .init_reprovision (admin , reprovision )
5051
5152 async def acquire_reprovision (self ) -> Reprovisioning :
5253 """Acquire a reprovisioning slot. Throws if already taken."""
@@ -74,7 +75,7 @@ async def _get_all_data_connectors(
7475 for dc in result [0 ]:
7576 yield dc
7677
77- async def init_reprovision (self , requested_by : APIUser , reprovisioning : Reprovisioning ) -> int :
78+ async def init_reprovision (self , admin : APIUser , reprovisioning : Reprovisioning ) -> int :
7879 """Initiates reprovisioning by inserting documents into the staging table.
7980
8081 Deletes all renku entities in the solr core. Then it goes
@@ -84,6 +85,9 @@ async def init_reprovision(self, requested_by: APIUser, reprovisioning: Reprovis
8485 solr with these entries.
8586 """
8687
88+ if not admin .is_admin :
89+ raise ForbiddenError (message = "Only Renku administrators are allowed to start search reprovisioning." )
90+
8791 def log_counter (c : int ) -> None :
8892 if c % 50 == 0 :
8993 logger .info (f"Inserted { c } . entities into staging table..." )
@@ -96,20 +100,21 @@ def log_counter(c: int) -> None:
96100 async with DefaultSolrClient (self ._solr_config ) as client :
97101 await client .delete ("_type:*" )
98102
99- all_users = self ._user_repo .get_all_users (requested_by = requested_by )
103+ all_users = self ._user_repo .get_all_users (requested_by = admin )
100104 counter = await self .__update_entities (all_users , "user" , started , counter , log_counter )
101- logger .info ("Done adding user entities to search_updates table." )
105+ logger .info (f "Done adding user entities to search_updates table. Record count: { counter } ." )
102106
103- all_groups = self ._group_repo .get_all_groups (requested_by = requested_by )
107+ all_groups = self ._group_repo .get_all_groups (requested_by = admin )
104108 counter = await self .__update_entities (all_groups , "group" , started , counter , log_counter )
105- logger .info ("Done adding group entities to search_updates table." )
109+ logger .info (f "Done adding group entities to search_updates table. Record count: { counter } " )
106110
107- all_projects = self ._project_repo .get_all_projects (requested_by = requested_by )
111+ all_projects = self ._project_repo .get_all_projects (requested_by = admin )
108112 counter = await self .__update_entities (all_projects , "project" , started , counter , log_counter )
109- logger .info ("Done adding project entities to search_updates table." )
113+ logger .info (f "Done adding project entities to search_updates table. Record count: { counter } " )
110114
111- all_dcs = self ._get_all_data_connectors (requested_by , per_page = 20 )
115+ all_dcs = self ._get_all_data_connectors (admin , per_page = 20 )
112116 counter = await self .__update_entities (all_dcs , "data connector" , started , counter , log_counter )
117+ logger .info (f"Done adding dataconnector entities to search_updates table. Record count: { counter } " )
113118
114119 logger .info (f"Inserted { counter } entities into the staging table." )
115120 except Exception as e :
0 commit comments