Skip to content

Commit 0ed127d

Browse files
Report working_dir and username from runner to server via JobRuntimeData
The runner now reports the resolved working directory and OS username back to the server through the PullResponse. The server persists these in JobRuntimeData (write-once), and the frontend uses job_runtime_data.working_dir to construct correct IDE deep-link URLs instead of the hardcoded legacy /workflow path. Made-with: Cursor
1 parent 733a17c commit 0ed127d

10 files changed

Lines changed: 197 additions & 6 deletions

File tree

frontend/src/pages/Runs/Details/RunDetails/ConnectToRunWithDevEnvConfiguration/index.tsx

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,11 @@ export const ConnectToRunWithDevEnvConfiguration: FC<{ run: IRun }> = ({ run })
5454
const [sshCommand, copySSHCommand] = getSSHCommand(run);
5555

5656
const configuration = run.run_spec.configuration as TDevEnvironmentConfiguration;
57-
const openInIDEUrl = `${configuration.ide}://vscode-remote/ssh-remote+${run.run_spec.run_name}/${run.run_spec.working_dir || 'workflow'}`;
57+
const latestSubmission = run.jobs[0]?.job_submissions?.slice(-1)[0];
58+
const workingDir = latestSubmission?.job_runtime_data?.working_dir;
59+
const openInIDEUrl = workingDir
60+
? `${configuration.ide}://vscode-remote/ssh-remote+${run.run_spec.run_name}${workingDir}`
61+
: null;
5862
const ideDisplayName = getIDEDisplayName(configuration.ide);
5963

6064
const [configCliCommand, copyCliCommand] = useConfigProjectCliCommand({ projectName: run.project_name });
@@ -63,7 +67,7 @@ export const ConnectToRunWithDevEnvConfiguration: FC<{ run: IRun }> = ({ run })
6367
<Container>
6468
<Header variant="h2">Connect</Header>
6569

