Skip to content

Commit 84e2c70

Browse files
Report runtime working_dir and username from runner via JobRuntimeData (#3617)
* Report working_dir and username from runner to server via JobRuntimeData The runner now reports the resolved working directory and OS username back to the server through the PullResponse. The server persists these in JobRuntimeData (write-once), and the frontend uses job_runtime_data.working_dir to construct correct IDE deep-link URLs instead of the hardcoded legacy /workflow path. Made-with: Cursor * Return new fields in /api/run response /api/pull is too late, we need these fields as soon as the job state is switched to RUNNING * [UI] In the run details page, for the dev environment, expect `latestSubmission?.job_runtime_data?.working_dir` after `run.status === 'running'`; otherwise, fallback to `/`. --------- Co-authored-by: Dmitry Meyer <me@undef.im>
1 parent 1ee8cd0 commit 84e2c70

File tree

13 files changed

+162
-41
lines changed

13 files changed

+162
-41
lines changed

frontend/src/pages/Runs/Details/RunDetails/ConnectToRunWithDevEnvConfiguration/index.tsx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ export const ConnectToRunWithDevEnvConfiguration: FC<{ run: IRun }> = ({ run })
5454
const [sshCommand, copySSHCommand] = getSSHCommand(run);
5555

5656
const configuration = run.run_spec.configuration as TDevEnvironmentConfiguration;
57-
const openInIDEUrl = `${configuration.ide}://vscode-remote/ssh-remote+${run.run_spec.run_name}/${run.run_spec.working_dir || 'workflow'}`;
57+
const latestSubmission = run.jobs[0]?.job_submissions?.slice(-1)[0];
58+
const workingDir = latestSubmission?.job_runtime_data?.working_dir ?? '/';
59+
const openInIDEUrl = `${configuration.ide}://vscode-remote/ssh-remote+${run.run_spec.run_name}${workingDir}`;
5860
const ideDisplayName = getIDEDisplayName(configuration.ide);
5961

6062
const [configCliCommand, copyCliCommand] = useConfigProjectCliCommand({ projectName: run.project_name });

frontend/src/types/run.d.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,15 @@ declare interface IJobProvisioningData {
293293
backend_data?: string;
294294
}
295295

296+
declare interface IJobRuntimeData {
297+
working_dir?: string | null;
298+
username?: string | null;
299+
}
300+
296301
declare interface IJobSubmission {
297302
id: string;
298303
job_provisioning_data?: IJobProvisioningData | null;
304+
job_runtime_data?: IJobRuntimeData | null;
299305
error_code?: TJobErrorCode | null;
300306
submission_num: number;
301307
status: TJobStatus;

runner/internal/executor/base.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type Executor interface {
1212
GetHistory(timestamp int64) *schemas.PullResponse
1313
GetJobWsLogsHistory() []schemas.LogEvent
1414
GetRunnerState() string
15+
GetJobInfo(ctx context.Context) (username string, workingDir string, err error)
1516
Run(ctx context.Context) error
1617
SetJob(job schemas.SubmitBody)
1718
SetJobState(ctx context.Context, state types.JobState)

runner/internal/executor/executor.go

Lines changed: 85 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/creack/pty"
2222
"github.com/dstackai/ansistrip"
2323
"github.com/prometheus/procfs"
24+
"github.com/sirupsen/logrus"
2425
"golang.org/x/sys/unix"
2526

2627
"github.com/dstackai/dstack/runner/consts"
@@ -61,6 +62,10 @@ type RunExecutor struct {
6162
fileArchiveDir string
6263
repoBlobDir string
6364

65+
runnerLogFile *os.File
66+
runnerLogStripper *ansistrip.Writer
67+
runnerLogger *logrus.Entry
68+
6469
run schemas.Run
6570
jobSpec schemas.JobSpec
6671
jobSubmission schemas.JobSubmission
@@ -136,14 +141,26 @@ func NewRunExecutor(tempDir string, dstackDir string, currentUser linuxuser.User
136141
}, nil
137142
}
138143

144+
// GetJobInfo must be called after SetJob
145+
func (ex *RunExecutor) GetJobInfo(ctx context.Context) (string, string, error) {
146+
// preRun() sets ex.jobUser and ex.jobWorkingDir
147+
if err := ex.preRun(ctx); err != nil {
148+
return "", "", err
149+
}
150+
return ex.jobUser.Username, ex.jobWorkingDir, nil
151+
}
152+
139153
// Run must be called after SetJob and WriteRepoBlob
140154
func (ex *RunExecutor) Run(ctx context.Context) (err error) {
141-
runnerLogFile, err := log.CreateAppendFile(filepath.Join(ex.tempDir, consts.RunnerLogFileName))
142-
if err != nil {
143-
ex.SetJobState(ctx, types.JobStateFailed)
144-
return fmt.Errorf("create runner log file: %w", err)
155+
// If jobStateHistory is not empty, either Run() has already been called or
156+
// preRun() has already been called via GetJobInfo() and failed
157+
if len(ex.jobStateHistory) > 0 {
158+
return errors.New("already running or finished")
159+
}
160+
if err := ex.preRun(ctx); err != nil {
161+
return err
145162
}
146-
defer func() { _ = runnerLogFile.Close() }()
163+
defer ex.postRun(ctx)
147164

148165
jobLogFile, err := log.CreateAppendFile(filepath.Join(ex.tempDir, consts.RunnerJobLogFileName))
149166
if err != nil {
@@ -153,7 +170,7 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
153170
defer func() { _ = jobLogFile.Close() }()
154171

155172
defer func() {
156-
// recover goes after runnerLogFile.Close() to keep the log
173+
// recover goes after postRun(), which closes runnerLogFile, to keep the log
157174
if r := recover(); r != nil {
158175
log.Error(ctx, "Executor PANIC", "err", r)
159176
ex.SetJobState(ctx, types.JobStateFailed)
@@ -171,21 +188,8 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
171188
}
172189
}()
173190

174-
stripper := ansistrip.NewWriter(ex.runnerLogs, AnsiStripFlushInterval, AnsiStripMaxDelay, MaxBufferSize)
175-
defer func() { _ = stripper.Close() }()
176-
logger := io.MultiWriter(runnerLogFile, os.Stdout, stripper)
177-
ctx = log.WithLogger(ctx, log.NewEntry(logger, int(log.DefaultEntry.Logger.Level))) // todo loglevel
178-
log.Info(ctx, "Run job", "log_level", log.GetLogger(ctx).Logger.Level.String())
179-
180-
if err := ex.setJobUser(ctx); err != nil {
181-
ex.SetJobStateWithTerminationReason(
182-
ctx,
183-
types.JobStateFailed,
184-
types.TerminationReasonExecutorError,
185-
fmt.Sprintf("Failed to set job user (%s)", err),
186-
)
187-
return fmt.Errorf("set job user: %w", err)
188-
}
191+
ctx = log.WithLogger(ctx, ex.runnerLogger)
192+
log.Info(ctx, "Run job")
189193

190194
// setJobUser sets User.HomeDir to "/" if the original home dir is not set or not accessible,
191195
// in that case we skip home dir provisioning
@@ -204,16 +208,6 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
204208
}
205209
}
206210

207-
if err := ex.setJobWorkingDir(ctx); err != nil {
208-
ex.SetJobStateWithTerminationReason(
209-
ctx,
210-
types.JobStateFailed,
211-
types.TerminationReasonExecutorError,
212-
fmt.Sprintf("Failed to set job working dir (%s)", err),
213-
)
214-
return fmt.Errorf("set job working dir: %w", err)
215-
}
216-
217211
if err := ex.setupRepo(ctx); err != nil {
218212
ex.SetJobStateWithTerminationReason(
219213
ctx,
@@ -336,6 +330,66 @@ func (ex *RunExecutor) SetRunnerState(state string) {
336330
ex.state = state
337331
}
338332

333+
// preRun performs actions that were once part of Run() but were moved to a separate function
334+
// to implement GetJobInfo()
335+
// preRun must not execute long-running operations, as GetJobInfo() is called synchronously
336+
// in the /api/run method
337+
func (ex *RunExecutor) preRun(ctx context.Context) error {
338+
// Already called once
339+
if ex.runnerLogFile != nil {
340+
return nil
341+
}
342+
343+
// logging is required for the subsequent setJob{User,WorkingDir} calls
344+
runnerLogFile, err := log.CreateAppendFile(filepath.Join(ex.tempDir, consts.RunnerLogFileName))
345+
if err != nil {
346+
ex.SetJobState(ctx, types.JobStateFailed)
347+
return fmt.Errorf("create runner log file: %w", err)
348+
}
349+
ex.runnerLogFile = runnerLogFile
350+
ex.runnerLogStripper = ansistrip.NewWriter(ex.runnerLogs, AnsiStripFlushInterval, AnsiStripMaxDelay, MaxBufferSize)
351+
runnerLogWriter := io.MultiWriter(ex.runnerLogFile, os.Stdout, ex.runnerLogStripper)
352+
runnerLogLevel := log.DefaultEntry.Logger.Level
353+
ex.runnerLogger = log.NewEntry(runnerLogWriter, int(runnerLogLevel))
354+
ctx = log.WithLogger(ctx, ex.runnerLogger)
355+
log.Info(ctx, "Logging configured", "log_level", runnerLogLevel.String())
356+
357+
// jobUser and jobWorkingDir are required for GetJobInfo()
358+
if err := ex.setJobUser(ctx); err != nil {
359+
ex.SetJobStateWithTerminationReason(
360+
ctx,
361+
types.JobStateFailed,
362+
types.TerminationReasonExecutorError,
363+
fmt.Sprintf("Failed to set job user (%s)", err),
364+
)
365+
return fmt.Errorf("set job user: %w", err)
366+
}
367+
if err := ex.setJobWorkingDir(ctx); err != nil {
368+
ex.SetJobStateWithTerminationReason(
369+
ctx,
370+
types.JobStateFailed,
371+
types.TerminationReasonExecutorError,
372+
fmt.Sprintf("Failed to set job working dir (%s)", err),
373+
)
374+
return fmt.Errorf("set job working dir: %w", err)
375+
}
376+
377+
return nil
378+
}
379+
380+
func (ex *RunExecutor) postRun(ctx context.Context) {
381+
if ex.runnerLogFile != nil {
382+
if err := ex.runnerLogFile.Close(); err != nil {
383+
log.Error(ctx, "Failed to close runnerLogFile", "err", err)
384+
}
385+
}
386+
if ex.runnerLogStripper != nil {
387+
if err := ex.runnerLogStripper.Close(); err != nil {
388+
log.Error(ctx, "Failed to close runnerLogStripper", "err", err)
389+
}
390+
}
391+
}
392+
339393
// setJobWorkingDir must be called from Run after setJobUser
340394
func (ex *RunExecutor) setJobWorkingDir(ctx context.Context) error {
341395
var err error

runner/internal/runner/api/http.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,18 +146,27 @@ func (s *Server) uploadCodePostHandler(w http.ResponseWriter, r *http.Request) (
146146

147147
func (s *Server) runPostHandler(w http.ResponseWriter, r *http.Request) (interface{}, error) {
148148
s.executor.Lock()
149-
defer s.executor.Unlock()
150149
if s.executor.GetRunnerState() != executor.WaitRun {
150+
s.executor.Unlock()
151151
return nil, &api.Error{Status: http.StatusConflict}
152152
}
153+
s.executor.SetRunnerState(executor.ServeLogs)
154+
s.executor.Unlock()
153155

154156
var runCtx context.Context
155157
runCtx, s.cancelRun = context.WithCancel(context.Background())
158+
username, workingDir, err := s.executor.GetJobInfo(runCtx)
156159
go func() {
157160
_ = s.executor.Run(runCtx) // INFO: all errors are handled inside the Run()
158161
s.jobBarrierCh <- nil // notify server that job finished
159162
}()
160-
s.executor.SetRunnerState(executor.ServeLogs)
163+
164+
if err == nil {
165+
return &schemas.JobInfoResponse{
166+
Username: username,
167+
WorkingDir: workingDir,
168+
}, nil
169+
}
161170

162171
return nil, nil
163172
}

runner/internal/schemas/schemas.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,11 @@ type PullResponse struct {
3535
LastUpdated int64 `json:"last_updated"`
3636
NoConnectionsSecs int64 `json:"no_connections_secs"`
3737
HasMore bool `json:"has_more"`
38-
// todo Result
38+
}
39+
40+
type JobInfoResponse struct {
41+
WorkingDir string `json:"working_dir"`
42+
Username string `json:"username"`
3943
}
4044

4145
type Run struct {

src/dstack/_internal/core/models/runs.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,10 @@ class JobRuntimeData(CoreModel):
352352
volume_names: Optional[list[str]] = None # None for backward compatibility
353353
# Virtual shared offer
354354
offer: Optional[InstanceOfferWithAvailability] = None # None for backward compatibility
355+
# Resolved working directory and OS username reported by the runner.
356+
# None if the runner hasn't reported them yet or if it's an old runner.
357+
working_dir: Optional[str] = None
358+
username: Optional[str] = None
355359

356360

357361
class ClusterInfo(CoreModel):

src/dstack/_internal/server/background/scheduled_tasks/running_jobs.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1116,7 +1116,13 @@ def _submit_job_to_runner(
11161116
logger.debug("%s: uploading code", fmt(job_model))
11171117
runner_client.upload_code(code)
11181118
logger.debug("%s: starting job", fmt(job_model))
1119-
runner_client.run_job()
1119+
job_info = runner_client.run_job()
1120+
if job_info is not None:
1121+
jrd = get_job_runtime_data(job_model)
1122+
if jrd is not None:
1123+
jrd.working_dir = job_info.working_dir
1124+
jrd.username = job_info.username
1125+
job_model.job_runtime_data = jrd.json()
11201126

11211127
switch_job_status(session, job_model, JobStatus.RUNNING)
11221128
# do not log here, because the runner will send a new status

src/dstack/_internal/server/schemas/runner.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ class PullResponse(CoreModel):
4646
no_connections_secs: Optional[int] = None # Optional for compatibility with old runners
4747

4848

49+
class JobInfoResponse(CoreModel):
50+
working_dir: str
51+
username: str
52+
53+
4954
class SubmitBody(CoreModel):
5055
run: Annotated[
5156
Run,

src/dstack/_internal/server/services/runner/client.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
GPUDevice,
2525
HealthcheckResponse,
2626
InstanceHealthResponse,
27+
JobInfoResponse,
2728
LegacyPullResponse,
2829
LegacyStopBody,
2930
LegacySubmitBody,
@@ -124,9 +125,13 @@ def upload_code(self, file: Union[BinaryIO, bytes]):
124125
)
125126
resp.raise_for_status()
126127

127-
def run_job(self):
128+
def run_job(self) -> Optional[JobInfoResponse]:
128129
resp = requests.post(self._url("/api/run"), timeout=REQUEST_TIMEOUT)
129130
resp.raise_for_status()
131+
if not _is_json_response(resp):
132+
# Old runner or runner failed to get job info
133+
return None
134+
return JobInfoResponse.__response__.parse_obj(resp.json())
130135

131136
def pull(self, timestamp: int) -> PullResponse:
132137
resp = requests.get(
@@ -617,6 +622,13 @@ def _memory_to_bytes(memory: Optional[Memory]) -> int:
617622
return int(memory * 1024**3)
618623

619624

625+
def _is_json_response(response: requests.Response) -> bool:
626+
content_type = response.headers.get("content-type")
627+
if not content_type:
628+
return False
629+
return content_type.split(";", maxsplit=1)[0].strip() == "application/json"
630+
631+
620632
_TaskID = Union[uuid.UUID, str]
621633

622634
_Version = tuple[int, int, int]

0 commit comments

Comments
 (0)