Skip to content

Commit 034ae29

Browse files
authored
Add/remove SSH instances via in-place update (#2884)
It's now possible to add/remove instances to/from SSH fleets applying the fleet configuration with adjusted `ssh_config.hosts`. Currently, the old and the new configurations must be equal except for the `identity_file` fields (ignored) and the order of hosts in the `ssh.config` list (but each host must remain the same or be removed). Closes: #2645
1 parent a8ee724 commit 034ae29

File tree

8 files changed

+841
-102
lines changed

8 files changed

+841
-102
lines changed

src/dstack/_internal/cli/services/configurators/fleet.py

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
ServerClientError,
2626
URLNotFoundError,
2727
)
28+
from dstack._internal.core.models.common import ApplyAction
2829
from dstack._internal.core.models.configurations import ApplyConfigurationType
2930
from dstack._internal.core.models.fleets import (
3031
Fleet,
@@ -72,7 +73,104 @@ def apply_configuration(
7273
spec=spec,
7374
)
7475
_print_plan_header(plan)
76+
if plan.action is not None:
77+
self._apply_plan(plan, command_args)
78+
else:
79+
# Old servers don't support spec update
80+
self._apply_plan_on_old_server(plan, command_args)
81+
82+
def _apply_plan(self, plan: FleetPlan, command_args: argparse.Namespace):
83+
delete_fleet_name: Optional[str] = None
84+
action_message = ""
85+
confirm_message = ""
86+
if plan.current_resource is None:
87+
if plan.spec.configuration.name is not None:
88+
action_message += (
89+
f"Fleet [code]{plan.spec.configuration.name}[/] does not exist yet."
90+
)
91+
confirm_message += "Create the fleet?"
92+
else:
93+
action_message += f"Found fleet [code]{plan.spec.configuration.name}[/]."
94+
if plan.action == ApplyAction.CREATE:
95+
delete_fleet_name = plan.current_resource.name
96+
action_message += (
97+
" Configuration changes detected. Cannot update the fleet in-place"
98+
)
99+
confirm_message += "Re-create the fleet?"
100+
elif plan.current_resource.spec == plan.effective_spec:
101+
if command_args.yes and not command_args.force:
102+
# --force is required only with --yes,
103+
# otherwise we may ask for force apply interactively.
104+
console.print(
105+
"No configuration changes detected. Use --force to apply anyway."
106+
)
107+
return
108+
delete_fleet_name = plan.current_resource.name
109+
action_message += " No configuration changes detected."
110+
confirm_message += "Re-create the fleet?"
111+
else:
112+
action_message += " Configuration changes detected."
113+
confirm_message += "Update the fleet in-place?"
114+
115+
console.print(action_message)
116+
if not command_args.yes and not confirm_ask(confirm_message):
117+
console.print("\nExiting...")
118+
return
119+
120+
if delete_fleet_name is not None:
121+
with console.status("Deleting existing fleet..."):
122+
self.api.client.fleets.delete(
123+
project_name=self.api.project, names=[delete_fleet_name]
124+
)
125+
# Fleet deletion is async. Wait for fleet to be deleted.
126+
while True:
127+
try:
128+
self.api.client.fleets.get(
129+
project_name=self.api.project, name=delete_fleet_name
130+
)
131+
except ResourceNotExistsError:
132+
break
133+
else:
134+
time.sleep(1)
135+
136+
try:
137+
with console.status("Applying plan..."):
138+
fleet = self.api.client.fleets.apply_plan(project_name=self.api.project, plan=plan)
139+
except ServerClientError as e:
140+
raise CLIError(e.msg)
141+
if command_args.detach:
142+
console.print("Fleet configuration submitted. Exiting...")
143+
return
144+
try:
145+
with MultiItemStatus(
146+
f"Provisioning [code]{fleet.name}[/]...", console=console
147+
) as live:
148+
while not _finished_provisioning(fleet):
149+
table = get_fleets_table([fleet])
150+
live.update(table)
151+
time.sleep(LIVE_TABLE_PROVISION_INTERVAL_SECS)
152+
fleet = self.api.client.fleets.get(self.api.project, fleet.name)
153+
except KeyboardInterrupt:
154+
if confirm_ask("Delete the fleet before exiting?"):
155+
with console.status("Deleting fleet..."):
156+
self.api.client.fleets.delete(
157+
project_name=self.api.project, names=[fleet.name]
158+
)
159+
else:
160+
console.print("Exiting... Fleet provisioning will continue in the background.")
161+
return
162+
console.print(
163+
get_fleets_table(
164+
[fleet],
165+
verbose=_failed_provisioning(fleet),
166+
format_date=local_time,
167+
)
168+
)
169+
if _failed_provisioning(fleet):
170+
console.print("\n[error]Some instances failed. Check the table above for errors.[/]")
171+
exit(1)
75172

173+
def _apply_plan_on_old_server(self, plan: FleetPlan, command_args: argparse.Namespace):
76174
action_message = ""
77175
confirm_message = ""
78176
if plan.current_resource is None:
@@ -86,7 +184,7 @@ def apply_configuration(
86184
diff = diff_models(
87185
old=plan.current_resource.spec.configuration,
88186
new=plan.spec.configuration,
89-
ignore={
187+
reset={
90188
"ssh_config": {
91189
"ssh_key": True,
92190
"proxy_jump": {"ssh_key"},

src/dstack/_internal/core/models/fleets.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from typing_extensions import Annotated, Literal
99

1010
from dstack._internal.core.models.backends.base import BackendType
11-
from dstack._internal.core.models.common import CoreModel
11+
from dstack._internal.core.models.common import ApplyAction, CoreModel
1212
from dstack._internal.core.models.envs import Env
1313
from dstack._internal.core.models.instances import Instance, InstanceOfferWithAvailability, SSHKey
1414
from dstack._internal.core.models.profiles import (
@@ -324,6 +324,7 @@ class FleetPlan(CoreModel):
324324
offers: List[InstanceOfferWithAvailability]
325325
total_offers: int
326326
max_offer_price: Optional[float] = None
327+
action: Optional[ApplyAction] = None # default value for backward compatibility
327328

328329
def get_effective_spec(self) -> FleetSpec:
329330
if self.effective_spec is not None:

src/dstack/_internal/core/services/diff.py

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any, Optional, TypedDict
1+
from typing import Any, Optional, TypedDict, TypeVar
22

33
from pydantic import BaseModel
44

@@ -15,20 +15,19 @@ class ModelFieldDiff(TypedDict):
1515

1616
# TODO: calculate nested diffs
1717
def diff_models(
18-
old: BaseModel, new: BaseModel, ignore: Optional[IncludeExcludeType] = None
18+
old: BaseModel, new: BaseModel, reset: Optional[IncludeExcludeType] = None
1919
) -> ModelDiff:
2020
"""
2121
Returns a diff of model instances fields.
2222
23-
NOTE: `ignore` is implemented as `BaseModel.parse_obj(BaseModel.dict(exclude=ignore))`,
24-
that is, the "ignored" fields are actually not ignored but reset to the default values
25-
before comparison, meaning that 1) any field in `ignore` must have a default value,
26-
2) the default value must be equal to itself (e.g. `math.nan` != `math.nan`).
23+
The fields specified in the `reset` option are reset to their default values, effectively
24+
excluding them from comparison (assuming that the default value is equal to itself, e.g,
25+
`None == None`, `"task" == "task"`, but `math.nan != math.nan`).
2726
2827
Args:
2928
old: The "old" model instance.
3029
new: The "new" model instance.
31-
ignore: Optional fields to ignore.
30+
reset: Fields to reset to their default values before comparison.
3231
3332
Returns:
3433
A dict of changed fields in the form of
@@ -37,9 +36,9 @@ def diff_models(
3736
if type(old) is not type(new):
3837
raise TypeError("Both instances must be of the same Pydantic model class.")
3938

40-
if ignore is not None:
41-
old = type(old).parse_obj(old.dict(exclude=ignore))
42-
new = type(new).parse_obj(new.dict(exclude=ignore))
39+
if reset is not None:
40+
old = copy_model(old, reset=reset)
41+
new = copy_model(new, reset=reset)
4342

4443
changes: ModelDiff = {}
4544
for field in old.__fields__:
@@ -49,3 +48,24 @@ def diff_models(
4948
changes[field] = {"old": old_value, "new": new_value}
5049

5150
return changes
51+
52+
53+
M = TypeVar("M", bound=BaseModel)
54+
55+
56+
def copy_model(model: M, reset: Optional[IncludeExcludeType] = None) -> M:
57+
"""
58+
Returns a deep copy of the model instance.
59+
60+
Implemented as `BaseModel.parse_obj(BaseModel.dict())`, thus,
61+
unlike `BaseModel.copy(deep=True)`, runs all validations.
62+
63+
The fields specified in the `reset` option are reset to their default values.
64+
65+
Args:
66+
reset: Fields to reset to their default values.
67+
68+
Returns:
69+
A deep copy of the model instance.
70+
"""
71+
return type(model).parse_obj(model.dict(exclude=reset))

0 commit comments

Comments
 (0)