11"""Service-router replica pipeline: detect router groups and ensure sync table rows."""
22
33import uuid
4+ from datetime import datetime
5+ from typing import Optional , TypedDict
46
57from sqlalchemy import select , update
68from sqlalchemy .ext .asyncio import AsyncSession
1113from dstack ._internal .server .models import RunModel , ServiceRouterWorkerSyncModel
1214
1315
16+ class _SyncRowUpdateMap (TypedDict , total = False ):
17+ deleted : bool
18+ last_processed_at : datetime
19+ lock_expires_at : Optional [datetime ]
20+ lock_token : Optional [uuid .UUID ]
21+ lock_owner : Optional [str ]
22+
23+
24+ def _reactivate_sync_row_update_map (* , now : datetime ) -> _SyncRowUpdateMap :
25+ return {
26+ "deleted" : False ,
27+ "last_processed_at" : now ,
28+ "lock_expires_at" : None ,
29+ "lock_token" : None ,
30+ "lock_owner" : None ,
31+ }
32+
33+
1434def run_spec_has_router_replica_group (run_spec : RunSpec ) -> bool :
1535 if run_spec .configuration .type != "service" :
1636 return False
@@ -36,20 +56,13 @@ async def ensure_service_router_worker_sync_row(
3656 now = common_utils .get_current_datetime ()
3757 if sync_row is not None :
3858 if sync_row .deleted :
39- # If the router replica group is reintroduced (e.g. via re-apply), reactivate the
40- # existing sync row so the background pipeline resumes syncing router workers.
41- # Do not import pipeline_tasks.base here: loading that package runs
42- # pipeline_tasks/__init__.py -> jobs_running -> runs and causes a circular import.
59+ # If the router replica group is reintroduced in service configuration (via re-apply),
60+ # reactivate the existing sync row so the background pipeline resumes syncing router workers.
61+ update_map = _reactivate_sync_row_update_map (now = now )
4362 await session .execute (
4463 update (ServiceRouterWorkerSyncModel )
4564 .where (ServiceRouterWorkerSyncModel .id == sync_row .id )
46- .values (
47- deleted = False ,
48- last_processed_at = now ,
49- lock_expires_at = None ,
50- lock_token = None ,
51- lock_owner = None ,
52- )
65+ .values (** update_map )
5366 )
5467 return
5568 session .add (
0 commit comments