Skip to content

Commit ecc591d

Browse files
authored
Generate unique fleet name for autocreated fleets (#3085)
* Generate unique fleet name for autocreated fleets * Document Lock unique names
1 parent d804ee0 commit ecc591d

File tree

2 files changed

+46
-7
lines changed

2 files changed

+46
-7
lines changed

contributing/LOCKING.md

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ await session.commit()
5050
select ...
5151
```
5252

53-
> SQLite exhibits "snapshot isolation". When a read transaction starts, that reader continues to see an unchanging "snapshot" of the database file as it existed at the moment in time when the read transaction started. Any write transactions that commit while the read transaction is active are still invisible to the read transaction, because the reader is seeing a snapshot of database file from a prior moment in time. Source: https://www.sqlite.org/isolation.html
53+
> SQLite exhibits Snapshot Isolation. When a read transaction starts, that reader continues to see an unchanging "snapshot" of the database file as it existed at the moment in time when the read transaction started. Any write transactions that commit while the read transaction is active are still invisible to the read transaction, because the reader is seeing a snapshot of database file from a prior moment in time. Source: https://www.sqlite.org/isolation.html
5454
5555
Thus, if a new transaction is not started, you won't see changes that concurrent transactions made before you acquired the lock.
5656

@@ -82,3 +82,29 @@ In fact, using `joinedload` and `.with_for_update()` will trigger an error becau
8282
**Always use `.with_for_update(key_share=True)` unless you plan to delete rows or update a primary key column**
8383

8484
If you `SELECT FOR UPDATE` from a table that is referenced in a child table via a foreign key, it can lead to deadlocks if the child table is updated because Postgres will issue a `FOR KEY SHARE` lock on the parent table rows to ensure valid foreign keys. For this reason, you should always do `SELECT FOR NO KEY UPDATE` (.`with_for_update(key_share=True)`) if primary key columns are not modified. `SELECT FOR NO KEY UPDATE` is not blocked by a `FOR KEY SHARE` lock, so no deadlock.
85+
86+
87+
**Lock unique names**
88+
89+
The following pattern can be used to lock a unique name of some resource type:
90+
91+
```python
92+
lock_namespace = f"fleet_names_{project.name}"
93+
if get_db().dialect_name == "sqlite":
94+
# Start new transaction to see committed changes after lock
95+
await session.commit()
96+
elif get_db().dialect_name == "postgresql":
97+
await session.execute(
98+
select(func.pg_advisory_xact_lock(string_to_lock_id(lock_namespace)))
99+
)
100+
101+
lock, _ = get_locker(get_db().dialect_name).get_lockset(lock_namespace)
102+
async with lock:
103+
# ... select taken names, use a unique name
104+
await session.commit()
105+
```
106+
107+
Note that:
108+
109+
* This pattern works assuming that Postgres is using default isolation level Read Committed. By the time a transaction acquires the advisory lock, all other transactions that can take the name have committed, so their changes can be seen and a unique name is taken.
110+
* SQLite needs a commit before selecting taken names due to Snapshot Isolation as noted above.

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from datetime import datetime, timedelta
66
from typing import List, Optional, Tuple
77

8-
from sqlalchemy import and_, not_, or_, select
8+
from sqlalchemy import and_, func, not_, or_, select
99
from sqlalchemy.ext.asyncio import AsyncSession
1010
from sqlalchemy.orm import contains_eager, joinedload, load_only, noload, selectinload
1111

@@ -54,6 +54,7 @@
5454
from dstack._internal.server.services.backends import get_project_backend_by_type_or_error
5555
from dstack._internal.server.services.fleets import (
5656
fleet_model_to_fleet,
57+
generate_fleet_name,
5758
get_fleet_requirements,
5859
get_next_instance_num,
5960
)
@@ -71,7 +72,7 @@
7172
get_job_configured_volumes,
7273
get_job_runtime_data,
7374
)
74-
from dstack._internal.server.services.locking import get_locker
75+
from dstack._internal.server.services.locking import get_locker, string_to_lock_id
7576
from dstack._internal.server.services.logging import fmt
7677
from dstack._internal.server.services.offers import get_offers_by_requirements
7778
from dstack._internal.server.services.requirements.combine import (
@@ -363,7 +364,8 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
363364
job_model.job_provisioning_data = job_provisioning_data.json()
364365
job_model.status = JobStatus.PROVISIONING
365366
if fleet_model is None:
366-
fleet_model = _create_fleet_model_for_job(
367+
fleet_model = await _create_fleet_model_for_job(
368+
session=session,
367369
project=project,
368370
run=run,
369371
)
@@ -752,17 +754,28 @@ def _check_can_create_new_instance_in_fleet(fleet: Fleet) -> bool:
752754
return True
753755

754756

755-
def _create_fleet_model_for_job(
757+
async def _create_fleet_model_for_job(
758+
session: AsyncSession,
756759
project: ProjectModel,
757760
run: Run,
758761
) -> FleetModel:
759762
placement = InstanceGroupPlacement.ANY
760763
if run.run_spec.configuration.type == "task" and run.run_spec.configuration.nodes > 1:
761764
placement = InstanceGroupPlacement.CLUSTER
762765
nodes = _get_nodes_required_num_for_run(run.run_spec)
766+
767+
lock_namespace = f"fleet_names_{project.name}"
768+
# TODO: Lock fleet names on SQLite.
769+
# Needs some refactoring so that the lock is released after commit.
770+
if get_db().dialect_name == "postgresql":
771+
await session.execute(
772+
select(func.pg_advisory_xact_lock(string_to_lock_id(lock_namespace)))
773+
)
774+
fleet_name = await generate_fleet_name(session=session, project=project)
775+
763776
spec = FleetSpec(
764777
configuration=FleetConfiguration(
765-
name=run.run_spec.run_name,
778+
name=fleet_name,
766779
placement=placement,
767780
reservation=run.run_spec.configuration.reservation,
768781
nodes=FleetNodesSpec(
@@ -776,7 +789,7 @@ def _create_fleet_model_for_job(
776789
)
777790
fleet_model = FleetModel(
778791
id=uuid.uuid4(),
779-
name=run.run_spec.run_name,
792+
name=fleet_name,
780793
project=project,
781794
status=FleetStatus.ACTIVE,
782795
spec=spec.json(),

0 commit comments

Comments
 (0)