Skip to content

Commit ea79096

Browse files
authored
Rolling deployments for repo updates (#2853)
- Support rolling deployments for services when the run repo is updated: new commits are added, the branch is changed, uncommitted files are updated, etc. Switching from one repo to another is not yet supported. > **Note**: A side effect of this change is that if the run configuration file is stored in the same repo that is used for the run, any changes to the configuration file will also be considered a change to the repo (`repo_code_hash`), and hence require a rolling deployment for services or a full restart for tasks and dev environments. Previously, this was not the case, since changes to `repo_code_hash` were ignored for existing jobs. This new behavior makes it more difficult to avoid redeployment when changing some configuration properties, namely `priority`, `inactivity_duration`, `replicas`, and `scaling`. However, we consider this acceptable, since changing these properties in-place is an advanced use case and can still be achieved by moving the configuration file out of the repo. - Improve run plan output in `dstack apply` when attempting an in-place update: - Show not only the list of changed configuration properties, but also other changes from the run spec, such as repo-related changes. ```shell $ dstack apply -f test-service.dstack.yml Active run test-service already exists. Detected changes that can be updated in-place: - Repo state (branch, commit, or other) - Repo files - Configuration properties: - env Update the run? [y/n]: ``` - Show the list of changes not only when in-place update is possible, but also when it is not. This will help users understand why a run cannot be updated in-place. ```shell $ dstack apply -f test-service.dstack.yml Active run test-service already exists. Detected changes that cannot be updated in-place: - Repo files - Configuration properties: - gateway Stop and override the run? [y/n]: ``` Currently, all detected changes are listed together. An area for future improvement is highlighting the changes that prevent an in-place update.
1 parent e008245 commit ea79096

File tree

17 files changed

+367
-41
lines changed

17 files changed

+367
-41
lines changed

runner/internal/executor/executor.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,14 @@ func (ex *RunExecutor) SetRunnerState(state string) {
276276
ex.state = state
277277
}
278278

279+
func (ex *RunExecutor) getRepoData() schemas.RepoData {
280+
if ex.jobSpec.RepoData == nil {
281+
// jobs submitted before 0.19.17 do not have jobSpec.RepoData
282+
return ex.run.RunSpec.RepoData
283+
}
284+
return *ex.jobSpec.RepoData
285+
}
286+
279287
func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error {
280288
node_rank := ex.jobSpec.JobNum
281289
nodes_num := ex.jobSpec.JobsPerReplica

runner/internal/executor/executor_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func TestExecutor_RemoteRepo(t *testing.T) {
132132

133133
var b bytes.Buffer
134134
ex := makeTestExecutor(t)
135-
ex.run.RunSpec.RepoData = schemas.RepoData{
135+
ex.jobSpec.RepoData = &schemas.RepoData{
136136
RepoType: "remote",
137137
RepoBranch: "main",
138138
RepoHash: "2b83592e506ed6fe8e49f4eaa97c3866bc9402b1",
@@ -148,7 +148,7 @@ func TestExecutor_RemoteRepo(t *testing.T) {
148148

149149
err = ex.execJob(context.TODO(), io.Writer(&b))
150150
assert.NoError(t, err)
151-
expected := fmt.Sprintf("%s\r\n%s\r\n%s\r\n", ex.run.RunSpec.RepoData.RepoHash, ex.run.RunSpec.RepoData.RepoConfigName, ex.run.RunSpec.RepoData.RepoConfigEmail)
151+
expected := fmt.Sprintf("%s\r\n%s\r\n%s\r\n", ex.getRepoData().RepoHash, ex.getRepoData().RepoConfigName, ex.getRepoData().RepoConfigEmail)
152152
assert.Equal(t, expected, b.String())
153153
}
154154

@@ -178,6 +178,7 @@ func makeTestExecutor(t *testing.T) *RunExecutor {
178178
Env: make(map[string]string),
179179
MaxDuration: 0, // no timeout
180180
WorkingDir: &workingDir,
181+
RepoData: &schemas.RepoData{RepoType: "local"},
181182
},
182183
Secrets: make(map[string]string),
183184
RepoCredentials: &schemas.RepoCredentials{

runner/internal/executor/repo.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (ex *RunExecutor) setupRepo(ctx context.Context) error {
4040
err = gerrors.Wrap(err_)
4141
}
4242
}()
43-
switch ex.run.RunSpec.RepoData.RepoType {
43+
switch ex.getRepoData().RepoType {
4444
case "remote":
4545
log.Trace(ctx, "Fetching git repository")
4646
if err := ex.prepareGit(ctx); err != nil {
@@ -52,7 +52,7 @@ func (ex *RunExecutor) setupRepo(ctx context.Context) error {
5252
return gerrors.Wrap(err)
5353
}
5454
default:
55-
return gerrors.Newf("unknown RepoType: %s", ex.run.RunSpec.RepoData.RepoType)
55+
return gerrors.Newf("unknown RepoType: %s", ex.getRepoData().RepoType)
5656
}
5757
return err
5858
}
@@ -61,8 +61,8 @@ func (ex *RunExecutor) prepareGit(ctx context.Context) error {
6161
repoManager := repo.NewManager(
6262
ctx,
6363
ex.repoCredentials.CloneURL,
64-
ex.run.RunSpec.RepoData.RepoBranch,
65-
ex.run.RunSpec.RepoData.RepoHash,
64+
ex.getRepoData().RepoBranch,
65+
ex.getRepoData().RepoHash,
6666
ex.jobSpec.SingleBranch,
6767
).WithLocalPath(ex.workingDir)
6868
if ex.repoCredentials != nil {
@@ -92,7 +92,7 @@ func (ex *RunExecutor) prepareGit(ctx context.Context) error {
9292
if err := repoManager.Checkout(); err != nil {
9393
return gerrors.Wrap(err)
9494
}
95-
if err := repoManager.SetConfig(ex.run.RunSpec.RepoData.RepoConfigName, ex.run.RunSpec.RepoData.RepoConfigEmail); err != nil {
95+
if err := repoManager.SetConfig(ex.getRepoData().RepoConfigName, ex.getRepoData().RepoConfigEmail); err != nil {
9696
return gerrors.Wrap(err)
9797
}
9898

runner/internal/schemas/schemas.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ type JobSpec struct {
6868
MaxDuration int `json:"max_duration"`
6969
SSHKey *SSHKey `json:"ssh_key"`
7070
WorkingDir *string `json:"working_dir"`
71+
// `RepoData` is optional for compatibility with jobs submitted before 0.19.17.
72+
// Use `RunExecutor.getRepoData()` to get non-nil `RepoData`.
73+
// TODO: make required when supporting jobs submitted before 0.19.17 is no longer relevant.
74+
RepoData *RepoData `json:"repo_data"`
7175
}
7276

7377
type ClusterInfo struct {

src/dstack/_internal/cli/services/configurators/run.py

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,13 @@
4141
)
4242
from dstack._internal.core.models.repos.base import Repo
4343
from dstack._internal.core.models.resources import CPUSpec
44-
from dstack._internal.core.models.runs import JobStatus, JobSubmission, RunStatus
44+
from dstack._internal.core.models.runs import JobStatus, JobSubmission, RunSpec, RunStatus
4545
from dstack._internal.core.services.configs import ConfigManager
4646
from dstack._internal.core.services.diff import diff_models
4747
from dstack._internal.utils.common import local_time
4848
from dstack._internal.utils.interpolator import InterpolatorError, VariablesInterpolator
4949
from dstack._internal.utils.logging import get_logger
50+
from dstack._internal.utils.nested_list import NestedList, NestedListItem
5051
from dstack.api._public.repos import get_ssh_keypair
5152
from dstack.api._public.runs import Run
5253
from dstack.api.utils import load_profile
@@ -102,25 +103,20 @@ def apply_configuration(
102103
confirm_message = f"Submit the run [code]{conf.name}[/]?"
103104
stop_run_name = None
104105
if run_plan.current_resource is not None:
105-
changed_fields = []
106-
if run_plan.action == ApplyAction.UPDATE:
107-
diff = diff_models(
108-
run_plan.get_effective_run_spec().configuration,
109-
run_plan.current_resource.run_spec.configuration,
110-
)
111-
changed_fields = list(diff.keys())
112-
if run_plan.action == ApplyAction.UPDATE and len(changed_fields) > 0:
106+
diff = render_run_spec_diff(
107+
run_plan.get_effective_run_spec(),
108+
run_plan.current_resource.run_spec,
109+
)
110+
if run_plan.action == ApplyAction.UPDATE and diff is not None:
113111
console.print(
114112
f"Active run [code]{conf.name}[/] already exists."
115-
" Detected configuration changes that can be updated in-place:"
116-
f" {changed_fields}"
113+
f" Detected changes that [code]can[/] be updated in-place:\n{diff}"
117114
)
118115
confirm_message = "Update the run?"
119-
elif run_plan.action == ApplyAction.UPDATE and len(changed_fields) == 0:
116+
elif run_plan.action == ApplyAction.UPDATE and diff is None:
120117
stop_run_name = run_plan.current_resource.run_spec.run_name
121118
console.print(
122-
f"Active run [code]{conf.name}[/] already exists."
123-
" Detected no configuration changes."
119+
f"Active run [code]{conf.name}[/] already exists. Detected no changes."
124120
)
125121
if command_args.yes and not command_args.force:
126122
console.print("Use --force to apply anyway.")
@@ -129,7 +125,8 @@ def apply_configuration(
129125
elif not run_plan.current_resource.status.is_finished():
130126
stop_run_name = run_plan.current_resource.run_spec.run_name
131127
console.print(
132-
f"Active run [code]{conf.name}[/] already exists and cannot be updated in-place."
128+
f"Active run [code]{conf.name}[/] already exists."
129+
f" Detected changes that [error]cannot[/] be updated in-place:\n{diff}"
133130
)
134131
confirm_message = "Stop and override the run?"
135132

@@ -611,3 +608,47 @@ def _run_resubmitted(run: Run, current_job_submission: Optional[JobSubmission])
611608
not run.status.is_finished()
612609
and run._run.latest_job_submission.submitted_at > current_job_submission.submitted_at
613610
)
611+
612+
613+
def render_run_spec_diff(old_spec: RunSpec, new_spec: RunSpec) -> Optional[str]:
614+
changed_spec_fields = list(diff_models(old_spec, new_spec))
615+
if not changed_spec_fields:
616+
return None
617+
friendly_spec_field_names = {
618+
"repo_id": "Repo ID",
619+
"repo_code_hash": "Repo files",
620+
"repo_data": "Repo state (branch, commit, or other)",
621+
"ssh_key_pub": "Public SSH key",
622+
}
623+
nested_list = NestedList()
624+
for spec_field in changed_spec_fields:
625+
if spec_field == "merged_profile":
626+
continue
627+
elif spec_field == "configuration":
628+
if type(old_spec.configuration) is not type(new_spec.configuration):
629+
item = NestedListItem("Configuration type")
630+
else:
631+
item = NestedListItem(
632+
"Configuration properties:",
633+
children=[
634+
NestedListItem(field)
635+
for field in diff_models(old_spec.configuration, new_spec.configuration)
636+
],
637+
)
638+
elif spec_field == "profile":
639+
if type(old_spec.profile) is not type(new_spec.profile):
640+
item = NestedListItem("Profile")
641+
else:
642+
item = NestedListItem(
643+
"Profile properties:",
644+
children=[
645+
NestedListItem(field)
646+
for field in diff_models(old_spec.profile, new_spec.profile)
647+
],
648+
)
649+
elif spec_field in friendly_spec_field_names:
650+
item = NestedListItem(friendly_spec_field_names[spec_field])
651+
else:
652+
item = NestedListItem(spec_field.replace("_", " ").capitalize())
653+
nested_list.children.append(item)
654+
return nested_list.render()

src/dstack/_internal/core/compatibility/runs.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from typing import Any, Dict, Optional
22

33
from dstack._internal.core.models.configurations import ServiceConfiguration
4-
from dstack._internal.core.models.runs import ApplyRunPlanInput, JobSubmission, RunSpec
4+
from dstack._internal.core.models.runs import ApplyRunPlanInput, JobSpec, JobSubmission, RunSpec
55
from dstack._internal.server.schemas.runs import GetRunPlanRequest
66

77

@@ -25,7 +25,10 @@ def get_apply_plan_excludes(plan: ApplyRunPlanInput) -> Optional[Dict]:
2525
current_resource_excludes["run_spec"] = get_run_spec_excludes(current_resource.run_spec)
2626
job_submissions_excludes = {}
2727
current_resource_excludes["jobs"] = {
28-
"__all__": {"job_submissions": {"__all__": job_submissions_excludes}}
28+
"__all__": {
29+
"job_spec": get_job_spec_excludes([job.job_spec for job in current_resource.jobs]),
30+
"job_submissions": {"__all__": job_submissions_excludes},
31+
}
2932
}
3033
job_submissions = [js for j in current_resource.jobs for js in j.job_submissions]
3134
if all(map(_should_exclude_job_submission_jpd_cpu_arch, job_submissions)):
@@ -123,6 +126,24 @@ def get_run_spec_excludes(run_spec: RunSpec) -> Optional[Dict]:
123126
return None
124127

125128

129+
def get_job_spec_excludes(job_specs: list[JobSpec]) -> Optional[dict]:
130+
"""
131+
Returns `job_spec` exclude mapping to exclude certain fields from the request.
132+
Use this method to exclude new fields when they are not set to keep
133+
clients backward-compatibility with older servers.
134+
"""
135+
spec_excludes: dict[str, Any] = {}
136+
137+
if all(s.repo_code_hash is None for s in job_specs):
138+
spec_excludes["repo_code_hash"] = True
139+
if all(s.repo_data is None for s in job_specs):
140+
spec_excludes["repo_data"] = True
141+
142+
if spec_excludes:
143+
return spec_excludes
144+
return None
145+
146+
126147
def _should_exclude_job_submission_jpd_cpu_arch(job_submission: JobSubmission) -> bool:
127148
try:
128149
return job_submission.job_provisioning_data.instance_type.resources.cpu_arch is None

src/dstack/_internal/core/models/runs.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,14 @@ class JobSpec(CoreModel):
218218
volumes: Optional[List[MountPoint]] = None
219219
ssh_key: Optional[JobSSHKey] = None
220220
working_dir: Optional[str]
221+
# `repo_data` is optional for client compatibility with pre-0.19.17 servers and for compatibility
222+
# with jobs submitted before 0.19.17. All new jobs are expected to have non-None `repo_data`.
223+
# For --no-repo runs, `repo_data` is `VirtualRunRepoData()`.
224+
repo_data: Annotated[Optional[AnyRunRepoData], Field(discriminator="repo_type")] = None
225+
# `repo_code_hash` can be None because it is not used for the repo or because the job was
226+
# submitted before 0.19.17. See `_get_repo_code_hash` on how to get the correct `repo_code_hash`
227+
# TODO: drop this comment when supporting jobs submitted before 0.19.17 is no longer relevant.
228+
repo_code_hash: Optional[str] = None
221229

222230

223231
class JobProvisioningData(CoreModel):

src/dstack/_internal/server/background/tasks/process_running_jobs.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
251251
session=session,
252252
project=project,
253253
repo=repo_model,
254-
code_hash=run.run_spec.repo_code_hash,
254+
code_hash=_get_repo_code_hash(run, job),
255255
)
256256

257257
success = await common_utils.run_async(
@@ -303,7 +303,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
303303
session=session,
304304
project=project,
305305
repo=repo_model,
306-
code_hash=run.run_spec.repo_code_hash,
306+
code_hash=_get_repo_code_hash(run, job),
307307
)
308308
success = await common_utils.run_async(
309309
_process_pulling_with_shim,
@@ -857,6 +857,19 @@ def _get_cluster_info(
857857
return cluster_info
858858

859859

860+
def _get_repo_code_hash(run: Run, job: Job) -> Optional[str]:
861+
# TODO: drop this function when supporting jobs submitted before 0.19.17 is no longer relevant.
862+
if (
863+
job.job_spec.repo_code_hash is None
864+
and run.run_spec.repo_code_hash is not None
865+
and job.job_submissions[-1].deployment_num == run.deployment_num
866+
):
867+
# The job spec does not have `repo_code_hash`, because it was submitted before 0.19.17.
868+
# Use `repo_code_hash` from the run.
869+
return run.run_spec.repo_code_hash
870+
return job.job_spec.repo_code_hash
871+
872+
860873
async def _get_job_code(
861874
session: AsyncSession, project: ProjectModel, repo: RepoModel, code_hash: Optional[str]
862875
) -> bytes:

src/dstack/_internal/server/schemas/runner.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class SubmitBody(CoreModel):
7878
"max_duration",
7979
"ssh_key",
8080
"working_dir",
81+
"repo_data",
8182
}
8283
),
8384
]

src/dstack/_internal/server/services/jobs/configurators/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ async def _get_job_spec(
149149
working_dir=self._working_dir(),
150150
volumes=self._volumes(job_num),
151151
ssh_key=self._ssh_key(jobs_per_replica),
152+
repo_data=self.run_spec.repo_data,
153+
repo_code_hash=self.run_spec.repo_code_hash,
152154
)
153155
return job_spec
154156

0 commit comments

Comments
 (0)