|
5 | 5 | from datetime import datetime, timedelta |
6 | 6 | from typing import List, Optional, Tuple |
7 | 7 |
|
8 | | -from sqlalchemy import and_, not_, or_, select |
| 8 | +from sqlalchemy import and_, func, not_, or_, select |
9 | 9 | from sqlalchemy.ext.asyncio import AsyncSession |
10 | 10 | from sqlalchemy.orm import contains_eager, joinedload, load_only, noload, selectinload |
11 | 11 |
|
@@ -365,6 +365,10 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): |
365 | 365 | project=project, |
366 | 366 | run=run, |
367 | 367 | ) |
| 368 | + # FIXME: Fleet is not locked which may lead to duplicate instance_num. |
| 369 | + # This is currently hard to fix without locking the fleet for entire provisioning duration. |
| 370 | + # Processing should be done in multiple steps so that |
| 371 | + # InstanceModel is created before provisioning. |
368 | 372 | instance_num = await _get_next_instance_num( |
369 | 373 | session=session, |
370 | 374 | fleet_model=fleet_model, |
@@ -773,25 +777,11 @@ def _create_fleet_model_for_job( |
773 | 777 |
|
774 | 778 |
|
775 | 779 | async def _get_next_instance_num(session: AsyncSession, fleet_model: FleetModel) -> int: |
776 | | - if len(fleet_model.instances) == 0: |
777 | | - # No instances means the fleet is not in the db yet, so don't lock. |
778 | | - return 0 |
779 | | - async with get_locker(get_db().dialect_name).lock_ctx( |
780 | | - FleetModel.__tablename__, [fleet_model.id] |
781 | | - ): |
782 | | - fleet_model = ( |
783 | | - ( |
784 | | - await session.execute( |
785 | | - select(FleetModel) |
786 | | - .where(FleetModel.id == fleet_model.id) |
787 | | - .options(joinedload(FleetModel.instances)) |
788 | | - .execution_options(populate_existing=True) |
789 | | - ) |
790 | | - ) |
791 | | - .unique() |
792 | | - .scalar_one() |
793 | | - ) |
794 | | - return len(fleet_model.instances) |
| 780 | + res = await session.execute( |
| 781 | + select(func.count(InstanceModel.id)).where(InstanceModel.fleet_id == fleet_model.id) |
| 782 | + ) |
| 783 | + instance_count = res.scalar_one() |
| 784 | + return instance_count |
795 | 785 |
|
796 | 786 |
|
797 | 787 | def _create_instance_model_for_job( |
|
0 commit comments