66-
{run.status === 'running' && (
70+
{run.status === 'running' && openInIDEUrl && (
6771
<Wizard
6872
i18nStrings={{
6973
stepNumberLabel: (stepNumber) => `Step ${stepNumber}`,
@@ -267,10 +271,10 @@ export const ConnectToRunWithDevEnvConfiguration: FC<{ run: IRun }> = ({ run })
267271
/>
268272
)}
269273

270-
{run.status !== 'running' && (
274+
{(run.status !== 'running' || !openInIDEUrl) && (
271275
<SpaceBetween size="s">
272276
<Box />
273-
<Alert type="info">Waiting for the run to start.</Alert>
277+
<Alert type="info">Waiting for the dev environment to be ready.</Alert>
274278
</SpaceBetween>
275279
)}
276280
</Container>

frontend/src/types/run.d.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,15 @@ declare interface IJobProvisioningData {
293293
backend_data?: string;
294294
}
295295

296+
declare interface IJobRuntimeData {
297+
working_dir?: string | null;
298+
username?: string | null;
299+
}
300+
296301
declare interface IJobSubmission {
297302
id: string;
298303
job_provisioning_data?: IJobProvisioningData | null;
304+
job_runtime_data?: IJobRuntimeData | null;
299305
error_code?: TJobErrorCode | null;
300306
submission_num: number;
301307
status: TJobStatus;

runner/internal/executor/executor_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,30 @@ func TestExecutor_LogsAnsiCodeHandling(t *testing.T) {
353353
}
354354
}
355355

356+
func TestGetHistory_IncludesWorkingDirAndUsername(t *testing.T) {
357+
ex := makeTestExecutor(t)
358+
resp := ex.GetHistory(0)
359+
assert.NotEmpty(t, resp.WorkingDir)
360+
assert.True(t, path.IsAbs(resp.WorkingDir))
361+
assert.NotEmpty(t, resp.Username)
362+
}
363+
364+
func TestGetHistory_BeforeRun(t *testing.T) {
365+
baseDir, err := filepath.EvalSymlinks(t.TempDir())
366+
require.NoError(t, err)
367+
tempDir := filepath.Join(baseDir, "temp")
368+
require.NoError(t, os.Mkdir(tempDir, 0o700))
369+
dstackDir := filepath.Join(baseDir, "dstack")
370+
require.NoError(t, os.Mkdir(dstackDir, 0o755))
371+
currentUser, err := linuxuser.FromCurrentProcess()
372+
require.NoError(t, err)
373+
ex, err := NewRunExecutor(tempDir, dstackDir, *currentUser, new(sshdMock))
374+
require.NoError(t, err)
375+
resp := ex.GetHistory(0)
376+
assert.Empty(t, resp.WorkingDir)
377+
assert.Empty(t, resp.Username)
378+
}
379+
356380
type sshdMock struct{}
357381

358382
func (d *sshdMock) Port() int {

runner/internal/executor/query.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,19 @@ func (ex *RunExecutor) GetJobWsLogsHistory() []schemas.LogEvent {
99
}
1010

1111
func (ex *RunExecutor) GetHistory(timestamp int64) *schemas.PullResponse {
12-
return &schemas.PullResponse{
12+
resp := &schemas.PullResponse{
1313
JobStates: eventsAfter(ex.jobStateHistory, timestamp),
1414
JobLogs: eventsAfter(ex.jobLogs.history, timestamp),
1515
RunnerLogs: eventsAfter(ex.runnerLogs.history, timestamp),
1616
LastUpdated: ex.timestamp.GetLatest(),
1717
NoConnectionsSecs: ex.connectionTracker.GetNoConnectionsSecs(),
1818
HasMore: ex.state != WaitLogsFinished,
19+
WorkingDir: ex.jobWorkingDir,
1920
}
21+
if ex.jobUser != nil {
22+
resp.Username = ex.jobUser.Username
23+
}
24+
return resp
2025
}
2126

2227
func (ex *RunExecutor) GetRunnerState() string {

runner/internal/schemas/schemas.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ type PullResponse struct {
3535
LastUpdated int64 `json:"last_updated"`
3636
NoConnectionsSecs int64 `json:"no_connections_secs"`
3737
HasMore bool `json:"has_more"`
38-
// todo Result
38+
WorkingDir string `json:"working_dir,omitempty"`
39+
Username string `json:"username,omitempty"`
3940
}
4041

4142
type Run struct {

src/dstack/_internal/core/models/runs.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,10 @@ class JobRuntimeData(CoreModel):
352352
volume_names: Optional[list[str]] = None # None for backward compatibility
353353
# Virtual shared offer
354354
offer: Optional[InstanceOfferWithAvailability] = None # None for backward compatibility
355+
# Resolved working directory and OS username reported by the runner.
356+
# None if the runner hasn't reported them yet or if it's an old runner.
357+
working_dir: Optional[str] = None
358+
username: Optional[str] = None
355359

356360

357361
class ClusterInfo(CoreModel):

src/dstack/_internal/server/background/scheduled_tasks/running_jobs.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,18 @@ def _process_running(
774774
timestamp = job_model.runner_timestamp
775775
resp = runner_client.pull(timestamp) # raises error if runner is down, causes retry
776776
job_model.runner_timestamp = resp.last_updated
777+
if resp.working_dir or resp.username:
778+
jrd = get_job_runtime_data(job_model)
779+
if jrd is not None:
780+
updated = False
781+
if resp.working_dir and jrd.working_dir is None:
782+
jrd.working_dir = resp.working_dir
783+
updated = True
784+
if resp.username and jrd.username is None:
785+
jrd.username = resp.username
786+
updated = True
787+
if updated:
788+
job_model.job_runtime_data = jrd.json()
777789
# may raise LogStorageError, causing a retry
778790
logs_services.write_logs(
779791
project=run_model.project,

src/dstack/_internal/server/schemas/runner.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ class PullResponse(CoreModel):
4444
runner_logs: List[LogEvent]
4545
last_updated: int
4646
no_connections_secs: Optional[int] = None # Optional for compatibility with old runners
47+
working_dir: Optional[str] = None # Optional for compatibility with old runners
48+
username: Optional[str] = None # Optional for compatibility with old runners
4749

4850

4951
class SubmitBody(CoreModel):

src/dstack/_internal/server/testing/common.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,8 @@ def get_job_runtime_data(
456456
ports: Optional[dict[int, int]] = None,
457457
offer: Optional[InstanceOfferWithAvailability] = None,
458458
volume_names: Optional[list[str]] = None,
459+
working_dir: Optional[str] = None,
460+
username: Optional[str] = None,
459461
) -> JobRuntimeData:
460462
return JobRuntimeData(
461463
network_mode=NetworkMode(network_mode),
@@ -465,6 +467,8 @@ def get_job_runtime_data(
465467
ports=ports,
466468
offer=offer,
467469
volume_names=volume_names,
470+
working_dir=working_dir,
471+
username=username,
468472
)
469473

470474

src/tests/_internal/server/background/scheduled_tasks/test_running_jobs.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,6 +1118,135 @@ async def test_registers_service_replica_only_after_probes_pass(
11181118
assert not job.registered
11191119
assert not events
11201120

1121+
@pytest.mark.asyncio
1122+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
1123+
async def test_process_running_persists_runtime_data(
1124+
self, test_db, session: AsyncSession, tmp_path: Path
1125+
):
1126+
project = await create_project(session=session)
1127+
user = await create_user(session=session)
1128+
repo = await create_repo(session=session, project_id=project.id)
1129+
run = await create_run(session=session, project=project, repo=repo, user=user)
1130+
instance = await create_instance(
1131+
session=session, project=project, status=InstanceStatus.BUSY
1132+
)
1133+
job = await create_job(
1134+
session=session,
1135+
run=run,
1136+
status=JobStatus.RUNNING,
1137+
job_provisioning_data=get_job_provisioning_data(dockerized=False),
1138+
job_runtime_data=get_job_runtime_data(),
1139+
instance=instance,
1140+
instance_assigned=True,
1141+
)
1142+
with (
1143+
patch("dstack._internal.server.services.runner.ssh.SSHTunnel"),
1144+
patch(
1145+
"dstack._internal.server.services.runner.client.RunnerClient"
1146+
) as RunnerClientMock,
1147+
patch.object(server_settings, "SERVER_DIR_PATH", tmp_path),
1148+
):
1149+
runner_client_mock = RunnerClientMock.return_value
1150+
runner_client_mock.pull.return_value = PullResponse(
1151+
job_states=[],
1152+
job_logs=[],
1153+
runner_logs=[],
1154+
last_updated=1,
1155+
working_dir="/dstack/run",
1156+
username="root",
1157+
)
1158+
await process_running_jobs()
1159+
await session.refresh(job)
1160+
jrd = JobRuntimeData.__response__.parse_raw(job.job_runtime_data)
1161+
assert jrd.working_dir == "/dstack/run"
1162+
assert jrd.username == "root"
1163+
1164+
@pytest.mark.asyncio
1165+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
1166+
async def test_process_running_does_not_overwrite_runtime_data(
1167+
self, test_db, session: AsyncSession, tmp_path: Path
1168+
):
1169+
project = await create_project(session=session)
1170+
user = await create_user(session=session)
1171+
repo = await create_repo(session=session, project_id=project.id)
1172+
run = await create_run(session=session, project=project, repo=repo, user=user)
1173+
instance = await create_instance(
1174+
session=session, project=project, status=InstanceStatus.BUSY
1175+
)
1176+
job = await create_job(
1177+
session=session,
1178+
run=run,
1179+
status=JobStatus.RUNNING,
1180+
job_provisioning_data=get_job_provisioning_data(dockerized=False),
1181+
job_runtime_data=get_job_runtime_data(
1182+
working_dir="/original/path", username="originaluser"
1183+
),
1184+
instance=instance,
1185+
instance_assigned=True,
1186+
)
1187+
with (
1188+
patch("dstack._internal.server.services.runner.ssh.SSHTunnel"),
1189+
patch(
1190+
"dstack._internal.server.services.runner.client.RunnerClient"
1191+
) as RunnerClientMock,
1192+
patch.object(server_settings, "SERVER_DIR_PATH", tmp_path),
1193+
):
1194+
runner_client_mock = RunnerClientMock.return_value
1195+
runner_client_mock.pull.return_value = PullResponse(
1196+
job_states=[],
1197+
job_logs=[],
1198+
runner_logs=[],
1199+
last_updated=1,
1200+
working_dir="/new/path",
1201+
username="ubuntu",
1202+
)
1203+
await process_running_jobs()
1204+
await session.refresh(job)
1205+
jrd = JobRuntimeData.__response__.parse_raw(job.job_runtime_data)
1206+
assert jrd.working_dir == "/original/path"
1207+
assert jrd.username == "originaluser"
1208+
1209+
@pytest.mark.asyncio
1210+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
1211+
async def test_process_running_handles_old_runner_without_runtime_fields(
1212+
self, test_db, session: AsyncSession, tmp_path: Path
1213+
):
1214+
project = await create_project(session=session)
1215+
user = await create_user(session=session)
1216+
repo = await create_repo(session=session, project_id=project.id)
1217+
run = await create_run(session=session, project=project, repo=repo, user=user)
1218+
instance = await create_instance(
1219+
session=session, project=project, status=InstanceStatus.BUSY
1220+
)
1221+
job = await create_job(
1222+
session=session,
1223+
run=run,
1224+
status=JobStatus.RUNNING,
1225+
job_provisioning_data=get_job_provisioning_data(dockerized=False),
1226+
job_runtime_data=get_job_runtime_data(),
1227+
instance=instance,
1228+
instance_assigned=True,
1229+
)
1230+
with (
1231+
patch("dstack._internal.server.services.runner.ssh.SSHTunnel"),
1232+
patch(
1233+
"dstack._internal.server.services.runner.client.RunnerClient"
1234+
) as RunnerClientMock,
1235+
patch.object(server_settings, "SERVER_DIR_PATH", tmp_path),
1236+
):
1237+
runner_client_mock = RunnerClientMock.return_value
1238+
runner_client_mock.pull.return_value = PullResponse(
1239+
job_states=[],
1240+
job_logs=[],
1241+
runner_logs=[],
1242+
last_updated=1,
1243+
)
1244+
await process_running_jobs()
1245+
await session.refresh(job)
1246+
jrd = JobRuntimeData.__response__.parse_raw(job.job_runtime_data)
1247+
assert jrd.working_dir is None
1248+
assert jrd.username is None
1249+
11211250

11221251
class TestPatchBaseImageForAwsEfa:
11231252
@staticmethod

0 commit comments

Comments
 (0)