Skip to content

Commit d4a6061

Browse files
authored
Reexport DCGM metrics from instances (#2364)
* shim: start dcgm-exporter if available, proxy requests * periodically collect and store last metrics * enrich metrics with dstack labels, export Closes: #2359
1 parent 2d030a8 commit d4a6061

File tree

23 files changed

+1196
-8
lines changed

23 files changed

+1196
-8
lines changed

docs/docs/guides/server-deployment.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,21 @@ To store logs using GCP Logging, set the `DSTACK_SERVER_GCP_LOGGING_PROJECT` env
202202

203203
</div>
204204

205+
## Metrics
206+
207+
If enabled, `dstack` collects and exports Prometheus metrics from running jobs. Metrics for jobs from all projects are available
208+
at the `/metrics` path, and metrics for jobs from a specific project are available at the `/metrics/project/<project-name>` path.
209+
210+
By default, metrics are disabled. To enable, set the `DSTACK_ENABLE_PROMETHEUS_METRICS` variable.
211+
212+
Each sample includes a set of `dstack_*` labels, e.g., `dstack_project_name="main"`, `dstack_run_name="vllm-llama32"`.
213+
214+
Currently, `dstack` collects the following metrics:
215+
216+
* A fixed subset of NVIDIA GPU metrics from [DCGM Exporter :material-arrow-top-right-thin:{ .external }](https://docs.nvidia.com/datacenter/dcgm/latest/gpu-telemetry/dcgm-exporter.html){:target="_blank"} on supported cloud backends — AWS, Azure, GCP, OCI — and SSH fleets.
217+
On supported cloud backends the required packages are already installed.
218+
If you use SSH fleets, install `datacenter-gpu-manager-4-core` and `datacenter-gpu-manager-exporter`.
219+
205220
## Encryption
206221

207222
By default, `dstack` stores data in plaintext. To enforce encryption, you

docs/docs/reference/environment-variables.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ For more details on the options below, refer to the [server deployment](../guide
105105
- `DSTACK_SERVER_CLOUDWATCH_LOG_REGION`{ #DSTACK_SERVER_CLOUDWATCH_LOG_REGION } – The CloudWatch Logs region. Defaults to `None`.
106106
- `DSTACK_DEFAULT_SERVICE_CLIENT_MAX_BODY_SIZE`{ #DSTACK_DEFAULT_SERVICE_CLIENT_MAX_BODY_SIZE } – Request body size limit for services running with a gateway, in bytes. Defaults to 64 MiB.
107107
- `DSTACK_FORBID_SERVICES_WITHOUT_GATEWAY`{ #DSTACK_FORBID_SERVICES_WITHOUT_GATEWAY } – Forbids registering new services without a gateway if set to any value.
108+
- `DSTACK_ENABLE_PROMETHEUS_METRICS`{ #DSTACK_ENABLE_PROMETHEUS_METRICS } — Enables Prometheus metrics collection and export.
108109

109110
??? info "Internal environment variables"
110111
The following environment variables are intended for development purposes:

runner/cmd/shim/main.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
"github.com/dstackai/dstack/runner/internal/log"
1616
"github.com/dstackai/dstack/runner/internal/shim"
1717
"github.com/dstackai/dstack/runner/internal/shim/api"
18+
"github.com/dstackai/dstack/runner/internal/shim/dcgm"
19+
"github.com/dstackai/dstack/runner/internal/shim/host"
1820
"github.com/sirupsen/logrus"
1921
"github.com/urfave/cli/v2"
2022
)
@@ -95,6 +97,21 @@ func main() {
9597
Destination: &args.Runner.LogLevel,
9698
EnvVars: []string{"DSTACK_RUNNER_LOG_LEVEL"},
9799
},
100+
/* DCGM Exporter Parameters */
101+
&cli.IntFlag{
102+
Name: "dcgm-exporter-http-port",
103+
Usage: "DCGM Exporter http port",
104+
Value: 10997,
105+
Destination: &args.DCGMExporter.HTTPPort,
106+
EnvVars: []string{"DSTACK_DCGM_EXPORTER_HTTP_PORT"},
107+
},
108+
&cli.IntFlag{
109+
Name: "dcgm-exporter-interval",
110+
Usage: "DCGM Exporter collect interval, milliseconds",
111+
Value: 5000,
112+
Destination: &args.DCGMExporter.Interval,
113+
EnvVars: []string{"DSTACK_DCGM_EXPORTER_INTERVAL"},
114+
},
98115
/* Docker Parameters */
99116
&cli.BoolFlag{
100117
Name: "privileged",
@@ -178,8 +195,28 @@ func start(ctx context.Context, args shim.CLIArgs, serviceMode bool) (err error)
178195
return err
179196
}
180197

198+
var dcgmExporter *dcgm.DCGMExporter
199+
200+
if host.GetGpuVendor() == host.GpuVendorNvidia {
201+
dcgmExporterPath, err := dcgm.GetDCGMExporterExecPath(ctx)
202+
if err == nil {
203+
interval := time.Duration(args.DCGMExporter.Interval * int(time.Millisecond))
204+
dcgmExporter = dcgm.NewDCGMExporter(dcgmExporterPath, args.DCGMExporter.HTTPPort, interval)
205+
err = dcgmExporter.Start(ctx)
206+
}
207+
if err == nil {
208+
log.Info(ctx, "using DCGM Exporter")
209+
defer func() {
210+
_ = dcgmExporter.Stop(ctx)
211+
}()
212+
} else {
213+
log.Warning(ctx, "not using DCGM Exporter", "err", err)
214+
dcgmExporter = nil
215+
}
216+
}
217+
181218
address := fmt.Sprintf(":%d", args.Shim.HTTPPort)
182-
shimServer := api.NewShimServer(ctx, address, dockerRunner, Version)
219+
shimServer := api.NewShimServer(ctx, address, dockerRunner, dcgmExporter, Version)
183220

184221
defer func() {
185222
shutdownCtx, cancelShutdown := context.WithTimeout(ctx, 5*time.Second)

runner/internal/shim/api/handlers.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/dstackai/dstack/runner/internal/api"
99
"github.com/dstackai/dstack/runner/internal/log"
1010
"github.com/dstackai/dstack/runner/internal/shim"
11+
"github.com/dstackai/dstack/runner/internal/shim/dcgm"
1112
)
1213

1314
func (s *ShimServer) HealthcheckHandler(w http.ResponseWriter, r *http.Request) (interface{}, error) {
@@ -121,3 +122,22 @@ func (s *ShimServer) TaskRemoveHandler(w http.ResponseWriter, r *http.Request) (
121122
log.Info(ctx, "removed", "task", taskID)
122123
return nil, nil
123124
}
125+
126+
func (s *ShimServer) TaskMetricsHandler(w http.ResponseWriter, r *http.Request) {
127+
if s.dcgmExporter == nil {
128+
http.Error(w, "DCGM Exporter is not available", http.StatusNotFound)
129+
return
130+
}
131+
taskInfo := s.runner.TaskInfo(r.PathValue("id"))
132+
if taskInfo.ID == "" {
133+
http.Error(w, "Task not found", http.StatusNotFound)
134+
return
135+
}
136+
expfmtBody, err := s.dcgmExporter.Fetch(r.Context())
137+
if err != nil {
138+
http.Error(w, err.Error(), http.StatusBadGateway)
139+
return
140+
}
141+
response := dcgm.FilterMetrics(expfmtBody, taskInfo.GpuIDs)
142+
_, _ = w.Write(response)
143+
}

runner/internal/shim/api/handlers_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ func TestHealthcheck(t *testing.T) {
1313
request := httptest.NewRequest("GET", "/api/healthcheck", nil)
1414
responseRecorder := httptest.NewRecorder()
1515

16-
server := NewShimServer(context.Background(), ":12345", NewDummyRunner(), "0.0.1.dev2")
16+
server := NewShimServer(context.Background(), ":12345", NewDummyRunner(), nil, "0.0.1.dev2")
1717

1818
f := common.JSONResponseHandler(server.HealthcheckHandler)
1919
f(responseRecorder, request)
@@ -30,7 +30,7 @@ func TestHealthcheck(t *testing.T) {
3030
}
3131

3232
func TestTaskSubmit(t *testing.T) {
33-
server := NewShimServer(context.Background(), ":12340", NewDummyRunner(), "0.0.1.dev2")
33+
server := NewShimServer(context.Background(), ":12340", NewDummyRunner(), nil, "0.0.1.dev2")
3434
requestBody := `{
3535
"id": "dummy-id",
3636
"name": "dummy-name",

runner/internal/shim/api/server.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/dstackai/dstack/runner/internal/api"
1010
"github.com/dstackai/dstack/runner/internal/shim"
11+
"github.com/dstackai/dstack/runner/internal/shim/dcgm"
1112
)
1213

1314
type TaskRunner interface {
@@ -27,10 +28,12 @@ type ShimServer struct {
2728

2829
runner TaskRunner
2930

31+
dcgmExporter *dcgm.DCGMExporter
32+
3033
version string
3134
}
3235

33-
func NewShimServer(ctx context.Context, address string, runner TaskRunner, version string) *ShimServer {
36+
func NewShimServer(ctx context.Context, address string, runner TaskRunner, dcgmExporter *dcgm.DCGMExporter, version string) *ShimServer {
3437
r := api.NewRouter()
3538
s := &ShimServer{
3639
HttpServer: &http.Server{
@@ -41,6 +44,8 @@ func NewShimServer(ctx context.Context, address string, runner TaskRunner, versi
4144

4245
runner: runner,
4346

47+
dcgmExporter: dcgmExporter,
48+
4449
version: version,
4550
}
4651

@@ -51,6 +56,7 @@ func NewShimServer(ctx context.Context, address string, runner TaskRunner, versi
5156
r.AddHandler("POST", "/api/tasks", s.TaskSubmitHandler)
5257
r.AddHandler("POST", "/api/tasks/{id}/terminate", s.TaskTerminateHandler)
5358
r.AddHandler("POST", "/api/tasks/{id}/remove", s.TaskRemoveHandler)
59+
r.HandleFunc("GET /metrics/tasks/{id}", s.TaskMetricsHandler)
5460

5561
return s
5662
}
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
package dcgm
2+
3+
import (
4+
"context"
5+
"encoding/csv"
6+
"errors"
7+
"fmt"
8+
"io"
9+
"net/http"
10+
"os"
11+
"os/exec"
12+
"strconv"
13+
"strings"
14+
"sync"
15+
"syscall"
16+
"time"
17+
18+
"github.com/alexellis/go-execute/v2"
19+
"github.com/dstackai/dstack/runner/internal/log"
20+
)
21+
22+
// Counter represents a single line in counters.csv, see
23+
// https://github.com/NVIDIA/dcgm-exporter/tree/5f9250c211?tab=readme-ov-file#changing-metrics
24+
// For list of supported types see
25+
// https://github.com/NVIDIA/dcgm-exporter/blob/5f9250c211/internal/pkg/counters/variables.go#L23
26+
// NB: Although it is called "counter" in dcgm-exporter, in fact it can be any Prometheus
27+
// metric type or even a label
28+
type Counter struct {
29+
Name string
30+
Type string
31+
Help string
32+
}
33+
34+
// Full list: https://docs.nvidia.com/datacenter/dcgm/latest/dcgm-api/dcgm-api-field-ids.html
35+
var counters = [...]Counter{
36+
{"DCGM_FI_DEV_GPU_UTIL", "gauge", "GPU utilization (in %)."},
37+
{"DCGM_FI_DEV_MEM_COPY_UTIL", "gauge", "Memory utilization (in %)."},
38+
{"DCGM_FI_DEV_ENC_UTIL", "gauge", "Encoder utilization (in %)."},
39+
{"DCGM_FI_DEV_DEC_UTIL", "gauge", "Decoder utilization (in %)."},
40+
{"DCGM_FI_DEV_FB_FREE", "gauge", "Framebuffer memory free (in MiB)."},
41+
{"DCGM_FI_DEV_FB_USED", "gauge", "Framebuffer memory used (in MiB)."},
42+
{"DCGM_FI_PROF_GR_ENGINE_ACTIVE", "gauge", "The ratio of cycles during which a graphics engine or compute engine remains active."},
43+
{"DCGM_FI_PROF_SM_ACTIVE", "gauge", "The ratio of cycles an SM has at least 1 warp assigned."},
44+
{"DCGM_FI_PROF_SM_OCCUPANCY", "gauge", "The ratio of number of warps resident on an SM."},
45+
{"DCGM_FI_PROF_PIPE_TENSOR_ACTIVE", "gauge", "Ratio of cycles the tensor (HMMA) pipe is active."},
46+
{"DCGM_FI_PROF_PIPE_FP64_ACTIVE", "gauge", "Ratio of cycles the fp64 pipes are active."},
47+
{"DCGM_FI_PROF_PIPE_FP32_ACTIVE", "gauge", "Ratio of cycles the fp32 pipes are active."},
48+
{"DCGM_FI_PROF_PIPE_FP16_ACTIVE", "gauge", "Ratio of cycles the fp16 pipes are active."},
49+
{"DCGM_FI_PROF_PIPE_INT_ACTIVE", "gauge", "Ratio of cycles the integer pipe is active."},
50+
{"DCGM_FI_PROF_DRAM_ACTIVE", "gauge", "Ratio of cycles the device memory interface is active sending or receiving data."},
51+
{"DCGM_FI_PROF_PCIE_TX_BYTES", "counter", "The number of bytes of active PCIe tx (transmit) data including both header and payload."},
52+
{"DCGM_FI_PROF_PCIE_RX_BYTES", "counter", "The number of bytes of active PCIe rx (read) data including both header and payload."},
53+
{"DCGM_FI_DEV_SM_CLOCK", "gauge", "SM clock frequency (in MHz)."},
54+
{"DCGM_FI_DEV_MEM_CLOCK", "gauge", "Memory clock frequency (in MHz)."},
55+
{"DCGM_FI_DEV_MEMORY_TEMP", "gauge", "Memory temperature (in C)."},
56+
{"DCGM_FI_DEV_GPU_TEMP", "gauge", "GPU temperature (in C)."},
57+
{"DCGM_FI_DEV_POWER_USAGE", "gauge", "Power draw (in W)."},
58+
{"DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION", "counter", "Total energy consumption since boot (in mJ)."},
59+
{"DCGM_FI_DEV_PCIE_REPLAY_COUNTER", "counter", "Total number of PCIe retries."},
60+
{"DCGM_FI_DEV_XID_ERRORS", "gauge", "Value of the last XID error encountered."},
61+
{"DCGM_FI_DEV_POWER_VIOLATION", "counter", "Throttling duration due to power constraints (in us)."},
62+
{"DCGM_FI_DEV_THERMAL_VIOLATION", "counter", "Throttling duration due to thermal constraints (in us)."},
63+
{"DCGM_FI_DEV_SYNC_BOOST_VIOLATION", "counter", "Throttling duration due to sync-boost constraints (in us)."},
64+
{"DCGM_FI_DEV_BOARD_LIMIT_VIOLATION", "counter", "Throttling duration due to board limit constraints (in us)."},
65+
{"DCGM_FI_DEV_LOW_UTIL_VIOLATION", "counter", "Throttling duration due to low utilization (in us)."},
66+
{"DCGM_FI_DEV_RELIABILITY_VIOLATION", "counter", "Throttling duration due to reliability constraints (in us)."},
67+
{"DCGM_FI_DEV_ECC_SBE_VOL_TOTAL", "counter", "Total number of single-bit volatile ECC errors."},
68+
{"DCGM_FI_DEV_ECC_DBE_VOL_TOTAL", "counter", "Total number of double-bit volatile ECC errors."},
69+
{"DCGM_FI_DEV_ECC_SBE_AGG_TOTAL", "counter", "Total number of single-bit persistent ECC errors."},
70+
{"DCGM_FI_DEV_ECC_DBE_AGG_TOTAL", "counter", "Total number of double-bit persistent ECC errors."},
71+
{"DCGM_FI_DEV_RETIRED_SBE", "counter", "Total number of retired pages due to single-bit errors."},
72+
{"DCGM_FI_DEV_RETIRED_DBE", "counter", "Total number of retired pages due to double-bit errors."},
73+
{"DCGM_FI_DEV_RETIRED_PENDING", "counter", "Total number of pages pending retirement."},
74+
{"DCGM_FI_DEV_UNCORRECTABLE_REMAPPED_ROWS", "counter", "Number of remapped rows for uncorrectable errors"},
75+
{"DCGM_FI_DEV_CORRECTABLE_REMAPPED_ROWS", "counter", "Number of remapped rows for correctable errors"},
76+
{"DCGM_FI_DEV_ROW_REMAP_FAILURE", "gauge", "Whether remapping of rows has failed"},
77+
{"DCGM_FI_DEV_NVLINK_CRC_FLIT_ERROR_COUNT_TOTAL", "counter", "Total number of NVLink flow-control CRC errors."},
78+
{"DCGM_FI_DEV_NVLINK_CRC_DATA_ERROR_COUNT_TOTAL", "counter", "Total number of NVLink data CRC errors."},
79+
{"DCGM_FI_DEV_NVLINK_REPLAY_ERROR_COUNT_TOTAL", "counter", "Total number of NVLink retries."},
80+
{"DCGM_FI_DEV_NVLINK_RECOVERY_ERROR_COUNT_TOTAL", "counter", "Total number of NVLink recovery errors."},
81+
{"DCGM_FI_DEV_NVLINK_BANDWIDTH_TOTAL", "counter", "Total number of NVLink bandwidth counters for all lanes."},
82+
{"DCGM_FI_DEV_NVLINK_BANDWIDTH_L0", "counter", "The number of bytes of active NVLink rx or tx data including both header and payload."},
83+
{"DCGM_FI_PROF_NVLINK_RX_BYTES", "counter", "The number of bytes of active PCIe rx (read) data including both header and payload. "},
84+
{"DCGM_FI_PROF_NVLINK_TX_BYTES", "counter", "The number of bytes of active NvLink tx (transmit) data including both header and payload. "},
85+
}
86+
87+
const dcgmExporterExecName = "dcgm-exporter"
88+
89+
type DCGMExporter struct {
90+
cmd *exec.Cmd
91+
cancel context.CancelFunc
92+
execPath string
93+
listenAddr string
94+
client *http.Client
95+
url string
96+
interval time.Duration
97+
configPath string
98+
mu sync.Mutex
99+
lastFetchedAt time.Time
100+
lastResponse []byte
101+
}
102+
103+
func (c *DCGMExporter) Start(ctx context.Context) error {
104+
if c.cmd != nil {
105+
return errors.New("already started")
106+
}
107+
108+
configFile, err := os.CreateTemp("", "counters-*.csv")
109+
if err != nil {
110+
return err
111+
}
112+
defer configFile.Close()
113+
c.configPath = configFile.Name()
114+
configWriter := csv.NewWriter(configFile)
115+
for _, counter := range counters {
116+
err := configWriter.Write([]string{counter.Name, counter.Type, counter.Help})
117+
if err != nil {
118+
return err
119+
}
120+
}
121+
configWriter.Flush()
122+
123+
cmdCtx, cmdCancel := context.WithCancel(ctx)
124+
c.cancel = cmdCancel
125+
cmd := exec.CommandContext(
126+
cmdCtx, c.execPath,
127+
"-f", c.configPath,
128+
"-a", c.listenAddr,
129+
"-c", strconv.Itoa(int(c.interval.Milliseconds())),
130+
)
131+
c.cmd = cmd
132+
cmd.Cancel = func() error {
133+
return cmd.Process.Signal(syscall.SIGTERM)
134+
}
135+
cmd.WaitDelay = 5 * time.Second
136+
return cmd.Start()
137+
}
138+
139+
func (c *DCGMExporter) Stop(context.Context) error {
140+
if c.cmd == nil {
141+
return errors.New("not started")
142+
}
143+
c.cancel()
144+
os.Remove(c.configPath)
145+
return c.cmd.Wait()
146+
}
147+
148+
func (c *DCGMExporter) Fetch(ctx context.Context) ([]byte, error) {
149+
c.mu.Lock()
150+
defer c.mu.Unlock()
151+
152+
now := time.Now()
153+
154+
if now.Sub(c.lastFetchedAt) < c.interval {
155+
return c.lastResponse, nil
156+
}
157+
158+
req, err := http.NewRequestWithContext(ctx, "GET", c.url, nil)
159+
if err != nil {
160+
return nil, err
161+
}
162+
resp, err := c.client.Do(req)
163+
if err != nil {
164+
return nil, err
165+
}
166+
defer resp.Body.Close()
167+
if resp.StatusCode != http.StatusOK {
168+
return nil, fmt.Errorf("status is not OK: %d", resp.StatusCode)
169+
}
170+
response, err := io.ReadAll(resp.Body)
171+
if err != nil {
172+
return nil, err
173+
}
174+
c.lastFetchedAt = now
175+
c.lastResponse = response
176+
return response, nil
177+
}
178+
179+
func NewDCGMExporter(execPath string, port int, interval time.Duration) *DCGMExporter {
180+
listenAddr := fmt.Sprintf("localhost:%d", port)
181+
client := &http.Client{
182+
Timeout: 10 * time.Second,
183+
}
184+
return &DCGMExporter{
185+
execPath: execPath,
186+
listenAddr: listenAddr,
187+
client: client,
188+
url: fmt.Sprintf("http://%s/metrics", listenAddr),
189+
interval: interval,
190+
}
191+
}
192+
193+
func GetDCGMExporterExecPath(ctx context.Context) (string, error) {
194+
path, err := exec.LookPath(dcgmExporterExecName)
195+
if err != nil {
196+
return "", err
197+
}
198+
cmd := execute.ExecTask{
199+
Command: path,
200+
Args: []string{"-v"},
201+
StreamStdio: false,
202+
}
203+
res, err := cmd.Execute(ctx)
204+
if err != nil {
205+
return "", err
206+
}
207+
if res.ExitCode != 0 {
208+
return "", fmt.Errorf("%s returned %d, stderr: %s, stdout: %s", path, res.ExitCode, res.Stderr, res.Stdout)
209+
}
210+
log.Debug(ctx, "detected", "path", path, "version", strings.TrimSpace(res.Stdout))
211+
return path, nil
212+
}

0 commit comments

Comments
 (0)