Skip to content

Commit 44270da

Browse files
authored
Set DSTACK_RUN_ID and DSTACK_JOB_ID (#2622)
* Pass run and job ids to runner * Set DSTACK_RUN_ID and DSTACK_JOB_ID * Comment on container_.NetworkSettings.Ports * Document new env vars * Minor fixes
1 parent 1f13a57 commit 44270da

10 files changed

Lines changed: 94 additions & 37 deletions

File tree

docs/docs/reference/environment-variables.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ tasks, and services:
1919
2020
If `name` is not set in the configuration, it is assigned a random name (e.g. `wet-mangust-1`).
2121

22+
- `DSTACK_RUN_ID`{ #DSTACK_RUN_ID } – The UUID of the run.
23+
- `DSTACK_JOB_ID`{ #DSTACK_JOB_ID } – The UUID of the job submission.
2224
- `DSTACK_REPO_ID`{ #DSTACK_REPO_ID } – The ID of the repo.
2325
- `DSTACK_GPUS_NUM`{ #DSTACK_GPUS_NUM } – The total number of GPUs in the run.
2426

runner/internal/executor/executor.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ type RunExecutor struct {
3434
sshPort int
3535
uid uint32
3636

37-
run schemas.RunSpec
37+
run schemas.Run
3838
jobSpec schemas.JobSpec
39+
jobSubmission schemas.JobSubmission
3940
clusterInfo schemas.ClusterInfo
4041
secrets map[string]string
4142
repoCredentials *schemas.RepoCredentials
@@ -195,7 +196,8 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
195196
}
196197

197198
func (ex *RunExecutor) SetJob(body schemas.SubmitBody) {
198-
ex.run = body.RunSpec
199+
ex.run = body.Run
200+
ex.jobSubmission = body.JobSubmission
199201
ex.jobSpec = body.JobSpec
200202
ex.clusterInfo = body.ClusterInfo
201203
ex.secrets = body.Secrets
@@ -256,8 +258,10 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error
256258
gpus_num := nodes_num * gpus_per_node_num
257259

258260
jobEnvs := map[string]string{
259-
"DSTACK_RUN_NAME": ex.run.RunName,
260-
"DSTACK_REPO_ID": ex.run.RepoId,
261+
"DSTACK_RUN_ID": ex.run.Id,
262+
"DSTACK_JOB_ID": ex.jobSubmission.Id,
263+
"DSTACK_RUN_NAME": ex.run.RunSpec.RunName,
264+
"DSTACK_REPO_ID": ex.run.RunSpec.RepoId,
261265
"DSTACK_NODES_IPS": strings.Join(ex.clusterInfo.JobIPs, "\n"),
262266
"DSTACK_MASTER_NODE_IP": ex.clusterInfo.MasterJobIP,
263267
"DSTACK_NODE_RANK": strconv.Itoa(node_rank),

runner/internal/executor/executor_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func TestExecutor_RemoteRepo(t *testing.T) {
132132

133133
var b bytes.Buffer
134134
ex := makeTestExecutor(t)
135-
ex.run.RepoData = schemas.RepoData{
135+
ex.run.RunSpec.RepoData = schemas.RepoData{
136136
RepoType: "remote",
137137
RepoBranch: "main",
138138
RepoHash: "2b83592e506ed6fe8e49f4eaa97c3866bc9402b1",
@@ -148,7 +148,7 @@ func TestExecutor_RemoteRepo(t *testing.T) {
148148

149149
err = ex.execJob(context.TODO(), io.Writer(&b))
150150
assert.NoError(t, err)
151-
expected := fmt.Sprintf("%s\r\n%s\r\n%s\r\n", ex.run.RepoData.RepoHash, ex.run.RepoData.RepoConfigName, ex.run.RepoData.RepoConfigEmail)
151+
expected := fmt.Sprintf("%s\r\n%s\r\n%s\r\n", ex.run.RunSpec.RepoData.RepoHash, ex.run.RunSpec.RepoData.RepoConfigName, ex.run.RunSpec.RepoData.RepoConfigEmail)
152152
assert.Equal(t, expected, b.String())
153153
}
154154

@@ -161,14 +161,17 @@ func makeTestExecutor(t *testing.T) *RunExecutor {
161161
require.NoError(t, err)
162162

163163
body := schemas.SubmitBody{
164-
RunSpec: schemas.RunSpec{
165-
RunName: "red-turtle-1",
166-
RepoId: "test-000000",
167-
RepoData: schemas.RepoData{RepoType: "local"},
168-
Configuration: schemas.Configuration{
169-
Type: "task",
164+
Run: schemas.Run{
165+
Id: "12346",
166+
RunSpec: schemas.RunSpec{
167+
RunName: "red-turtle-1",
168+
RepoId: "test-000000",
169+
RepoData: schemas.RepoData{RepoType: "local"},
170+
Configuration: schemas.Configuration{
171+
Type: "task",
172+
},
173+
ConfigurationPath: ".dstack.yml",
170174
},
171-
ConfigurationPath: ".dstack.yml",
172175
},
173176
JobSpec: schemas.JobSpec{
174177
Commands: []string{"/bin/bash", "-c"},

runner/internal/executor/repo.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (ex *RunExecutor) setupRepo(ctx context.Context) error {
4040
err = gerrors.Wrap(err_)
4141
}
4242
}()
43-
switch ex.run.RepoData.RepoType {
43+
switch ex.run.RunSpec.RepoData.RepoType {
4444
case "remote":
4545
log.Trace(ctx, "Fetching git repository")
4646
if err := ex.prepareGit(ctx); err != nil {
@@ -52,7 +52,7 @@ func (ex *RunExecutor) setupRepo(ctx context.Context) error {
5252
return gerrors.Wrap(err)
5353
}
5454
default:
55-
return gerrors.Newf("unknown RepoType: %s", ex.run.RepoData.RepoType)
55+
return gerrors.Newf("unknown RepoType: %s", ex.run.RunSpec.RepoData.RepoType)
5656
}
5757
return err
5858
}
@@ -61,8 +61,8 @@ func (ex *RunExecutor) prepareGit(ctx context.Context) error {
6161
repoManager := repo.NewManager(
6262
ctx,
6363
ex.repoCredentials.CloneURL,
64-
ex.run.RepoData.RepoBranch,
65-
ex.run.RepoData.RepoHash,
64+
ex.run.RunSpec.RepoData.RepoBranch,
65+
ex.run.RunSpec.RepoData.RepoHash,
6666
ex.jobSpec.SingleBranch,
6767
).WithLocalPath(ex.workingDir)
6868
if ex.repoCredentials != nil {
@@ -92,7 +92,7 @@ func (ex *RunExecutor) prepareGit(ctx context.Context) error {
9292
if err := repoManager.Checkout(); err != nil {
9393
return gerrors.Wrap(err)
9494
}
95-
if err := repoManager.SetConfig(ex.run.RepoData.RepoConfigName, ex.run.RepoData.RepoConfigEmail); err != nil {
95+
if err := repoManager.SetConfig(ex.run.RunSpec.RepoData.RepoConfigName, ex.run.RunSpec.RepoData.RepoConfigEmail); err != nil {
9696
return gerrors.Wrap(err)
9797
}
9898

runner/internal/schemas/schemas.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ type LogEvent struct {
2020
}
2121

2222
type SubmitBody struct {
23-
RunSpec RunSpec `json:"run_spec"`
23+
Run Run `json:"run"`
2424
JobSpec JobSpec `json:"job_spec"`
25+
JobSubmission JobSubmission `json:"job_submission"`
2526
ClusterInfo ClusterInfo `json:"cluster_info"`
2627
Secrets map[string]string `json:"secrets"`
2728
RepoCredentials *RepoCredentials `json:"repo_credentials"`
@@ -37,6 +38,11 @@ type PullResponse struct {
3738
// todo Result
3839
}
3940

41+
type Run struct {
42+
Id string `json:"id"`
43+
RunSpec RunSpec `json:"run_spec"`
44+
}
45+
4046
type RunSpec struct {
4147
RunName string `json:"run_name"`
4248
RepoId string `json:"repo_id"`
@@ -45,6 +51,10 @@ type RunSpec struct {
4551
ConfigurationPath string `json:"configuration_path"`
4652
}
4753

54+
type JobSubmission struct {
55+
Id string `json:"id"`
56+
}
57+
4858
type JobSpec struct {
4959
ReplicaNum int `json:"replica_num"`
5060
JobNum int `json:"job_num"`

runner/internal/shim/docker.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,9 @@ func (d *DockerRunner) startContainer(ctx context.Context, task *Task) error {
852852
if err != nil {
853853
return tracerr.Wrap(err)
854854
}
855+
// FIXME: container_.NetworkSettings.Ports values (bindings) are not immediately available
856+
// on macOS, so ports can be empty with local backend.
857+
// Workaround: restart shim after submitting the run.
855858
task.ports = extractPorts(ctx, container_.NetworkSettings.Ports)
856859
return nil
857860
}

src/dstack/_internal/core/models/runs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,9 +262,9 @@ class JobRuntimeData(CoreModel):
262262
# or not applicable (container-based backends)
263263
ports: Optional[dict[int, int]] = None
264264
# List of volumes used by the job
265-
volume_names: Optional[list[str]] = None # None for backward compalibility
265+
volume_names: Optional[list[str]] = None # None for backward compatibility
266266
# Virtual shared offer
267-
offer: Optional[InstanceOfferWithAvailability] = None # None for backward compalibility
267+
offer: Optional[InstanceOfferWithAvailability] = None # None for backward compatibility
268268

269269

270270
class ClusterInfo(CoreModel):

src/dstack/_internal/server/background/tasks/process_running_jobs.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ def _process_pulling_with_shim(
543543
if shim_client.is_api_v2_supported(): # raises error if shim is down, causes retry
544544
task = shim_client.get_task(job_model.id)
545545

546-
# If task goes to terminated before the job is submitted to runner, then an error occured
546+
# If task goes to terminated before the job is submitted to runner, then an error occurred
547547
if task.status == TaskStatus.TERMINATED:
548548
logger.warning(
549549
"shim failed to execute job %s: %s (%s)",
@@ -572,7 +572,7 @@ def _process_pulling_with_shim(
572572
else:
573573
shim_status = shim_client.pull() # raises error if shim is down, causes retry
574574

575-
# If shim goes to pending before the job is submitted to runner, then an error occured
575+
# If shim goes to pending before the job is submitted to runner, then an error occurred
576576
if (
577577
shim_status.state == "pending"
578578
and shim_status.result is not None
@@ -822,8 +822,8 @@ def _submit_job_to_runner(
822822
return success_if_not_available
823823

824824
runner_client.submit_job(
825-
run_spec=run.run_spec,
826-
job_spec=job.job_spec,
825+
run=run,
826+
job=job,
827827
cluster_info=cluster_info,
828828
secrets=secrets,
829829
repo_credentials=repo_credentials,

src/dstack/_internal/server/schemas/runner.py

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,14 @@
77

88
from dstack._internal.core.models.common import CoreModel, NetworkMode
99
from dstack._internal.core.models.repos.remote import RemoteRepoCreds
10-
from dstack._internal.core.models.runs import ClusterInfo, JobSpec, JobStatus, RunSpec
10+
from dstack._internal.core.models.runs import (
11+
ClusterInfo,
12+
JobSpec,
13+
JobStatus,
14+
JobSubmission,
15+
Run,
16+
RunSpec,
17+
)
1118
from dstack._internal.core.models.volumes import InstanceMountPoint, VolumeMountPoint
1219

1320

@@ -39,15 +46,18 @@ class PullResponse(CoreModel):
3946

4047

4148
class SubmitBody(CoreModel):
42-
run_spec: Annotated[
43-
RunSpec,
49+
run: Annotated[
50+
Run,
4451
Field(
4552
include={
46-
"run_name",
47-
"repo_id",
48-
"repo_data",
49-
"configuration",
50-
"configuration_path",
53+
"id": True,
54+
"run_spec": {
55+
"run_name",
56+
"repo_id",
57+
"repo_data",
58+
"configuration",
59+
"configuration_path",
60+
},
5161
}
5262
),
5363
]
@@ -70,9 +80,31 @@ class SubmitBody(CoreModel):
7080
}
7181
),
7282
]
83+
job_submission: Annotated[
84+
JobSubmission,
85+
Field(
86+
include={
87+
"id",
88+
}
89+
),
90+
]
7391
cluster_info: Annotated[Optional[ClusterInfo], Field(include=True)]
7492
secrets: Annotated[Optional[Dict[str, str]], Field(include=True)]
7593
repo_credentials: Annotated[Optional[RemoteRepoCreds], Field(include=True)]
94+
# run_spec is deprecated in favor of run.run_spec
95+
# TODO: Remove once we no longer support instances deployed with 0.19.8 or earlier.
96+
run_spec: Annotated[
97+
RunSpec,
98+
Field(
99+
include={
100+
"run_name",
101+
"repo_id",
102+
"repo_data",
103+
"configuration",
104+
"configuration_path",
105+
},
106+
),
107+
]
76108

77109

78110
class HealthcheckResponse(CoreModel):

src/dstack/_internal/server/services/runner/client.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from dstack._internal.core.models.envs import Env
1313
from dstack._internal.core.models.repos.remote import RemoteRepoCreds
1414
from dstack._internal.core.models.resources import Memory
15-
from dstack._internal.core.models.runs import ClusterInfo, JobSpec, RunSpec
15+
from dstack._internal.core.models.runs import ClusterInfo, Job, Run
1616
from dstack._internal.core.models.volumes import InstanceMountPoint, Volume, VolumeMountPoint
1717
from dstack._internal.server.schemas.runner import (
1818
GPUDevice,
@@ -72,15 +72,16 @@ def get_metrics(self) -> Optional[MetricsResponse]:
7272

7373
def submit_job(
7474
self,
75-
run_spec: RunSpec,
76-
job_spec: JobSpec,
75+
run: Run,
76+
job: Job,
7777
cluster_info: ClusterInfo,
7878
secrets: Dict[str, str],
7979
repo_credentials: Optional[RemoteRepoCreds],
8080
instance_env: Optional[Union[Env, Dict[str, str]]] = None,
8181
):
8282
# XXX: This is a quick-and-dirty hack to deliver InstanceModel-specific environment
8383
# variables to the runner without runner API modification.
84+
job_spec = job.job_spec
8485
if instance_env is not None:
8586
if isinstance(instance_env, Env):
8687
merged_env = instance_env.as_dict()
@@ -90,11 +91,13 @@ def submit_job(
9091
job_spec = job_spec.copy(deep=True)
9192
job_spec.env = merged_env
9293
body = SubmitBody(
93-
run_spec=run_spec,
94+
run=run,
9495
job_spec=job_spec,
96+
job_submission=job.job_submissions[-1],
9597
cluster_info=cluster_info,
9698
secrets=secrets,
9799
repo_credentials=repo_credentials,
100+
run_spec=run.run_spec,
98101
)
99102
resp = requests.post(
100103
# use .json() to encode enums

0 commit comments

Comments
 (0)