Skip to content

Commit 187f291

Browse files
committed
Merge branch 'master' into pr_orjson
2 parents 994c5f7 + ab5dfbf commit 187f291

17 files changed

Lines changed: 296 additions & 67 deletions

File tree

.github/workflows/build.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ jobs:
123123
defaults:
124124
run:
125125
working-directory: runner
126-
runs-on: ubuntu-latest
126+
runs-on: ${{ matrix.os }}
127+
strategy:
128+
matrix:
129+
os: [ubuntu-latest, macos-latest]
127130
steps:
128131
- uses: actions/checkout@v4
129132
- name: Set up Go

frontend/src/pages/Runs/List/index.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export const RunList: React.FC = () => {
4848

4949
const { data, isLoading, refreshList, isLoadingMore } = useInfiniteScroll<IRun, TRunsRequestParams>({
5050
useLazyQuery: useLazyGetRunsQuery,
51-
args: { ...filteringRequestParams, limit: DEFAULT_TABLE_PAGE_SIZE },
51+
args: { ...filteringRequestParams, limit: DEFAULT_TABLE_PAGE_SIZE, job_submissions_limit: 1 },
5252
getPaginationParams: (lastRun) => ({ prev_submitted_at: lastRun.submitted_at }),
5353
});
5454

frontend/src/types/run.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ declare type TRunsRequestParams = {
77
prev_run_id?: string;
88
limit?: number;
99
ascending?: boolean;
10+
job_submissions_limit?: number;
1011
};
1112

1213
declare type TDeleteRunsRequestParams = {

runner/internal/executor/executor.go

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"os/exec"
1212
osuser "os/user"
1313
"path/filepath"
14+
"runtime"
1415
"strconv"
1516
"strings"
1617
"sync"
@@ -27,6 +28,12 @@ import (
2728
"github.com/prometheus/procfs"
2829
)
2930

31+
type ConnectionTracker interface {
32+
GetNoConnectionsSecs() int64
33+
Track(ticker <-chan time.Time)
34+
Stop()
35+
}
36+
3037
type RunExecutor struct {
3138
tempDir string
3239
homeDir string
@@ -51,9 +58,16 @@ type RunExecutor struct {
5158
timestamp *MonotonicTimestamp
5259

5360
killDelay time.Duration
54-
connectionTracker *connections.ConnectionTracker
61+
connectionTracker ConnectionTracker
5562
}
5663

64+
// stubConnectionTracker is a no-op implementation for when procfs is not available (only required for tests on darwin)
65+
type stubConnectionTracker struct{}
66+
67+
func (s *stubConnectionTracker) GetNoConnectionsSecs() int64 { return 0 }
68+
func (s *stubConnectionTracker) Track(ticker <-chan time.Time) {}
69+
func (s *stubConnectionTracker) Stop() {}
70+
5771
func NewRunExecutor(tempDir string, homeDir string, workingDir string, sshPort int) (*RunExecutor, error) {
5872
mu := &sync.RWMutex{}
5973
timestamp := NewMonotonicTimestamp()
@@ -65,15 +79,25 @@ func NewRunExecutor(tempDir string, homeDir string, workingDir string, sshPort i
6579
if err != nil {
6680
return nil, fmt.Errorf("failed to parse current user uid: %w", err)
6781
}
68-
proc, err := procfs.NewDefaultFS()
69-
if err != nil {
70-
return nil, fmt.Errorf("failed to initialize procfs: %w", err)
82+
83+
// Try to initialize procfs, but don't fail if it's not available (e.g., on macOS)
84+
var connectionTracker ConnectionTracker
85+
86+
if runtime.GOOS == "linux" {
87+
proc, err := procfs.NewDefaultFS()
88+
if err != nil {
89+
return nil, fmt.Errorf("failed to initialize procfs: %w", err)
90+
}
91+
connectionTracker = connections.NewConnectionTracker(connections.ConnectionTrackerConfig{
92+
Port: uint64(sshPort),
93+
MinConnDuration: 10 * time.Second, // shorter connections are likely from dstack-server
94+
Procfs: proc,
95+
})
96+
} else {
97+
// Use stub connection tracker (only required for tests on darwin)
98+
connectionTracker = &stubConnectionTracker{}
7199
}
72-
connectionTracker := connections.NewConnectionTracker(connections.ConnectionTrackerConfig{
73-
Port: uint64(sshPort),
74-
MinConnDuration: 10 * time.Second, // shorter connections are likely from dstack-server
75-
Procfs: proc,
76-
})
100+
77101
return &RunExecutor{
78102
tempDir: tempDir,
79103
homeDir: homeDir,

runner/internal/metrics/metrics_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
package metrics
22

33
import (
4+
"runtime"
45
"testing"
56

67
"github.com/dstackai/dstack/runner/internal/schemas"
78
"github.com/stretchr/testify/assert"
89
)
910

1011
func TestGetAMDGPUMetrics_OK(t *testing.T) {
12+
if runtime.GOOS == "darwin" {
13+
t.Skip("Skipping on macOS")
14+
}
1115
collector, err := NewMetricsCollector()
1216
assert.NoError(t, err)
1317

@@ -39,6 +43,9 @@ func TestGetAMDGPUMetrics_OK(t *testing.T) {
3943
}
4044

4145
func TestGetAMDGPUMetrics_ErrorGPUUtilNA(t *testing.T) {
46+
if runtime.GOOS == "darwin" {
47+
t.Skip("Skipping on macOS")
48+
}
4249
collector, err := NewMetricsCollector()
4350
assert.NoError(t, err)
4451
metrics, err := collector.getAMDGPUMetrics("gpu,gfx,gfx_clock,vram_used,vram_total\n0,N/A,N/A,283,196300\n")

src/dstack/_internal/core/backends/base/compute.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,8 @@ def get_shim_commands(
559559
backend_shim_env: Optional[Dict[str, str]] = None,
560560
arch: Optional[str] = None,
561561
) -> List[str]:
562-
commands = get_shim_pre_start_commands(
562+
commands = get_setup_cloud_instance_commands()
563+
commands += get_shim_pre_start_commands(
563564
base_path=base_path,
564565
bin_path=bin_path,
565566
arch=arch,
@@ -641,6 +642,23 @@ def get_dstack_shim_download_url(arch: Optional[str] = None) -> str:
641642
return url_template.format(version=version, arch=arch)
642643

643644

645+
def get_setup_cloud_instance_commands() -> list[str]:
646+
return [
647+
# Workaround for https://github.com/NVIDIA/nvidia-container-toolkit/issues/48
648+
# Attempts to patch /etc/docker/daemon.json while keeping any custom settings it may have.
649+
(
650+
"/bin/sh -c '" # wrap in /bin/sh to avoid interfering with other cloud init commands
651+
" grep -q nvidia /etc/docker/daemon.json"
652+
" && ! grep -q native.cgroupdriver /etc/docker/daemon.json"
653+
" && jq '\\''.\"exec-opts\" = ((.\"exec-opts\" // []) + [\"native.cgroupdriver=cgroupfs\"])'\\'' /etc/docker/daemon.json > /tmp/daemon.json"
654+
" && sudo mv /tmp/daemon.json /etc/docker/daemon.json"
655+
" && sudo service docker restart"
656+
" || true"
657+
"'"
658+
),
659+
]
660+
661+
644662
def get_shim_pre_start_commands(
645663
base_path: Optional[PathLike] = None,
646664
bin_path: Optional[PathLike] = None,

src/dstack/_internal/core/backends/cudo/compute.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,13 @@ def create_instance(
6565
public_keys = instance_config.get_public_keys()
6666
memory_size = round(instance_offer.instance.resources.memory_mib / 1024)
6767
disk_size = round(instance_offer.instance.resources.disk.size_mib / 1024)
68-
commands = get_shim_commands(authorized_keys=public_keys)
6968
gpus_no = len(instance_offer.instance.resources.gpus)
70-
shim_commands = " ".join([" && ".join(commands)])
71-
startup_script = (
72-
shim_commands if gpus_no > 0 else f"{install_docker_script()} && {shim_commands}"
73-
)
69+
if gpus_no > 0:
70+
# we'll need jq for patching /etc/docker/daemon.json, see get_shim_commands()
71+
commands = install_jq_commands()
72+
else:
73+
commands = install_docker_commands()
74+
commands += get_shim_commands(authorized_keys=public_keys)
7475

7576
try:
7677
resp_data = self.api_client.create_virtual_machine(
@@ -85,7 +86,7 @@ def create_instance(
8586
memory_gib=memory_size,
8687
vcpus=instance_offer.instance.resources.cpus,
8788
vm_id=vm_id,
88-
start_script=startup_script,
89+
start_script=" && ".join(commands),
8990
password=None,
9091
customSshKeys=public_keys,
9192
)
@@ -151,6 +152,19 @@ def _get_image_id(cuda: bool) -> str:
151152
return image_name
152153

153154

154-
def install_docker_script():
155-
commands = 'export DEBIAN_FRONTEND="noninteractive" && mkdir -p /etc/apt/keyrings && curl --max-time 60 -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /etc/apt/keyrings/docker.gpg && echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | tee /etc/apt/sources.list.d/docker.list > /dev/null && apt-get update && apt-get --assume-yes install docker-ce docker-ce-cli containerd.io docker-compose-plugin'
156-
return commands
155+
def install_jq_commands():
156+
return [
157+
"export DEBIAN_FRONTEND=noninteractive",
158+
"apt-get --assume-yes install jq",
159+
]
160+
161+
162+
def install_docker_commands():
163+
return [
164+
"export DEBIAN_FRONTEND=noninteractive",
165+
"mkdir -p /etc/apt/keyrings",
166+
"curl --max-time 60 -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /etc/apt/keyrings/docker.gpg",
167+
'echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | tee /etc/apt/sources.list.d/docker.list > /dev/null',
168+
"apt-get update",
169+
"apt-get --assume-yes install docker-ce docker-ce-cli containerd.io docker-compose-plugin",
170+
]

src/dstack/_internal/core/backends/gcp/compute.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import google.cloud.compute_v1 as compute_v1
99
from cachetools import TTLCache, cachedmethod
1010
from google.cloud import tpu_v2
11+
from google.cloud.compute_v1.types.compute import Instance
1112
from gpuhunt import KNOWN_TPUS
1213

1314
import dstack._internal.core.backends.gcp.auth as auth
@@ -19,6 +20,7 @@
1920
ComputeWithGatewaySupport,
2021
ComputeWithMultinodeSupport,
2122
ComputeWithPlacementGroupSupport,
23+
ComputeWithPrivateGatewaySupport,
2224
ComputeWithVolumeSupport,
2325
generate_unique_gateway_instance_name,
2426
generate_unique_instance_name,
@@ -83,6 +85,7 @@ class GCPCompute(
8385
ComputeWithMultinodeSupport,
8486
ComputeWithPlacementGroupSupport,
8587
ComputeWithGatewaySupport,
88+
ComputeWithPrivateGatewaySupport,
8689
ComputeWithVolumeSupport,
8790
Compute,
8891
):
@@ -395,11 +398,7 @@ def update_provisioning_data(
395398
if instance.status in ["PROVISIONING", "STAGING"]:
396399
return
397400
if instance.status == "RUNNING":
398-
if allocate_public_ip:
399-
hostname = instance.network_interfaces[0].access_configs[0].nat_i_p
400-
else:
401-
hostname = instance.network_interfaces[0].network_i_p
402-
provisioning_data.hostname = hostname
401+
provisioning_data.hostname = _get_instance_ip(instance, allocate_public_ip)
403402
provisioning_data.internal_ip = instance.network_interfaces[0].network_i_p
404403
return
405404
raise ProvisioningError(
@@ -500,7 +499,7 @@ def create_gateway(
500499
request.instance_resource = gcp_resources.create_instance_struct(
501500
disk_size=10,
502501
image_id=_get_gateway_image_id(),
503-
machine_type="e2-small",
502+
machine_type="e2-medium",
504503
accelerators=[],
505504
spot=False,
506505
user_data=get_gateway_user_data(configuration.ssh_key_pub),
@@ -512,6 +511,7 @@ def create_gateway(
512511
service_account=self.config.vm_service_account,
513512
network=self.config.vpc_resource_name,
514513
subnetwork=subnetwork,
514+
allocate_public_ip=configuration.public_ip,
515515
)
516516
operation = self.instances_client.insert(request=request)
517517
gcp_resources.wait_for_extended_operation(operation, "instance creation")
@@ -522,7 +522,7 @@ def create_gateway(
522522
instance_id=instance_name,
523523
region=configuration.region, # used for instance termination
524524
availability_zone=zone,
525-
ip_address=instance.network_interfaces[0].access_configs[0].nat_i_p,
525+
ip_address=_get_instance_ip(instance, configuration.public_ip),
526526
backend_data=json.dumps({"zone": zone}),
527527
)
528528

@@ -1024,3 +1024,9 @@ def _is_tpu_provisioning_data(provisioning_data: JobProvisioningData) -> bool:
10241024
backend_data_dict = json.loads(provisioning_data.backend_data)
10251025
is_tpu = backend_data_dict.get("is_tpu", False)
10261026
return is_tpu
1027+
1028+
1029+
def _get_instance_ip(instance: Instance, public_ip: bool) -> str:
1030+
if public_ip:
1031+
return instance.network_interfaces[0].access_configs[0].nat_i_p
1032+
return instance.network_interfaces[0].network_i_p

src/dstack/_internal/core/backends/lambdalabs/compute.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import hashlib
2+
import shlex
23
import subprocess
34
import tempfile
45
from threading import Thread
@@ -98,7 +99,7 @@ def update_provisioning_data(
9899
arch=provisioning_data.instance_type.resources.cpu_arch,
99100
)
100101
# shim is assumed to be run under root
101-
launch_command = "sudo sh -c '" + "&& ".join(commands) + "'"
102+
launch_command = "sudo sh -c " + shlex.quote(" && ".join(commands))
102103
thread = Thread(
103104
target=_start_runner,
104105
kwargs={

src/dstack/_internal/core/models/runs.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,10 @@ def _get_status_message(self) -> Optional[str]:
559559
return self.status.value
560560

561561
last_job = self.jobs[0]
562+
# FIXME: status_message should not require all job submissions for status calculation
563+
# since it's very expensive and is not required for anything else.
564+
# May return a different status if not all job submissions requested.
565+
# TODO: Calculate status_message by looking at job models directly instead job submissions.
562566
last_job_termination_reason = last_job.get_last_termination_reason()
563567

564568
if len(self.jobs) == 1:

0 commit comments

Comments
 (0)